/[xulu]/trunk/src/appl/parallel/server/XuluServer.java
ViewVC logotype

Contents of /trunk/src/appl/parallel/server/XuluServer.java

Parent Directory Parent Directory | Revision Log Revision Log


Revision 114 - (show annotations)
Mon Jul 11 11:31:25 2011 UTC (13 years, 5 months ago) by mojays
File size: 33553 byte(s)
SCHMITZM library updated to current version (2.6-SNAPSHOT)
Added gt-xsd-filter.jar, gt-xsd-gml2.jar, picocontainer.jar and xsd.jar from Geotools 2.6.5
1 package appl.parallel.server;
2
3 import java.awt.Rectangle;
4 import java.io.IOException;
5 import java.net.DatagramPacket;
6 import java.net.InetAddress;
7 import java.net.MalformedURLException;
8 import java.net.MulticastSocket;
9 import java.net.URL;
10 import java.net.UnknownHostException;
11 import java.rmi.Naming;
12 import java.rmi.NotBoundException;
13 import java.rmi.RMISecurityManager;
14 import java.rmi.Remote;
15 import java.rmi.RemoteException;
16 import java.rmi.server.ServerNotActiveException;
17 import java.rmi.server.UnicastRemoteObject;
18 import java.util.HashMap;
19 import java.util.concurrent.Callable;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.Future;
24
25 import net.jini.loader.pref.PreferredClassLoader;
26
27 import org.apache.log4j.BasicConfigurator;
28 import org.apache.log4j.Level;
29 import org.apache.log4j.LogManager;
30 import org.apache.log4j.Logger;
31 import org.apache.log4j.PropertyConfigurator;
32 import org.apache.log4j.net.SocketAppender;
33
34 import appl.ext.XuluConfig;
35 import appl.parallel.ComputingResourceProperties;
36 import appl.parallel.client.ClientDataServer;
37 import appl.parallel.event.CommEvent.CommType;
38 import appl.parallel.event.CommEventSink;
39 import appl.parallel.event.TimeEvent;
40 import appl.parallel.services.RemoteEventProxy;
41 import appl.parallel.spmd.AdvancedSPMDServerController;
42 import appl.parallel.spmd.SPMDTask;
43 import appl.parallel.spmd.split.AbstractSplitMap.NeighborhoodBoxingMode;
44 import appl.parallel.spmd.split.SplitMap;
45 import appl.parallel.spmd.split.SplitMap1DVertical;
46 import appl.parallel.util.Helper;
47 import appl.util.benchmark.Benchmark;
48
49 /**
50 * The Server manages the remote execution of program code. It may be started
51 * using the {@link #main(String[])} method.<br>
52 * The Server reads the following properties from {@link XuluConfig} - the
53 * values given should be the defaults.<br>
54 * <table border="1">
55 * <tr>
56 * <th>Property</th>
57 * <th>Default</th>
58 * <th>Desc</th>
59 * </tr>
60 * <tr>
61 * <td>XuluServer.useCodeDownloading</td>
62 * <td>false</td>
63 * <td>if true, the server tries to download task class files from a
64 * HTTP-server at the client-url</td>
65 * </tr>
66 * <tr>
67 * <td>XuluServer.registryPort</td>
68 * <td>1099</td>
69 * <td>The port at which a new registry is created, if no running registry was
70 * found</td>
71 * </tr>
72 * <tr>
73 * <td>XuluServer.multicastgroup</td>
74 * <td>239.1.1.1</td>
75 * <td>The default multicast group</td>
76 * </tr>
77 * <tr>
78 * <td> XuluServer.multicastport</td>
79 * <td>10000</td>
80 * <td>The default multicast port</td>
81 * </tr>
82 * <tr>
83 * <td>XuluServer.eventDelay</td>
84 * <td>50</td>
85 * <td>Events generated on server side may be delayed to collect multiple
86 * Events and send them at once (this saves communication time).Time is given in
87 * milliseconds</td>
88 * </tr>
89 * <tr>
90 * <td>XuluServer.log4j.logLevel</td>
91 * <td>info</td>
92 * <td>the level can be '<i>debug</i>','<i>info</i>','<i>warn</i>','<i>error</i>'
93 * or '<i>fatal</i>' and can be overridden by commandline - Parameters<br>
94 * <br>
95 * </td>
96 * </tr>
97 * <td>XuluServer.log4j.mode</td>
98 * <td>console</td>
99 * <td>the mode can be '<i>file</i>','<i>chainsaw</i>' or '<i>console</i>'.
100 * File means that the configurationfile xuluserverlog4j.cfg is read. <br>
101 * <i>'chainsaw'</i> means that the server tries to configure the logger each
102 * time a new client for a chainsaw instance running at the client ip
103 * (standardports) <br>
104 * </td>
105 * </tr>
106 * <td>XuluServer.benchmarkclass</td>
107 * <td>appl.util.benchmark.SimpleBenchmark</td>
108 * <td>The Benchmark to get a rating for this machine. Notice that you can
109 * override the rating with the -rating parameter</td>
110 * </tr>
111 * <tr>
112 * <td>XuluServer.useThreads</td>
113 * <td>max</td>
114 * <td>The number of threads to be used by the server for tasks that support
115 * multi-threading. Per default one thread is created for every available
116 * Processors (value='max'). More threads than processors may be useful for
117 * testing tasks which support multihreading or for processors which support
118 * hyperthreading.</td>
119 * </tr>
120 * <tr>
121 * <td>XuluServer.runbench</td>
122 * <td>true</td>
123 * <td>the benchmark is run automatically</td>
124 * </tr>
125 * </table> <br>
126 * <b>Multicasting:</b> <br>
127 * If multicasting does not work, try to disable your firewall or configure it
128 * to allow multicast UDP packages. Notice also, that multicasting may not work
129 * in virtual machines like VMWare or MS Virtual PC 2004/7. <br>
130 * <br>
131 * <b>Assumptions</b>
132 * Only one client is connected to the server (and can run Tasks) <br>
133 *<br>
134 * See {@link #main(String[])} for details about command line parameters
135 * @see ServerMulticastReceiver
136 * @author Dominik Appl
137 */
138 public class XuluServer extends UnicastRemoteObject implements SPMDResource {
139
140 /**
141 * This Thread is used when multiple threads should be executed simultaneous during multithreading
142 *
143 * @author Dominik Appl
144 */
145 class SimpleCallThread implements Callable {
146
147 private final SPMDTask task;
148
149 private final Object[] parameters;
150
151 /**
152 * @param task the task to be executed
153 * @param parameters the parameters for the run method of the task
154 */
155 public SimpleCallThread(SPMDTask task, Object... parameters) {
156 this.task = task;
157 this.parameters = parameters;
158 }
159
160 /*
161 * (non-Javadoc)
162 *
163 * @see java.util.concurrent.Callable#call()
164 */
165 public Object call() throws Exception {
166 return task.run(parameters);
167 }
168 }
169
170 private static final Logger LOG = LogManager
171 .getLogger("appl.parallel.server.XuluServer");
172
173 private static String bindingName = "XuluServer";
174
175 private static final String helloFromServerMessage = "Hello Xulu from XuluServer";
176
177 private static String log4jfilename = "xuluserverlog4j.cfg";
178
179 private static boolean chainsaw = false;
180
181 private static SocketAppender chainsawAppender;
182
183 /**
184 * @param mode the mode of the execution
185 */
186 private static void log4jConfig(String mode) {
187 if (mode.equals("chainsaw")) {
188 BasicConfigurator.resetConfiguration();
189 chainsawAppender = new SocketAppender();
190 BasicConfigurator.configure(chainsawAppender);
191 System.out.println("Server configured for chainsaw.");
192 chainsaw = true;
193 } else if (mode.equals("file")) {
194 BasicConfigurator.resetConfiguration();
195 PropertyConfigurator.configure(log4jfilename);
196 } else if (mode.equals("console")) {
197 BasicConfigurator.resetConfiguration();
198 BasicConfigurator.configure();
199 } else {
200 System.err.println("Invalid log4j-Config parameter found: " + mode);
201 printInfoMessage();
202 System.exit(0);
203 }
204 }
205
206 /**
207 * configures the log4j level
208 *
209 * @see #main(String[])
210 */
211 private static void log4jLevel(String level) {
212 if (level.equals("info")) {
213 LOG.setLevel(Level.INFO);
214 } else if (level.equals("debug")) {
215 LOG.setLevel(Level.DEBUG);
216 } else if (level.equals("error")) {
217 LOG.setLevel(Level.ERROR);
218 } else if (level.equals("fatal")) {
219 LOG.setLevel(Level.FATAL);
220 } else if (level.equals("warn")) {
221 LOG.setLevel(Level.WARN);
222 } else if (level.equals("off")) {
223 LOG.setLevel(Level.OFF);
224 } else {
225 System.err.println("Invalid log4j-Level found: " + level);
226 printInfoMessage();
227 System.exit(0);
228 }
229 System.out.println("Setting logLevel to " + level);
230 }
231
232 /**
233 * Starts the server.
234 *
235 * @param args
236 * The following parameters are valid: <br>
237 * <br>
238 * -loglevel:<code>LEVEL</code> - where <code>LEVEL</code> is '<i>debug</i>','<i>info</i>'
239 * (DEFAULT),'<i>warn</i>','<i>error</i>' or '<i>fatal</i>' <br>
240 * <br>
241 * -logconfig:<code>CONFIG</code> - where <code>CONFIG</code> is '<i>file</i>','<i>chainsaw</i>' or '<i>console</i>'
242 * <li>'chainsaw' means that the server tries to configure the logger each time a
243 * new client for a chainsaw instance running at the client ip
244 * (standardports) </li><br>
245 * <li>'<i>file</i>' means that the file '<i>xuluserverlog4j.cfg</i>'
246 * in the home-directory is used for configuration <br> </li>
247 * <li>'<i>console</i>'
248 * means of course, that the console is used for output <br> </li>
249 * <br>
250 * -port:PORTNUMBER - binds the server to the specified port (default 1099)
251 *
252
253 */
254 public static void main(String[] args) {
255 // parse arguments
256 int serverPort = 0;
257 for (String arg : args) {
258 if (arg.toLowerCase().startsWith("-loglevel:"))
259 log4jLevel(arg.substring(10).toLowerCase());
260 else if (arg.toLowerCase().startsWith("-logconfig:"))
261 log4jConfig(arg.substring(11).toLowerCase());
262 else if (arg.toLowerCase().startsWith("-port:"))
263 serverPort = Integer.valueOf(arg.substring(6));
264 else {
265 System.err.println("Invalid Argument found: " + arg);
266 printInfoMessage();
267 System.exit(0);
268 }
269 }
270
271 if (System.getSecurityManager() == null) {
272 System.setSecurityManager(new RMISecurityManager());
273 }
274 Remote server;
275 try {
276 server = new XuluServer(serverPort);
277 System.out.println("Xulu Server is up and running!");
278 } catch (RemoteException e) {
279 System.err.println("Xulu Server could not start!");
280 e.printStackTrace();
281 }
282 }
283
284 private static void printInfoMessage() {
285 System.out
286 .println("The following arguments are valid: \n"
287 + "-loglevel:<LEVEL> \n"
288 + " where <LEVEL> is 'off','debug','info' (DEFAULT),'warn','error' or 'fatal'\n\n"
289 + "-logconfig:<CONFIG> \n"
290 + " where <CONFIG> is 'file','chainsaw' or 'console'(DEFAULT) \n"
291 + " 'chainsaw' means that the server tries to configure the logger each\n"
292 + " time a new client for a chainsaw instance running at the client ip\n"
293 + " (standardports) \n"
294 + " 'file' means that the file 'xuluserverlog4j.cfg' in the home-directory "
295 + " is used for configuration \n"
296 + " 'console' means of course, that the console is used for output"
297 + "-port:<portnumber> \n"
298 + " The Server is bound to the specified port");
299 }
300
301 private static void unbind() {
302 Helper.unbind(bindingName);
303 }
304
305 /**
306 * The {@link Class} objects of the currently active connection. Every time
307 * {@link #runSPMDModelTask(String, int, Object[])} is called the
308 * newest class is loaded from the client. For performance reasons this will
309 * happen only one time per connection. In this map the current active
310 * classes are stored.
311 */
312 private HashMap<String, SPMDTask[]> spmdTaskInstances;
313
314 /* standard is should be 1099 */
315 private int registryPort = 1099;
316
317 private int multicastPort;
318
319 private String multiCastIP;
320
321 ServerMulticastReceiver multicastThread;
322
323 InetAddress multiCastGroup;
324
325 // says whether a client is connected
326 private boolean connected = false;
327
328 private long starttime = System.currentTimeMillis();
329
330 private String connectedClient;
331
332 private PartitionDataManager dataManager;
333
334 private RemoteEventProxy remoteEventReceiver;
335
336 // User specified name of the Server or hostname
337 private String serverName;
338
339 private int machineRating = 0;
340
341 private final boolean loggingEnabled;
342
343 private int taskPriority = Thread.NORM_PRIORITY;
344
345 private int availableProcessors = 1;
346
347 private int toUseThreads = 1;
348
349 private final boolean isInternalServer;
350
351 private ClientDataServer localSPMDClient;
352
353 private boolean useCodeDownloading;
354
355 private ExecutorService executor;
356
357 // used for execution results
358 private Future[] resultCalls;
359
360 /**
361 * Creates a new instance at the standard port
362 *
363 * @throws RemoteException
364 */
365 public XuluServer() throws RemoteException {
366 this(true, false, 0);
367 }
368
369 /**
370 * @param configLogging
371 * enable or disable logging
372 * @param isInternalServer
373 * set to true if you want to make method calls purely local.
374 * This is for performance reasons (see
375 * {@link UnicastRemoteObject#getClientHost()}
376 * @param port
377 * port the server is bound to (0 for default)
378 * @throws RemoteException
379 */
380 public XuluServer(boolean configLogging, boolean isInternalServer, int port)
381 throws RemoteException {
382 super();
383 loggingEnabled = configLogging;
384 this.isInternalServer = isInternalServer;
385 initConfiguration();
386 //the executor is used for multithreading
387 executor = Executors.newCachedThreadPool();
388 spmdTaskInstances = new HashMap<String, SPMDTask[]>(10);
389 if (port != 0)
390 this.registryPort = port;
391 bind();
392
393 initializeMultiCasting();
394 serverName = "unknown server";
395 try {
396 if (isInternalServer)
397 serverName = "internal Server";
398 else
399 serverName = InetAddress.getLocalHost().getHostName();
400 } catch (UnknownHostException e) {
401 // TODO Auto-generated catch block
402 e.printStackTrace();
403 }
404 if (port != 0)
405 serverName = serverName + " (port " + registryPort + ")";
406 runBenchmark();
407 }
408
409 /**
410 * <br>
411 * This is used when a Server is running inside the Xulu-Client. It has
412 * performance advantages (direct access - no TCP/IP)
413 *
414 * @param configLogging
415 * enable or disable logging
416 * @param clientDataServer a local client for faster access
417 * @throws RemoteException
418 */
419 public XuluServer(boolean configLogging, ClientDataServer clientDataServer)
420 throws RemoteException {
421 this(configLogging, true, 0);
422 this.localSPMDClient = clientDataServer;
423 }
424
425 /**
426 * @param port
427 * the port to which the Server is bound
428 * @throws RemoteException
429 */
430 public XuluServer(int port) throws RemoteException {
431 this(true, false, port);
432 }
433
434 /**
435 * Bind the remote object's stub in the registry. Creates a registry if no
436 * running registry is found.
437 */
438 private void bind() throws RemoteException {
439 String name = bindingName;
440 Helper.bind(name, this, registryPort);
441 }
442
443 private boolean checkConnected(String callingHost) {
444
445 if (callingHost.equals(connectedClient))
446 return true;
447 LOG
448 .warn(callingHost
449 + " has tried to execute a critical operation, but was not connected "
450 + ". Actual connected client: " + connectedClient);
451 return false;
452 }
453
454 /**
455 * Only a connected client has access to all functionality. When connecting
456 * or reconnecting, all data of the server and the corresponding
457 * {@link PartitionDataServer} is reset!
458 *
459 * @see appl.parallel.ComputingResource#connect()
460 */
461 public synchronized boolean connect() throws RemoteException {
462 /**
463 * determine the calling host. Internal Server requires special
464 * handling: notice that this happens very often in this class, but can
465 * not externalized due to the behavior of #getClientHost();
466 */
467 String callingHost = "error";
468 try {
469 callingHost = isInternalServer ? "localhost" : getClientHost();
470 } catch (ServerNotActiveException e) {
471 e.printStackTrace();
472 }
473 /** ********************************************************************** */
474
475 if (connectedClient == null || callingHost.equals(connectedClient)) {
476 if (connectedClient == null)
477 LOG.info("Client " + callingHost + " connected!");
478 else
479 LOG.info("Client " + connectedClient + " reconnected! ");
480 // reset data
481 reset();
482 connectedClient = callingHost;
483 // initalize event system & chainsaw
484 initEventSystem();
485 initChainsaw(connectedClient);
486 return true;
487 } else if (connectedClient != null) {
488 LOG.warn(callingHost + " has tried to "
489 + "connect to server, but was not connected, "
490 + "because this server is in use by " + connectedClient);
491 }
492
493 return false;
494 }
495
496 /*
497 * (non-Javadoc)
498 *
499 * @see appl.parallel.server.ComputingResource#createDataServer()
500 */
501 public PartitionDataServer createDataServer(String[] IPs)
502 throws RemoteException {
503 /**
504 * determine the calling host. Internal Server requires special
505 * handling: notice that this happens very often in this class, but can
506 * not externalized due to the behavior of #getClientHost();
507 */
508 String callingHost = "localhost";
509 try {
510 callingHost = isInternalServer ? "localhost" : getClientHost();
511 } catch (ServerNotActiveException e) {
512 e.printStackTrace();
513 }
514 /** ********************************************************************** */
515 if (checkConnected(callingHost)) {
516 if (isInternalServer)
517 dataManager = new PartitionDataManager(IPs,
518 remoteEventReceiver, getRegistryPort(), localSPMDClient);
519 else
520 dataManager = new PartitionDataManager(IPs,
521 remoteEventReceiver, getRegistryPort());
522 return dataManager;
523 }
524 return null;
525 }
526
527 /* (non-Javadoc)
528 * @see appl.parallel.server.ComputingResource#disconnect()
529 */
530 public synchronized void disconnect() throws RemoteException {
531 /**
532 * determine the calling host. Internal Server requires special
533 * handling: notice that this happens very often in this class, but can
534 * not externalized due to the behavior of #getClientHost();
535 */
536 String callingHost = "localhost";
537 try {
538 callingHost = isInternalServer ? "localhost" : getClientHost();
539 } catch (ServerNotActiveException e) {
540 e.printStackTrace();
541 }
542 /** ********************************************************************** */
543 if (checkConnected(callingHost)) {
544 reset();
545 LOG.info("Client " + connectedClient
546 + " successfully disconnected from Server");
547 }
548 }
549
550 /**
551 * @return a rating for the machine as discovered by the benchmark
552 */
553 public int getRating() {
554 return machineRating;
555 }
556
557 /**
558 * @return the registry port the server is bound to
559 */
560 public int getRegistryPort() {
561 return registryPort;
562 }
563
564 /*
565 * (non-Javadoc)
566 *
567 * @see appl.parallel.server.XuluServer#getResourceInformation()
568 */
569 public ComputingResourceProperties getResourceInformation()
570 throws RemoteException {
571 return new XuluServerProperties(this, serverName);
572 }
573
574 /**
575 * @return the time in s the server is running
576 */
577 int getUptime() {
578 return (int) (System.currentTimeMillis() - starttime);
579 }
580
581 private void initChainsaw(String connectedClient) {
582 if (chainsaw && connectedClient != null) {
583 chainsawAppender = new SocketAppender(connectedClient, 4445);
584 BasicConfigurator.resetConfiguration();
585 BasicConfigurator.configure(chainsawAppender);
586 }
587 }
588
589 /**
590 * inits task priority, ports and loglevel
591 */
592 private void initConfiguration() {
593 XuluConfig config = XuluConfig.getXuluConfig();
594 // a lower priority will run the server more in the background
595 taskPriority = config.getIntProperty("XuluServer.priority");
596
597 switch (taskPriority) {
598 default:
599 taskPriority = Thread.NORM_PRIORITY;
600 break;
601 case 1:
602 taskPriority = Thread.MIN_PRIORITY;
603 break;
604 case 2:
605 taskPriority = Thread.NORM_PRIORITY - 1;
606 break;
607 case 4:
608 taskPriority = Thread.NORM_PRIORITY + 1;
609 break;
610 case 5:
611 taskPriority = Thread.MAX_PRIORITY;
612 break;
613 }
614
615 availableProcessors = Runtime.getRuntime().availableProcessors();
616 // default: try to use one thread per processor
617 toUseThreads = availableProcessors;
618 // check if there is an entry in the XuluConfig saying how many threads
619 // should be created.
620 // (more threads than processors may be useful for the use of
621 // hyperthreading)
622 String configProz = config.getProperty("XuluServer.useThreads");
623 if ((configProz != null) && !configProz.equals("max")) {
624 toUseThreads = config.getIntProperty("XuluServer.useThreads");
625 // error checking:
626 if (toUseThreads < 1 || toUseThreads > 128){
627 toUseThreads = availableProcessors;
628 System.out
629 .println("Error: Number of threads must be between 1 and 128!");
630 }
631 }
632 // init correspondig arrays
633 resultCalls = new Future[toUseThreads];
634
635 int port = config.getIntProperty("XuluServer.registryport");
636 if (port != 0)
637 registryPort = port;
638
639 useCodeDownloading = config
640 .getBooleanProperty("XuluServer.useCodeDownloading");
641 if (useCodeDownloading)
642 System.out.println("Code downloading is enabled");
643 else
644 System.out.println("Code downloading is disabled");
645
646 int multicastport = config.getIntProperty("XuluServer.multicastport");
647 if (multicastport != 0)
648 multicastPort = multicastport;
649 String IP = config.getProperty("XuluServer.multicastgroup");
650 if (IP != null)
651 multiCastIP = IP;
652 try {
653 multiCastGroup = InetAddress.getByName(multiCastIP);
654 } catch (UnknownHostException e) {
655 e.printStackTrace();
656 }
657 if (loggingEnabled) {
658
659 String mode = config.getProperty("XuluServer.log4j.mode");
660 if (mode != null)
661 log4jConfig(mode);
662 }
663 String level = config.getProperty("XuluServer.log4j.logLevel");
664 if (level != null)
665 log4jLevel(level.toLowerCase());
666 else
667 log4jLevel("info");
668 }
669
670 /**
671 * Connects to the RemoteEventReceiver at the calling client
672 */
673 private void initEventSystem() throws RemoteException {
674 String lookupName = "rmi://" + connectedClient + "/RemoteEventReceiver";
675 try {
676 int delay = XuluConfig.getXuluConfig().getIntProperty(
677 "XuluServer.eventDelay");
678 System.out.println("looking up " + lookupName);
679 CommEventSink eventSink = (CommEventSink) Naming.lookup(lookupName);
680 if (remoteEventReceiver != null)
681 remoteEventReceiver.stopService();
682 remoteEventReceiver = new RemoteEventProxy(eventSink, delay);
683 remoteEventReceiver.startService();
684 } catch (MalformedURLException e) {
685 e.printStackTrace();
686 } catch (NotBoundException e) {
687 LOG.warn("Could not find the remote event receiver at "
688 + lookupName, e);
689 System.out.println(e.getMessage());
690 remoteEventReceiver = new RemoteEventProxy(null, 0);
691 }
692 }
693
694 /**
695 * inits the multicast unit
696 */
697 private void initializeMultiCasting() {
698 try {
699 MulticastSocket s = new MulticastSocket(multicastPort);
700 s.joinGroup(multiCastGroup);
701 // send hello message
702 String IP = XuluConfig.getXuluConfig().getProperty(
703 "XuluServer.multicastgroup");
704 DatagramPacket answer = new DatagramPacket(helloFromServerMessage
705 .getBytes(), helloFromServerMessage.length(), InetAddress
706 .getByName(IP), multicastPort);
707 s.send(answer);
708 // start thread that waits on client messages
709 multicastThread = new ServerMulticastReceiver(s);
710 multicastThread.start();
711 } catch (IOException e) {
712 // TODO Auto-generated catch block
713 LOG.warn("Could not initalize multicasting! Reason was "
714 + e.getMessage(), e);
715
716 }
717 }
718
719 /**
720 * @return whether this server is available or in use by another client
721 * @see appl.parallel.ComputingResource#isAvailable()
722 */
723 public boolean isAvailable() throws RemoteException {
724 /**
725 * determine the calling host. Internal Server requires special
726 * handling: notice that this happens very often in this class, but can
727 * not externalized due to the behavior of #getClientHost();
728 */
729 String callingHost = "localhost";
730 try {
731 callingHost = isInternalServer ? "localhost" : getClientHost();
732 } catch (ServerNotActiveException e) {
733 e.printStackTrace();
734 }
735 /** ********************************************************************** */
736 if (callingHost.equals(connectedClient))
737 return true;
738
739 if (connectedClient == null)
740 return true;
741 return false;
742 }
743
744 /**
745 * @return if a client is connected
746 */
747 public boolean isClientConnected() {
748 return connected;
749
750 }
751
752 /**
753 * Loads a {@link SPMDTask} from the specified URL and returns an array of
754 * instances of this class. There are so many instances, as there are
755 * available processors on this machine!. This class also makes sure that
756 * this happens only one time per connection (else this would of course be a
757 * performance killer in fine grained parallelization).
758 *
759 * @param location
760 * the URL of the rootDirectory/networkLocation
761 * @param spmdTaskClassName
762 * the name of the SPMDTask for which the newest class should be
763 * found
764 * @return the instance of the class or <code>null</code> if loading fails
765 */
766 private SPMDTask[] loadSPMDTasksfromURL(String location,
767 String spmdTaskClassName) {
768
769 SPMDTask[] spmdClassInstances = (SPMDTask[]) spmdTaskInstances
770 .get(spmdTaskClassName);
771
772 // if already loaded for this connection simply return the instances
773 if (spmdClassInstances != null)
774 return (SPMDTask[]) spmdClassInstances;
775 spmdClassInstances = new SPMDTask[toUseThreads];
776 Class spmdClass = null;
777 if (useCodeDownloading) {
778 try {
779 // lookup client and make sure that the newest class of the task
780 // is instantiated. Will avoid loading older cached or local
781 // filesystem version
782 URL[] locations = { new URL("http://" + location + "/") };
783 PreferredClassLoader classLoader = new PreferredClassLoader(
784 locations, Thread.currentThread()
785 .getContextClassLoader(), null, false);
786 LOG.debug("Try to load " + spmdTaskClassName + " from "
787 + location);
788 spmdClass = classLoader.loadClass(spmdTaskClassName);
789 } catch (Exception e) {
790 LOG.error("Could not load class for " + spmdTaskClassName
791 + " from " + location, e);
792 e.printStackTrace();
793 }
794 }
795 // if retrieval failed for some reason or code downloading disabled
796 // then use the local class
797 try {
798 if (spmdClass == null) {
799 spmdClass = Class.forName(spmdTaskClassName);
800 }
801
802 // instantiate the classes
803 for (int i = 0; i < spmdClassInstances.length; i++) {
804 Object task = spmdClass.newInstance();
805 if (!(task instanceof SPMDTask)) {
806 LOG.fatal(spmdTaskClassName
807 + " was not an instance of SPMDTask!");
808 throw new UnsupportedOperationException(spmdTaskClassName
809 + " was not an instance of SPMDTask!");
810 }
811 spmdClassInstances[i] = (SPMDTask) task;
812 }
813
814 spmdTaskInstances.put(spmdTaskClassName, spmdClassInstances);
815 return spmdClassInstances;
816
817 } catch (ClassNotFoundException e) {
818 // TODO Auto-generated catch block
819 e.printStackTrace();
820 } catch (InstantiationException e) {
821 // TODO Auto-generated catch block
822 e.printStackTrace();
823 } catch (IllegalAccessException e) {
824 // TODO Auto-generated catch block
825 e.printStackTrace();
826 }
827 throw new UnsupportedOperationException("Loading of class "
828 + spmdTaskClassName
829 + " failed on serverside! Execution canceled");
830 }
831
832 /*
833 * (non-Javadoc)
834 *
835 * @see appl.parallel.server.XuluServer#ping(java.lang.Object[])
836 */
837 public Object ping(Object... o) throws RemoteException {
838 System.out.println("received ping");
839 return o;
840 }
841
842 /**
843 * deletes all client-specific data data, so that a newly connected client
844 * can use a clean system
845 */
846 private void reset() {
847 if (dataManager != null)
848 dataManager.stop();
849 dataManager = null;
850 connectedClient = null;
851 spmdTaskInstances.clear();
852 }
853
854 /**
855 * runs the benchmark defined in XuluServer.benchmarkclass
856 */
857 private void runBenchmark() {
858 // see if the benchmark should be run
859
860 boolean runbench = XuluConfig.getXuluConfig().getBooleanProperty(
861 "XuluServer.runbench");
862 if (runbench) {
863 String classname = XuluConfig.getXuluConfig().getProperty(
864 "XuluServer.benchmarkclass");
865 if (classname != null) {
866 try {
867 // getBenchmarkClass
868 Benchmark bench = (Benchmark) Class.forName(classname)
869 .newInstance();
870 machineRating = bench.bench();
871 System.out
872 .println("Successfully run benchmark. Machine Rating is "
873 + machineRating);
874 } catch (Exception e) {
875 System.err
876 .println("Could not load Benchmarkclass "
877 + classname
878 + ". Setting rating to 0 (which means average). Reason was: "
879 + e.getMessage());
880 machineRating = 0;
881 }
882 } else
883 System.out.println("Benchmark disabled");
884 }
885 }
886
887 /**
888 * Every time {@link #runSPMDModelTask(String, int, Object[])} is called the
889 * newest class is loaded from the client. For performance reasons this will
890 * happen only one time per connection.
891 *
892 * @see appl.parallel.server.SPMDResource#runSPMDModelTask(String, int,
893 * Object[])
894 */
895 @SuppressWarnings("unchecked")
896 public Object[] runSPMDModelTask(String spmdTaskName, int referenceID,
897 Object... parameters) throws RemoteException {
898 /**
899 * determine the calling host. Internal Server requires special
900 * handling: notice that this happens very often in this class, but can
901 * not externalized due to the behavior of #getClientHost();
902 */
903 String callingHost = "localhost";
904 try {
905 callingHost = isInternalServer ? "localhost" : getClientHost();
906 } catch (ServerNotActiveException e) {
907 e.printStackTrace();
908 }
909 /** ********************************************************************** */
910 Thread.currentThread().setPriority(taskPriority);
911 String clientIP = "<unknown Client>";
912 long time = System.nanoTime();
913
914 clientIP = callingHost;
915 if (!clientIP.equals(connectedClient)) {
916 LOG.warn(clientIP
917 + " has tried to run a Task, but was not connected. "
918 + "This server is in use by " + connectedClient);
919 throw new RemoteException(
920 "Run of SPMDTask failed! This server is in use by "
921 + connectedClient);
922 }
923 if (LOG.isDebugEnabled())
924 LOG.debug("received task from " + clientIP
925 + ". Starting evaluation now...");
926 // if no datamanager is found: exit
927 if (dataManager == null)
928 throw new UnsupportedOperationException(
929 "No DataManager found. Create one first using 'createDataManager' on this Server");
930
931 // lookup newest version of the class from the server at the url
932 // (or from local disk if remote class loading is disabled)
933 SPMDTask[] tasks = loadSPMDTasksfromURL(clientIP, spmdTaskName);
934
935 // The number of threads allowed for execution can be different
936 // from the number of user assigned threads
937 // because tasks may not support multithreadeding
938 int allowedThreads = 1;
939 if (tasks[0].supportsMultiThreading())
940 allowedThreads = toUseThreads;
941
942 Object results[] = new Object[allowedThreads];
943
944 // the following loop is only executed the first time a task is used!
945 if (!tasks[0].isInitialized()) {
946 if (LOG.isInfoEnabled())
947 LOG.info("Trying to use " + allowedThreads
948 + " threads for execution of "
949 + tasks[0].getClass().getName());
950 // calculate splitting for multiple threads:
951 int splitMapPos = dataManager.getInfo(referenceID).getSplitMapPos();
952 SplitMap splitMap = dataManager.getInfo(referenceID).getSplitMap();
953 SplitMap1DVertical map1DVertical = new SplitMap1DVertical(splitMap
954 .getPartitionCalculationBounds(splitMapPos).width, splitMap
955 .getPartitionCalculationBounds(splitMapPos).height, 0,
956 allowedThreads, NeighborhoodBoxingMode.outBoxing);
957
958 // create controllers and init the tasks
959 for (int i = 0; i < allowedThreads; i++) {
960 AdvancedSPMDServerController serverController = new AdvancedSPMDServerController(
961 dataManager, referenceID);
962 // set first thread as master thread
963 serverController.setMasterThread(i == 0);
964 // now assign the calculation area. Because we have assumed that
965 // the (Thread)-calculation
966 // area begins at (0,0) we must move it relative to the real
967 // local calculation area
968 Rectangle calculationBounds = map1DVertical
969 .getPartitionCalculationBounds(i);
970 int absolutX = (int) (serverController.getLocalCalcMinX() + calculationBounds
971 .getMinX());
972 int absolutY = (int) (serverController.getLocalCalcMinY() + calculationBounds
973 .getMinY());
974 calculationBounds.setLocation(absolutX, absolutY);
975 serverController.setLocalCalculationBounds(calculationBounds);
976 tasks[i].setSPMDServerController(serverController);
977 tasks[i].initialize();
978 }
979 }
980 if (allowedThreads == 1) // only for saving resources: no new thread
981 { // if only one thread is used
982 Object executionResult = tasks[0].run(parameters);
983 results = new Object[] { executionResult };
984 } else {
985 // for more than 1 processor execute the multiple tasks in separate
986 // threads
987 for (int i = 0; i < allowedThreads; i++) {
988 resultCalls[i] = executor.submit(new SimpleCallThread(tasks[i],
989 parameters));
990 }
991 // wait for the Threads to finish
992 for (int i = 0; i < allowedThreads; i++) {
993 try {
994 results[i] = resultCalls[i].get();
995 } catch (InterruptedException e) {
996 // TODO Auto-generated catch block
997 e.printStackTrace();
998 } catch (ExecutionException e) {
999 // TODO Auto-generated catch block
1000 e.printStackTrace();
1001 }
1002 }
1003 }
1004 // generate a event for the execution
1005 if (remoteEventReceiver.isTimeMonitoringEnabled())
1006 remoteEventReceiver.fireRemoteEvent(new TimeEvent((System
1007 .nanoTime() - time), serverName, "DataServer",
1008 CommType.REMOTE_EXECUTION));
1009 return results;
1010 }
1011
1012 /**
1013 * This method frees all resources.<br>
1014 * Notice that this method is not remotely accessible!
1015 */
1016 public void stopServer() {
1017 unbind();
1018 if (dataManager != null)
1019 dataManager.stop();
1020 dataManager = null;
1021 if (multicastThread != null)
1022 multicastThread.stopThread();
1023 }
1024 }

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26