/[xulu]/trunk.bakup/src/appl/parallel/server/XuluServer.java
ViewVC logotype

Annotation of /trunk.bakup/src/appl/parallel/server/XuluServer.java

Parent Directory Parent Directory | Revision Log Revision Log


Revision 77 - (hide annotations)
Wed Feb 10 16:14:27 2010 UTC (14 years, 11 months ago) by alfonx
File size: 33991 byte(s)
backup of trunk
1 mojays 2 package appl.parallel.server;
2    
3     import java.awt.Rectangle;
4     import java.io.IOException;
5     import java.net.DatagramPacket;
6     import java.net.InetAddress;
7     import java.net.MalformedURLException;
8     import java.net.MulticastSocket;
9     import java.net.URL;
10     import java.net.UnknownHostException;
11     import java.rmi.MarshalledObject;
12     import java.rmi.Naming;
13     import java.rmi.NotBoundException;
14     import java.rmi.RMISecurityManager;
15     import java.rmi.Remote;
16     import java.rmi.RemoteException;
17     import java.rmi.activation.Activatable;
18     import java.rmi.activation.ActivationID;
19     import java.rmi.server.ServerNotActiveException;
20     import java.rmi.server.UnicastRemoteObject;
21     import java.util.HashMap;
22     import java.util.Vector;
23     import java.util.concurrent.Callable;
24     import java.util.concurrent.ExecutionException;
25     import java.util.concurrent.ExecutorService;
26     import java.util.concurrent.Executors;
27     import java.util.concurrent.Future;
28    
29     import net.jini.loader.pref.PreferredClassLoader;
30    
31     import org.apache.log4j.BasicConfigurator;
32     import org.apache.log4j.Level;
33     import org.apache.log4j.LogManager;
34     import org.apache.log4j.Logger;
35     import org.apache.log4j.PropertyConfigurator;
36     import org.apache.log4j.net.SocketAppender;
37    
38     import appl.ext.XuluConfig;
39     import appl.parallel.ComputingResourceProperties;
40     import appl.parallel.client.DataServer;
41     import appl.parallel.client.ClientDataServer;
42     import appl.parallel.event.CommEventSink;
43     import appl.parallel.event.TimeEvent;
44     import appl.parallel.event.CommEvent.CommType;
45     import appl.parallel.services.RemoteEventProxy;
46     import appl.parallel.spmd.AdvancedSPMDServerController;
47     import appl.parallel.spmd.AdvancedSPMDServerInterface;
48     import appl.parallel.spmd.SPMDServerController;
49     import appl.parallel.spmd.SPMDServerInterface;
50     import appl.parallel.spmd.SPMDTask;
51     import appl.parallel.spmd.split.SinglePartitionInfo;
52     import appl.parallel.spmd.split.SplitMap;
53     import appl.parallel.spmd.split.SplitMap1DVertical;
54     import appl.parallel.spmd.split.AbstractSplitMap.NeighborhoodBoxingMode;
55     import appl.parallel.thread.OneMethodThread;
56     import appl.parallel.util.Helper;
57     import appl.util.benchmark.Benchmark;
58    
59     /**
60     * The Server manages the remote execution of program code. It may be started
61     * using the {@link #main(String[])} method.<br>
62     * The Server reads the following properties from {@link XuluConfig} - the
63     * values given should be the defaults.<br>
64     * <table border="1">
65     * <tr>
66     * <th>Property</th>
67     * <th>Default</th>
68     * <th>Desc</th>
69     * </tr>
70     * <tr>
71     * <td>XuluServer.useCodeDownloading</td>
72     * <td>false</td>
73     * <td>if true, the server tries to download task class files from a
74     * HTTP-server at the client-url</td>
75     * </tr>
76     * <tr>
77     * <td>XuluServer.registryPort</td>
78     * <td>1099</td>
79     * <td>The port at which a new registry is created, if no running registry was
80     * found</td>
81     * </tr>
82     * <tr>
83     * <td>XuluServer.multicastgroup</td>
84     * <td>239.1.1.1</td>
85     * <td>The default multicast group</td>
86     * </tr>
87     * <tr>
88     * <td> XuluServer.multicastport</td>
89     * <td>10000</td>
90     * <td>The default multicast port</td>
91     * </tr>
92     * <tr>
93     * <td>XuluServer.eventDelay</td>
94     * <td>50</td>
95     * <td>Events generated on server side may be delayed to collect multiple
96     * Events and send them at once (this saves communication time).Time is given in
97     * milliseconds</td>
98     * </tr>
99     * <tr>
100     * <td>XuluServer.log4j.logLevel</td>
101     * <td>info</td>
102     * <td>the level can be '<i>debug</i>','<i>info</i>','<i>warn</i>','<i>error</i>'
103     * or '<i>fatal</i>' and can be overridden by commandline - Parameters<br>
104     * <br>
105     * </td>
106     * </tr>
107     * <td>XuluServer.log4j.mode</td>
108     * <td>console</td>
109     * <td>the mode can be '<i>file</i>','<i>chainsaw</i>' or '<i>console</i>'.
110     * File means that the configurationfile xuluserverlog4j.cfg is read. <br>
111     * <i>'chainsaw'</i> means that the server tries to configure the logger each
112     * time a new client for a chainsaw instance running at the client ip
113     * (standardports) <br>
114     * </td>
115     * </tr>
116     * <td>XuluServer.benchmarkclass</td>
117     * <td>appl.util.benchmark.SimpleBenchmark</td>
118     * <td>The Benchmark to get a rating for this machine. Notice that you can
119     * override the rating with the -rating parameter</td>
120     * </tr>
121     * <tr>
122     * <td>XuluServer.useThreads</td>
123     * <td>max</td>
124     * <td>The number of threads to be used by the server for tasks that support
125     * multi-threading. Per default one thread is created for every available
126     * Processors (value='max'). More threads than processors may be useful for
127     * testing tasks which support multihreading or for processors which support
128     * hyperthreading.</td>
129     * </tr>
130     * <tr>
131     * <td>XuluServer.runbench</td>
132     * <td>true</td>
133     * <td>the benchmark is run automatically</td>
134     * </tr>
135     * </table> <br>
136     * <b>Multicasting:</b> <br>
137     * If multicasting does not work, try to disable your firewall or configure it
138     * to allow multicast UDP packages. Notice also, that multicasting may not work
139     * in virtual machines like VMWare or MS Virtual PC 2004/7. <br>
140     * <br>
141     * <b>Assumptions</b>
142     * Only one client is connected to the server (and can run Tasks) <br>
143     *<br>
144     * See {@link #main(String[])} for details about command line parameters
145     * @see ServerMulticastReceiver
146     * @author Dominik Appl
147     */
148     public class XuluServer extends UnicastRemoteObject implements SPMDResource {
149    
150     /**
151     * This Thread is used when multiple threads should be executed simultaneous during multithreading
152     *
153     * @author Dominik Appl
154     */
155     class SimpleCallThread implements Callable {
156    
157     private final SPMDTask task;
158    
159     private final Object[] parameters;
160    
161     /**
162     * @param task the task to be executed
163     * @param parameters the parameters for the run method of the task
164     */
165     public SimpleCallThread(SPMDTask task, Object... parameters) {
166     this.task = task;
167     this.parameters = parameters;
168     }
169    
170     /*
171     * (non-Javadoc)
172     *
173     * @see java.util.concurrent.Callable#call()
174     */
175     public Object call() throws Exception {
176     return task.run(parameters);
177     }
178     }
179    
180     private static final Logger LOG = LogManager
181     .getLogger("appl.parallel.server.XuluServer");
182    
183     private static String bindingName = "XuluServer";
184    
185     private static final String helloFromServerMessage = "Hello Xulu from XuluServer";
186    
187     private static String log4jfilename = "xuluserverlog4j.cfg";
188    
189     private static boolean chainsaw = false;
190    
191     private static SocketAppender chainsawAppender;
192    
193     /**
194     * @param mode the mode of the execution
195     */
196     private static void log4jConfig(String mode) {
197     if (mode.equals("chainsaw")) {
198     BasicConfigurator.resetConfiguration();
199     chainsawAppender = new SocketAppender();
200     BasicConfigurator.configure(chainsawAppender);
201     System.out.println("Server configured for chainsaw.");
202     chainsaw = true;
203     } else if (mode.equals("file")) {
204     BasicConfigurator.resetConfiguration();
205     PropertyConfigurator.configure(log4jfilename);
206     } else if (mode.equals("console")) {
207     BasicConfigurator.resetConfiguration();
208     BasicConfigurator.configure();
209     } else {
210     System.err.println("Invalid log4j-Config parameter found: " + mode);
211     printInfoMessage();
212     System.exit(0);
213     }
214     }
215    
216     /**
217     * configures the log4j level
218     *
219     * @see #main(String[])
220     */
221     private static void log4jLevel(String level) {
222     if (level.equals("info")) {
223     LOG.setLevel(Level.INFO);
224     } else if (level.equals("debug")) {
225     LOG.setLevel(Level.DEBUG);
226     } else if (level.equals("error")) {
227     LOG.setLevel(Level.ERROR);
228     } else if (level.equals("fatal")) {
229     LOG.setLevel(Level.FATAL);
230     } else if (level.equals("warn")) {
231     LOG.setLevel(Level.WARN);
232     } else if (level.equals("off")) {
233     LOG.setLevel(Level.OFF);
234     } else {
235     System.err.println("Invalid log4j-Level found: " + level);
236     printInfoMessage();
237     System.exit(0);
238     }
239     System.out.println("Setting logLevel to " + level);
240     }
241    
242     /**
243     * Starts the server.
244     *
245     * @param args
246     * The following parameters are valid: <br>
247     * <br>
248     * -loglevel:<code>LEVEL</code> - where <code>LEVEL</code> is '<i>debug</i>','<i>info</i>'
249     * (DEFAULT),'<i>warn</i>','<i>error</i>' or '<i>fatal</i>' <br>
250     * <br>
251     * -logconfig:<code>CONFIG</code> - where <code>CONFIG</code> is '<i>file</i>','<i>chainsaw</i>' or '<i>console</i>'
252     * <li>'chainsaw' means that the server tries to configure the logger each time a
253     * new client for a chainsaw instance running at the client ip
254     * (standardports) </li><br>
255     * <li>'<i>file</i>' means that the file '<i>xuluserverlog4j.cfg</i>'
256     * in the home-directory is used for configuration <br> </li>
257     * <li>'<i>console</i>'
258     * means of course, that the console is used for output <br> </li>
259     * <br>
260     * -port:PORTNUMBER - binds the server to the specified port (default 1099)
261     *
262    
263     */
264     public static void main(String[] args) {
265     // parse arguments
266     int serverPort = 0;
267     for (String arg : args) {
268     if (arg.toLowerCase().startsWith("-loglevel:"))
269     log4jLevel(arg.substring(10).toLowerCase());
270     else if (arg.toLowerCase().startsWith("-logconfig:"))
271     log4jConfig(arg.substring(11).toLowerCase());
272     else if (arg.toLowerCase().startsWith("-port:"))
273     serverPort = Integer.valueOf(arg.substring(6));
274     else {
275     System.err.println("Invalid Argument found: " + arg);
276     printInfoMessage();
277     System.exit(0);
278     }
279     }
280    
281     if (System.getSecurityManager() == null) {
282     System.setSecurityManager(new RMISecurityManager());
283     }
284     Remote server;
285     try {
286     server = new XuluServer(serverPort);
287     System.out.println("Xulu Server is up and running!");
288     } catch (RemoteException e) {
289     System.err.println("Xulu Server could not start!");
290     e.printStackTrace();
291     }
292     }
293    
294     private static void printInfoMessage() {
295     System.out
296     .println("The following arguments are valid: \n"
297     + "-loglevel:<LEVEL> \n"
298     + " where <LEVEL> is 'off','debug','info' (DEFAULT),'warn','error' or 'fatal'\n\n"
299     + "-logconfig:<CONFIG> \n"
300     + " where <CONFIG> is 'file','chainsaw' or 'console'(DEFAULT) \n"
301     + " 'chainsaw' means that the server tries to configure the logger each\n"
302     + " time a new client for a chainsaw instance running at the client ip\n"
303     + " (standardports) \n"
304     + " 'file' means that the file 'xuluserverlog4j.cfg' in the home-directory "
305     + " is used for configuration \n"
306     + " 'console' means of course, that the console is used for output"
307     + "-port:<portnumber> \n"
308     + " The Server is bound to the specified port");
309     }
310    
311     private static void unbind() {
312     Helper.unbind(bindingName);
313     }
314    
315     /**
316     * The {@link Class} objects of the currently active connection. Every time
317     * {@link #runSPMDModelTask(String, int, Object[])} is called the
318     * newest class is loaded from the client. For performance reasons this will
319     * happen only one time per connection. In this map the current active
320     * classes are stored.
321     */
322     private HashMap<String, SPMDTask[]> spmdTaskInstances;
323    
324     /* standard is should be 1099 */
325     private int registryPort = 1099;
326    
327     private int multicastPort;
328    
329     private String multiCastIP;
330    
331     ServerMulticastReceiver multicastThread;
332    
333     InetAddress multiCastGroup;
334    
335     // says whether a client is connected
336     private boolean connected = false;
337    
338     private long starttime = System.currentTimeMillis();
339    
340     private String connectedClient;
341    
342     private PartitionDataManager dataManager;
343    
344     private RemoteEventProxy remoteEventReceiver;
345    
346     // User specified name of the Server or hostname
347     private String serverName;
348    
349     private int machineRating = 0;
350    
351     private final boolean loggingEnabled;
352    
353     private int taskPriority = Thread.NORM_PRIORITY;
354    
355     private int availableProcessors = 1;
356    
357     private int toUseThreads = 1;
358    
359     private final boolean isInternalServer;
360    
361     private ClientDataServer localSPMDClient;
362    
363     private boolean useCodeDownloading;
364    
365     private ExecutorService executor;
366    
367     // used for execution results
368     private Future[] resultCalls;
369    
370     /**
371     * Creates a new instance at the standard port
372     *
373     * @throws RemoteException
374     */
375     public XuluServer() throws RemoteException {
376     this(true, false, 0);
377     }
378    
379     /**
380     * @param configLogging
381     * enable or disable logging
382     * @param isInternalServer
383     * set to true if you want to make method calls purely local.
384     * This is for performance reasons (see
385     * {@link UnicastRemoteObject#getClientHost()}
386     * @param port
387     * port the server is bound to (0 for default)
388     * @throws RemoteException
389     */
390     public XuluServer(boolean configLogging, boolean isInternalServer, int port)
391     throws RemoteException {
392     super();
393     loggingEnabled = configLogging;
394     this.isInternalServer = isInternalServer;
395     initConfiguration();
396     //the executor is used for multithreading
397     executor = Executors.newCachedThreadPool();
398     spmdTaskInstances = new HashMap<String, SPMDTask[]>(10);
399     if (port != 0)
400     this.registryPort = port;
401     bind();
402    
403     initializeMultiCasting();
404     serverName = "unknown server";
405     try {
406     if (isInternalServer)
407     serverName = "internal Server";
408     else
409     serverName = InetAddress.getLocalHost().getHostName();
410     } catch (UnknownHostException e) {
411     // TODO Auto-generated catch block
412     e.printStackTrace();
413     }
414     if (port != 0)
415     serverName = serverName + " (port " + registryPort + ")";
416     runBenchmark();
417     }
418    
419     /**
420     * <br>
421     * This is used when a Server is running inside the Xulu-Client. It has
422     * performance advantages (direct access - no TCP/IP)
423     *
424     * @param configLogging
425     * enable or disable logging
426     * @param clientDataServer a local client for faster access
427     * @throws RemoteException
428     */
429     public XuluServer(boolean configLogging, ClientDataServer clientDataServer)
430     throws RemoteException {
431     this(configLogging, true, 0);
432     this.localSPMDClient = clientDataServer;
433     }
434    
435     /**
436     * @param port
437     * the port to which the Server is bound
438     * @throws RemoteException
439     */
440     public XuluServer(int port) throws RemoteException {
441     this(true, false, port);
442     }
443    
444     /**
445     * Bind the remote object's stub in the registry. Creates a registry if no
446     * running registry is found.
447     */
448     private void bind() throws RemoteException {
449     String name = bindingName;
450     Helper.bind(name, this, registryPort);
451     }
452    
453     private boolean checkConnected(String callingHost) {
454    
455     if (callingHost.equals(connectedClient))
456     return true;
457     LOG
458     .warn(callingHost
459     + " has tried to execute a critical operation, but was not connected "
460     + ". Actual connected client: " + connectedClient);
461     return false;
462     }
463    
464     /**
465     * Only a connected client has access to all functionality. When connecting
466     * or reconnecting, all data of the server and the corresponding
467     * {@link PartitionDataServer} is reset!
468     *
469     * @see appl.parallel.ComputingResource#connect()
470     */
471     public synchronized boolean connect() throws RemoteException {
472     /**
473     * determine the calling host. Internal Server requires special
474     * handling: notice that this happens very often in this class, but can
475     * not externalized due to the behavior of #getClientHost();
476     */
477     String callingHost = "error";
478     try {
479     callingHost = isInternalServer ? "localhost" : getClientHost();
480     } catch (ServerNotActiveException e) {
481     e.printStackTrace();
482     }
483     /** ********************************************************************** */
484    
485     if (connectedClient == null || callingHost.equals(connectedClient)) {
486     if (connectedClient == null)
487     LOG.info("Client " + callingHost + " connected!");
488     else
489     LOG.info("Client " + connectedClient + " reconnected! ");
490     // reset data
491     reset();
492     connectedClient = callingHost;
493     // initalize event system & chainsaw
494     initEventSystem();
495     initChainsaw(connectedClient);
496     return true;
497     } else if (connectedClient != null) {
498     LOG.warn(callingHost + " has tried to "
499     + "connect to server, but was not connected, "
500     + "because this server is in use by " + connectedClient);
501     }
502    
503     return false;
504     }
505    
506     /*
507     * (non-Javadoc)
508     *
509     * @see appl.parallel.server.ComputingResource#createDataServer()
510     */
511     public PartitionDataServer createDataServer(String[] IPs)
512     throws RemoteException {
513     /**
514     * determine the calling host. Internal Server requires special
515     * handling: notice that this happens very often in this class, but can
516     * not externalized due to the behavior of #getClientHost();
517     */
518     String callingHost = "localhost";
519     try {
520     callingHost = isInternalServer ? "localhost" : getClientHost();
521     } catch (ServerNotActiveException e) {
522     e.printStackTrace();
523     }
524     /** ********************************************************************** */
525     if (checkConnected(callingHost)) {
526     if (isInternalServer)
527     dataManager = new PartitionDataManager(IPs,
528     remoteEventReceiver, getRegistryPort(), localSPMDClient);
529     else
530     dataManager = new PartitionDataManager(IPs,
531     remoteEventReceiver, getRegistryPort());
532     return dataManager;
533     }
534     return null;
535     }
536    
537     /* (non-Javadoc)
538     * @see appl.parallel.server.ComputingResource#disconnect()
539     */
540     public synchronized void disconnect() throws RemoteException {
541     /**
542     * determine the calling host. Internal Server requires special
543     * handling: notice that this happens very often in this class, but can
544     * not externalized due to the behavior of #getClientHost();
545     */
546     String callingHost = "localhost";
547     try {
548     callingHost = isInternalServer ? "localhost" : getClientHost();
549     } catch (ServerNotActiveException e) {
550     e.printStackTrace();
551     }
552     /** ********************************************************************** */
553     if (checkConnected(callingHost)) {
554     reset();
555     LOG.info("Client " + connectedClient
556     + " successfully disconnected from Server");
557     }
558     }
559    
560     /**
561     * @return a rating for the machine as discovered by the benchmark
562     */
563     public int getRating() {
564     return machineRating;
565     }
566    
567     /**
568     * @return the registry port the server is bound to
569     */
570     public int getRegistryPort() {
571     return registryPort;
572     }
573    
574     /*
575     * (non-Javadoc)
576     *
577     * @see appl.parallel.server.XuluServer#getResourceInformation()
578     */
579     public ComputingResourceProperties getResourceInformation()
580     throws RemoteException {
581     return new XuluServerProperties(this, serverName);
582     }
583    
584     /**
585     * @return the time in s the server is running
586     */
587     int getUptime() {
588     return (int) (System.currentTimeMillis() - starttime);
589     }
590    
591     private void initChainsaw(String connectedClient) {
592     if (chainsaw && connectedClient != null) {
593     chainsawAppender = new SocketAppender(connectedClient, 4445);
594     BasicConfigurator.resetConfiguration();
595     BasicConfigurator.configure(chainsawAppender);
596     }
597     }
598    
599     /**
600     * inits task priority, ports and loglevel
601     */
602     private void initConfiguration() {
603     XuluConfig config = XuluConfig.getXuluConfig();
604     // a lower priority will run the server more in the background
605     taskPriority = config.getIntProperty("XuluServer.priority");
606    
607     switch (taskPriority) {
608     default:
609     taskPriority = Thread.NORM_PRIORITY;
610     break;
611     case 1:
612     taskPriority = Thread.MIN_PRIORITY;
613     break;
614     case 2:
615     taskPriority = Thread.NORM_PRIORITY - 1;
616     break;
617     case 4:
618     taskPriority = Thread.NORM_PRIORITY + 1;
619     break;
620     case 5:
621     taskPriority = Thread.MAX_PRIORITY;
622     break;
623     }
624    
625     availableProcessors = Runtime.getRuntime().availableProcessors();
626     // default: try to use one thread per processor
627     toUseThreads = availableProcessors;
628     // check if there is an entry in the XuluConfig saying how many threads
629     // should be created.
630     // (more threads than processors may be useful for the use of
631     // hyperthreading)
632     String configProz = config.getProperty("XuluServer.useThreads");
633     if ((configProz != null) && !configProz.equals("max")) {
634     toUseThreads = config.getIntProperty("XuluServer.useThreads");
635     // error checking:
636     if (toUseThreads < 1 || toUseThreads > 128){
637     toUseThreads = availableProcessors;
638     System.out
639     .println("Error: Number of threads must be between 1 and 128!");
640     }
641     }
642     // init correspondig arrays
643     resultCalls = new Future[toUseThreads];
644    
645     int port = config.getIntProperty("XuluServer.registryport");
646     if (port != 0)
647     registryPort = port;
648    
649     useCodeDownloading = config
650     .getBooleanProperty("XuluServer.useCodeDownloading");
651     if (useCodeDownloading)
652     System.out.println("Code downloading is enabled");
653     else
654     System.out.println("Code downloading is disabled");
655    
656     int multicastport = config.getIntProperty("XuluServer.multicastport");
657     if (multicastport != 0)
658     multicastPort = multicastport;
659     String IP = config.getProperty("XuluServer.multicastgroup");
660     if (IP != null)
661     multiCastIP = IP;
662     try {
663     multiCastGroup = InetAddress.getByName(multiCastIP);
664     } catch (UnknownHostException e) {
665     e.printStackTrace();
666     }
667     if (loggingEnabled) {
668    
669     String mode = config.getProperty("XuluServer.log4j.mode");
670     if (mode != null)
671     log4jConfig(mode);
672     }
673     String level = config.getProperty("XuluServer.log4j.logLevel");
674     if (level != null)
675     log4jLevel(level.toLowerCase());
676     else
677     log4jLevel("info");
678     }
679    
680     /**
681     * Connects to the RemoteEventReceiver at the calling client
682     */
683     private void initEventSystem() throws RemoteException {
684     String lookupName = "rmi://" + connectedClient + "/RemoteEventReceiver";
685     try {
686     int delay = XuluConfig.getXuluConfig().getIntProperty(
687     "XuluServer.eventDelay");
688     System.out.println("looking up " + lookupName);
689     CommEventSink eventSink = (CommEventSink) Naming.lookup(lookupName);
690     if (remoteEventReceiver != null)
691     remoteEventReceiver.stopService();
692     remoteEventReceiver = new RemoteEventProxy(eventSink, delay);
693     remoteEventReceiver.startService();
694     } catch (MalformedURLException e) {
695     e.printStackTrace();
696     } catch (NotBoundException e) {
697     LOG.warn("Could not find the remote event receiver at "
698     + lookupName, e);
699     System.out.println(e.getMessage());
700     remoteEventReceiver = new RemoteEventProxy(null, 0);
701     }
702     }
703    
704     /**
705     * inits the multicast unit
706     */
707     private void initializeMultiCasting() {
708     try {
709     MulticastSocket s = new MulticastSocket(multicastPort);
710     s.joinGroup(multiCastGroup);
711     // send hello message
712     String IP = XuluConfig.getXuluConfig().getProperty(
713     "XuluServer.multicastgroup");
714     DatagramPacket answer = new DatagramPacket(helloFromServerMessage
715     .getBytes(), helloFromServerMessage.length(), InetAddress
716     .getByName(IP), multicastPort);
717     s.send(answer);
718     // start thread that waits on client messages
719     multicastThread = new ServerMulticastReceiver(s);
720     multicastThread.start();
721     } catch (IOException e) {
722     // TODO Auto-generated catch block
723     LOG.warn("Could not initalize multicasting! Reason was "
724     + e.getMessage(), e);
725    
726     }
727     }
728    
729     /**
730     * @return whether this server is available or in use by another client
731     * @see appl.parallel.ComputingResource#isAvailable()
732     */
733     public boolean isAvailable() throws RemoteException {
734     /**
735     * determine the calling host. Internal Server requires special
736     * handling: notice that this happens very often in this class, but can
737     * not externalized due to the behavior of #getClientHost();
738     */
739     String callingHost = "localhost";
740     try {
741     callingHost = isInternalServer ? "localhost" : getClientHost();
742     } catch (ServerNotActiveException e) {
743     e.printStackTrace();
744     }
745     /** ********************************************************************** */
746     if (callingHost.equals(connectedClient))
747     return true;
748    
749     if (connectedClient == null)
750     return true;
751     return false;
752     }
753    
754     /**
755     * @return if a client is connected
756     */
757     public boolean isClientConnected() {
758     return connected;
759    
760     }
761    
762     /**
763     * Loads a {@link SPMDTask} from the specified URL and returns an array of
764     * instances of this class. There are so many instances, as there are
765     * available processors on this machine!. This class also makes sure that
766     * this happens only one time per connection (else this would of course be a
767     * performance killer in fine grained parallelization).
768     *
769     * @param location
770     * the URL of the rootDirectory/networkLocation
771     * @param spmdTaskClassName
772     * the name of the SPMDTask for which the newest class should be
773     * found
774     * @return the instance of the class or <code>null</code> if loading fails
775     */
776     private SPMDTask[] loadSPMDTasksfromURL(String location,
777     String spmdTaskClassName) {
778    
779     SPMDTask[] spmdClassInstances = (SPMDTask[]) spmdTaskInstances
780     .get(spmdTaskClassName);
781    
782     // if already loaded for this connection simply return the instances
783     if (spmdClassInstances != null)
784     return (SPMDTask[]) spmdClassInstances;
785     spmdClassInstances = new SPMDTask[toUseThreads];
786     Class spmdClass = null;
787     if (useCodeDownloading) {
788     try {
789     // lookup client and make sure that the newest class of the task
790     // is instantiated. Will avoid loading older cached or local
791     // filesystem version
792     URL[] locations = { new URL("http://" + location + "/") };
793     PreferredClassLoader classLoader = new PreferredClassLoader(
794     locations, Thread.currentThread()
795     .getContextClassLoader(), null, false);
796     LOG.debug("Try to load " + spmdTaskClassName + " from "
797     + location);
798     spmdClass = classLoader.loadClass(spmdTaskClassName);
799     } catch (Exception e) {
800     LOG.error("Could not load class for " + spmdTaskClassName
801     + " from " + location, e);
802     e.printStackTrace();
803     }
804     }
805     // if retrieval failed for some reason or code downloading disabled
806     // then use the local class
807     try {
808     if (spmdClass == null) {
809     spmdClass = Class.forName(spmdTaskClassName);
810     }
811    
812     // instantiate the classes
813     for (int i = 0; i < spmdClassInstances.length; i++) {
814     Object task = spmdClass.newInstance();
815     if (!(task instanceof SPMDTask)) {
816     LOG.fatal(spmdTaskClassName
817     + " was not an instance of SPMDTask!");
818     throw new UnsupportedOperationException(spmdTaskClassName
819     + " was not an instance of SPMDTask!");
820     }
821     spmdClassInstances[i] = (SPMDTask) task;
822     }
823    
824     spmdTaskInstances.put(spmdTaskClassName, spmdClassInstances);
825     return spmdClassInstances;
826    
827     } catch (ClassNotFoundException e) {
828     // TODO Auto-generated catch block
829     e.printStackTrace();
830     } catch (InstantiationException e) {
831     // TODO Auto-generated catch block
832     e.printStackTrace();
833     } catch (IllegalAccessException e) {
834     // TODO Auto-generated catch block
835     e.printStackTrace();
836     }
837     throw new UnsupportedOperationException("Loading of class "
838     + spmdTaskClassName
839     + " failed on serverside! Execution canceled");
840     }
841    
842     /*
843     * (non-Javadoc)
844     *
845     * @see appl.parallel.server.XuluServer#ping(java.lang.Object[])
846     */
847     public Object ping(Object... o) throws RemoteException {
848     System.out.println("received ping");
849     return o;
850     }
851    
852     /**
853     * deletes all client-specific data data, so that a newly connected client
854     * can use a clean system
855     */
856     private void reset() {
857     if (dataManager != null)
858     dataManager.stop();
859     dataManager = null;
860     connectedClient = null;
861     spmdTaskInstances.clear();
862     }
863    
864     /**
865     * runs the benchmark defined in XuluServer.benchmarkclass
866     */
867     private void runBenchmark() {
868     // see if the benchmark should be run
869    
870     boolean runbench = XuluConfig.getXuluConfig().getBooleanProperty(
871     "XuluServer.runbench");
872     if (runbench) {
873     String classname = XuluConfig.getXuluConfig().getProperty(
874     "XuluServer.benchmarkclass");
875     if (classname != null) {
876     try {
877     // getBenchmarkClass
878     Benchmark bench = (Benchmark) Class.forName(classname)
879     .newInstance();
880     machineRating = bench.bench();
881     System.out
882     .println("Successfully run benchmark. Machine Rating is "
883     + machineRating);
884     } catch (Exception e) {
885     System.err
886     .println("Could not load Benchmarkclass "
887     + classname
888     + ". Setting rating to 0 (which means average). Reason was: "
889     + e.getMessage());
890     machineRating = 0;
891     }
892     } else
893     System.out.println("Benchmark disabled");
894     }
895     }
896    
897     /**
898     * Every time {@link #runSPMDModelTask(String, int, Object[])} is called the
899     * newest class is loaded from the client. For performance reasons this will
900     * happen only one time per connection.
901     *
902     * @see appl.parallel.server.SPMDResource#runSPMDModelTask(String, int,
903     * Object[])
904     */
905     @SuppressWarnings("unchecked")
906     public Object[] runSPMDModelTask(String spmdTaskName, int referenceID,
907     Object... parameters) throws RemoteException {
908     /**
909     * determine the calling host. Internal Server requires special
910     * handling: notice that this happens very often in this class, but can
911     * not externalized due to the behavior of #getClientHost();
912     */
913     String callingHost = "localhost";
914     try {
915     callingHost = isInternalServer ? "localhost" : getClientHost();
916     } catch (ServerNotActiveException e) {
917     e.printStackTrace();
918     }
919     /** ********************************************************************** */
920     Thread.currentThread().setPriority(taskPriority);
921     String clientIP = "<unknown Client>";
922     long time = System.nanoTime();
923    
924     clientIP = callingHost;
925     if (!clientIP.equals(connectedClient)) {
926     LOG.warn(clientIP
927     + " has tried to run a Task, but was not connected. "
928     + "This server is in use by " + connectedClient);
929     throw new RemoteException(
930     "Run of SPMDTask failed! This server is in use by "
931     + connectedClient);
932     }
933     if (LOG.isDebugEnabled())
934     LOG.debug("received task from " + clientIP
935     + ". Starting evaluation now...");
936     // if no datamanager is found: exit
937     if (dataManager == null)
938     throw new UnsupportedOperationException(
939     "No DataManager found. Create one first using 'createDataManager' on this Server");
940    
941     // lookup newest version of the class from the server at the url
942     // (or from local disk if remote class loading is disabled)
943     SPMDTask[] tasks = loadSPMDTasksfromURL(clientIP, spmdTaskName);
944    
945     // The number of threads allowed for execution can be different
946     // from the number of user assigned threads
947     // because tasks may not support multithreadeding
948     int allowedThreads = 1;
949     if (tasks[0].supportsMultiThreading())
950     allowedThreads = toUseThreads;
951    
952     Object results[] = new Object[allowedThreads];
953    
954     // the following loop is only executed the first time a task is used!
955     if (!tasks[0].isInitialized()) {
956     if (LOG.isInfoEnabled())
957     LOG.info("Trying to use " + allowedThreads
958     + " threads for execution of "
959     + tasks[0].getClass().getName());
960     // calculate splitting for multiple threads:
961     int splitMapPos = dataManager.getInfo(referenceID).getSplitMapPos();
962     SplitMap splitMap = dataManager.getInfo(referenceID).getSplitMap();
963     SplitMap1DVertical map1DVertical = new SplitMap1DVertical(splitMap
964     .getPartitionCalculationBounds(splitMapPos).width, splitMap
965     .getPartitionCalculationBounds(splitMapPos).height, 0,
966     allowedThreads, NeighborhoodBoxingMode.outBoxing);
967    
968     // create controllers and init the tasks
969     for (int i = 0; i < allowedThreads; i++) {
970     AdvancedSPMDServerController serverController = new AdvancedSPMDServerController(
971     dataManager, referenceID);
972     // set first thread as master thread
973     serverController.setMasterThread(i == 0);
974     // now assign the calculation area. Because we have assumed that
975     // the (Thread)-calculation
976     // area begins at (0,0) we must move it relative to the real
977     // local calculation area
978     Rectangle calculationBounds = map1DVertical
979     .getPartitionCalculationBounds(i);
980     int absolutX = (int) (serverController.getLocalCalcMinX() + calculationBounds
981     .getMinX());
982     int absolutY = (int) (serverController.getLocalCalcMinY() + calculationBounds
983     .getMinY());
984     calculationBounds.setLocation(absolutX, absolutY);
985     serverController.setLocalCalculationBounds(calculationBounds);
986     tasks[i].setSPMDServerController(serverController);
987     tasks[i].initialize();
988     }
989     }
990     if (allowedThreads == 1) // only for saving resources: no new thread
991     { // if only one thread is used
992     Object executionResult = tasks[0].run(parameters);
993     results = new Object[] { executionResult };
994     } else {
995     // for more than 1 processor execute the multiple tasks in separate
996     // threads
997     for (int i = 0; i < allowedThreads; i++) {
998     resultCalls[i] = executor.submit(new SimpleCallThread(tasks[i],
999     parameters));
1000     }
1001     // wait for the Threads to finish
1002     for (int i = 0; i < allowedThreads; i++) {
1003     try {
1004     results[i] = resultCalls[i].get();
1005     } catch (InterruptedException e) {
1006     // TODO Auto-generated catch block
1007     e.printStackTrace();
1008     } catch (ExecutionException e) {
1009     // TODO Auto-generated catch block
1010     e.printStackTrace();
1011     }
1012     }
1013     }
1014     // generate a event for the execution
1015     if (remoteEventReceiver.isTimeMonitoringEnabled())
1016     remoteEventReceiver.fireRemoteEvent(new TimeEvent((System
1017     .nanoTime() - time), serverName, "DataServer",
1018     CommType.REMOTE_EXECUTION));
1019     return results;
1020     }
1021    
1022     /**
1023     * This method frees all resources.<br>
1024     * Notice that this method is not remotely accessible!
1025     */
1026     public void stopServer() {
1027     unbind();
1028     if (dataManager != null)
1029     dataManager.stop();
1030     dataManager = null;
1031     if (multicastThread != null)
1032     multicastThread.stopThread();
1033     }
1034     }

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26