/[xulu]/branches/1.8-gt2-2.6/src/appl/parallel/server/XuluServer.java
ViewVC logotype

Annotation of /branches/1.8-gt2-2.6/src/appl/parallel/server/XuluServer.java

Parent Directory Parent Directory | Revision Log Revision Log


Revision 60 - (hide annotations)
Sun Oct 4 16:54:52 2009 UTC (15 years, 3 months ago) by alfonx
File size: 33553 byte(s)
* organized imports
1 mojays 2 package appl.parallel.server;
2    
3     import java.awt.Rectangle;
4     import java.io.IOException;
5     import java.net.DatagramPacket;
6     import java.net.InetAddress;
7     import java.net.MalformedURLException;
8     import java.net.MulticastSocket;
9     import java.net.URL;
10     import java.net.UnknownHostException;
11     import java.rmi.Naming;
12     import java.rmi.NotBoundException;
13     import java.rmi.RMISecurityManager;
14     import java.rmi.Remote;
15     import java.rmi.RemoteException;
16     import java.rmi.server.ServerNotActiveException;
17     import java.rmi.server.UnicastRemoteObject;
18     import java.util.HashMap;
19     import java.util.concurrent.Callable;
20     import java.util.concurrent.ExecutionException;
21     import java.util.concurrent.ExecutorService;
22     import java.util.concurrent.Executors;
23     import java.util.concurrent.Future;
24    
25     import net.jini.loader.pref.PreferredClassLoader;
26    
27     import org.apache.log4j.BasicConfigurator;
28     import org.apache.log4j.Level;
29     import org.apache.log4j.LogManager;
30     import org.apache.log4j.Logger;
31     import org.apache.log4j.PropertyConfigurator;
32     import org.apache.log4j.net.SocketAppender;
33    
34     import appl.ext.XuluConfig;
35     import appl.parallel.ComputingResourceProperties;
36     import appl.parallel.client.ClientDataServer;
37     import appl.parallel.event.CommEventSink;
38     import appl.parallel.event.TimeEvent;
39     import appl.parallel.event.CommEvent.CommType;
40     import appl.parallel.services.RemoteEventProxy;
41     import appl.parallel.spmd.AdvancedSPMDServerController;
42     import appl.parallel.spmd.SPMDTask;
43     import appl.parallel.spmd.split.SplitMap;
44     import appl.parallel.spmd.split.SplitMap1DVertical;
45     import appl.parallel.spmd.split.AbstractSplitMap.NeighborhoodBoxingMode;
46     import appl.parallel.util.Helper;
47     import appl.util.benchmark.Benchmark;
48    
49     /**
50     * The Server manages the remote execution of program code. It may be started
51     * using the {@link #main(String[])} method.<br>
52     * The Server reads the following properties from {@link XuluConfig} - the
53     * values given should be the defaults.<br>
54     * <table border="1">
55     * <tr>
56     * <th>Property</th>
57     * <th>Default</th>
58     * <th>Desc</th>
59     * </tr>
60     * <tr>
61     * <td>XuluServer.useCodeDownloading</td>
62     * <td>false</td>
63     * <td>if true, the server tries to download task class files from a
64     * HTTP-server at the client-url</td>
65     * </tr>
66     * <tr>
67     * <td>XuluServer.registryPort</td>
68     * <td>1099</td>
69     * <td>The port at which a new registry is created, if no running registry was
70     * found</td>
71     * </tr>
72     * <tr>
73     * <td>XuluServer.multicastgroup</td>
74     * <td>239.1.1.1</td>
75     * <td>The default multicast group</td>
76     * </tr>
77     * <tr>
78     * <td> XuluServer.multicastport</td>
79     * <td>10000</td>
80     * <td>The default multicast port</td>
81     * </tr>
82     * <tr>
83     * <td>XuluServer.eventDelay</td>
84     * <td>50</td>
85     * <td>Events generated on server side may be delayed to collect multiple
86     * Events and send them at once (this saves communication time).Time is given in
87     * milliseconds</td>
88     * </tr>
89     * <tr>
90     * <td>XuluServer.log4j.logLevel</td>
91     * <td>info</td>
92     * <td>the level can be '<i>debug</i>','<i>info</i>','<i>warn</i>','<i>error</i>'
93     * or '<i>fatal</i>' and can be overridden by commandline - Parameters<br>
94     * <br>
95     * </td>
96     * </tr>
97     * <td>XuluServer.log4j.mode</td>
98     * <td>console</td>
99     * <td>the mode can be '<i>file</i>','<i>chainsaw</i>' or '<i>console</i>'.
100     * File means that the configurationfile xuluserverlog4j.cfg is read. <br>
101     * <i>'chainsaw'</i> means that the server tries to configure the logger each
102     * time a new client for a chainsaw instance running at the client ip
103     * (standardports) <br>
104     * </td>
105     * </tr>
106     * <td>XuluServer.benchmarkclass</td>
107     * <td>appl.util.benchmark.SimpleBenchmark</td>
108     * <td>The Benchmark to get a rating for this machine. Notice that you can
109     * override the rating with the -rating parameter</td>
110     * </tr>
111     * <tr>
112     * <td>XuluServer.useThreads</td>
113     * <td>max</td>
114     * <td>The number of threads to be used by the server for tasks that support
115     * multi-threading. Per default one thread is created for every available
116     * Processors (value='max'). More threads than processors may be useful for
117     * testing tasks which support multihreading or for processors which support
118     * hyperthreading.</td>
119     * </tr>
120     * <tr>
121     * <td>XuluServer.runbench</td>
122     * <td>true</td>
123     * <td>the benchmark is run automatically</td>
124     * </tr>
125     * </table> <br>
126     * <b>Multicasting:</b> <br>
127     * If multicasting does not work, try to disable your firewall or configure it
128     * to allow multicast UDP packages. Notice also, that multicasting may not work
129     * in virtual machines like VMWare or MS Virtual PC 2004/7. <br>
130     * <br>
131     * <b>Assumptions</b>
132     * Only one client is connected to the server (and can run Tasks) <br>
133     *<br>
134     * See {@link #main(String[])} for details about command line parameters
135     * @see ServerMulticastReceiver
136     * @author Dominik Appl
137     */
138     public class XuluServer extends UnicastRemoteObject implements SPMDResource {
139    
140     /**
141     * This Thread is used when multiple threads should be executed simultaneous during multithreading
142     *
143     * @author Dominik Appl
144     */
145     class SimpleCallThread implements Callable {
146    
147     private final SPMDTask task;
148    
149     private final Object[] parameters;
150    
151     /**
152     * @param task the task to be executed
153     * @param parameters the parameters for the run method of the task
154     */
155     public SimpleCallThread(SPMDTask task, Object... parameters) {
156     this.task = task;
157     this.parameters = parameters;
158     }
159    
160     /*
161     * (non-Javadoc)
162     *
163     * @see java.util.concurrent.Callable#call()
164     */
165     public Object call() throws Exception {
166     return task.run(parameters);
167     }
168     }
169    
170     private static final Logger LOG = LogManager
171     .getLogger("appl.parallel.server.XuluServer");
172    
173     private static String bindingName = "XuluServer";
174    
175     private static final String helloFromServerMessage = "Hello Xulu from XuluServer";
176    
177     private static String log4jfilename = "xuluserverlog4j.cfg";
178    
179     private static boolean chainsaw = false;
180    
181     private static SocketAppender chainsawAppender;
182    
183     /**
184     * @param mode the mode of the execution
185     */
186     private static void log4jConfig(String mode) {
187     if (mode.equals("chainsaw")) {
188     BasicConfigurator.resetConfiguration();
189     chainsawAppender = new SocketAppender();
190     BasicConfigurator.configure(chainsawAppender);
191     System.out.println("Server configured for chainsaw.");
192     chainsaw = true;
193     } else if (mode.equals("file")) {
194     BasicConfigurator.resetConfiguration();
195     PropertyConfigurator.configure(log4jfilename);
196     } else if (mode.equals("console")) {
197     BasicConfigurator.resetConfiguration();
198     BasicConfigurator.configure();
199     } else {
200     System.err.println("Invalid log4j-Config parameter found: " + mode);
201     printInfoMessage();
202     System.exit(0);
203     }
204     }
205    
206     /**
207     * configures the log4j level
208     *
209     * @see #main(String[])
210     */
211     private static void log4jLevel(String level) {
212     if (level.equals("info")) {
213     LOG.setLevel(Level.INFO);
214     } else if (level.equals("debug")) {
215     LOG.setLevel(Level.DEBUG);
216     } else if (level.equals("error")) {
217     LOG.setLevel(Level.ERROR);
218     } else if (level.equals("fatal")) {
219     LOG.setLevel(Level.FATAL);
220     } else if (level.equals("warn")) {
221     LOG.setLevel(Level.WARN);
222     } else if (level.equals("off")) {
223     LOG.setLevel(Level.OFF);
224     } else {
225     System.err.println("Invalid log4j-Level found: " + level);
226     printInfoMessage();
227     System.exit(0);
228     }
229     System.out.println("Setting logLevel to " + level);
230     }
231    
232     /**
233     * Starts the server.
234     *
235     * @param args
236     * The following parameters are valid: <br>
237     * <br>
238     * -loglevel:<code>LEVEL</code> - where <code>LEVEL</code> is '<i>debug</i>','<i>info</i>'
239     * (DEFAULT),'<i>warn</i>','<i>error</i>' or '<i>fatal</i>' <br>
240     * <br>
241     * -logconfig:<code>CONFIG</code> - where <code>CONFIG</code> is '<i>file</i>','<i>chainsaw</i>' or '<i>console</i>'
242     * <li>'chainsaw' means that the server tries to configure the logger each time a
243     * new client for a chainsaw instance running at the client ip
244     * (standardports) </li><br>
245     * <li>'<i>file</i>' means that the file '<i>xuluserverlog4j.cfg</i>'
246     * in the home-directory is used for configuration <br> </li>
247     * <li>'<i>console</i>'
248     * means of course, that the console is used for output <br> </li>
249     * <br>
250     * -port:PORTNUMBER - binds the server to the specified port (default 1099)
251     *
252    
253     */
254     public static void main(String[] args) {
255     // parse arguments
256     int serverPort = 0;
257     for (String arg : args) {
258     if (arg.toLowerCase().startsWith("-loglevel:"))
259     log4jLevel(arg.substring(10).toLowerCase());
260     else if (arg.toLowerCase().startsWith("-logconfig:"))
261     log4jConfig(arg.substring(11).toLowerCase());
262     else if (arg.toLowerCase().startsWith("-port:"))
263     serverPort = Integer.valueOf(arg.substring(6));
264     else {
265     System.err.println("Invalid Argument found: " + arg);
266     printInfoMessage();
267     System.exit(0);
268     }
269     }
270    
271     if (System.getSecurityManager() == null) {
272     System.setSecurityManager(new RMISecurityManager());
273     }
274     Remote server;
275     try {
276     server = new XuluServer(serverPort);
277     System.out.println("Xulu Server is up and running!");
278     } catch (RemoteException e) {
279     System.err.println("Xulu Server could not start!");
280     e.printStackTrace();
281     }
282     }
283    
284     private static void printInfoMessage() {
285     System.out
286     .println("The following arguments are valid: \n"
287     + "-loglevel:<LEVEL> \n"
288     + " where <LEVEL> is 'off','debug','info' (DEFAULT),'warn','error' or 'fatal'\n\n"
289     + "-logconfig:<CONFIG> \n"
290     + " where <CONFIG> is 'file','chainsaw' or 'console'(DEFAULT) \n"
291     + " 'chainsaw' means that the server tries to configure the logger each\n"
292     + " time a new client for a chainsaw instance running at the client ip\n"
293     + " (standardports) \n"
294     + " 'file' means that the file 'xuluserverlog4j.cfg' in the home-directory "
295     + " is used for configuration \n"
296     + " 'console' means of course, that the console is used for output"
297     + "-port:<portnumber> \n"
298     + " The Server is bound to the specified port");
299     }
300    
301     private static void unbind() {
302     Helper.unbind(bindingName);
303     }
304    
305     /**
306     * The {@link Class} objects of the currently active connection. Every time
307     * {@link #runSPMDModelTask(String, int, Object[])} is called the
308     * newest class is loaded from the client. For performance reasons this will
309     * happen only one time per connection. In this map the current active
310     * classes are stored.
311     */
312     private HashMap<String, SPMDTask[]> spmdTaskInstances;
313    
314     /* standard is should be 1099 */
315     private int registryPort = 1099;
316    
317     private int multicastPort;
318    
319     private String multiCastIP;
320    
321     ServerMulticastReceiver multicastThread;
322    
323     InetAddress multiCastGroup;
324    
325     // says whether a client is connected
326     private boolean connected = false;
327    
328     private long starttime = System.currentTimeMillis();
329    
330     private String connectedClient;
331    
332     private PartitionDataManager dataManager;
333    
334     private RemoteEventProxy remoteEventReceiver;
335    
336     // User specified name of the Server or hostname
337     private String serverName;
338    
339     private int machineRating = 0;
340    
341     private final boolean loggingEnabled;
342    
343     private int taskPriority = Thread.NORM_PRIORITY;
344    
345     private int availableProcessors = 1;
346    
347     private int toUseThreads = 1;
348    
349     private final boolean isInternalServer;
350    
351     private ClientDataServer localSPMDClient;
352    
353     private boolean useCodeDownloading;
354    
355     private ExecutorService executor;
356    
357     // used for execution results
358     private Future[] resultCalls;
359    
360     /**
361     * Creates a new instance at the standard port
362     *
363     * @throws RemoteException
364     */
365     public XuluServer() throws RemoteException {
366     this(true, false, 0);
367     }
368    
369     /**
370     * @param configLogging
371     * enable or disable logging
372     * @param isInternalServer
373     * set to true if you want to make method calls purely local.
374     * This is for performance reasons (see
375     * {@link UnicastRemoteObject#getClientHost()}
376     * @param port
377     * port the server is bound to (0 for default)
378     * @throws RemoteException
379     */
380     public XuluServer(boolean configLogging, boolean isInternalServer, int port)
381     throws RemoteException {
382     super();
383     loggingEnabled = configLogging;
384     this.isInternalServer = isInternalServer;
385     initConfiguration();
386     //the executor is used for multithreading
387     executor = Executors.newCachedThreadPool();
388     spmdTaskInstances = new HashMap<String, SPMDTask[]>(10);
389     if (port != 0)
390     this.registryPort = port;
391     bind();
392    
393     initializeMultiCasting();
394     serverName = "unknown server";
395     try {
396     if (isInternalServer)
397     serverName = "internal Server";
398     else
399     serverName = InetAddress.getLocalHost().getHostName();
400     } catch (UnknownHostException e) {
401     // TODO Auto-generated catch block
402     e.printStackTrace();
403     }
404     if (port != 0)
405     serverName = serverName + " (port " + registryPort + ")";
406     runBenchmark();
407     }
408    
409     /**
410     * <br>
411     * This is used when a Server is running inside the Xulu-Client. It has
412     * performance advantages (direct access - no TCP/IP)
413     *
414     * @param configLogging
415     * enable or disable logging
416     * @param clientDataServer a local client for faster access
417     * @throws RemoteException
418     */
419     public XuluServer(boolean configLogging, ClientDataServer clientDataServer)
420     throws RemoteException {
421     this(configLogging, true, 0);
422     this.localSPMDClient = clientDataServer;
423     }
424    
425     /**
426     * @param port
427     * the port to which the Server is bound
428     * @throws RemoteException
429     */
430     public XuluServer(int port) throws RemoteException {
431     this(true, false, port);
432     }
433    
434     /**
435     * Bind the remote object's stub in the registry. Creates a registry if no
436     * running registry is found.
437     */
438     private void bind() throws RemoteException {
439     String name = bindingName;
440     Helper.bind(name, this, registryPort);
441     }
442    
443     private boolean checkConnected(String callingHost) {
444    
445     if (callingHost.equals(connectedClient))
446     return true;
447     LOG
448     .warn(callingHost
449     + " has tried to execute a critical operation, but was not connected "
450     + ". Actual connected client: " + connectedClient);
451     return false;
452     }
453    
454     /**
455     * Only a connected client has access to all functionality. When connecting
456     * or reconnecting, all data of the server and the corresponding
457     * {@link PartitionDataServer} is reset!
458     *
459     * @see appl.parallel.ComputingResource#connect()
460     */
461     public synchronized boolean connect() throws RemoteException {
462     /**
463     * determine the calling host. Internal Server requires special
464     * handling: notice that this happens very often in this class, but can
465     * not externalized due to the behavior of #getClientHost();
466     */
467     String callingHost = "error";
468     try {
469     callingHost = isInternalServer ? "localhost" : getClientHost();
470     } catch (ServerNotActiveException e) {
471     e.printStackTrace();
472     }
473     /** ********************************************************************** */
474    
475     if (connectedClient == null || callingHost.equals(connectedClient)) {
476     if (connectedClient == null)
477     LOG.info("Client " + callingHost + " connected!");
478     else
479     LOG.info("Client " + connectedClient + " reconnected! ");
480     // reset data
481     reset();
482     connectedClient = callingHost;
483     // initalize event system & chainsaw
484     initEventSystem();
485     initChainsaw(connectedClient);
486     return true;
487     } else if (connectedClient != null) {
488     LOG.warn(callingHost + " has tried to "
489     + "connect to server, but was not connected, "
490     + "because this server is in use by " + connectedClient);
491     }
492    
493     return false;
494     }
495    
496     /*
497     * (non-Javadoc)
498     *
499     * @see appl.parallel.server.ComputingResource#createDataServer()
500     */
501     public PartitionDataServer createDataServer(String[] IPs)
502     throws RemoteException {
503     /**
504     * determine the calling host. Internal Server requires special
505     * handling: notice that this happens very often in this class, but can
506     * not externalized due to the behavior of #getClientHost();
507     */
508     String callingHost = "localhost";
509     try {
510     callingHost = isInternalServer ? "localhost" : getClientHost();
511     } catch (ServerNotActiveException e) {
512     e.printStackTrace();
513     }
514     /** ********************************************************************** */
515     if (checkConnected(callingHost)) {
516     if (isInternalServer)
517     dataManager = new PartitionDataManager(IPs,
518     remoteEventReceiver, getRegistryPort(), localSPMDClient);
519     else
520     dataManager = new PartitionDataManager(IPs,
521     remoteEventReceiver, getRegistryPort());
522     return dataManager;
523     }
524     return null;
525     }
526    
527     /* (non-Javadoc)
528     * @see appl.parallel.server.ComputingResource#disconnect()
529     */
530     public synchronized void disconnect() throws RemoteException {
531     /**
532     * determine the calling host. Internal Server requires special
533     * handling: notice that this happens very often in this class, but can
534     * not externalized due to the behavior of #getClientHost();
535     */
536     String callingHost = "localhost";
537     try {
538     callingHost = isInternalServer ? "localhost" : getClientHost();
539     } catch (ServerNotActiveException e) {
540     e.printStackTrace();
541     }
542     /** ********************************************************************** */
543     if (checkConnected(callingHost)) {
544     reset();
545     LOG.info("Client " + connectedClient
546     + " successfully disconnected from Server");
547     }
548     }
549    
550     /**
551     * @return a rating for the machine as discovered by the benchmark
552     */
553     public int getRating() {
554     return machineRating;
555     }
556    
557     /**
558     * @return the registry port the server is bound to
559     */
560     public int getRegistryPort() {
561     return registryPort;
562     }
563    
564     /*
565     * (non-Javadoc)
566     *
567     * @see appl.parallel.server.XuluServer#getResourceInformation()
568     */
569     public ComputingResourceProperties getResourceInformation()
570     throws RemoteException {
571     return new XuluServerProperties(this, serverName);
572     }
573    
574     /**
575     * @return the time in s the server is running
576     */
577     int getUptime() {
578     return (int) (System.currentTimeMillis() - starttime);
579     }
580    
581     private void initChainsaw(String connectedClient) {
582     if (chainsaw && connectedClient != null) {
583     chainsawAppender = new SocketAppender(connectedClient, 4445);
584     BasicConfigurator.resetConfiguration();
585     BasicConfigurator.configure(chainsawAppender);
586     }
587     }
588    
589     /**
590     * inits task priority, ports and loglevel
591     */
592     private void initConfiguration() {
593     XuluConfig config = XuluConfig.getXuluConfig();
594     // a lower priority will run the server more in the background
595     taskPriority = config.getIntProperty("XuluServer.priority");
596    
597     switch (taskPriority) {
598     default:
599     taskPriority = Thread.NORM_PRIORITY;
600     break;
601     case 1:
602     taskPriority = Thread.MIN_PRIORITY;
603     break;
604     case 2:
605     taskPriority = Thread.NORM_PRIORITY - 1;
606     break;
607     case 4:
608     taskPriority = Thread.NORM_PRIORITY + 1;
609     break;
610     case 5:
611     taskPriority = Thread.MAX_PRIORITY;
612     break;
613     }
614    
615     availableProcessors = Runtime.getRuntime().availableProcessors();
616     // default: try to use one thread per processor
617     toUseThreads = availableProcessors;
618     // check if there is an entry in the XuluConfig saying how many threads
619     // should be created.
620     // (more threads than processors may be useful for the use of
621     // hyperthreading)
622     String configProz = config.getProperty("XuluServer.useThreads");
623     if ((configProz != null) && !configProz.equals("max")) {
624     toUseThreads = config.getIntProperty("XuluServer.useThreads");
625     // error checking:
626     if (toUseThreads < 1 || toUseThreads > 128){
627     toUseThreads = availableProcessors;
628     System.out
629     .println("Error: Number of threads must be between 1 and 128!");
630     }
631     }
632     // init correspondig arrays
633     resultCalls = new Future[toUseThreads];
634    
635     int port = config.getIntProperty("XuluServer.registryport");
636     if (port != 0)
637     registryPort = port;
638    
639     useCodeDownloading = config
640     .getBooleanProperty("XuluServer.useCodeDownloading");
641     if (useCodeDownloading)
642     System.out.println("Code downloading is enabled");
643     else
644     System.out.println("Code downloading is disabled");
645    
646     int multicastport = config.getIntProperty("XuluServer.multicastport");
647     if (multicastport != 0)
648     multicastPort = multicastport;
649     String IP = config.getProperty("XuluServer.multicastgroup");
650     if (IP != null)
651     multiCastIP = IP;
652     try {
653     multiCastGroup = InetAddress.getByName(multiCastIP);
654     } catch (UnknownHostException e) {
655     e.printStackTrace();
656     }
657     if (loggingEnabled) {
658    
659     String mode = config.getProperty("XuluServer.log4j.mode");
660     if (mode != null)
661     log4jConfig(mode);
662     }
663     String level = config.getProperty("XuluServer.log4j.logLevel");
664     if (level != null)
665     log4jLevel(level.toLowerCase());
666     else
667     log4jLevel("info");
668     }
669    
670     /**
671     * Connects to the RemoteEventReceiver at the calling client
672     */
673     private void initEventSystem() throws RemoteException {
674     String lookupName = "rmi://" + connectedClient + "/RemoteEventReceiver";
675     try {
676     int delay = XuluConfig.getXuluConfig().getIntProperty(
677     "XuluServer.eventDelay");
678     System.out.println("looking up " + lookupName);
679     CommEventSink eventSink = (CommEventSink) Naming.lookup(lookupName);
680     if (remoteEventReceiver != null)
681     remoteEventReceiver.stopService();
682     remoteEventReceiver = new RemoteEventProxy(eventSink, delay);
683     remoteEventReceiver.startService();
684     } catch (MalformedURLException e) {
685     e.printStackTrace();
686     } catch (NotBoundException e) {
687     LOG.warn("Could not find the remote event receiver at "
688     + lookupName, e);
689     System.out.println(e.getMessage());
690     remoteEventReceiver = new RemoteEventProxy(null, 0);
691     }
692     }
693    
694     /**
695     * inits the multicast unit
696     */
697     private void initializeMultiCasting() {
698     try {
699     MulticastSocket s = new MulticastSocket(multicastPort);
700     s.joinGroup(multiCastGroup);
701     // send hello message
702     String IP = XuluConfig.getXuluConfig().getProperty(
703     "XuluServer.multicastgroup");
704     DatagramPacket answer = new DatagramPacket(helloFromServerMessage
705     .getBytes(), helloFromServerMessage.length(), InetAddress
706     .getByName(IP), multicastPort);
707     s.send(answer);
708     // start thread that waits on client messages
709     multicastThread = new ServerMulticastReceiver(s);
710     multicastThread.start();
711     } catch (IOException e) {
712     // TODO Auto-generated catch block
713     LOG.warn("Could not initalize multicasting! Reason was "
714     + e.getMessage(), e);
715    
716     }
717     }
718    
719     /**
720     * @return whether this server is available or in use by another client
721     * @see appl.parallel.ComputingResource#isAvailable()
722     */
723     public boolean isAvailable() throws RemoteException {
724     /**
725     * determine the calling host. Internal Server requires special
726     * handling: notice that this happens very often in this class, but can
727     * not externalized due to the behavior of #getClientHost();
728     */
729     String callingHost = "localhost";
730     try {
731     callingHost = isInternalServer ? "localhost" : getClientHost();
732     } catch (ServerNotActiveException e) {
733     e.printStackTrace();
734     }
735     /** ********************************************************************** */
736     if (callingHost.equals(connectedClient))
737     return true;
738    
739     if (connectedClient == null)
740     return true;
741     return false;
742     }
743    
744     /**
745     * @return if a client is connected
746     */
747     public boolean isClientConnected() {
748     return connected;
749    
750     }
751    
752     /**
753     * Loads a {@link SPMDTask} from the specified URL and returns an array of
754     * instances of this class. There are so many instances, as there are
755     * available processors on this machine!. This class also makes sure that
756     * this happens only one time per connection (else this would of course be a
757     * performance killer in fine grained parallelization).
758     *
759     * @param location
760     * the URL of the rootDirectory/networkLocation
761     * @param spmdTaskClassName
762     * the name of the SPMDTask for which the newest class should be
763     * found
764     * @return the instance of the class or <code>null</code> if loading fails
765     */
766     private SPMDTask[] loadSPMDTasksfromURL(String location,
767     String spmdTaskClassName) {
768    
769     SPMDTask[] spmdClassInstances = (SPMDTask[]) spmdTaskInstances
770     .get(spmdTaskClassName);
771    
772     // if already loaded for this connection simply return the instances
773     if (spmdClassInstances != null)
774     return (SPMDTask[]) spmdClassInstances;
775     spmdClassInstances = new SPMDTask[toUseThreads];
776     Class spmdClass = null;
777     if (useCodeDownloading) {
778     try {
779     // lookup client and make sure that the newest class of the task
780     // is instantiated. Will avoid loading older cached or local
781     // filesystem version
782     URL[] locations = { new URL("http://" + location + "/") };
783     PreferredClassLoader classLoader = new PreferredClassLoader(
784     locations, Thread.currentThread()
785     .getContextClassLoader(), null, false);
786     LOG.debug("Try to load " + spmdTaskClassName + " from "
787     + location);
788     spmdClass = classLoader.loadClass(spmdTaskClassName);
789     } catch (Exception e) {
790     LOG.error("Could not load class for " + spmdTaskClassName
791     + " from " + location, e);
792     e.printStackTrace();
793     }
794     }
795     // if retrieval failed for some reason or code downloading disabled
796     // then use the local class
797     try {
798     if (spmdClass == null) {
799     spmdClass = Class.forName(spmdTaskClassName);
800     }
801    
802     // instantiate the classes
803     for (int i = 0; i < spmdClassInstances.length; i++) {
804     Object task = spmdClass.newInstance();
805     if (!(task instanceof SPMDTask)) {
806     LOG.fatal(spmdTaskClassName
807     + " was not an instance of SPMDTask!");
808     throw new UnsupportedOperationException(spmdTaskClassName
809     + " was not an instance of SPMDTask!");
810     }
811     spmdClassInstances[i] = (SPMDTask) task;
812     }
813    
814     spmdTaskInstances.put(spmdTaskClassName, spmdClassInstances);
815     return spmdClassInstances;
816    
817     } catch (ClassNotFoundException e) {
818     // TODO Auto-generated catch block
819     e.printStackTrace();
820     } catch (InstantiationException e) {
821     // TODO Auto-generated catch block
822     e.printStackTrace();
823     } catch (IllegalAccessException e) {
824     // TODO Auto-generated catch block
825     e.printStackTrace();
826     }
827     throw new UnsupportedOperationException("Loading of class "
828     + spmdTaskClassName
829     + " failed on serverside! Execution canceled");
830     }
831    
832     /*
833     * (non-Javadoc)
834     *
835     * @see appl.parallel.server.XuluServer#ping(java.lang.Object[])
836     */
837     public Object ping(Object... o) throws RemoteException {
838     System.out.println("received ping");
839     return o;
840     }
841    
842     /**
843     * deletes all client-specific data data, so that a newly connected client
844     * can use a clean system
845     */
846     private void reset() {
847     if (dataManager != null)
848     dataManager.stop();
849     dataManager = null;
850     connectedClient = null;
851     spmdTaskInstances.clear();
852     }
853    
854     /**
855     * runs the benchmark defined in XuluServer.benchmarkclass
856     */
857     private void runBenchmark() {
858     // see if the benchmark should be run
859    
860     boolean runbench = XuluConfig.getXuluConfig().getBooleanProperty(
861     "XuluServer.runbench");
862     if (runbench) {
863     String classname = XuluConfig.getXuluConfig().getProperty(
864     "XuluServer.benchmarkclass");
865     if (classname != null) {
866     try {
867     // getBenchmarkClass
868     Benchmark bench = (Benchmark) Class.forName(classname)
869     .newInstance();
870     machineRating = bench.bench();
871     System.out
872     .println("Successfully run benchmark. Machine Rating is "
873     + machineRating);
874     } catch (Exception e) {
875     System.err
876     .println("Could not load Benchmarkclass "
877     + classname
878     + ". Setting rating to 0 (which means average). Reason was: "
879     + e.getMessage());
880     machineRating = 0;
881     }
882     } else
883     System.out.println("Benchmark disabled");
884     }
885     }
886    
887     /**
888     * Every time {@link #runSPMDModelTask(String, int, Object[])} is called the
889     * newest class is loaded from the client. For performance reasons this will
890     * happen only one time per connection.
891     *
892     * @see appl.parallel.server.SPMDResource#runSPMDModelTask(String, int,
893     * Object[])
894     */
895     @SuppressWarnings("unchecked")
896     public Object[] runSPMDModelTask(String spmdTaskName, int referenceID,
897     Object... parameters) throws RemoteException {
898     /**
899     * determine the calling host. Internal Server requires special
900     * handling: notice that this happens very often in this class, but can
901     * not externalized due to the behavior of #getClientHost();
902     */
903     String callingHost = "localhost";
904     try {
905     callingHost = isInternalServer ? "localhost" : getClientHost();
906     } catch (ServerNotActiveException e) {
907     e.printStackTrace();
908     }
909     /** ********************************************************************** */
910     Thread.currentThread().setPriority(taskPriority);
911     String clientIP = "<unknown Client>";
912     long time = System.nanoTime();
913    
914     clientIP = callingHost;
915     if (!clientIP.equals(connectedClient)) {
916     LOG.warn(clientIP
917     + " has tried to run a Task, but was not connected. "
918     + "This server is in use by " + connectedClient);
919     throw new RemoteException(
920     "Run of SPMDTask failed! This server is in use by "
921     + connectedClient);
922     }
923     if (LOG.isDebugEnabled())
924     LOG.debug("received task from " + clientIP
925     + ". Starting evaluation now...");
926     // if no datamanager is found: exit
927     if (dataManager == null)
928     throw new UnsupportedOperationException(
929     "No DataManager found. Create one first using 'createDataManager' on this Server");
930    
931     // lookup newest version of the class from the server at the url
932     // (or from local disk if remote class loading is disabled)
933     SPMDTask[] tasks = loadSPMDTasksfromURL(clientIP, spmdTaskName);
934    
935     // The number of threads allowed for execution can be different
936     // from the number of user assigned threads
937     // because tasks may not support multithreadeding
938     int allowedThreads = 1;
939     if (tasks[0].supportsMultiThreading())
940     allowedThreads = toUseThreads;
941    
942     Object results[] = new Object[allowedThreads];
943    
944     // the following loop is only executed the first time a task is used!
945     if (!tasks[0].isInitialized()) {
946     if (LOG.isInfoEnabled())
947     LOG.info("Trying to use " + allowedThreads
948     + " threads for execution of "
949     + tasks[0].getClass().getName());
950     // calculate splitting for multiple threads:
951     int splitMapPos = dataManager.getInfo(referenceID).getSplitMapPos();
952     SplitMap splitMap = dataManager.getInfo(referenceID).getSplitMap();
953     SplitMap1DVertical map1DVertical = new SplitMap1DVertical(splitMap
954     .getPartitionCalculationBounds(splitMapPos).width, splitMap
955     .getPartitionCalculationBounds(splitMapPos).height, 0,
956     allowedThreads, NeighborhoodBoxingMode.outBoxing);
957    
958     // create controllers and init the tasks
959     for (int i = 0; i < allowedThreads; i++) {
960     AdvancedSPMDServerController serverController = new AdvancedSPMDServerController(
961     dataManager, referenceID);
962     // set first thread as master thread
963     serverController.setMasterThread(i == 0);
964     // now assign the calculation area. Because we have assumed that
965     // the (Thread)-calculation
966     // area begins at (0,0) we must move it relative to the real
967     // local calculation area
968     Rectangle calculationBounds = map1DVertical
969     .getPartitionCalculationBounds(i);
970     int absolutX = (int) (serverController.getLocalCalcMinX() + calculationBounds
971     .getMinX());
972     int absolutY = (int) (serverController.getLocalCalcMinY() + calculationBounds
973     .getMinY());
974     calculationBounds.setLocation(absolutX, absolutY);
975     serverController.setLocalCalculationBounds(calculationBounds);
976     tasks[i].setSPMDServerController(serverController);
977     tasks[i].initialize();
978     }
979     }
980     if (allowedThreads == 1) // only for saving resources: no new thread
981     { // if only one thread is used
982     Object executionResult = tasks[0].run(parameters);
983     results = new Object[] { executionResult };
984     } else {
985     // for more than 1 processor execute the multiple tasks in separate
986     // threads
987     for (int i = 0; i < allowedThreads; i++) {
988     resultCalls[i] = executor.submit(new SimpleCallThread(tasks[i],
989     parameters));
990     }
991     // wait for the Threads to finish
992     for (int i = 0; i < allowedThreads; i++) {
993     try {
994     results[i] = resultCalls[i].get();
995     } catch (InterruptedException e) {
996     // TODO Auto-generated catch block
997     e.printStackTrace();
998     } catch (ExecutionException e) {
999     // TODO Auto-generated catch block
1000     e.printStackTrace();
1001     }
1002     }
1003     }
1004     // generate a event for the execution
1005     if (remoteEventReceiver.isTimeMonitoringEnabled())
1006     remoteEventReceiver.fireRemoteEvent(new TimeEvent((System
1007     .nanoTime() - time), serverName, "DataServer",
1008     CommType.REMOTE_EXECUTION));
1009     return results;
1010     }
1011    
1012     /**
1013     * This method frees all resources.<br>
1014     * Notice that this method is not remotely accessible!
1015     */
1016     public void stopServer() {
1017     unbind();
1018     if (dataManager != null)
1019     dataManager.stop();
1020     dataManager = null;
1021     if (multicastThread != null)
1022     multicastThread.stopThread();
1023     }
1024     }

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26