/[xulu]/branches/1.8-gt2-2.6/src/appl/parallel/server/XuluServer.java
ViewVC logotype

Contents of /branches/1.8-gt2-2.6/src/appl/parallel/server/XuluServer.java

Parent Directory Parent Directory | Revision Log Revision Log


Revision 47 - (show annotations)
Mon Aug 31 14:23:19 2009 UTC (15 years, 3 months ago) by mojays
File size: 33991 byte(s)
Branch 1.8-gt2-2.6 (from rev 45) for geotools 2.6 migration
1 package appl.parallel.server;
2
3 import java.awt.Rectangle;
4 import java.io.IOException;
5 import java.net.DatagramPacket;
6 import java.net.InetAddress;
7 import java.net.MalformedURLException;
8 import java.net.MulticastSocket;
9 import java.net.URL;
10 import java.net.UnknownHostException;
11 import java.rmi.MarshalledObject;
12 import java.rmi.Naming;
13 import java.rmi.NotBoundException;
14 import java.rmi.RMISecurityManager;
15 import java.rmi.Remote;
16 import java.rmi.RemoteException;
17 import java.rmi.activation.Activatable;
18 import java.rmi.activation.ActivationID;
19 import java.rmi.server.ServerNotActiveException;
20 import java.rmi.server.UnicastRemoteObject;
21 import java.util.HashMap;
22 import java.util.Vector;
23 import java.util.concurrent.Callable;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.Future;
28
29 import net.jini.loader.pref.PreferredClassLoader;
30
31 import org.apache.log4j.BasicConfigurator;
32 import org.apache.log4j.Level;
33 import org.apache.log4j.LogManager;
34 import org.apache.log4j.Logger;
35 import org.apache.log4j.PropertyConfigurator;
36 import org.apache.log4j.net.SocketAppender;
37
38 import appl.ext.XuluConfig;
39 import appl.parallel.ComputingResourceProperties;
40 import appl.parallel.client.DataServer;
41 import appl.parallel.client.ClientDataServer;
42 import appl.parallel.event.CommEventSink;
43 import appl.parallel.event.TimeEvent;
44 import appl.parallel.event.CommEvent.CommType;
45 import appl.parallel.services.RemoteEventProxy;
46 import appl.parallel.spmd.AdvancedSPMDServerController;
47 import appl.parallel.spmd.AdvancedSPMDServerInterface;
48 import appl.parallel.spmd.SPMDServerController;
49 import appl.parallel.spmd.SPMDServerInterface;
50 import appl.parallel.spmd.SPMDTask;
51 import appl.parallel.spmd.split.SinglePartitionInfo;
52 import appl.parallel.spmd.split.SplitMap;
53 import appl.parallel.spmd.split.SplitMap1DVertical;
54 import appl.parallel.spmd.split.AbstractSplitMap.NeighborhoodBoxingMode;
55 import appl.parallel.thread.OneMethodThread;
56 import appl.parallel.util.Helper;
57 import appl.util.benchmark.Benchmark;
58
59 /**
60 * The Server manages the remote execution of program code. It may be started
61 * using the {@link #main(String[])} method.<br>
62 * The Server reads the following properties from {@link XuluConfig} - the
63 * values given should be the defaults.<br>
64 * <table border="1">
65 * <tr>
66 * <th>Property</th>
67 * <th>Default</th>
68 * <th>Desc</th>
69 * </tr>
70 * <tr>
71 * <td>XuluServer.useCodeDownloading</td>
72 * <td>false</td>
73 * <td>if true, the server tries to download task class files from a
74 * HTTP-server at the client-url</td>
75 * </tr>
76 * <tr>
77 * <td>XuluServer.registryPort</td>
78 * <td>1099</td>
79 * <td>The port at which a new registry is created, if no running registry was
80 * found</td>
81 * </tr>
82 * <tr>
83 * <td>XuluServer.multicastgroup</td>
84 * <td>239.1.1.1</td>
85 * <td>The default multicast group</td>
86 * </tr>
87 * <tr>
88 * <td> XuluServer.multicastport</td>
89 * <td>10000</td>
90 * <td>The default multicast port</td>
91 * </tr>
92 * <tr>
93 * <td>XuluServer.eventDelay</td>
94 * <td>50</td>
95 * <td>Events generated on server side may be delayed to collect multiple
96 * Events and send them at once (this saves communication time).Time is given in
97 * milliseconds</td>
98 * </tr>
99 * <tr>
100 * <td>XuluServer.log4j.logLevel</td>
101 * <td>info</td>
102 * <td>the level can be '<i>debug</i>','<i>info</i>','<i>warn</i>','<i>error</i>'
103 * or '<i>fatal</i>' and can be overridden by commandline - Parameters<br>
104 * <br>
105 * </td>
106 * </tr>
107 * <td>XuluServer.log4j.mode</td>
108 * <td>console</td>
109 * <td>the mode can be '<i>file</i>','<i>chainsaw</i>' or '<i>console</i>'.
110 * File means that the configurationfile xuluserverlog4j.cfg is read. <br>
111 * <i>'chainsaw'</i> means that the server tries to configure the logger each
112 * time a new client for a chainsaw instance running at the client ip
113 * (standardports) <br>
114 * </td>
115 * </tr>
116 * <td>XuluServer.benchmarkclass</td>
117 * <td>appl.util.benchmark.SimpleBenchmark</td>
118 * <td>The Benchmark to get a rating for this machine. Notice that you can
119 * override the rating with the -rating parameter</td>
120 * </tr>
121 * <tr>
122 * <td>XuluServer.useThreads</td>
123 * <td>max</td>
124 * <td>The number of threads to be used by the server for tasks that support
125 * multi-threading. Per default one thread is created for every available
126 * Processors (value='max'). More threads than processors may be useful for
127 * testing tasks which support multihreading or for processors which support
128 * hyperthreading.</td>
129 * </tr>
130 * <tr>
131 * <td>XuluServer.runbench</td>
132 * <td>true</td>
133 * <td>the benchmark is run automatically</td>
134 * </tr>
135 * </table> <br>
136 * <b>Multicasting:</b> <br>
137 * If multicasting does not work, try to disable your firewall or configure it
138 * to allow multicast UDP packages. Notice also, that multicasting may not work
139 * in virtual machines like VMWare or MS Virtual PC 2004/7. <br>
140 * <br>
141 * <b>Assumptions</b>
142 * Only one client is connected to the server (and can run Tasks) <br>
143 *<br>
144 * See {@link #main(String[])} for details about command line parameters
145 * @see ServerMulticastReceiver
146 * @author Dominik Appl
147 */
148 public class XuluServer extends UnicastRemoteObject implements SPMDResource {
149
150 /**
151 * This Thread is used when multiple threads should be executed simultaneous during multithreading
152 *
153 * @author Dominik Appl
154 */
155 class SimpleCallThread implements Callable {
156
157 private final SPMDTask task;
158
159 private final Object[] parameters;
160
161 /**
162 * @param task the task to be executed
163 * @param parameters the parameters for the run method of the task
164 */
165 public SimpleCallThread(SPMDTask task, Object... parameters) {
166 this.task = task;
167 this.parameters = parameters;
168 }
169
170 /*
171 * (non-Javadoc)
172 *
173 * @see java.util.concurrent.Callable#call()
174 */
175 public Object call() throws Exception {
176 return task.run(parameters);
177 }
178 }
179
180 private static final Logger LOG = LogManager
181 .getLogger("appl.parallel.server.XuluServer");
182
183 private static String bindingName = "XuluServer";
184
185 private static final String helloFromServerMessage = "Hello Xulu from XuluServer";
186
187 private static String log4jfilename = "xuluserverlog4j.cfg";
188
189 private static boolean chainsaw = false;
190
191 private static SocketAppender chainsawAppender;
192
193 /**
194 * @param mode the mode of the execution
195 */
196 private static void log4jConfig(String mode) {
197 if (mode.equals("chainsaw")) {
198 BasicConfigurator.resetConfiguration();
199 chainsawAppender = new SocketAppender();
200 BasicConfigurator.configure(chainsawAppender);
201 System.out.println("Server configured for chainsaw.");
202 chainsaw = true;
203 } else if (mode.equals("file")) {
204 BasicConfigurator.resetConfiguration();
205 PropertyConfigurator.configure(log4jfilename);
206 } else if (mode.equals("console")) {
207 BasicConfigurator.resetConfiguration();
208 BasicConfigurator.configure();
209 } else {
210 System.err.println("Invalid log4j-Config parameter found: " + mode);
211 printInfoMessage();
212 System.exit(0);
213 }
214 }
215
216 /**
217 * configures the log4j level
218 *
219 * @see #main(String[])
220 */
221 private static void log4jLevel(String level) {
222 if (level.equals("info")) {
223 LOG.setLevel(Level.INFO);
224 } else if (level.equals("debug")) {
225 LOG.setLevel(Level.DEBUG);
226 } else if (level.equals("error")) {
227 LOG.setLevel(Level.ERROR);
228 } else if (level.equals("fatal")) {
229 LOG.setLevel(Level.FATAL);
230 } else if (level.equals("warn")) {
231 LOG.setLevel(Level.WARN);
232 } else if (level.equals("off")) {
233 LOG.setLevel(Level.OFF);
234 } else {
235 System.err.println("Invalid log4j-Level found: " + level);
236 printInfoMessage();
237 System.exit(0);
238 }
239 System.out.println("Setting logLevel to " + level);
240 }
241
242 /**
243 * Starts the server.
244 *
245 * @param args
246 * The following parameters are valid: <br>
247 * <br>
248 * -loglevel:<code>LEVEL</code> - where <code>LEVEL</code> is '<i>debug</i>','<i>info</i>'
249 * (DEFAULT),'<i>warn</i>','<i>error</i>' or '<i>fatal</i>' <br>
250 * <br>
251 * -logconfig:<code>CONFIG</code> - where <code>CONFIG</code> is '<i>file</i>','<i>chainsaw</i>' or '<i>console</i>'
252 * <li>'chainsaw' means that the server tries to configure the logger each time a
253 * new client for a chainsaw instance running at the client ip
254 * (standardports) </li><br>
255 * <li>'<i>file</i>' means that the file '<i>xuluserverlog4j.cfg</i>'
256 * in the home-directory is used for configuration <br> </li>
257 * <li>'<i>console</i>'
258 * means of course, that the console is used for output <br> </li>
259 * <br>
260 * -port:PORTNUMBER - binds the server to the specified port (default 1099)
261 *
262
263 */
264 public static void main(String[] args) {
265 // parse arguments
266 int serverPort = 0;
267 for (String arg : args) {
268 if (arg.toLowerCase().startsWith("-loglevel:"))
269 log4jLevel(arg.substring(10).toLowerCase());
270 else if (arg.toLowerCase().startsWith("-logconfig:"))
271 log4jConfig(arg.substring(11).toLowerCase());
272 else if (arg.toLowerCase().startsWith("-port:"))
273 serverPort = Integer.valueOf(arg.substring(6));
274 else {
275 System.err.println("Invalid Argument found: " + arg);
276 printInfoMessage();
277 System.exit(0);
278 }
279 }
280
281 if (System.getSecurityManager() == null) {
282 System.setSecurityManager(new RMISecurityManager());
283 }
284 Remote server;
285 try {
286 server = new XuluServer(serverPort);
287 System.out.println("Xulu Server is up and running!");
288 } catch (RemoteException e) {
289 System.err.println("Xulu Server could not start!");
290 e.printStackTrace();
291 }
292 }
293
294 private static void printInfoMessage() {
295 System.out
296 .println("The following arguments are valid: \n"
297 + "-loglevel:<LEVEL> \n"
298 + " where <LEVEL> is 'off','debug','info' (DEFAULT),'warn','error' or 'fatal'\n\n"
299 + "-logconfig:<CONFIG> \n"
300 + " where <CONFIG> is 'file','chainsaw' or 'console'(DEFAULT) \n"
301 + " 'chainsaw' means that the server tries to configure the logger each\n"
302 + " time a new client for a chainsaw instance running at the client ip\n"
303 + " (standardports) \n"
304 + " 'file' means that the file 'xuluserverlog4j.cfg' in the home-directory "
305 + " is used for configuration \n"
306 + " 'console' means of course, that the console is used for output"
307 + "-port:<portnumber> \n"
308 + " The Server is bound to the specified port");
309 }
310
311 private static void unbind() {
312 Helper.unbind(bindingName);
313 }
314
315 /**
316 * The {@link Class} objects of the currently active connection. Every time
317 * {@link #runSPMDModelTask(String, int, Object[])} is called the
318 * newest class is loaded from the client. For performance reasons this will
319 * happen only one time per connection. In this map the current active
320 * classes are stored.
321 */
322 private HashMap<String, SPMDTask[]> spmdTaskInstances;
323
324 /* standard is should be 1099 */
325 private int registryPort = 1099;
326
327 private int multicastPort;
328
329 private String multiCastIP;
330
331 ServerMulticastReceiver multicastThread;
332
333 InetAddress multiCastGroup;
334
335 // says whether a client is connected
336 private boolean connected = false;
337
338 private long starttime = System.currentTimeMillis();
339
340 private String connectedClient;
341
342 private PartitionDataManager dataManager;
343
344 private RemoteEventProxy remoteEventReceiver;
345
346 // User specified name of the Server or hostname
347 private String serverName;
348
349 private int machineRating = 0;
350
351 private final boolean loggingEnabled;
352
353 private int taskPriority = Thread.NORM_PRIORITY;
354
355 private int availableProcessors = 1;
356
357 private int toUseThreads = 1;
358
359 private final boolean isInternalServer;
360
361 private ClientDataServer localSPMDClient;
362
363 private boolean useCodeDownloading;
364
365 private ExecutorService executor;
366
367 // used for execution results
368 private Future[] resultCalls;
369
370 /**
371 * Creates a new instance at the standard port
372 *
373 * @throws RemoteException
374 */
375 public XuluServer() throws RemoteException {
376 this(true, false, 0);
377 }
378
379 /**
380 * @param configLogging
381 * enable or disable logging
382 * @param isInternalServer
383 * set to true if you want to make method calls purely local.
384 * This is for performance reasons (see
385 * {@link UnicastRemoteObject#getClientHost()}
386 * @param port
387 * port the server is bound to (0 for default)
388 * @throws RemoteException
389 */
390 public XuluServer(boolean configLogging, boolean isInternalServer, int port)
391 throws RemoteException {
392 super();
393 loggingEnabled = configLogging;
394 this.isInternalServer = isInternalServer;
395 initConfiguration();
396 //the executor is used for multithreading
397 executor = Executors.newCachedThreadPool();
398 spmdTaskInstances = new HashMap<String, SPMDTask[]>(10);
399 if (port != 0)
400 this.registryPort = port;
401 bind();
402
403 initializeMultiCasting();
404 serverName = "unknown server";
405 try {
406 if (isInternalServer)
407 serverName = "internal Server";
408 else
409 serverName = InetAddress.getLocalHost().getHostName();
410 } catch (UnknownHostException e) {
411 // TODO Auto-generated catch block
412 e.printStackTrace();
413 }
414 if (port != 0)
415 serverName = serverName + " (port " + registryPort + ")";
416 runBenchmark();
417 }
418
419 /**
420 * <br>
421 * This is used when a Server is running inside the Xulu-Client. It has
422 * performance advantages (direct access - no TCP/IP)
423 *
424 * @param configLogging
425 * enable or disable logging
426 * @param clientDataServer a local client for faster access
427 * @throws RemoteException
428 */
429 public XuluServer(boolean configLogging, ClientDataServer clientDataServer)
430 throws RemoteException {
431 this(configLogging, true, 0);
432 this.localSPMDClient = clientDataServer;
433 }
434
435 /**
436 * @param port
437 * the port to which the Server is bound
438 * @throws RemoteException
439 */
440 public XuluServer(int port) throws RemoteException {
441 this(true, false, port);
442 }
443
444 /**
445 * Bind the remote object's stub in the registry. Creates a registry if no
446 * running registry is found.
447 */
448 private void bind() throws RemoteException {
449 String name = bindingName;
450 Helper.bind(name, this, registryPort);
451 }
452
453 private boolean checkConnected(String callingHost) {
454
455 if (callingHost.equals(connectedClient))
456 return true;
457 LOG
458 .warn(callingHost
459 + " has tried to execute a critical operation, but was not connected "
460 + ". Actual connected client: " + connectedClient);
461 return false;
462 }
463
464 /**
465 * Only a connected client has access to all functionality. When connecting
466 * or reconnecting, all data of the server and the corresponding
467 * {@link PartitionDataServer} is reset!
468 *
469 * @see appl.parallel.ComputingResource#connect()
470 */
471 public synchronized boolean connect() throws RemoteException {
472 /**
473 * determine the calling host. Internal Server requires special
474 * handling: notice that this happens very often in this class, but can
475 * not externalized due to the behavior of #getClientHost();
476 */
477 String callingHost = "error";
478 try {
479 callingHost = isInternalServer ? "localhost" : getClientHost();
480 } catch (ServerNotActiveException e) {
481 e.printStackTrace();
482 }
483 /** ********************************************************************** */
484
485 if (connectedClient == null || callingHost.equals(connectedClient)) {
486 if (connectedClient == null)
487 LOG.info("Client " + callingHost + " connected!");
488 else
489 LOG.info("Client " + connectedClient + " reconnected! ");
490 // reset data
491 reset();
492 connectedClient = callingHost;
493 // initalize event system & chainsaw
494 initEventSystem();
495 initChainsaw(connectedClient);
496 return true;
497 } else if (connectedClient != null) {
498 LOG.warn(callingHost + " has tried to "
499 + "connect to server, but was not connected, "
500 + "because this server is in use by " + connectedClient);
501 }
502
503 return false;
504 }
505
506 /*
507 * (non-Javadoc)
508 *
509 * @see appl.parallel.server.ComputingResource#createDataServer()
510 */
511 public PartitionDataServer createDataServer(String[] IPs)
512 throws RemoteException {
513 /**
514 * determine the calling host. Internal Server requires special
515 * handling: notice that this happens very often in this class, but can
516 * not externalized due to the behavior of #getClientHost();
517 */
518 String callingHost = "localhost";
519 try {
520 callingHost = isInternalServer ? "localhost" : getClientHost();
521 } catch (ServerNotActiveException e) {
522 e.printStackTrace();
523 }
524 /** ********************************************************************** */
525 if (checkConnected(callingHost)) {
526 if (isInternalServer)
527 dataManager = new PartitionDataManager(IPs,
528 remoteEventReceiver, getRegistryPort(), localSPMDClient);
529 else
530 dataManager = new PartitionDataManager(IPs,
531 remoteEventReceiver, getRegistryPort());
532 return dataManager;
533 }
534 return null;
535 }
536
537 /* (non-Javadoc)
538 * @see appl.parallel.server.ComputingResource#disconnect()
539 */
540 public synchronized void disconnect() throws RemoteException {
541 /**
542 * determine the calling host. Internal Server requires special
543 * handling: notice that this happens very often in this class, but can
544 * not externalized due to the behavior of #getClientHost();
545 */
546 String callingHost = "localhost";
547 try {
548 callingHost = isInternalServer ? "localhost" : getClientHost();
549 } catch (ServerNotActiveException e) {
550 e.printStackTrace();
551 }
552 /** ********************************************************************** */
553 if (checkConnected(callingHost)) {
554 reset();
555 LOG.info("Client " + connectedClient
556 + " successfully disconnected from Server");
557 }
558 }
559
560 /**
561 * @return a rating for the machine as discovered by the benchmark
562 */
563 public int getRating() {
564 return machineRating;
565 }
566
567 /**
568 * @return the registry port the server is bound to
569 */
570 public int getRegistryPort() {
571 return registryPort;
572 }
573
574 /*
575 * (non-Javadoc)
576 *
577 * @see appl.parallel.server.XuluServer#getResourceInformation()
578 */
579 public ComputingResourceProperties getResourceInformation()
580 throws RemoteException {
581 return new XuluServerProperties(this, serverName);
582 }
583
584 /**
585 * @return the time in s the server is running
586 */
587 int getUptime() {
588 return (int) (System.currentTimeMillis() - starttime);
589 }
590
591 private void initChainsaw(String connectedClient) {
592 if (chainsaw && connectedClient != null) {
593 chainsawAppender = new SocketAppender(connectedClient, 4445);
594 BasicConfigurator.resetConfiguration();
595 BasicConfigurator.configure(chainsawAppender);
596 }
597 }
598
599 /**
600 * inits task priority, ports and loglevel
601 */
602 private void initConfiguration() {
603 XuluConfig config = XuluConfig.getXuluConfig();
604 // a lower priority will run the server more in the background
605 taskPriority = config.getIntProperty("XuluServer.priority");
606
607 switch (taskPriority) {
608 default:
609 taskPriority = Thread.NORM_PRIORITY;
610 break;
611 case 1:
612 taskPriority = Thread.MIN_PRIORITY;
613 break;
614 case 2:
615 taskPriority = Thread.NORM_PRIORITY - 1;
616 break;
617 case 4:
618 taskPriority = Thread.NORM_PRIORITY + 1;
619 break;
620 case 5:
621 taskPriority = Thread.MAX_PRIORITY;
622 break;
623 }
624
625 availableProcessors = Runtime.getRuntime().availableProcessors();
626 // default: try to use one thread per processor
627 toUseThreads = availableProcessors;
628 // check if there is an entry in the XuluConfig saying how many threads
629 // should be created.
630 // (more threads than processors may be useful for the use of
631 // hyperthreading)
632 String configProz = config.getProperty("XuluServer.useThreads");
633 if ((configProz != null) && !configProz.equals("max")) {
634 toUseThreads = config.getIntProperty("XuluServer.useThreads");
635 // error checking:
636 if (toUseThreads < 1 || toUseThreads > 128){
637 toUseThreads = availableProcessors;
638 System.out
639 .println("Error: Number of threads must be between 1 and 128!");
640 }
641 }
642 // init correspondig arrays
643 resultCalls = new Future[toUseThreads];
644
645 int port = config.getIntProperty("XuluServer.registryport");
646 if (port != 0)
647 registryPort = port;
648
649 useCodeDownloading = config
650 .getBooleanProperty("XuluServer.useCodeDownloading");
651 if (useCodeDownloading)
652 System.out.println("Code downloading is enabled");
653 else
654 System.out.println("Code downloading is disabled");
655
656 int multicastport = config.getIntProperty("XuluServer.multicastport");
657 if (multicastport != 0)
658 multicastPort = multicastport;
659 String IP = config.getProperty("XuluServer.multicastgroup");
660 if (IP != null)
661 multiCastIP = IP;
662 try {
663 multiCastGroup = InetAddress.getByName(multiCastIP);
664 } catch (UnknownHostException e) {
665 e.printStackTrace();
666 }
667 if (loggingEnabled) {
668
669 String mode = config.getProperty("XuluServer.log4j.mode");
670 if (mode != null)
671 log4jConfig(mode);
672 }
673 String level = config.getProperty("XuluServer.log4j.logLevel");
674 if (level != null)
675 log4jLevel(level.toLowerCase());
676 else
677 log4jLevel("info");
678 }
679
680 /**
681 * Connects to the RemoteEventReceiver at the calling client
682 */
683 private void initEventSystem() throws RemoteException {
684 String lookupName = "rmi://" + connectedClient + "/RemoteEventReceiver";
685 try {
686 int delay = XuluConfig.getXuluConfig().getIntProperty(
687 "XuluServer.eventDelay");
688 System.out.println("looking up " + lookupName);
689 CommEventSink eventSink = (CommEventSink) Naming.lookup(lookupName);
690 if (remoteEventReceiver != null)
691 remoteEventReceiver.stopService();
692 remoteEventReceiver = new RemoteEventProxy(eventSink, delay);
693 remoteEventReceiver.startService();
694 } catch (MalformedURLException e) {
695 e.printStackTrace();
696 } catch (NotBoundException e) {
697 LOG.warn("Could not find the remote event receiver at "
698 + lookupName, e);
699 System.out.println(e.getMessage());
700 remoteEventReceiver = new RemoteEventProxy(null, 0);
701 }
702 }
703
704 /**
705 * inits the multicast unit
706 */
707 private void initializeMultiCasting() {
708 try {
709 MulticastSocket s = new MulticastSocket(multicastPort);
710 s.joinGroup(multiCastGroup);
711 // send hello message
712 String IP = XuluConfig.getXuluConfig().getProperty(
713 "XuluServer.multicastgroup");
714 DatagramPacket answer = new DatagramPacket(helloFromServerMessage
715 .getBytes(), helloFromServerMessage.length(), InetAddress
716 .getByName(IP), multicastPort);
717 s.send(answer);
718 // start thread that waits on client messages
719 multicastThread = new ServerMulticastReceiver(s);
720 multicastThread.start();
721 } catch (IOException e) {
722 // TODO Auto-generated catch block
723 LOG.warn("Could not initalize multicasting! Reason was "
724 + e.getMessage(), e);
725
726 }
727 }
728
729 /**
730 * @return whether this server is available or in use by another client
731 * @see appl.parallel.ComputingResource#isAvailable()
732 */
733 public boolean isAvailable() throws RemoteException {
734 /**
735 * determine the calling host. Internal Server requires special
736 * handling: notice that this happens very often in this class, but can
737 * not externalized due to the behavior of #getClientHost();
738 */
739 String callingHost = "localhost";
740 try {
741 callingHost = isInternalServer ? "localhost" : getClientHost();
742 } catch (ServerNotActiveException e) {
743 e.printStackTrace();
744 }
745 /** ********************************************************************** */
746 if (callingHost.equals(connectedClient))
747 return true;
748
749 if (connectedClient == null)
750 return true;
751 return false;
752 }
753
754 /**
755 * @return if a client is connected
756 */
757 public boolean isClientConnected() {
758 return connected;
759
760 }
761
762 /**
763 * Loads a {@link SPMDTask} from the specified URL and returns an array of
764 * instances of this class. There are so many instances, as there are
765 * available processors on this machine!. This class also makes sure that
766 * this happens only one time per connection (else this would of course be a
767 * performance killer in fine grained parallelization).
768 *
769 * @param location
770 * the URL of the rootDirectory/networkLocation
771 * @param spmdTaskClassName
772 * the name of the SPMDTask for which the newest class should be
773 * found
774 * @return the instance of the class or <code>null</code> if loading fails
775 */
776 private SPMDTask[] loadSPMDTasksfromURL(String location,
777 String spmdTaskClassName) {
778
779 SPMDTask[] spmdClassInstances = (SPMDTask[]) spmdTaskInstances
780 .get(spmdTaskClassName);
781
782 // if already loaded for this connection simply return the instances
783 if (spmdClassInstances != null)
784 return (SPMDTask[]) spmdClassInstances;
785 spmdClassInstances = new SPMDTask[toUseThreads];
786 Class spmdClass = null;
787 if (useCodeDownloading) {
788 try {
789 // lookup client and make sure that the newest class of the task
790 // is instantiated. Will avoid loading older cached or local
791 // filesystem version
792 URL[] locations = { new URL("http://" + location + "/") };
793 PreferredClassLoader classLoader = new PreferredClassLoader(
794 locations, Thread.currentThread()
795 .getContextClassLoader(), null, false);
796 LOG.debug("Try to load " + spmdTaskClassName + " from "
797 + location);
798 spmdClass = classLoader.loadClass(spmdTaskClassName);
799 } catch (Exception e) {
800 LOG.error("Could not load class for " + spmdTaskClassName
801 + " from " + location, e);
802 e.printStackTrace();
803 }
804 }
805 // if retrieval failed for some reason or code downloading disabled
806 // then use the local class
807 try {
808 if (spmdClass == null) {
809 spmdClass = Class.forName(spmdTaskClassName);
810 }
811
812 // instantiate the classes
813 for (int i = 0; i < spmdClassInstances.length; i++) {
814 Object task = spmdClass.newInstance();
815 if (!(task instanceof SPMDTask)) {
816 LOG.fatal(spmdTaskClassName
817 + " was not an instance of SPMDTask!");
818 throw new UnsupportedOperationException(spmdTaskClassName
819 + " was not an instance of SPMDTask!");
820 }
821 spmdClassInstances[i] = (SPMDTask) task;
822 }
823
824 spmdTaskInstances.put(spmdTaskClassName, spmdClassInstances);
825 return spmdClassInstances;
826
827 } catch (ClassNotFoundException e) {
828 // TODO Auto-generated catch block
829 e.printStackTrace();
830 } catch (InstantiationException e) {
831 // TODO Auto-generated catch block
832 e.printStackTrace();
833 } catch (IllegalAccessException e) {
834 // TODO Auto-generated catch block
835 e.printStackTrace();
836 }
837 throw new UnsupportedOperationException("Loading of class "
838 + spmdTaskClassName
839 + " failed on serverside! Execution canceled");
840 }
841
842 /*
843 * (non-Javadoc)
844 *
845 * @see appl.parallel.server.XuluServer#ping(java.lang.Object[])
846 */
847 public Object ping(Object... o) throws RemoteException {
848 System.out.println("received ping");
849 return o;
850 }
851
852 /**
853 * deletes all client-specific data data, so that a newly connected client
854 * can use a clean system
855 */
856 private void reset() {
857 if (dataManager != null)
858 dataManager.stop();
859 dataManager = null;
860 connectedClient = null;
861 spmdTaskInstances.clear();
862 }
863
864 /**
865 * runs the benchmark defined in XuluServer.benchmarkclass
866 */
867 private void runBenchmark() {
868 // see if the benchmark should be run
869
870 boolean runbench = XuluConfig.getXuluConfig().getBooleanProperty(
871 "XuluServer.runbench");
872 if (runbench) {
873 String classname = XuluConfig.getXuluConfig().getProperty(
874 "XuluServer.benchmarkclass");
875 if (classname != null) {
876 try {
877 // getBenchmarkClass
878 Benchmark bench = (Benchmark) Class.forName(classname)
879 .newInstance();
880 machineRating = bench.bench();
881 System.out
882 .println("Successfully run benchmark. Machine Rating is "
883 + machineRating);
884 } catch (Exception e) {
885 System.err
886 .println("Could not load Benchmarkclass "
887 + classname
888 + ". Setting rating to 0 (which means average). Reason was: "
889 + e.getMessage());
890 machineRating = 0;
891 }
892 } else
893 System.out.println("Benchmark disabled");
894 }
895 }
896
897 /**
898 * Every time {@link #runSPMDModelTask(String, int, Object[])} is called the
899 * newest class is loaded from the client. For performance reasons this will
900 * happen only one time per connection.
901 *
902 * @see appl.parallel.server.SPMDResource#runSPMDModelTask(String, int,
903 * Object[])
904 */
905 @SuppressWarnings("unchecked")
906 public Object[] runSPMDModelTask(String spmdTaskName, int referenceID,
907 Object... parameters) throws RemoteException {
908 /**
909 * determine the calling host. Internal Server requires special
910 * handling: notice that this happens very often in this class, but can
911 * not externalized due to the behavior of #getClientHost();
912 */
913 String callingHost = "localhost";
914 try {
915 callingHost = isInternalServer ? "localhost" : getClientHost();
916 } catch (ServerNotActiveException e) {
917 e.printStackTrace();
918 }
919 /** ********************************************************************** */
920 Thread.currentThread().setPriority(taskPriority);
921 String clientIP = "<unknown Client>";
922 long time = System.nanoTime();
923
924 clientIP = callingHost;
925 if (!clientIP.equals(connectedClient)) {
926 LOG.warn(clientIP
927 + " has tried to run a Task, but was not connected. "
928 + "This server is in use by " + connectedClient);
929 throw new RemoteException(
930 "Run of SPMDTask failed! This server is in use by "
931 + connectedClient);
932 }
933 if (LOG.isDebugEnabled())
934 LOG.debug("received task from " + clientIP
935 + ". Starting evaluation now...");
936 // if no datamanager is found: exit
937 if (dataManager == null)
938 throw new UnsupportedOperationException(
939 "No DataManager found. Create one first using 'createDataManager' on this Server");
940
941 // lookup newest version of the class from the server at the url
942 // (or from local disk if remote class loading is disabled)
943 SPMDTask[] tasks = loadSPMDTasksfromURL(clientIP, spmdTaskName);
944
945 // The number of threads allowed for execution can be different
946 // from the number of user assigned threads
947 // because tasks may not support multithreadeding
948 int allowedThreads = 1;
949 if (tasks[0].supportsMultiThreading())
950 allowedThreads = toUseThreads;
951
952 Object results[] = new Object[allowedThreads];
953
954 // the following loop is only executed the first time a task is used!
955 if (!tasks[0].isInitialized()) {
956 if (LOG.isInfoEnabled())
957 LOG.info("Trying to use " + allowedThreads
958 + " threads for execution of "
959 + tasks[0].getClass().getName());
960 // calculate splitting for multiple threads:
961 int splitMapPos = dataManager.getInfo(referenceID).getSplitMapPos();
962 SplitMap splitMap = dataManager.getInfo(referenceID).getSplitMap();
963 SplitMap1DVertical map1DVertical = new SplitMap1DVertical(splitMap
964 .getPartitionCalculationBounds(splitMapPos).width, splitMap
965 .getPartitionCalculationBounds(splitMapPos).height, 0,
966 allowedThreads, NeighborhoodBoxingMode.outBoxing);
967
968 // create controllers and init the tasks
969 for (int i = 0; i < allowedThreads; i++) {
970 AdvancedSPMDServerController serverController = new AdvancedSPMDServerController(
971 dataManager, referenceID);
972 // set first thread as master thread
973 serverController.setMasterThread(i == 0);
974 // now assign the calculation area. Because we have assumed that
975 // the (Thread)-calculation
976 // area begins at (0,0) we must move it relative to the real
977 // local calculation area
978 Rectangle calculationBounds = map1DVertical
979 .getPartitionCalculationBounds(i);
980 int absolutX = (int) (serverController.getLocalCalcMinX() + calculationBounds
981 .getMinX());
982 int absolutY = (int) (serverController.getLocalCalcMinY() + calculationBounds
983 .getMinY());
984 calculationBounds.setLocation(absolutX, absolutY);
985 serverController.setLocalCalculationBounds(calculationBounds);
986 tasks[i].setSPMDServerController(serverController);
987 tasks[i].initialize();
988 }
989 }
990 if (allowedThreads == 1) // only for saving resources: no new thread
991 { // if only one thread is used
992 Object executionResult = tasks[0].run(parameters);
993 results = new Object[] { executionResult };
994 } else {
995 // for more than 1 processor execute the multiple tasks in separate
996 // threads
997 for (int i = 0; i < allowedThreads; i++) {
998 resultCalls[i] = executor.submit(new SimpleCallThread(tasks[i],
999 parameters));
1000 }
1001 // wait for the Threads to finish
1002 for (int i = 0; i < allowedThreads; i++) {
1003 try {
1004 results[i] = resultCalls[i].get();
1005 } catch (InterruptedException e) {
1006 // TODO Auto-generated catch block
1007 e.printStackTrace();
1008 } catch (ExecutionException e) {
1009 // TODO Auto-generated catch block
1010 e.printStackTrace();
1011 }
1012 }
1013 }
1014 // generate a event for the execution
1015 if (remoteEventReceiver.isTimeMonitoringEnabled())
1016 remoteEventReceiver.fireRemoteEvent(new TimeEvent((System
1017 .nanoTime() - time), serverName, "DataServer",
1018 CommType.REMOTE_EXECUTION));
1019 return results;
1020 }
1021
1022 /**
1023 * This method frees all resources.<br>
1024 * Notice that this method is not remotely accessible!
1025 */
1026 public void stopServer() {
1027 unbind();
1028 if (dataManager != null)
1029 dataManager.stop();
1030 dataManager = null;
1031 if (multicastThread != null)
1032 multicastThread.stopThread();
1033 }
1034 }

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26