1 |
package appl.parallel.server; |
2 |
|
3 |
import java.awt.Rectangle; |
4 |
import java.io.IOException; |
5 |
import java.net.DatagramPacket; |
6 |
import java.net.InetAddress; |
7 |
import java.net.MalformedURLException; |
8 |
import java.net.MulticastSocket; |
9 |
import java.net.URL; |
10 |
import java.net.UnknownHostException; |
11 |
import java.rmi.MarshalledObject; |
12 |
import java.rmi.Naming; |
13 |
import java.rmi.NotBoundException; |
14 |
import java.rmi.RMISecurityManager; |
15 |
import java.rmi.Remote; |
16 |
import java.rmi.RemoteException; |
17 |
import java.rmi.activation.Activatable; |
18 |
import java.rmi.activation.ActivationID; |
19 |
import java.rmi.server.ServerNotActiveException; |
20 |
import java.rmi.server.UnicastRemoteObject; |
21 |
import java.util.HashMap; |
22 |
import java.util.Vector; |
23 |
import java.util.concurrent.Callable; |
24 |
import java.util.concurrent.ExecutionException; |
25 |
import java.util.concurrent.ExecutorService; |
26 |
import java.util.concurrent.Executors; |
27 |
import java.util.concurrent.Future; |
28 |
|
29 |
import net.jini.loader.pref.PreferredClassLoader; |
30 |
|
31 |
import org.apache.log4j.BasicConfigurator; |
32 |
import org.apache.log4j.Level; |
33 |
import org.apache.log4j.LogManager; |
34 |
import org.apache.log4j.Logger; |
35 |
import org.apache.log4j.PropertyConfigurator; |
36 |
import org.apache.log4j.net.SocketAppender; |
37 |
|
38 |
import appl.ext.XuluConfig; |
39 |
import appl.parallel.ComputingResourceProperties; |
40 |
import appl.parallel.client.DataServer; |
41 |
import appl.parallel.client.ClientDataServer; |
42 |
import appl.parallel.event.CommEventSink; |
43 |
import appl.parallel.event.TimeEvent; |
44 |
import appl.parallel.event.CommEvent.CommType; |
45 |
import appl.parallel.services.RemoteEventProxy; |
46 |
import appl.parallel.spmd.AdvancedSPMDServerController; |
47 |
import appl.parallel.spmd.AdvancedSPMDServerInterface; |
48 |
import appl.parallel.spmd.SPMDServerController; |
49 |
import appl.parallel.spmd.SPMDServerInterface; |
50 |
import appl.parallel.spmd.SPMDTask; |
51 |
import appl.parallel.spmd.split.SinglePartitionInfo; |
52 |
import appl.parallel.spmd.split.SplitMap; |
53 |
import appl.parallel.spmd.split.SplitMap1DVertical; |
54 |
import appl.parallel.spmd.split.AbstractSplitMap.NeighborhoodBoxingMode; |
55 |
import appl.parallel.thread.OneMethodThread; |
56 |
import appl.parallel.util.Helper; |
57 |
import appl.util.benchmark.Benchmark; |
58 |
|
59 |
/** |
60 |
* The Server manages the remote execution of program code. It may be started |
61 |
* using the {@link #main(String[])} method.<br> |
62 |
* The Server reads the following properties from {@link XuluConfig} - the |
63 |
* values given should be the defaults.<br> |
64 |
* <table border="1"> |
65 |
* <tr> |
66 |
* <th>Property</th> |
67 |
* <th>Default</th> |
68 |
* <th>Desc</th> |
69 |
* </tr> |
70 |
* <tr> |
71 |
* <td>XuluServer.useCodeDownloading</td> |
72 |
* <td>false</td> |
73 |
* <td>if true, the server tries to download task class files from a |
74 |
* HTTP-server at the client-url</td> |
75 |
* </tr> |
76 |
* <tr> |
77 |
* <td>XuluServer.registryPort</td> |
78 |
* <td>1099</td> |
79 |
* <td>The port at which a new registry is created, if no running registry was |
80 |
* found</td> |
81 |
* </tr> |
82 |
* <tr> |
83 |
* <td>XuluServer.multicastgroup</td> |
84 |
* <td>239.1.1.1</td> |
85 |
* <td>The default multicast group</td> |
86 |
* </tr> |
87 |
* <tr> |
88 |
* <td> XuluServer.multicastport</td> |
89 |
* <td>10000</td> |
90 |
* <td>The default multicast port</td> |
91 |
* </tr> |
92 |
* <tr> |
93 |
* <td>XuluServer.eventDelay</td> |
94 |
* <td>50</td> |
95 |
* <td>Events generated on server side may be delayed to collect multiple |
96 |
* Events and send them at once (this saves communication time).Time is given in |
97 |
* milliseconds</td> |
98 |
* </tr> |
99 |
* <tr> |
100 |
* <td>XuluServer.log4j.logLevel</td> |
101 |
* <td>info</td> |
102 |
* <td>the level can be '<i>debug</i>','<i>info</i>','<i>warn</i>','<i>error</i>' |
103 |
* or '<i>fatal</i>' and can be overridden by commandline - Parameters<br> |
104 |
* <br> |
105 |
* </td> |
106 |
* </tr> |
107 |
* <td>XuluServer.log4j.mode</td> |
108 |
* <td>console</td> |
109 |
* <td>the mode can be '<i>file</i>','<i>chainsaw</i>' or '<i>console</i>'. |
110 |
* File means that the configurationfile xuluserverlog4j.cfg is read. <br> |
111 |
* <i>'chainsaw'</i> means that the server tries to configure the logger each |
112 |
* time a new client for a chainsaw instance running at the client ip |
113 |
* (standardports) <br> |
114 |
* </td> |
115 |
* </tr> |
116 |
* <td>XuluServer.benchmarkclass</td> |
117 |
* <td>appl.util.benchmark.SimpleBenchmark</td> |
118 |
* <td>The Benchmark to get a rating for this machine. Notice that you can |
119 |
* override the rating with the -rating parameter</td> |
120 |
* </tr> |
121 |
* <tr> |
122 |
* <td>XuluServer.useThreads</td> |
123 |
* <td>max</td> |
124 |
* <td>The number of threads to be used by the server for tasks that support |
125 |
* multi-threading. Per default one thread is created for every available |
126 |
* Processors (value='max'). More threads than processors may be useful for |
127 |
* testing tasks which support multihreading or for processors which support |
128 |
* hyperthreading.</td> |
129 |
* </tr> |
130 |
* <tr> |
131 |
* <td>XuluServer.runbench</td> |
132 |
* <td>true</td> |
133 |
* <td>the benchmark is run automatically</td> |
134 |
* </tr> |
135 |
* </table> <br> |
136 |
* <b>Multicasting:</b> <br> |
137 |
* If multicasting does not work, try to disable your firewall or configure it |
138 |
* to allow multicast UDP packages. Notice also, that multicasting may not work |
139 |
* in virtual machines like VMWare or MS Virtual PC 2004/7. <br> |
140 |
* <br> |
141 |
* <b>Assumptions</b> |
142 |
* Only one client is connected to the server (and can run Tasks) <br> |
143 |
*<br> |
144 |
* See {@link #main(String[])} for details about command line parameters |
145 |
* @see ServerMulticastReceiver |
146 |
* @author Dominik Appl |
147 |
*/ |
148 |
public class XuluServer extends UnicastRemoteObject implements SPMDResource { |
149 |
|
150 |
/** |
151 |
* This Thread is used when multiple threads should be executed simultaneous during multithreading |
152 |
* |
153 |
* @author Dominik Appl |
154 |
*/ |
155 |
class SimpleCallThread implements Callable { |
156 |
|
157 |
private final SPMDTask task; |
158 |
|
159 |
private final Object[] parameters; |
160 |
|
161 |
/** |
162 |
* @param task the task to be executed |
163 |
* @param parameters the parameters for the run method of the task |
164 |
*/ |
165 |
public SimpleCallThread(SPMDTask task, Object... parameters) { |
166 |
this.task = task; |
167 |
this.parameters = parameters; |
168 |
} |
169 |
|
170 |
/* |
171 |
* (non-Javadoc) |
172 |
* |
173 |
* @see java.util.concurrent.Callable#call() |
174 |
*/ |
175 |
public Object call() throws Exception { |
176 |
return task.run(parameters); |
177 |
} |
178 |
} |
179 |
|
180 |
private static final Logger LOG = LogManager |
181 |
.getLogger("appl.parallel.server.XuluServer"); |
182 |
|
183 |
private static String bindingName = "XuluServer"; |
184 |
|
185 |
private static final String helloFromServerMessage = "Hello Xulu from XuluServer"; |
186 |
|
187 |
private static String log4jfilename = "xuluserverlog4j.cfg"; |
188 |
|
189 |
private static boolean chainsaw = false; |
190 |
|
191 |
private static SocketAppender chainsawAppender; |
192 |
|
193 |
/** |
194 |
* @param mode the mode of the execution |
195 |
*/ |
196 |
private static void log4jConfig(String mode) { |
197 |
if (mode.equals("chainsaw")) { |
198 |
BasicConfigurator.resetConfiguration(); |
199 |
chainsawAppender = new SocketAppender(); |
200 |
BasicConfigurator.configure(chainsawAppender); |
201 |
System.out.println("Server configured for chainsaw."); |
202 |
chainsaw = true; |
203 |
} else if (mode.equals("file")) { |
204 |
BasicConfigurator.resetConfiguration(); |
205 |
PropertyConfigurator.configure(log4jfilename); |
206 |
} else if (mode.equals("console")) { |
207 |
BasicConfigurator.resetConfiguration(); |
208 |
BasicConfigurator.configure(); |
209 |
} else { |
210 |
System.err.println("Invalid log4j-Config parameter found: " + mode); |
211 |
printInfoMessage(); |
212 |
System.exit(0); |
213 |
} |
214 |
} |
215 |
|
216 |
/** |
217 |
* configures the log4j level |
218 |
* |
219 |
* @see #main(String[]) |
220 |
*/ |
221 |
private static void log4jLevel(String level) { |
222 |
if (level.equals("info")) { |
223 |
LOG.setLevel(Level.INFO); |
224 |
} else if (level.equals("debug")) { |
225 |
LOG.setLevel(Level.DEBUG); |
226 |
} else if (level.equals("error")) { |
227 |
LOG.setLevel(Level.ERROR); |
228 |
} else if (level.equals("fatal")) { |
229 |
LOG.setLevel(Level.FATAL); |
230 |
} else if (level.equals("warn")) { |
231 |
LOG.setLevel(Level.WARN); |
232 |
} else if (level.equals("off")) { |
233 |
LOG.setLevel(Level.OFF); |
234 |
} else { |
235 |
System.err.println("Invalid log4j-Level found: " + level); |
236 |
printInfoMessage(); |
237 |
System.exit(0); |
238 |
} |
239 |
System.out.println("Setting logLevel to " + level); |
240 |
} |
241 |
|
242 |
/** |
243 |
* Starts the server. |
244 |
* |
245 |
* @param args |
246 |
* The following parameters are valid: <br> |
247 |
* <br> |
248 |
* -loglevel:<code>LEVEL</code> - where <code>LEVEL</code> is '<i>debug</i>','<i>info</i>' |
249 |
* (DEFAULT),'<i>warn</i>','<i>error</i>' or '<i>fatal</i>' <br> |
250 |
* <br> |
251 |
* -logconfig:<code>CONFIG</code> - where <code>CONFIG</code> is '<i>file</i>','<i>chainsaw</i>' or '<i>console</i>' |
252 |
* <li>'chainsaw' means that the server tries to configure the logger each time a |
253 |
* new client for a chainsaw instance running at the client ip |
254 |
* (standardports) </li><br> |
255 |
* <li>'<i>file</i>' means that the file '<i>xuluserverlog4j.cfg</i>' |
256 |
* in the home-directory is used for configuration <br> </li> |
257 |
* <li>'<i>console</i>' |
258 |
* means of course, that the console is used for output <br> </li> |
259 |
* <br> |
260 |
* -port:PORTNUMBER - binds the server to the specified port (default 1099) |
261 |
* |
262 |
|
263 |
*/ |
264 |
public static void main(String[] args) { |
265 |
// parse arguments |
266 |
int serverPort = 0; |
267 |
for (String arg : args) { |
268 |
if (arg.toLowerCase().startsWith("-loglevel:")) |
269 |
log4jLevel(arg.substring(10).toLowerCase()); |
270 |
else if (arg.toLowerCase().startsWith("-logconfig:")) |
271 |
log4jConfig(arg.substring(11).toLowerCase()); |
272 |
else if (arg.toLowerCase().startsWith("-port:")) |
273 |
serverPort = Integer.valueOf(arg.substring(6)); |
274 |
else { |
275 |
System.err.println("Invalid Argument found: " + arg); |
276 |
printInfoMessage(); |
277 |
System.exit(0); |
278 |
} |
279 |
} |
280 |
|
281 |
if (System.getSecurityManager() == null) { |
282 |
System.setSecurityManager(new RMISecurityManager()); |
283 |
} |
284 |
Remote server; |
285 |
try { |
286 |
server = new XuluServer(serverPort); |
287 |
System.out.println("Xulu Server is up and running!"); |
288 |
} catch (RemoteException e) { |
289 |
System.err.println("Xulu Server could not start!"); |
290 |
e.printStackTrace(); |
291 |
} |
292 |
} |
293 |
|
294 |
private static void printInfoMessage() { |
295 |
System.out |
296 |
.println("The following arguments are valid: \n" |
297 |
+ "-loglevel:<LEVEL> \n" |
298 |
+ " where <LEVEL> is 'off','debug','info' (DEFAULT),'warn','error' or 'fatal'\n\n" |
299 |
+ "-logconfig:<CONFIG> \n" |
300 |
+ " where <CONFIG> is 'file','chainsaw' or 'console'(DEFAULT) \n" |
301 |
+ " 'chainsaw' means that the server tries to configure the logger each\n" |
302 |
+ " time a new client for a chainsaw instance running at the client ip\n" |
303 |
+ " (standardports) \n" |
304 |
+ " 'file' means that the file 'xuluserverlog4j.cfg' in the home-directory " |
305 |
+ " is used for configuration \n" |
306 |
+ " 'console' means of course, that the console is used for output" |
307 |
+ "-port:<portnumber> \n" |
308 |
+ " The Server is bound to the specified port"); |
309 |
} |
310 |
|
311 |
private static void unbind() { |
312 |
Helper.unbind(bindingName); |
313 |
} |
314 |
|
315 |
/** |
316 |
* The {@link Class} objects of the currently active connection. Every time |
317 |
* {@link #runSPMDModelTask(String, int, Object[])} is called the |
318 |
* newest class is loaded from the client. For performance reasons this will |
319 |
* happen only one time per connection. In this map the current active |
320 |
* classes are stored. |
321 |
*/ |
322 |
private HashMap<String, SPMDTask[]> spmdTaskInstances; |
323 |
|
324 |
/* standard is should be 1099 */ |
325 |
private int registryPort = 1099; |
326 |
|
327 |
private int multicastPort; |
328 |
|
329 |
private String multiCastIP; |
330 |
|
331 |
ServerMulticastReceiver multicastThread; |
332 |
|
333 |
InetAddress multiCastGroup; |
334 |
|
335 |
// says whether a client is connected |
336 |
private boolean connected = false; |
337 |
|
338 |
private long starttime = System.currentTimeMillis(); |
339 |
|
340 |
private String connectedClient; |
341 |
|
342 |
private PartitionDataManager dataManager; |
343 |
|
344 |
private RemoteEventProxy remoteEventReceiver; |
345 |
|
346 |
// User specified name of the Server or hostname |
347 |
private String serverName; |
348 |
|
349 |
private int machineRating = 0; |
350 |
|
351 |
private final boolean loggingEnabled; |
352 |
|
353 |
private int taskPriority = Thread.NORM_PRIORITY; |
354 |
|
355 |
private int availableProcessors = 1; |
356 |
|
357 |
private int toUseThreads = 1; |
358 |
|
359 |
private final boolean isInternalServer; |
360 |
|
361 |
private ClientDataServer localSPMDClient; |
362 |
|
363 |
private boolean useCodeDownloading; |
364 |
|
365 |
private ExecutorService executor; |
366 |
|
367 |
// used for execution results |
368 |
private Future[] resultCalls; |
369 |
|
370 |
/** |
371 |
* Creates a new instance at the standard port |
372 |
* |
373 |
* @throws RemoteException |
374 |
*/ |
375 |
public XuluServer() throws RemoteException { |
376 |
this(true, false, 0); |
377 |
} |
378 |
|
379 |
/** |
380 |
* @param configLogging |
381 |
* enable or disable logging |
382 |
* @param isInternalServer |
383 |
* set to true if you want to make method calls purely local. |
384 |
* This is for performance reasons (see |
385 |
* {@link UnicastRemoteObject#getClientHost()} |
386 |
* @param port |
387 |
* port the server is bound to (0 for default) |
388 |
* @throws RemoteException |
389 |
*/ |
390 |
public XuluServer(boolean configLogging, boolean isInternalServer, int port) |
391 |
throws RemoteException { |
392 |
super(); |
393 |
loggingEnabled = configLogging; |
394 |
this.isInternalServer = isInternalServer; |
395 |
initConfiguration(); |
396 |
//the executor is used for multithreading |
397 |
executor = Executors.newCachedThreadPool(); |
398 |
spmdTaskInstances = new HashMap<String, SPMDTask[]>(10); |
399 |
if (port != 0) |
400 |
this.registryPort = port; |
401 |
bind(); |
402 |
|
403 |
initializeMultiCasting(); |
404 |
serverName = "unknown server"; |
405 |
try { |
406 |
if (isInternalServer) |
407 |
serverName = "internal Server"; |
408 |
else |
409 |
serverName = InetAddress.getLocalHost().getHostName(); |
410 |
} catch (UnknownHostException e) { |
411 |
// TODO Auto-generated catch block |
412 |
e.printStackTrace(); |
413 |
} |
414 |
if (port != 0) |
415 |
serverName = serverName + " (port " + registryPort + ")"; |
416 |
runBenchmark(); |
417 |
} |
418 |
|
419 |
/** |
420 |
* <br> |
421 |
* This is used when a Server is running inside the Xulu-Client. It has |
422 |
* performance advantages (direct access - no TCP/IP) |
423 |
* |
424 |
* @param configLogging |
425 |
* enable or disable logging |
426 |
* @param clientDataServer a local client for faster access |
427 |
* @throws RemoteException |
428 |
*/ |
429 |
public XuluServer(boolean configLogging, ClientDataServer clientDataServer) |
430 |
throws RemoteException { |
431 |
this(configLogging, true, 0); |
432 |
this.localSPMDClient = clientDataServer; |
433 |
} |
434 |
|
435 |
/** |
436 |
* @param port |
437 |
* the port to which the Server is bound |
438 |
* @throws RemoteException |
439 |
*/ |
440 |
public XuluServer(int port) throws RemoteException { |
441 |
this(true, false, port); |
442 |
} |
443 |
|
444 |
/** |
445 |
* Bind the remote object's stub in the registry. Creates a registry if no |
446 |
* running registry is found. |
447 |
*/ |
448 |
private void bind() throws RemoteException { |
449 |
String name = bindingName; |
450 |
Helper.bind(name, this, registryPort); |
451 |
} |
452 |
|
453 |
private boolean checkConnected(String callingHost) { |
454 |
|
455 |
if (callingHost.equals(connectedClient)) |
456 |
return true; |
457 |
LOG |
458 |
.warn(callingHost |
459 |
+ " has tried to execute a critical operation, but was not connected " |
460 |
+ ". Actual connected client: " + connectedClient); |
461 |
return false; |
462 |
} |
463 |
|
464 |
/** |
465 |
* Only a connected client has access to all functionality. When connecting |
466 |
* or reconnecting, all data of the server and the corresponding |
467 |
* {@link PartitionDataServer} is reset! |
468 |
* |
469 |
* @see appl.parallel.ComputingResource#connect() |
470 |
*/ |
471 |
public synchronized boolean connect() throws RemoteException { |
472 |
/** |
473 |
* determine the calling host. Internal Server requires special |
474 |
* handling: notice that this happens very often in this class, but can |
475 |
* not externalized due to the behavior of #getClientHost(); |
476 |
*/ |
477 |
String callingHost = "error"; |
478 |
try { |
479 |
callingHost = isInternalServer ? "localhost" : getClientHost(); |
480 |
} catch (ServerNotActiveException e) { |
481 |
e.printStackTrace(); |
482 |
} |
483 |
/** ********************************************************************** */ |
484 |
|
485 |
if (connectedClient == null || callingHost.equals(connectedClient)) { |
486 |
if (connectedClient == null) |
487 |
LOG.info("Client " + callingHost + " connected!"); |
488 |
else |
489 |
LOG.info("Client " + connectedClient + " reconnected! "); |
490 |
// reset data |
491 |
reset(); |
492 |
connectedClient = callingHost; |
493 |
// initalize event system & chainsaw |
494 |
initEventSystem(); |
495 |
initChainsaw(connectedClient); |
496 |
return true; |
497 |
} else if (connectedClient != null) { |
498 |
LOG.warn(callingHost + " has tried to " |
499 |
+ "connect to server, but was not connected, " |
500 |
+ "because this server is in use by " + connectedClient); |
501 |
} |
502 |
|
503 |
return false; |
504 |
} |
505 |
|
506 |
/* |
507 |
* (non-Javadoc) |
508 |
* |
509 |
* @see appl.parallel.server.ComputingResource#createDataServer() |
510 |
*/ |
511 |
public PartitionDataServer createDataServer(String[] IPs) |
512 |
throws RemoteException { |
513 |
/** |
514 |
* determine the calling host. Internal Server requires special |
515 |
* handling: notice that this happens very often in this class, but can |
516 |
* not externalized due to the behavior of #getClientHost(); |
517 |
*/ |
518 |
String callingHost = "localhost"; |
519 |
try { |
520 |
callingHost = isInternalServer ? "localhost" : getClientHost(); |
521 |
} catch (ServerNotActiveException e) { |
522 |
e.printStackTrace(); |
523 |
} |
524 |
/** ********************************************************************** */ |
525 |
if (checkConnected(callingHost)) { |
526 |
if (isInternalServer) |
527 |
dataManager = new PartitionDataManager(IPs, |
528 |
remoteEventReceiver, getRegistryPort(), localSPMDClient); |
529 |
else |
530 |
dataManager = new PartitionDataManager(IPs, |
531 |
remoteEventReceiver, getRegistryPort()); |
532 |
return dataManager; |
533 |
} |
534 |
return null; |
535 |
} |
536 |
|
537 |
/* (non-Javadoc) |
538 |
* @see appl.parallel.server.ComputingResource#disconnect() |
539 |
*/ |
540 |
public synchronized void disconnect() throws RemoteException { |
541 |
/** |
542 |
* determine the calling host. Internal Server requires special |
543 |
* handling: notice that this happens very often in this class, but can |
544 |
* not externalized due to the behavior of #getClientHost(); |
545 |
*/ |
546 |
String callingHost = "localhost"; |
547 |
try { |
548 |
callingHost = isInternalServer ? "localhost" : getClientHost(); |
549 |
} catch (ServerNotActiveException e) { |
550 |
e.printStackTrace(); |
551 |
} |
552 |
/** ********************************************************************** */ |
553 |
if (checkConnected(callingHost)) { |
554 |
reset(); |
555 |
LOG.info("Client " + connectedClient |
556 |
+ " successfully disconnected from Server"); |
557 |
} |
558 |
} |
559 |
|
560 |
/** |
561 |
* @return a rating for the machine as discovered by the benchmark |
562 |
*/ |
563 |
public int getRating() { |
564 |
return machineRating; |
565 |
} |
566 |
|
567 |
/** |
568 |
* @return the registry port the server is bound to |
569 |
*/ |
570 |
public int getRegistryPort() { |
571 |
return registryPort; |
572 |
} |
573 |
|
574 |
/* |
575 |
* (non-Javadoc) |
576 |
* |
577 |
* @see appl.parallel.server.XuluServer#getResourceInformation() |
578 |
*/ |
579 |
public ComputingResourceProperties getResourceInformation() |
580 |
throws RemoteException { |
581 |
return new XuluServerProperties(this, serverName); |
582 |
} |
583 |
|
584 |
/** |
585 |
* @return the time in s the server is running |
586 |
*/ |
587 |
int getUptime() { |
588 |
return (int) (System.currentTimeMillis() - starttime); |
589 |
} |
590 |
|
591 |
private void initChainsaw(String connectedClient) { |
592 |
if (chainsaw && connectedClient != null) { |
593 |
chainsawAppender = new SocketAppender(connectedClient, 4445); |
594 |
BasicConfigurator.resetConfiguration(); |
595 |
BasicConfigurator.configure(chainsawAppender); |
596 |
} |
597 |
} |
598 |
|
599 |
/** |
600 |
* inits task priority, ports and loglevel |
601 |
*/ |
602 |
private void initConfiguration() { |
603 |
XuluConfig config = XuluConfig.getXuluConfig(); |
604 |
// a lower priority will run the server more in the background |
605 |
taskPriority = config.getIntProperty("XuluServer.priority"); |
606 |
|
607 |
switch (taskPriority) { |
608 |
default: |
609 |
taskPriority = Thread.NORM_PRIORITY; |
610 |
break; |
611 |
case 1: |
612 |
taskPriority = Thread.MIN_PRIORITY; |
613 |
break; |
614 |
case 2: |
615 |
taskPriority = Thread.NORM_PRIORITY - 1; |
616 |
break; |
617 |
case 4: |
618 |
taskPriority = Thread.NORM_PRIORITY + 1; |
619 |
break; |
620 |
case 5: |
621 |
taskPriority = Thread.MAX_PRIORITY; |
622 |
break; |
623 |
} |
624 |
|
625 |
availableProcessors = Runtime.getRuntime().availableProcessors(); |
626 |
// default: try to use one thread per processor |
627 |
toUseThreads = availableProcessors; |
628 |
// check if there is an entry in the XuluConfig saying how many threads |
629 |
// should be created. |
630 |
// (more threads than processors may be useful for the use of |
631 |
// hyperthreading) |
632 |
String configProz = config.getProperty("XuluServer.useThreads"); |
633 |
if ((configProz != null) && !configProz.equals("max")) { |
634 |
toUseThreads = config.getIntProperty("XuluServer.useThreads"); |
635 |
// error checking: |
636 |
if (toUseThreads < 1 || toUseThreads > 128){ |
637 |
toUseThreads = availableProcessors; |
638 |
System.out |
639 |
.println("Error: Number of threads must be between 1 and 128!"); |
640 |
} |
641 |
} |
642 |
// init correspondig arrays |
643 |
resultCalls = new Future[toUseThreads]; |
644 |
|
645 |
int port = config.getIntProperty("XuluServer.registryport"); |
646 |
if (port != 0) |
647 |
registryPort = port; |
648 |
|
649 |
useCodeDownloading = config |
650 |
.getBooleanProperty("XuluServer.useCodeDownloading"); |
651 |
if (useCodeDownloading) |
652 |
System.out.println("Code downloading is enabled"); |
653 |
else |
654 |
System.out.println("Code downloading is disabled"); |
655 |
|
656 |
int multicastport = config.getIntProperty("XuluServer.multicastport"); |
657 |
if (multicastport != 0) |
658 |
multicastPort = multicastport; |
659 |
String IP = config.getProperty("XuluServer.multicastgroup"); |
660 |
if (IP != null) |
661 |
multiCastIP = IP; |
662 |
try { |
663 |
multiCastGroup = InetAddress.getByName(multiCastIP); |
664 |
} catch (UnknownHostException e) { |
665 |
e.printStackTrace(); |
666 |
} |
667 |
if (loggingEnabled) { |
668 |
|
669 |
String mode = config.getProperty("XuluServer.log4j.mode"); |
670 |
if (mode != null) |
671 |
log4jConfig(mode); |
672 |
} |
673 |
String level = config.getProperty("XuluServer.log4j.logLevel"); |
674 |
if (level != null) |
675 |
log4jLevel(level.toLowerCase()); |
676 |
else |
677 |
log4jLevel("info"); |
678 |
} |
679 |
|
680 |
/** |
681 |
* Connects to the RemoteEventReceiver at the calling client |
682 |
*/ |
683 |
private void initEventSystem() throws RemoteException { |
684 |
String lookupName = "rmi://" + connectedClient + "/RemoteEventReceiver"; |
685 |
try { |
686 |
int delay = XuluConfig.getXuluConfig().getIntProperty( |
687 |
"XuluServer.eventDelay"); |
688 |
System.out.println("looking up " + lookupName); |
689 |
CommEventSink eventSink = (CommEventSink) Naming.lookup(lookupName); |
690 |
if (remoteEventReceiver != null) |
691 |
remoteEventReceiver.stopService(); |
692 |
remoteEventReceiver = new RemoteEventProxy(eventSink, delay); |
693 |
remoteEventReceiver.startService(); |
694 |
} catch (MalformedURLException e) { |
695 |
e.printStackTrace(); |
696 |
} catch (NotBoundException e) { |
697 |
LOG.warn("Could not find the remote event receiver at " |
698 |
+ lookupName, e); |
699 |
System.out.println(e.getMessage()); |
700 |
remoteEventReceiver = new RemoteEventProxy(null, 0); |
701 |
} |
702 |
} |
703 |
|
704 |
/** |
705 |
* inits the multicast unit |
706 |
*/ |
707 |
private void initializeMultiCasting() { |
708 |
try { |
709 |
MulticastSocket s = new MulticastSocket(multicastPort); |
710 |
s.joinGroup(multiCastGroup); |
711 |
// send hello message |
712 |
String IP = XuluConfig.getXuluConfig().getProperty( |
713 |
"XuluServer.multicastgroup"); |
714 |
DatagramPacket answer = new DatagramPacket(helloFromServerMessage |
715 |
.getBytes(), helloFromServerMessage.length(), InetAddress |
716 |
.getByName(IP), multicastPort); |
717 |
s.send(answer); |
718 |
// start thread that waits on client messages |
719 |
multicastThread = new ServerMulticastReceiver(s); |
720 |
multicastThread.start(); |
721 |
} catch (IOException e) { |
722 |
// TODO Auto-generated catch block |
723 |
LOG.warn("Could not initalize multicasting! Reason was " |
724 |
+ e.getMessage(), e); |
725 |
|
726 |
} |
727 |
} |
728 |
|
729 |
/** |
730 |
* @return whether this server is available or in use by another client |
731 |
* @see appl.parallel.ComputingResource#isAvailable() |
732 |
*/ |
733 |
public boolean isAvailable() throws RemoteException { |
734 |
/** |
735 |
* determine the calling host. Internal Server requires special |
736 |
* handling: notice that this happens very often in this class, but can |
737 |
* not externalized due to the behavior of #getClientHost(); |
738 |
*/ |
739 |
String callingHost = "localhost"; |
740 |
try { |
741 |
callingHost = isInternalServer ? "localhost" : getClientHost(); |
742 |
} catch (ServerNotActiveException e) { |
743 |
e.printStackTrace(); |
744 |
} |
745 |
/** ********************************************************************** */ |
746 |
if (callingHost.equals(connectedClient)) |
747 |
return true; |
748 |
|
749 |
if (connectedClient == null) |
750 |
return true; |
751 |
return false; |
752 |
} |
753 |
|
754 |
/** |
755 |
* @return if a client is connected |
756 |
*/ |
757 |
public boolean isClientConnected() { |
758 |
return connected; |
759 |
|
760 |
} |
761 |
|
762 |
/** |
763 |
* Loads a {@link SPMDTask} from the specified URL and returns an array of |
764 |
* instances of this class. There are so many instances, as there are |
765 |
* available processors on this machine!. This class also makes sure that |
766 |
* this happens only one time per connection (else this would of course be a |
767 |
* performance killer in fine grained parallelization). |
768 |
* |
769 |
* @param location |
770 |
* the URL of the rootDirectory/networkLocation |
771 |
* @param spmdTaskClassName |
772 |
* the name of the SPMDTask for which the newest class should be |
773 |
* found |
774 |
* @return the instance of the class or <code>null</code> if loading fails |
775 |
*/ |
776 |
private SPMDTask[] loadSPMDTasksfromURL(String location, |
777 |
String spmdTaskClassName) { |
778 |
|
779 |
SPMDTask[] spmdClassInstances = (SPMDTask[]) spmdTaskInstances |
780 |
.get(spmdTaskClassName); |
781 |
|
782 |
// if already loaded for this connection simply return the instances |
783 |
if (spmdClassInstances != null) |
784 |
return (SPMDTask[]) spmdClassInstances; |
785 |
spmdClassInstances = new SPMDTask[toUseThreads]; |
786 |
Class spmdClass = null; |
787 |
if (useCodeDownloading) { |
788 |
try { |
789 |
// lookup client and make sure that the newest class of the task |
790 |
// is instantiated. Will avoid loading older cached or local |
791 |
// filesystem version |
792 |
URL[] locations = { new URL("http://" + location + "/") }; |
793 |
PreferredClassLoader classLoader = new PreferredClassLoader( |
794 |
locations, Thread.currentThread() |
795 |
.getContextClassLoader(), null, false); |
796 |
LOG.debug("Try to load " + spmdTaskClassName + " from " |
797 |
+ location); |
798 |
spmdClass = classLoader.loadClass(spmdTaskClassName); |
799 |
} catch (Exception e) { |
800 |
LOG.error("Could not load class for " + spmdTaskClassName |
801 |
+ " from " + location, e); |
802 |
e.printStackTrace(); |
803 |
} |
804 |
} |
805 |
// if retrieval failed for some reason or code downloading disabled |
806 |
// then use the local class |
807 |
try { |
808 |
if (spmdClass == null) { |
809 |
spmdClass = Class.forName(spmdTaskClassName); |
810 |
} |
811 |
|
812 |
// instantiate the classes |
813 |
for (int i = 0; i < spmdClassInstances.length; i++) { |
814 |
Object task = spmdClass.newInstance(); |
815 |
if (!(task instanceof SPMDTask)) { |
816 |
LOG.fatal(spmdTaskClassName |
817 |
+ " was not an instance of SPMDTask!"); |
818 |
throw new UnsupportedOperationException(spmdTaskClassName |
819 |
+ " was not an instance of SPMDTask!"); |
820 |
} |
821 |
spmdClassInstances[i] = (SPMDTask) task; |
822 |
} |
823 |
|
824 |
spmdTaskInstances.put(spmdTaskClassName, spmdClassInstances); |
825 |
return spmdClassInstances; |
826 |
|
827 |
} catch (ClassNotFoundException e) { |
828 |
// TODO Auto-generated catch block |
829 |
e.printStackTrace(); |
830 |
} catch (InstantiationException e) { |
831 |
// TODO Auto-generated catch block |
832 |
e.printStackTrace(); |
833 |
} catch (IllegalAccessException e) { |
834 |
// TODO Auto-generated catch block |
835 |
e.printStackTrace(); |
836 |
} |
837 |
throw new UnsupportedOperationException("Loading of class " |
838 |
+ spmdTaskClassName |
839 |
+ " failed on serverside! Execution canceled"); |
840 |
} |
841 |
|
842 |
/* |
843 |
* (non-Javadoc) |
844 |
* |
845 |
* @see appl.parallel.server.XuluServer#ping(java.lang.Object[]) |
846 |
*/ |
847 |
public Object ping(Object... o) throws RemoteException { |
848 |
System.out.println("received ping"); |
849 |
return o; |
850 |
} |
851 |
|
852 |
/** |
853 |
* deletes all client-specific data data, so that a newly connected client |
854 |
* can use a clean system |
855 |
*/ |
856 |
private void reset() { |
857 |
if (dataManager != null) |
858 |
dataManager.stop(); |
859 |
dataManager = null; |
860 |
connectedClient = null; |
861 |
spmdTaskInstances.clear(); |
862 |
} |
863 |
|
864 |
/** |
865 |
* runs the benchmark defined in XuluServer.benchmarkclass |
866 |
*/ |
867 |
private void runBenchmark() { |
868 |
// see if the benchmark should be run |
869 |
|
870 |
boolean runbench = XuluConfig.getXuluConfig().getBooleanProperty( |
871 |
"XuluServer.runbench"); |
872 |
if (runbench) { |
873 |
String classname = XuluConfig.getXuluConfig().getProperty( |
874 |
"XuluServer.benchmarkclass"); |
875 |
if (classname != null) { |
876 |
try { |
877 |
// getBenchmarkClass |
878 |
Benchmark bench = (Benchmark) Class.forName(classname) |
879 |
.newInstance(); |
880 |
machineRating = bench.bench(); |
881 |
System.out |
882 |
.println("Successfully run benchmark. Machine Rating is " |
883 |
+ machineRating); |
884 |
} catch (Exception e) { |
885 |
System.err |
886 |
.println("Could not load Benchmarkclass " |
887 |
+ classname |
888 |
+ ". Setting rating to 0 (which means average). Reason was: " |
889 |
+ e.getMessage()); |
890 |
machineRating = 0; |
891 |
} |
892 |
} else |
893 |
System.out.println("Benchmark disabled"); |
894 |
} |
895 |
} |
896 |
|
897 |
/** |
898 |
* Every time {@link #runSPMDModelTask(String, int, Object[])} is called the |
899 |
* newest class is loaded from the client. For performance reasons this will |
900 |
* happen only one time per connection. |
901 |
* |
902 |
* @see appl.parallel.server.SPMDResource#runSPMDModelTask(String, int, |
903 |
* Object[]) |
904 |
*/ |
905 |
@SuppressWarnings("unchecked") |
906 |
public Object[] runSPMDModelTask(String spmdTaskName, int referenceID, |
907 |
Object... parameters) throws RemoteException { |
908 |
/** |
909 |
* determine the calling host. Internal Server requires special |
910 |
* handling: notice that this happens very often in this class, but can |
911 |
* not externalized due to the behavior of #getClientHost(); |
912 |
*/ |
913 |
String callingHost = "localhost"; |
914 |
try { |
915 |
callingHost = isInternalServer ? "localhost" : getClientHost(); |
916 |
} catch (ServerNotActiveException e) { |
917 |
e.printStackTrace(); |
918 |
} |
919 |
/** ********************************************************************** */ |
920 |
Thread.currentThread().setPriority(taskPriority); |
921 |
String clientIP = "<unknown Client>"; |
922 |
long time = System.nanoTime(); |
923 |
|
924 |
clientIP = callingHost; |
925 |
if (!clientIP.equals(connectedClient)) { |
926 |
LOG.warn(clientIP |
927 |
+ " has tried to run a Task, but was not connected. " |
928 |
+ "This server is in use by " + connectedClient); |
929 |
throw new RemoteException( |
930 |
"Run of SPMDTask failed! This server is in use by " |
931 |
+ connectedClient); |
932 |
} |
933 |
if (LOG.isDebugEnabled()) |
934 |
LOG.debug("received task from " + clientIP |
935 |
+ ". Starting evaluation now..."); |
936 |
// if no datamanager is found: exit |
937 |
if (dataManager == null) |
938 |
throw new UnsupportedOperationException( |
939 |
"No DataManager found. Create one first using 'createDataManager' on this Server"); |
940 |
|
941 |
// lookup newest version of the class from the server at the url |
942 |
// (or from local disk if remote class loading is disabled) |
943 |
SPMDTask[] tasks = loadSPMDTasksfromURL(clientIP, spmdTaskName); |
944 |
|
945 |
// The number of threads allowed for execution can be different |
946 |
// from the number of user assigned threads |
947 |
// because tasks may not support multithreadeding |
948 |
int allowedThreads = 1; |
949 |
if (tasks[0].supportsMultiThreading()) |
950 |
allowedThreads = toUseThreads; |
951 |
|
952 |
Object results[] = new Object[allowedThreads]; |
953 |
|
954 |
// the following loop is only executed the first time a task is used! |
955 |
if (!tasks[0].isInitialized()) { |
956 |
if (LOG.isInfoEnabled()) |
957 |
LOG.info("Trying to use " + allowedThreads |
958 |
+ " threads for execution of " |
959 |
+ tasks[0].getClass().getName()); |
960 |
// calculate splitting for multiple threads: |
961 |
int splitMapPos = dataManager.getInfo(referenceID).getSplitMapPos(); |
962 |
SplitMap splitMap = dataManager.getInfo(referenceID).getSplitMap(); |
963 |
SplitMap1DVertical map1DVertical = new SplitMap1DVertical(splitMap |
964 |
.getPartitionCalculationBounds(splitMapPos).width, splitMap |
965 |
.getPartitionCalculationBounds(splitMapPos).height, 0, |
966 |
allowedThreads, NeighborhoodBoxingMode.outBoxing); |
967 |
|
968 |
// create controllers and init the tasks |
969 |
for (int i = 0; i < allowedThreads; i++) { |
970 |
AdvancedSPMDServerController serverController = new AdvancedSPMDServerController( |
971 |
dataManager, referenceID); |
972 |
// set first thread as master thread |
973 |
serverController.setMasterThread(i == 0); |
974 |
// now assign the calculation area. Because we have assumed that |
975 |
// the (Thread)-calculation |
976 |
// area begins at (0,0) we must move it relative to the real |
977 |
// local calculation area |
978 |
Rectangle calculationBounds = map1DVertical |
979 |
.getPartitionCalculationBounds(i); |
980 |
int absolutX = (int) (serverController.getLocalCalcMinX() + calculationBounds |
981 |
.getMinX()); |
982 |
int absolutY = (int) (serverController.getLocalCalcMinY() + calculationBounds |
983 |
.getMinY()); |
984 |
calculationBounds.setLocation(absolutX, absolutY); |
985 |
serverController.setLocalCalculationBounds(calculationBounds); |
986 |
tasks[i].setSPMDServerController(serverController); |
987 |
tasks[i].initialize(); |
988 |
} |
989 |
} |
990 |
if (allowedThreads == 1) // only for saving resources: no new thread |
991 |
{ // if only one thread is used |
992 |
Object executionResult = tasks[0].run(parameters); |
993 |
results = new Object[] { executionResult }; |
994 |
} else { |
995 |
// for more than 1 processor execute the multiple tasks in separate |
996 |
// threads |
997 |
for (int i = 0; i < allowedThreads; i++) { |
998 |
resultCalls[i] = executor.submit(new SimpleCallThread(tasks[i], |
999 |
parameters)); |
1000 |
} |
1001 |
// wait for the Threads to finish |
1002 |
for (int i = 0; i < allowedThreads; i++) { |
1003 |
try { |
1004 |
results[i] = resultCalls[i].get(); |
1005 |
} catch (InterruptedException e) { |
1006 |
// TODO Auto-generated catch block |
1007 |
e.printStackTrace(); |
1008 |
} catch (ExecutionException e) { |
1009 |
// TODO Auto-generated catch block |
1010 |
e.printStackTrace(); |
1011 |
} |
1012 |
} |
1013 |
} |
1014 |
// generate a event for the execution |
1015 |
if (remoteEventReceiver.isTimeMonitoringEnabled()) |
1016 |
remoteEventReceiver.fireRemoteEvent(new TimeEvent((System |
1017 |
.nanoTime() - time), serverName, "DataServer", |
1018 |
CommType.REMOTE_EXECUTION)); |
1019 |
return results; |
1020 |
} |
1021 |
|
1022 |
/** |
1023 |
* This method frees all resources.<br> |
1024 |
* Notice that this method is not remotely accessible! |
1025 |
*/ |
1026 |
public void stopServer() { |
1027 |
unbind(); |
1028 |
if (dataManager != null) |
1029 |
dataManager.stop(); |
1030 |
dataManager = null; |
1031 |
if (multicastThread != null) |
1032 |
multicastThread.stopThread(); |
1033 |
} |
1034 |
} |