1 |
package appl.parallel.server; |
2 |
|
3 |
import java.awt.Rectangle; |
4 |
import java.io.IOException; |
5 |
import java.net.DatagramPacket; |
6 |
import java.net.InetAddress; |
7 |
import java.net.MalformedURLException; |
8 |
import java.net.MulticastSocket; |
9 |
import java.net.URL; |
10 |
import java.net.UnknownHostException; |
11 |
import java.rmi.Naming; |
12 |
import java.rmi.NotBoundException; |
13 |
import java.rmi.RMISecurityManager; |
14 |
import java.rmi.Remote; |
15 |
import java.rmi.RemoteException; |
16 |
import java.rmi.server.ServerNotActiveException; |
17 |
import java.rmi.server.UnicastRemoteObject; |
18 |
import java.util.HashMap; |
19 |
import java.util.concurrent.Callable; |
20 |
import java.util.concurrent.ExecutionException; |
21 |
import java.util.concurrent.ExecutorService; |
22 |
import java.util.concurrent.Executors; |
23 |
import java.util.concurrent.Future; |
24 |
|
25 |
import net.jini.loader.pref.PreferredClassLoader; |
26 |
|
27 |
import org.apache.log4j.BasicConfigurator; |
28 |
import org.apache.log4j.Level; |
29 |
import org.apache.log4j.LogManager; |
30 |
import org.apache.log4j.Logger; |
31 |
import org.apache.log4j.PropertyConfigurator; |
32 |
import org.apache.log4j.net.SocketAppender; |
33 |
|
34 |
import appl.ext.XuluConfig; |
35 |
import appl.parallel.ComputingResourceProperties; |
36 |
import appl.parallel.client.ClientDataServer; |
37 |
import appl.parallel.event.CommEventSink; |
38 |
import appl.parallel.event.TimeEvent; |
39 |
import appl.parallel.event.CommEvent.CommType; |
40 |
import appl.parallel.services.RemoteEventProxy; |
41 |
import appl.parallel.spmd.AdvancedSPMDServerController; |
42 |
import appl.parallel.spmd.SPMDTask; |
43 |
import appl.parallel.spmd.split.SplitMap; |
44 |
import appl.parallel.spmd.split.SplitMap1DVertical; |
45 |
import appl.parallel.spmd.split.AbstractSplitMap.NeighborhoodBoxingMode; |
46 |
import appl.parallel.util.Helper; |
47 |
import appl.util.benchmark.Benchmark; |
48 |
|
49 |
/** |
50 |
* The Server manages the remote execution of program code. It may be started |
51 |
* using the {@link #main(String[])} method.<br> |
52 |
* The Server reads the following properties from {@link XuluConfig} - the |
53 |
* values given should be the defaults.<br> |
54 |
* <table border="1"> |
55 |
* <tr> |
56 |
* <th>Property</th> |
57 |
* <th>Default</th> |
58 |
* <th>Desc</th> |
59 |
* </tr> |
60 |
* <tr> |
61 |
* <td>XuluServer.useCodeDownloading</td> |
62 |
* <td>false</td> |
63 |
* <td>if true, the server tries to download task class files from a |
64 |
* HTTP-server at the client-url</td> |
65 |
* </tr> |
66 |
* <tr> |
67 |
* <td>XuluServer.registryPort</td> |
68 |
* <td>1099</td> |
69 |
* <td>The port at which a new registry is created, if no running registry was |
70 |
* found</td> |
71 |
* </tr> |
72 |
* <tr> |
73 |
* <td>XuluServer.multicastgroup</td> |
74 |
* <td>239.1.1.1</td> |
75 |
* <td>The default multicast group</td> |
76 |
* </tr> |
77 |
* <tr> |
78 |
* <td> XuluServer.multicastport</td> |
79 |
* <td>10000</td> |
80 |
* <td>The default multicast port</td> |
81 |
* </tr> |
82 |
* <tr> |
83 |
* <td>XuluServer.eventDelay</td> |
84 |
* <td>50</td> |
85 |
* <td>Events generated on server side may be delayed to collect multiple |
86 |
* Events and send them at once (this saves communication time).Time is given in |
87 |
* milliseconds</td> |
88 |
* </tr> |
89 |
* <tr> |
90 |
* <td>XuluServer.log4j.logLevel</td> |
91 |
* <td>info</td> |
92 |
* <td>the level can be '<i>debug</i>','<i>info</i>','<i>warn</i>','<i>error</i>' |
93 |
* or '<i>fatal</i>' and can be overridden by commandline - Parameters<br> |
94 |
* <br> |
95 |
* </td> |
96 |
* </tr> |
97 |
* <td>XuluServer.log4j.mode</td> |
98 |
* <td>console</td> |
99 |
* <td>the mode can be '<i>file</i>','<i>chainsaw</i>' or '<i>console</i>'. |
100 |
* File means that the configurationfile xuluserverlog4j.cfg is read. <br> |
101 |
* <i>'chainsaw'</i> means that the server tries to configure the logger each |
102 |
* time a new client for a chainsaw instance running at the client ip |
103 |
* (standardports) <br> |
104 |
* </td> |
105 |
* </tr> |
106 |
* <td>XuluServer.benchmarkclass</td> |
107 |
* <td>appl.util.benchmark.SimpleBenchmark</td> |
108 |
* <td>The Benchmark to get a rating for this machine. Notice that you can |
109 |
* override the rating with the -rating parameter</td> |
110 |
* </tr> |
111 |
* <tr> |
112 |
* <td>XuluServer.useThreads</td> |
113 |
* <td>max</td> |
114 |
* <td>The number of threads to be used by the server for tasks that support |
115 |
* multi-threading. Per default one thread is created for every available |
116 |
* Processors (value='max'). More threads than processors may be useful for |
117 |
* testing tasks which support multihreading or for processors which support |
118 |
* hyperthreading.</td> |
119 |
* </tr> |
120 |
* <tr> |
121 |
* <td>XuluServer.runbench</td> |
122 |
* <td>true</td> |
123 |
* <td>the benchmark is run automatically</td> |
124 |
* </tr> |
125 |
* </table> <br> |
126 |
* <b>Multicasting:</b> <br> |
127 |
* If multicasting does not work, try to disable your firewall or configure it |
128 |
* to allow multicast UDP packages. Notice also, that multicasting may not work |
129 |
* in virtual machines like VMWare or MS Virtual PC 2004/7. <br> |
130 |
* <br> |
131 |
* <b>Assumptions</b> |
132 |
* Only one client is connected to the server (and can run Tasks) <br> |
133 |
*<br> |
134 |
* See {@link #main(String[])} for details about command line parameters |
135 |
* @see ServerMulticastReceiver |
136 |
* @author Dominik Appl |
137 |
*/ |
138 |
public class XuluServer extends UnicastRemoteObject implements SPMDResource { |
139 |
|
140 |
/** |
141 |
* This Thread is used when multiple threads should be executed simultaneous during multithreading |
142 |
* |
143 |
* @author Dominik Appl |
144 |
*/ |
145 |
class SimpleCallThread implements Callable { |
146 |
|
147 |
private final SPMDTask task; |
148 |
|
149 |
private final Object[] parameters; |
150 |
|
151 |
/** |
152 |
* @param task the task to be executed |
153 |
* @param parameters the parameters for the run method of the task |
154 |
*/ |
155 |
public SimpleCallThread(SPMDTask task, Object... parameters) { |
156 |
this.task = task; |
157 |
this.parameters = parameters; |
158 |
} |
159 |
|
160 |
/* |
161 |
* (non-Javadoc) |
162 |
* |
163 |
* @see java.util.concurrent.Callable#call() |
164 |
*/ |
165 |
public Object call() throws Exception { |
166 |
return task.run(parameters); |
167 |
} |
168 |
} |
169 |
|
170 |
private static final Logger LOG = LogManager |
171 |
.getLogger("appl.parallel.server.XuluServer"); |
172 |
|
173 |
private static String bindingName = "XuluServer"; |
174 |
|
175 |
private static final String helloFromServerMessage = "Hello Xulu from XuluServer"; |
176 |
|
177 |
private static String log4jfilename = "xuluserverlog4j.cfg"; |
178 |
|
179 |
private static boolean chainsaw = false; |
180 |
|
181 |
private static SocketAppender chainsawAppender; |
182 |
|
183 |
/** |
184 |
* @param mode the mode of the execution |
185 |
*/ |
186 |
private static void log4jConfig(String mode) { |
187 |
if (mode.equals("chainsaw")) { |
188 |
BasicConfigurator.resetConfiguration(); |
189 |
chainsawAppender = new SocketAppender(); |
190 |
BasicConfigurator.configure(chainsawAppender); |
191 |
System.out.println("Server configured for chainsaw."); |
192 |
chainsaw = true; |
193 |
} else if (mode.equals("file")) { |
194 |
BasicConfigurator.resetConfiguration(); |
195 |
PropertyConfigurator.configure(log4jfilename); |
196 |
} else if (mode.equals("console")) { |
197 |
BasicConfigurator.resetConfiguration(); |
198 |
BasicConfigurator.configure(); |
199 |
} else { |
200 |
System.err.println("Invalid log4j-Config parameter found: " + mode); |
201 |
printInfoMessage(); |
202 |
System.exit(0); |
203 |
} |
204 |
} |
205 |
|
206 |
/** |
207 |
* configures the log4j level |
208 |
* |
209 |
* @see #main(String[]) |
210 |
*/ |
211 |
private static void log4jLevel(String level) { |
212 |
if (level.equals("info")) { |
213 |
LOG.setLevel(Level.INFO); |
214 |
} else if (level.equals("debug")) { |
215 |
LOG.setLevel(Level.DEBUG); |
216 |
} else if (level.equals("error")) { |
217 |
LOG.setLevel(Level.ERROR); |
218 |
} else if (level.equals("fatal")) { |
219 |
LOG.setLevel(Level.FATAL); |
220 |
} else if (level.equals("warn")) { |
221 |
LOG.setLevel(Level.WARN); |
222 |
} else if (level.equals("off")) { |
223 |
LOG.setLevel(Level.OFF); |
224 |
} else { |
225 |
System.err.println("Invalid log4j-Level found: " + level); |
226 |
printInfoMessage(); |
227 |
System.exit(0); |
228 |
} |
229 |
System.out.println("Setting logLevel to " + level); |
230 |
} |
231 |
|
232 |
/** |
233 |
* Starts the server. |
234 |
* |
235 |
* @param args |
236 |
* The following parameters are valid: <br> |
237 |
* <br> |
238 |
* -loglevel:<code>LEVEL</code> - where <code>LEVEL</code> is '<i>debug</i>','<i>info</i>' |
239 |
* (DEFAULT),'<i>warn</i>','<i>error</i>' or '<i>fatal</i>' <br> |
240 |
* <br> |
241 |
* -logconfig:<code>CONFIG</code> - where <code>CONFIG</code> is '<i>file</i>','<i>chainsaw</i>' or '<i>console</i>' |
242 |
* <li>'chainsaw' means that the server tries to configure the logger each time a |
243 |
* new client for a chainsaw instance running at the client ip |
244 |
* (standardports) </li><br> |
245 |
* <li>'<i>file</i>' means that the file '<i>xuluserverlog4j.cfg</i>' |
246 |
* in the home-directory is used for configuration <br> </li> |
247 |
* <li>'<i>console</i>' |
248 |
* means of course, that the console is used for output <br> </li> |
249 |
* <br> |
250 |
* -port:PORTNUMBER - binds the server to the specified port (default 1099) |
251 |
* |
252 |
|
253 |
*/ |
254 |
public static void main(String[] args) { |
255 |
// parse arguments |
256 |
int serverPort = 0; |
257 |
for (String arg : args) { |
258 |
if (arg.toLowerCase().startsWith("-loglevel:")) |
259 |
log4jLevel(arg.substring(10).toLowerCase()); |
260 |
else if (arg.toLowerCase().startsWith("-logconfig:")) |
261 |
log4jConfig(arg.substring(11).toLowerCase()); |
262 |
else if (arg.toLowerCase().startsWith("-port:")) |
263 |
serverPort = Integer.valueOf(arg.substring(6)); |
264 |
else { |
265 |
System.err.println("Invalid Argument found: " + arg); |
266 |
printInfoMessage(); |
267 |
System.exit(0); |
268 |
} |
269 |
} |
270 |
|
271 |
if (System.getSecurityManager() == null) { |
272 |
System.setSecurityManager(new RMISecurityManager()); |
273 |
} |
274 |
Remote server; |
275 |
try { |
276 |
server = new XuluServer(serverPort); |
277 |
System.out.println("Xulu Server is up and running!"); |
278 |
} catch (RemoteException e) { |
279 |
System.err.println("Xulu Server could not start!"); |
280 |
e.printStackTrace(); |
281 |
} |
282 |
} |
283 |
|
284 |
private static void printInfoMessage() { |
285 |
System.out |
286 |
.println("The following arguments are valid: \n" |
287 |
+ "-loglevel:<LEVEL> \n" |
288 |
+ " where <LEVEL> is 'off','debug','info' (DEFAULT),'warn','error' or 'fatal'\n\n" |
289 |
+ "-logconfig:<CONFIG> \n" |
290 |
+ " where <CONFIG> is 'file','chainsaw' or 'console'(DEFAULT) \n" |
291 |
+ " 'chainsaw' means that the server tries to configure the logger each\n" |
292 |
+ " time a new client for a chainsaw instance running at the client ip\n" |
293 |
+ " (standardports) \n" |
294 |
+ " 'file' means that the file 'xuluserverlog4j.cfg' in the home-directory " |
295 |
+ " is used for configuration \n" |
296 |
+ " 'console' means of course, that the console is used for output" |
297 |
+ "-port:<portnumber> \n" |
298 |
+ " The Server is bound to the specified port"); |
299 |
} |
300 |
|
301 |
private static void unbind() { |
302 |
Helper.unbind(bindingName); |
303 |
} |
304 |
|
305 |
/** |
306 |
* The {@link Class} objects of the currently active connection. Every time |
307 |
* {@link #runSPMDModelTask(String, int, Object[])} is called the |
308 |
* newest class is loaded from the client. For performance reasons this will |
309 |
* happen only one time per connection. In this map the current active |
310 |
* classes are stored. |
311 |
*/ |
312 |
private HashMap<String, SPMDTask[]> spmdTaskInstances; |
313 |
|
314 |
/* standard is should be 1099 */ |
315 |
private int registryPort = 1099; |
316 |
|
317 |
private int multicastPort; |
318 |
|
319 |
private String multiCastIP; |
320 |
|
321 |
ServerMulticastReceiver multicastThread; |
322 |
|
323 |
InetAddress multiCastGroup; |
324 |
|
325 |
// says whether a client is connected |
326 |
private boolean connected = false; |
327 |
|
328 |
private long starttime = System.currentTimeMillis(); |
329 |
|
330 |
private String connectedClient; |
331 |
|
332 |
private PartitionDataManager dataManager; |
333 |
|
334 |
private RemoteEventProxy remoteEventReceiver; |
335 |
|
336 |
// User specified name of the Server or hostname |
337 |
private String serverName; |
338 |
|
339 |
private int machineRating = 0; |
340 |
|
341 |
private final boolean loggingEnabled; |
342 |
|
343 |
private int taskPriority = Thread.NORM_PRIORITY; |
344 |
|
345 |
private int availableProcessors = 1; |
346 |
|
347 |
private int toUseThreads = 1; |
348 |
|
349 |
private final boolean isInternalServer; |
350 |
|
351 |
private ClientDataServer localSPMDClient; |
352 |
|
353 |
private boolean useCodeDownloading; |
354 |
|
355 |
private ExecutorService executor; |
356 |
|
357 |
// used for execution results |
358 |
private Future[] resultCalls; |
359 |
|
360 |
/** |
361 |
* Creates a new instance at the standard port |
362 |
* |
363 |
* @throws RemoteException |
364 |
*/ |
365 |
public XuluServer() throws RemoteException { |
366 |
this(true, false, 0); |
367 |
} |
368 |
|
369 |
/** |
370 |
* @param configLogging |
371 |
* enable or disable logging |
372 |
* @param isInternalServer |
373 |
* set to true if you want to make method calls purely local. |
374 |
* This is for performance reasons (see |
375 |
* {@link UnicastRemoteObject#getClientHost()} |
376 |
* @param port |
377 |
* port the server is bound to (0 for default) |
378 |
* @throws RemoteException |
379 |
*/ |
380 |
public XuluServer(boolean configLogging, boolean isInternalServer, int port) |
381 |
throws RemoteException { |
382 |
super(); |
383 |
loggingEnabled = configLogging; |
384 |
this.isInternalServer = isInternalServer; |
385 |
initConfiguration(); |
386 |
//the executor is used for multithreading |
387 |
executor = Executors.newCachedThreadPool(); |
388 |
spmdTaskInstances = new HashMap<String, SPMDTask[]>(10); |
389 |
if (port != 0) |
390 |
this.registryPort = port; |
391 |
bind(); |
392 |
|
393 |
initializeMultiCasting(); |
394 |
serverName = "unknown server"; |
395 |
try { |
396 |
if (isInternalServer) |
397 |
serverName = "internal Server"; |
398 |
else |
399 |
serverName = InetAddress.getLocalHost().getHostName(); |
400 |
} catch (UnknownHostException e) { |
401 |
// TODO Auto-generated catch block |
402 |
e.printStackTrace(); |
403 |
} |
404 |
if (port != 0) |
405 |
serverName = serverName + " (port " + registryPort + ")"; |
406 |
runBenchmark(); |
407 |
} |
408 |
|
409 |
/** |
410 |
* <br> |
411 |
* This is used when a Server is running inside the Xulu-Client. It has |
412 |
* performance advantages (direct access - no TCP/IP) |
413 |
* |
414 |
* @param configLogging |
415 |
* enable or disable logging |
416 |
* @param clientDataServer a local client for faster access |
417 |
* @throws RemoteException |
418 |
*/ |
419 |
public XuluServer(boolean configLogging, ClientDataServer clientDataServer) |
420 |
throws RemoteException { |
421 |
this(configLogging, true, 0); |
422 |
this.localSPMDClient = clientDataServer; |
423 |
} |
424 |
|
425 |
/** |
426 |
* @param port |
427 |
* the port to which the Server is bound |
428 |
* @throws RemoteException |
429 |
*/ |
430 |
public XuluServer(int port) throws RemoteException { |
431 |
this(true, false, port); |
432 |
} |
433 |
|
434 |
/** |
435 |
* Bind the remote object's stub in the registry. Creates a registry if no |
436 |
* running registry is found. |
437 |
*/ |
438 |
private void bind() throws RemoteException { |
439 |
String name = bindingName; |
440 |
Helper.bind(name, this, registryPort); |
441 |
} |
442 |
|
443 |
private boolean checkConnected(String callingHost) { |
444 |
|
445 |
if (callingHost.equals(connectedClient)) |
446 |
return true; |
447 |
LOG |
448 |
.warn(callingHost |
449 |
+ " has tried to execute a critical operation, but was not connected " |
450 |
+ ". Actual connected client: " + connectedClient); |
451 |
return false; |
452 |
} |
453 |
|
454 |
/** |
455 |
* Only a connected client has access to all functionality. When connecting |
456 |
* or reconnecting, all data of the server and the corresponding |
457 |
* {@link PartitionDataServer} is reset! |
458 |
* |
459 |
* @see appl.parallel.ComputingResource#connect() |
460 |
*/ |
461 |
public synchronized boolean connect() throws RemoteException { |
462 |
/** |
463 |
* determine the calling host. Internal Server requires special |
464 |
* handling: notice that this happens very often in this class, but can |
465 |
* not externalized due to the behavior of #getClientHost(); |
466 |
*/ |
467 |
String callingHost = "error"; |
468 |
try { |
469 |
callingHost = isInternalServer ? "localhost" : getClientHost(); |
470 |
} catch (ServerNotActiveException e) { |
471 |
e.printStackTrace(); |
472 |
} |
473 |
/** ********************************************************************** */ |
474 |
|
475 |
if (connectedClient == null || callingHost.equals(connectedClient)) { |
476 |
if (connectedClient == null) |
477 |
LOG.info("Client " + callingHost + " connected!"); |
478 |
else |
479 |
LOG.info("Client " + connectedClient + " reconnected! "); |
480 |
// reset data |
481 |
reset(); |
482 |
connectedClient = callingHost; |
483 |
// initalize event system & chainsaw |
484 |
initEventSystem(); |
485 |
initChainsaw(connectedClient); |
486 |
return true; |
487 |
} else if (connectedClient != null) { |
488 |
LOG.warn(callingHost + " has tried to " |
489 |
+ "connect to server, but was not connected, " |
490 |
+ "because this server is in use by " + connectedClient); |
491 |
} |
492 |
|
493 |
return false; |
494 |
} |
495 |
|
496 |
/* |
497 |
* (non-Javadoc) |
498 |
* |
499 |
* @see appl.parallel.server.ComputingResource#createDataServer() |
500 |
*/ |
501 |
public PartitionDataServer createDataServer(String[] IPs) |
502 |
throws RemoteException { |
503 |
/** |
504 |
* determine the calling host. Internal Server requires special |
505 |
* handling: notice that this happens very often in this class, but can |
506 |
* not externalized due to the behavior of #getClientHost(); |
507 |
*/ |
508 |
String callingHost = "localhost"; |
509 |
try { |
510 |
callingHost = isInternalServer ? "localhost" : getClientHost(); |
511 |
} catch (ServerNotActiveException e) { |
512 |
e.printStackTrace(); |
513 |
} |
514 |
/** ********************************************************************** */ |
515 |
if (checkConnected(callingHost)) { |
516 |
if (isInternalServer) |
517 |
dataManager = new PartitionDataManager(IPs, |
518 |
remoteEventReceiver, getRegistryPort(), localSPMDClient); |
519 |
else |
520 |
dataManager = new PartitionDataManager(IPs, |
521 |
remoteEventReceiver, getRegistryPort()); |
522 |
return dataManager; |
523 |
} |
524 |
return null; |
525 |
} |
526 |
|
527 |
/* (non-Javadoc) |
528 |
* @see appl.parallel.server.ComputingResource#disconnect() |
529 |
*/ |
530 |
public synchronized void disconnect() throws RemoteException { |
531 |
/** |
532 |
* determine the calling host. Internal Server requires special |
533 |
* handling: notice that this happens very often in this class, but can |
534 |
* not externalized due to the behavior of #getClientHost(); |
535 |
*/ |
536 |
String callingHost = "localhost"; |
537 |
try { |
538 |
callingHost = isInternalServer ? "localhost" : getClientHost(); |
539 |
} catch (ServerNotActiveException e) { |
540 |
e.printStackTrace(); |
541 |
} |
542 |
/** ********************************************************************** */ |
543 |
if (checkConnected(callingHost)) { |
544 |
reset(); |
545 |
LOG.info("Client " + connectedClient |
546 |
+ " successfully disconnected from Server"); |
547 |
} |
548 |
} |
549 |
|
550 |
/** |
551 |
* @return a rating for the machine as discovered by the benchmark |
552 |
*/ |
553 |
public int getRating() { |
554 |
return machineRating; |
555 |
} |
556 |
|
557 |
/** |
558 |
* @return the registry port the server is bound to |
559 |
*/ |
560 |
public int getRegistryPort() { |
561 |
return registryPort; |
562 |
} |
563 |
|
564 |
/* |
565 |
* (non-Javadoc) |
566 |
* |
567 |
* @see appl.parallel.server.XuluServer#getResourceInformation() |
568 |
*/ |
569 |
public ComputingResourceProperties getResourceInformation() |
570 |
throws RemoteException { |
571 |
return new XuluServerProperties(this, serverName); |
572 |
} |
573 |
|
574 |
/** |
575 |
* @return the time in s the server is running |
576 |
*/ |
577 |
int getUptime() { |
578 |
return (int) (System.currentTimeMillis() - starttime); |
579 |
} |
580 |
|
581 |
private void initChainsaw(String connectedClient) { |
582 |
if (chainsaw && connectedClient != null) { |
583 |
chainsawAppender = new SocketAppender(connectedClient, 4445); |
584 |
BasicConfigurator.resetConfiguration(); |
585 |
BasicConfigurator.configure(chainsawAppender); |
586 |
} |
587 |
} |
588 |
|
589 |
/** |
590 |
* inits task priority, ports and loglevel |
591 |
*/ |
592 |
private void initConfiguration() { |
593 |
XuluConfig config = XuluConfig.getXuluConfig(); |
594 |
// a lower priority will run the server more in the background |
595 |
taskPriority = config.getIntProperty("XuluServer.priority"); |
596 |
|
597 |
switch (taskPriority) { |
598 |
default: |
599 |
taskPriority = Thread.NORM_PRIORITY; |
600 |
break; |
601 |
case 1: |
602 |
taskPriority = Thread.MIN_PRIORITY; |
603 |
break; |
604 |
case 2: |
605 |
taskPriority = Thread.NORM_PRIORITY - 1; |
606 |
break; |
607 |
case 4: |
608 |
taskPriority = Thread.NORM_PRIORITY + 1; |
609 |
break; |
610 |
case 5: |
611 |
taskPriority = Thread.MAX_PRIORITY; |
612 |
break; |
613 |
} |
614 |
|
615 |
availableProcessors = Runtime.getRuntime().availableProcessors(); |
616 |
// default: try to use one thread per processor |
617 |
toUseThreads = availableProcessors; |
618 |
// check if there is an entry in the XuluConfig saying how many threads |
619 |
// should be created. |
620 |
// (more threads than processors may be useful for the use of |
621 |
// hyperthreading) |
622 |
String configProz = config.getProperty("XuluServer.useThreads"); |
623 |
if ((configProz != null) && !configProz.equals("max")) { |
624 |
toUseThreads = config.getIntProperty("XuluServer.useThreads"); |
625 |
// error checking: |
626 |
if (toUseThreads < 1 || toUseThreads > 128){ |
627 |
toUseThreads = availableProcessors; |
628 |
System.out |
629 |
.println("Error: Number of threads must be between 1 and 128!"); |
630 |
} |
631 |
} |
632 |
// init correspondig arrays |
633 |
resultCalls = new Future[toUseThreads]; |
634 |
|
635 |
int port = config.getIntProperty("XuluServer.registryport"); |
636 |
if (port != 0) |
637 |
registryPort = port; |
638 |
|
639 |
useCodeDownloading = config |
640 |
.getBooleanProperty("XuluServer.useCodeDownloading"); |
641 |
if (useCodeDownloading) |
642 |
System.out.println("Code downloading is enabled"); |
643 |
else |
644 |
System.out.println("Code downloading is disabled"); |
645 |
|
646 |
int multicastport = config.getIntProperty("XuluServer.multicastport"); |
647 |
if (multicastport != 0) |
648 |
multicastPort = multicastport; |
649 |
String IP = config.getProperty("XuluServer.multicastgroup"); |
650 |
if (IP != null) |
651 |
multiCastIP = IP; |
652 |
try { |
653 |
multiCastGroup = InetAddress.getByName(multiCastIP); |
654 |
} catch (UnknownHostException e) { |
655 |
e.printStackTrace(); |
656 |
} |
657 |
if (loggingEnabled) { |
658 |
|
659 |
String mode = config.getProperty("XuluServer.log4j.mode"); |
660 |
if (mode != null) |
661 |
log4jConfig(mode); |
662 |
} |
663 |
String level = config.getProperty("XuluServer.log4j.logLevel"); |
664 |
if (level != null) |
665 |
log4jLevel(level.toLowerCase()); |
666 |
else |
667 |
log4jLevel("info"); |
668 |
} |
669 |
|
670 |
/** |
671 |
* Connects to the RemoteEventReceiver at the calling client |
672 |
*/ |
673 |
private void initEventSystem() throws RemoteException { |
674 |
String lookupName = "rmi://" + connectedClient + "/RemoteEventReceiver"; |
675 |
try { |
676 |
int delay = XuluConfig.getXuluConfig().getIntProperty( |
677 |
"XuluServer.eventDelay"); |
678 |
System.out.println("looking up " + lookupName); |
679 |
CommEventSink eventSink = (CommEventSink) Naming.lookup(lookupName); |
680 |
if (remoteEventReceiver != null) |
681 |
remoteEventReceiver.stopService(); |
682 |
remoteEventReceiver = new RemoteEventProxy(eventSink, delay); |
683 |
remoteEventReceiver.startService(); |
684 |
} catch (MalformedURLException e) { |
685 |
e.printStackTrace(); |
686 |
} catch (NotBoundException e) { |
687 |
LOG.warn("Could not find the remote event receiver at " |
688 |
+ lookupName, e); |
689 |
System.out.println(e.getMessage()); |
690 |
remoteEventReceiver = new RemoteEventProxy(null, 0); |
691 |
} |
692 |
} |
693 |
|
694 |
/** |
695 |
* inits the multicast unit |
696 |
*/ |
697 |
private void initializeMultiCasting() { |
698 |
try { |
699 |
MulticastSocket s = new MulticastSocket(multicastPort); |
700 |
s.joinGroup(multiCastGroup); |
701 |
// send hello message |
702 |
String IP = XuluConfig.getXuluConfig().getProperty( |
703 |
"XuluServer.multicastgroup"); |
704 |
DatagramPacket answer = new DatagramPacket(helloFromServerMessage |
705 |
.getBytes(), helloFromServerMessage.length(), InetAddress |
706 |
.getByName(IP), multicastPort); |
707 |
s.send(answer); |
708 |
// start thread that waits on client messages |
709 |
multicastThread = new ServerMulticastReceiver(s); |
710 |
multicastThread.start(); |
711 |
} catch (IOException e) { |
712 |
// TODO Auto-generated catch block |
713 |
LOG.warn("Could not initalize multicasting! Reason was " |
714 |
+ e.getMessage(), e); |
715 |
|
716 |
} |
717 |
} |
718 |
|
719 |
/** |
720 |
* @return whether this server is available or in use by another client |
721 |
* @see appl.parallel.ComputingResource#isAvailable() |
722 |
*/ |
723 |
public boolean isAvailable() throws RemoteException { |
724 |
/** |
725 |
* determine the calling host. Internal Server requires special |
726 |
* handling: notice that this happens very often in this class, but can |
727 |
* not externalized due to the behavior of #getClientHost(); |
728 |
*/ |
729 |
String callingHost = "localhost"; |
730 |
try { |
731 |
callingHost = isInternalServer ? "localhost" : getClientHost(); |
732 |
} catch (ServerNotActiveException e) { |
733 |
e.printStackTrace(); |
734 |
} |
735 |
/** ********************************************************************** */ |
736 |
if (callingHost.equals(connectedClient)) |
737 |
return true; |
738 |
|
739 |
if (connectedClient == null) |
740 |
return true; |
741 |
return false; |
742 |
} |
743 |
|
744 |
/** |
745 |
* @return if a client is connected |
746 |
*/ |
747 |
public boolean isClientConnected() { |
748 |
return connected; |
749 |
|
750 |
} |
751 |
|
752 |
/** |
753 |
* Loads a {@link SPMDTask} from the specified URL and returns an array of |
754 |
* instances of this class. There are so many instances, as there are |
755 |
* available processors on this machine!. This class also makes sure that |
756 |
* this happens only one time per connection (else this would of course be a |
757 |
* performance killer in fine grained parallelization). |
758 |
* |
759 |
* @param location |
760 |
* the URL of the rootDirectory/networkLocation |
761 |
* @param spmdTaskClassName |
762 |
* the name of the SPMDTask for which the newest class should be |
763 |
* found |
764 |
* @return the instance of the class or <code>null</code> if loading fails |
765 |
*/ |
766 |
private SPMDTask[] loadSPMDTasksfromURL(String location, |
767 |
String spmdTaskClassName) { |
768 |
|
769 |
SPMDTask[] spmdClassInstances = (SPMDTask[]) spmdTaskInstances |
770 |
.get(spmdTaskClassName); |
771 |
|
772 |
// if already loaded for this connection simply return the instances |
773 |
if (spmdClassInstances != null) |
774 |
return (SPMDTask[]) spmdClassInstances; |
775 |
spmdClassInstances = new SPMDTask[toUseThreads]; |
776 |
Class spmdClass = null; |
777 |
if (useCodeDownloading) { |
778 |
try { |
779 |
// lookup client and make sure that the newest class of the task |
780 |
// is instantiated. Will avoid loading older cached or local |
781 |
// filesystem version |
782 |
URL[] locations = { new URL("http://" + location + "/") }; |
783 |
PreferredClassLoader classLoader = new PreferredClassLoader( |
784 |
locations, Thread.currentThread() |
785 |
.getContextClassLoader(), null, false); |
786 |
LOG.debug("Try to load " + spmdTaskClassName + " from " |
787 |
+ location); |
788 |
spmdClass = classLoader.loadClass(spmdTaskClassName); |
789 |
} catch (Exception e) { |
790 |
LOG.error("Could not load class for " + spmdTaskClassName |
791 |
+ " from " + location, e); |
792 |
e.printStackTrace(); |
793 |
} |
794 |
} |
795 |
// if retrieval failed for some reason or code downloading disabled |
796 |
// then use the local class |
797 |
try { |
798 |
if (spmdClass == null) { |
799 |
spmdClass = Class.forName(spmdTaskClassName); |
800 |
} |
801 |
|
802 |
// instantiate the classes |
803 |
for (int i = 0; i < spmdClassInstances.length; i++) { |
804 |
Object task = spmdClass.newInstance(); |
805 |
if (!(task instanceof SPMDTask)) { |
806 |
LOG.fatal(spmdTaskClassName |
807 |
+ " was not an instance of SPMDTask!"); |
808 |
throw new UnsupportedOperationException(spmdTaskClassName |
809 |
+ " was not an instance of SPMDTask!"); |
810 |
} |
811 |
spmdClassInstances[i] = (SPMDTask) task; |
812 |
} |
813 |
|
814 |
spmdTaskInstances.put(spmdTaskClassName, spmdClassInstances); |
815 |
return spmdClassInstances; |
816 |
|
817 |
} catch (ClassNotFoundException e) { |
818 |
// TODO Auto-generated catch block |
819 |
e.printStackTrace(); |
820 |
} catch (InstantiationException e) { |
821 |
// TODO Auto-generated catch block |
822 |
e.printStackTrace(); |
823 |
} catch (IllegalAccessException e) { |
824 |
// TODO Auto-generated catch block |
825 |
e.printStackTrace(); |
826 |
} |
827 |
throw new UnsupportedOperationException("Loading of class " |
828 |
+ spmdTaskClassName |
829 |
+ " failed on serverside! Execution canceled"); |
830 |
} |
831 |
|
832 |
/* |
833 |
* (non-Javadoc) |
834 |
* |
835 |
* @see appl.parallel.server.XuluServer#ping(java.lang.Object[]) |
836 |
*/ |
837 |
public Object ping(Object... o) throws RemoteException { |
838 |
System.out.println("received ping"); |
839 |
return o; |
840 |
} |
841 |
|
842 |
/** |
843 |
* deletes all client-specific data data, so that a newly connected client |
844 |
* can use a clean system |
845 |
*/ |
846 |
private void reset() { |
847 |
if (dataManager != null) |
848 |
dataManager.stop(); |
849 |
dataManager = null; |
850 |
connectedClient = null; |
851 |
spmdTaskInstances.clear(); |
852 |
} |
853 |
|
854 |
/** |
855 |
* runs the benchmark defined in XuluServer.benchmarkclass |
856 |
*/ |
857 |
private void runBenchmark() { |
858 |
// see if the benchmark should be run |
859 |
|
860 |
boolean runbench = XuluConfig.getXuluConfig().getBooleanProperty( |
861 |
"XuluServer.runbench"); |
862 |
if (runbench) { |
863 |
String classname = XuluConfig.getXuluConfig().getProperty( |
864 |
"XuluServer.benchmarkclass"); |
865 |
if (classname != null) { |
866 |
try { |
867 |
// getBenchmarkClass |
868 |
Benchmark bench = (Benchmark) Class.forName(classname) |
869 |
.newInstance(); |
870 |
machineRating = bench.bench(); |
871 |
System.out |
872 |
.println("Successfully run benchmark. Machine Rating is " |
873 |
+ machineRating); |
874 |
} catch (Exception e) { |
875 |
System.err |
876 |
.println("Could not load Benchmarkclass " |
877 |
+ classname |
878 |
+ ". Setting rating to 0 (which means average). Reason was: " |
879 |
+ e.getMessage()); |
880 |
machineRating = 0; |
881 |
} |
882 |
} else |
883 |
System.out.println("Benchmark disabled"); |
884 |
} |
885 |
} |
886 |
|
887 |
/** |
888 |
* Every time {@link #runSPMDModelTask(String, int, Object[])} is called the |
889 |
* newest class is loaded from the client. For performance reasons this will |
890 |
* happen only one time per connection. |
891 |
* |
892 |
* @see appl.parallel.server.SPMDResource#runSPMDModelTask(String, int, |
893 |
* Object[]) |
894 |
*/ |
895 |
@SuppressWarnings("unchecked") |
896 |
public Object[] runSPMDModelTask(String spmdTaskName, int referenceID, |
897 |
Object... parameters) throws RemoteException { |
898 |
/** |
899 |
* determine the calling host. Internal Server requires special |
900 |
* handling: notice that this happens very often in this class, but can |
901 |
* not externalized due to the behavior of #getClientHost(); |
902 |
*/ |
903 |
String callingHost = "localhost"; |
904 |
try { |
905 |
callingHost = isInternalServer ? "localhost" : getClientHost(); |
906 |
} catch (ServerNotActiveException e) { |
907 |
e.printStackTrace(); |
908 |
} |
909 |
/** ********************************************************************** */ |
910 |
Thread.currentThread().setPriority(taskPriority); |
911 |
String clientIP = "<unknown Client>"; |
912 |
long time = System.nanoTime(); |
913 |
|
914 |
clientIP = callingHost; |
915 |
if (!clientIP.equals(connectedClient)) { |
916 |
LOG.warn(clientIP |
917 |
+ " has tried to run a Task, but was not connected. " |
918 |
+ "This server is in use by " + connectedClient); |
919 |
throw new RemoteException( |
920 |
"Run of SPMDTask failed! This server is in use by " |
921 |
+ connectedClient); |
922 |
} |
923 |
if (LOG.isDebugEnabled()) |
924 |
LOG.debug("received task from " + clientIP |
925 |
+ ". Starting evaluation now..."); |
926 |
// if no datamanager is found: exit |
927 |
if (dataManager == null) |
928 |
throw new UnsupportedOperationException( |
929 |
"No DataManager found. Create one first using 'createDataManager' on this Server"); |
930 |
|
931 |
// lookup newest version of the class from the server at the url |
932 |
// (or from local disk if remote class loading is disabled) |
933 |
SPMDTask[] tasks = loadSPMDTasksfromURL(clientIP, spmdTaskName); |
934 |
|
935 |
// The number of threads allowed for execution can be different |
936 |
// from the number of user assigned threads |
937 |
// because tasks may not support multithreadeding |
938 |
int allowedThreads = 1; |
939 |
if (tasks[0].supportsMultiThreading()) |
940 |
allowedThreads = toUseThreads; |
941 |
|
942 |
Object results[] = new Object[allowedThreads]; |
943 |
|
944 |
// the following loop is only executed the first time a task is used! |
945 |
if (!tasks[0].isInitialized()) { |
946 |
if (LOG.isInfoEnabled()) |
947 |
LOG.info("Trying to use " + allowedThreads |
948 |
+ " threads for execution of " |
949 |
+ tasks[0].getClass().getName()); |
950 |
// calculate splitting for multiple threads: |
951 |
int splitMapPos = dataManager.getInfo(referenceID).getSplitMapPos(); |
952 |
SplitMap splitMap = dataManager.getInfo(referenceID).getSplitMap(); |
953 |
SplitMap1DVertical map1DVertical = new SplitMap1DVertical(splitMap |
954 |
.getPartitionCalculationBounds(splitMapPos).width, splitMap |
955 |
.getPartitionCalculationBounds(splitMapPos).height, 0, |
956 |
allowedThreads, NeighborhoodBoxingMode.outBoxing); |
957 |
|
958 |
// create controllers and init the tasks |
959 |
for (int i = 0; i < allowedThreads; i++) { |
960 |
AdvancedSPMDServerController serverController = new AdvancedSPMDServerController( |
961 |
dataManager, referenceID); |
962 |
// set first thread as master thread |
963 |
serverController.setMasterThread(i == 0); |
964 |
// now assign the calculation area. Because we have assumed that |
965 |
// the (Thread)-calculation |
966 |
// area begins at (0,0) we must move it relative to the real |
967 |
// local calculation area |
968 |
Rectangle calculationBounds = map1DVertical |
969 |
.getPartitionCalculationBounds(i); |
970 |
int absolutX = (int) (serverController.getLocalCalcMinX() + calculationBounds |
971 |
.getMinX()); |
972 |
int absolutY = (int) (serverController.getLocalCalcMinY() + calculationBounds |
973 |
.getMinY()); |
974 |
calculationBounds.setLocation(absolutX, absolutY); |
975 |
serverController.setLocalCalculationBounds(calculationBounds); |
976 |
tasks[i].setSPMDServerController(serverController); |
977 |
tasks[i].initialize(); |
978 |
} |
979 |
} |
980 |
if (allowedThreads == 1) // only for saving resources: no new thread |
981 |
{ // if only one thread is used |
982 |
Object executionResult = tasks[0].run(parameters); |
983 |
results = new Object[] { executionResult }; |
984 |
} else { |
985 |
// for more than 1 processor execute the multiple tasks in separate |
986 |
// threads |
987 |
for (int i = 0; i < allowedThreads; i++) { |
988 |
resultCalls[i] = executor.submit(new SimpleCallThread(tasks[i], |
989 |
parameters)); |
990 |
} |
991 |
// wait for the Threads to finish |
992 |
for (int i = 0; i < allowedThreads; i++) { |
993 |
try { |
994 |
results[i] = resultCalls[i].get(); |
995 |
} catch (InterruptedException e) { |
996 |
// TODO Auto-generated catch block |
997 |
e.printStackTrace(); |
998 |
} catch (ExecutionException e) { |
999 |
// TODO Auto-generated catch block |
1000 |
e.printStackTrace(); |
1001 |
} |
1002 |
} |
1003 |
} |
1004 |
// generate a event for the execution |
1005 |
if (remoteEventReceiver.isTimeMonitoringEnabled()) |
1006 |
remoteEventReceiver.fireRemoteEvent(new TimeEvent((System |
1007 |
.nanoTime() - time), serverName, "DataServer", |
1008 |
CommType.REMOTE_EXECUTION)); |
1009 |
return results; |
1010 |
} |
1011 |
|
1012 |
/** |
1013 |
* This method frees all resources.<br> |
1014 |
* Notice that this method is not remotely accessible! |
1015 |
*/ |
1016 |
public void stopServer() { |
1017 |
unbind(); |
1018 |
if (dataManager != null) |
1019 |
dataManager.stop(); |
1020 |
dataManager = null; |
1021 |
if (multicastThread != null) |
1022 |
multicastThread.stopThread(); |
1023 |
} |
1024 |
} |