1 |
package appl.parallel.server; |
2 |
|
3 |
import java.awt.Rectangle; |
4 |
import java.net.InetAddress; |
5 |
import java.net.UnknownHostException; |
6 |
import java.rmi.Naming; |
7 |
import java.rmi.NotBoundException; |
8 |
import java.rmi.RemoteException; |
9 |
import java.rmi.registry.LocateRegistry; |
10 |
import java.rmi.registry.Registry; |
11 |
import java.rmi.server.UnicastRemoteObject; |
12 |
import java.util.HashMap; |
13 |
import java.util.Hashtable; |
14 |
import java.util.Vector; |
15 |
import java.util.concurrent.ExecutionException; |
16 |
import java.util.concurrent.ExecutorService; |
17 |
import java.util.concurrent.Executors; |
18 |
import java.util.concurrent.Future; |
19 |
|
20 |
import org.apache.log4j.LogManager; |
21 |
import org.apache.log4j.Logger; |
22 |
|
23 |
import appl.parallel.client.ClientDataServer; |
24 |
import appl.parallel.client.DataServer; |
25 |
import appl.parallel.data.PartitionDataHandler; |
26 |
import appl.parallel.event.CommEvent.CommType; |
27 |
import appl.parallel.event.CommEventSink; |
28 |
import appl.parallel.event.TimeEvent; |
29 |
import appl.parallel.event.TransferEvent; |
30 |
import appl.parallel.services.RemoteEventProxy; |
31 |
import appl.parallel.spmd.MultiDataInfo; |
32 |
import appl.parallel.spmd.MultiDataPartitionObject; |
33 |
import appl.parallel.spmd.SPMDClientController; |
34 |
import appl.parallel.spmd.split.DataPartition; |
35 |
import appl.parallel.spmd.split.PartitionInfo; |
36 |
import appl.parallel.spmd.split.SinglePartitionInfo; |
37 |
import appl.parallel.spmd.split.SplitMap; |
38 |
import appl.parallel.thread.DataServerThread; |
39 |
import de.appl.data.LoadingException; |
40 |
|
41 |
/** |
42 |
* Manages the all data for the {@link XuluServer}. This includes retrieval of |
43 |
* partitions from connected resources like the {@link DataServer} or, as part |
44 |
* of the update of neighborhood regions, from other |
45 |
* {@link XuluServer XuluServers}. |
46 |
* |
47 |
* @author Dominik Appl |
48 |
*/ |
49 |
public class PartitionDataManager extends UnicastRemoteObject implements |
50 |
PartitionDataServer { |
51 |
|
52 |
/** |
53 |
* The Thread is used for updating the partitions neighborhood region from |
54 |
* the other servers. |
55 |
* |
56 |
* @author Dominik Appl |
57 |
*/ |
58 |
class UpdateThread extends DataServerThread { |
59 |
|
60 |
private final DataPartition thisPartition; |
61 |
|
62 |
private final Rectangle updateBounds; |
63 |
|
64 |
private final int id; |
65 |
|
66 |
private final String hostname; |
67 |
|
68 |
public UpdateThread(PartitionDataServer dataServer, |
69 |
String ipOrHostname, DataPartition thisPartition, |
70 |
Rectangle updateBounds, int id, CommEventSink sink) { |
71 |
super(dataServer, null, null, CommType.REMOTE_UPDATE, sink); |
72 |
hostname = ipOrHostname; |
73 |
// TODO Auto-generated constructor stub |
74 |
this.thisPartition = thisPartition; |
75 |
this.updateBounds = updateBounds; |
76 |
this.id = id; |
77 |
} |
78 |
|
79 |
/* |
80 |
* (non-Javadoc) |
81 |
* |
82 |
* @see appl.parallel.thread.ExecutionThread#fireTimeEvents(long, |
83 |
* java.lang.Object) |
84 |
*/ |
85 |
@Override |
86 |
protected void fireTimeEvents(long execTime, Object result) { |
87 |
try { |
88 |
eventSink.fireRemoteEvent(new TimeEvent(execTime, |
89 |
dataServerName, hostname, commType)); |
90 |
} catch (RemoteException e) { |
91 |
e.printStackTrace(); |
92 |
} |
93 |
} |
94 |
|
95 |
/* |
96 |
* (non-Javadoc) |
97 |
* |
98 |
* @see appl.parallel.thread.ExecutionThread#fireTransferEvent(java.lang.Object) |
99 |
*/ |
100 |
@Override |
101 |
protected void fireTransferEvent(Object result) { |
102 |
try { |
103 |
eventSink.fireRemoteEvent(new TransferEvent(dataServerName, |
104 |
hostname, commType, new Object[] { result })); |
105 |
} catch (RemoteException e) { |
106 |
e.printStackTrace(); |
107 |
} |
108 |
} |
109 |
|
110 |
/* |
111 |
* (non-Javadoc) |
112 |
* |
113 |
* @see appl.parallel.thread.ExecutionThread#run() |
114 |
*/ |
115 |
@Override |
116 |
protected Object run() throws Exception { |
117 |
try { |
118 |
DataPartition updateData = getServer().getPartition(id, |
119 |
updateBounds); |
120 |
// apply update: |
121 |
this.thisPartition.setPartition(updateData, updateBounds); |
122 |
return updateData; |
123 |
} catch (Exception e) { |
124 |
LOG.error("Error while retrieving partition " + updateBounds |
125 |
+ "from PartitionDataServer with ip " + hostname); |
126 |
} |
127 |
return null; |
128 |
} |
129 |
} |
130 |
|
131 |
private final String bindingName = "PartitionDataServer"; |
132 |
|
133 |
private final Logger LOG = LogManager.getLogger(this.getClass().getName()); |
134 |
|
135 |
/** |
136 |
* all partition data. The key is the id. Is an Hashtable instead of |
137 |
* HashMasp for thread-safty. |
138 |
*/ |
139 |
private Hashtable<Integer, DataPartition> data = new Hashtable<Integer, DataPartition>( |
140 |
15); |
141 |
|
142 |
/** |
143 |
* the names of the data objects |
144 |
*/ |
145 |
private HashMap<String, Integer> NameToIDMapping = new HashMap<String, Integer>( |
146 |
15); |
147 |
|
148 |
private HashMap<Integer, SinglePartitionInfo> infos = new HashMap<Integer, SinglePartitionInfo>( |
149 |
10); |
150 |
|
151 |
private HashMap<String, Object> baseParameters = new HashMap<String, Object>( |
152 |
10); |
153 |
|
154 |
private final PartitionDataServer[] dataServers; |
155 |
|
156 |
private final ExecutorService executor; |
157 |
|
158 |
private final String[] IPs; |
159 |
|
160 |
private boolean dataServersInitialized = false; |
161 |
|
162 |
private HashMap<String, MultiDataInfo> multiDataInfos = new HashMap<String, MultiDataInfo>( |
163 |
10); |
164 |
|
165 |
private final String dataServerName; |
166 |
|
167 |
private final CommEventSink remoteEventReceiver; |
168 |
|
169 |
private String[] hostnames; |
170 |
|
171 |
private ClientDataServer localSPMDClient; |
172 |
|
173 |
private final int registryPort; |
174 |
|
175 |
/** |
176 |
* Creates and inits a new PartitionDataserver. |
177 |
* |
178 |
* @param IPs |
179 |
* the IPs of participating Servers for neighborhood updates(may |
180 |
* include port specification) |
181 |
* @param remoteEventReceiver |
182 |
* generated events are forwarded to the receiver |
183 |
* @param registryPort |
184 |
* port of the registry to which the {@link PartitionDataManager} |
185 |
* should be bound |
186 |
* @throws RemoteException |
187 |
* if a connection to a participating server fails |
188 |
*/ |
189 |
public PartitionDataManager(String[] IPs, |
190 |
CommEventSink remoteEventReceiver, int registryPort) |
191 |
throws RemoteException { |
192 |
// create NameToID and IdToInfo-Mapping |
193 |
this.IPs = IPs; |
194 |
this.remoteEventReceiver = remoteEventReceiver; |
195 |
this.registryPort = registryPort; |
196 |
|
197 |
dataServers = new PartitionDataServer[IPs.length]; |
198 |
startTheServer(); |
199 |
|
200 |
// start Thread service |
201 |
executor = Executors.newCachedThreadPool(); |
202 |
String name = "unknown PartitionDataServer"; |
203 |
try { |
204 |
name = InetAddress.getLocalHost().getHostName(); |
205 |
} catch (UnknownHostException e) { |
206 |
// TODO Auto-generated catch block |
207 |
e.printStackTrace(); |
208 |
} |
209 |
dataServerName = name; |
210 |
// try to get the hostnames |
211 |
hostnames = new String[IPs.length]; |
212 |
for (int i = 0; i < IPs.length; i++) { |
213 |
// try { |
214 |
hostnames[i] = IPs[i]; |
215 |
// hostnames[i] = InetAddress.getByName(IPs[i]).getHostName(); |
216 |
// } catch (UnknownHostException e) { |
217 |
// LOG.warn("Exception while retrivieving hostname of address: " + |
218 |
// IPs[i]); |
219 |
// } |
220 |
} |
221 |
} |
222 |
|
223 |
/** |
224 |
* Same as {@link #PartitionDataManager(String[], CommEventSink, int)}, but uses |
225 |
* a local SPMDClient. <br> |
226 |
* This is used when a Server is running inside the Xulu-Client. It has |
227 |
* performance advantages (direct access - no TCP/IP) |
228 |
* |
229 |
* @param IPs |
230 |
* the IPs of participating Servers for neighborhood updates(may |
231 |
* include port specification) |
232 |
* @param remoteEventReceiver |
233 |
* generated events are forwarded to the receiver |
234 |
* @param registryport |
235 |
* port of the registry to which the {@link PartitionDataManager} |
236 |
* should be bound |
237 |
* @param localSPMDClient |
238 |
* a {@link ClientDataServer} for fast local access |
239 |
* @throws RemoteException |
240 |
*/ |
241 |
public PartitionDataManager(String[] IPs, |
242 |
RemoteEventProxy remoteEventReceiver, int registryport, |
243 |
ClientDataServer localSPMDClient) throws RemoteException { |
244 |
this(IPs, remoteEventReceiver, registryport); |
245 |
this.localSPMDClient = localSPMDClient; |
246 |
} |
247 |
|
248 |
/* |
249 |
* (non-Javadoc) |
250 |
* |
251 |
* @see appl.parallel.client.DataServer#addData(appl.parallel.spmd.split.DataPartition) |
252 |
*/ |
253 |
public void addData(DataPartition partition) throws RemoteException { |
254 |
if (partition == null) { |
255 |
LOG.warn("To add partition was null - no data added"); |
256 |
return; |
257 |
} |
258 |
data.put(partition.getRootID(), partition); |
259 |
|
260 |
} |
261 |
|
262 |
/* |
263 |
* (non-Javadoc) |
264 |
* |
265 |
* @see appl.parallel.server.PartitionDataServer#addMultiDataInfos(java.util.HashMap) |
266 |
*/ |
267 |
public void addMultiDataInfos( |
268 |
HashMap<String, MultiDataInfo> newMultiDataInfos) |
269 |
throws RemoteException { |
270 |
this.multiDataInfos.putAll(newMultiDataInfos); |
271 |
} |
272 |
|
273 |
/* |
274 |
* (non-Javadoc) |
275 |
* |
276 |
* @see appl.parallel.server.PartitionDataServer#addPartitionInfos(java.util.Vector) |
277 |
*/ |
278 |
public void addPartitionInfos( |
279 |
Vector<SinglePartitionInfo> singlePartitionInfos) |
280 |
throws RemoteException { |
281 |
for (SinglePartitionInfo singlePartitionInfo : singlePartitionInfos) { |
282 |
NameToIDMapping.put(singlePartitionInfo.getBaseResourceName(), |
283 |
singlePartitionInfo.getBaseResourceID()); |
284 |
infos.put(singlePartitionInfo.getBaseResourceID(), |
285 |
singlePartitionInfo); |
286 |
} |
287 |
} |
288 |
|
289 |
/** |
290 |
* @param name |
291 |
* @param idx |
292 |
*/ |
293 |
public void destroyMultiPartition(String name, int idx) { |
294 |
|
295 |
} |
296 |
|
297 |
/** |
298 |
* returns the baseParameter by name |
299 |
* |
300 |
* @param name |
301 |
* the name of the parameter |
302 |
* @return the parameter |
303 |
*/ |
304 |
public Object getBaseParameter(String name) { |
305 |
return baseParameters.get(name); |
306 |
} |
307 |
|
308 |
/** |
309 |
* Returns a a (remote) PartitionDataServer for an IP-Address. Looks up the |
310 |
* PartitionDataServer in the registry |
311 |
*/ |
312 |
private PartitionDataServer getConnection(String ip) { |
313 |
PartitionDataServer server; |
314 |
// lookup server |
315 |
try { |
316 |
server = (PartitionDataServer) Naming.lookup("rmi://" + ip + "/" |
317 |
+ bindingName); |
318 |
} catch (Exception e) { |
319 |
LOG.error("bindig of " + ip + " failed" + e.getMessage(), e); |
320 |
e.printStackTrace(); |
321 |
return null; |
322 |
} |
323 |
return server; |
324 |
} |
325 |
|
326 |
/** |
327 |
* Returns the {@link DataPartition} associated with the given ID |
328 |
* |
329 |
* @param id |
330 |
* the id of the data |
331 |
* @return the data |
332 |
*/ |
333 |
public synchronized DataPartition getData(int id) throws RemoteException { |
334 |
// check if in hashtable already |
335 |
DataPartition partition = data.get(id); |
336 |
if (partition != null) |
337 |
return partition; |
338 |
// if not in hashtable: |
339 |
// load the data and put it into hash |
340 |
// find the element with the ID: |
341 |
SinglePartitionInfo info = infos.get(id); |
342 |
|
343 |
if (info == null) { |
344 |
LOG.warn("No partition with id " + id + " found!"); |
345 |
return null; |
346 |
} |
347 |
// retrieve data from client or other location |
348 |
try { |
349 |
// if there is a local server try to get the local partition |
350 |
PartitionDataHandler partitionDataHandler = info |
351 |
.getPartitionDataHandler(); |
352 |
if (localSPMDClient != null) |
353 |
partitionDataHandler.setSPMDClient(localSPMDClient); |
354 |
partition = partitionDataHandler.load(); |
355 |
} catch (LoadingException e) { |
356 |
// TODO Auto-generated catch block |
357 |
LOG.error("Loading of partition with id '" + id |
358 |
+ "' failed. Null returned!"); |
359 |
e.printStackTrace(); |
360 |
return null; |
361 |
} |
362 |
if (partition == null) { |
363 |
LOG.error("Loading of partition with id '" + id |
364 |
+ "' failed. Null returned!"); |
365 |
return null; |
366 |
} |
367 |
|
368 |
// enter data in partition table |
369 |
data.put(id, partition); |
370 |
return partition; |
371 |
} |
372 |
|
373 |
/** |
374 |
* Gets a partition by name |
375 |
* |
376 |
* @param name |
377 |
* the name of the resource (probably given by the programmer) |
378 |
* @return the partition or null if not found |
379 |
*/ |
380 |
public DataPartition getData(String name) { |
381 |
Integer id = NameToIDMapping.get(name); |
382 |
if (id == null) { |
383 |
LOG.warn("No data for name " + name + " found! Returning null"); |
384 |
return null; |
385 |
} |
386 |
try { |
387 |
return getData(id); |
388 |
} catch (RemoteException e) { |
389 |
// should never be reached (because the method call is local |
390 |
e.printStackTrace(); |
391 |
} |
392 |
return null; |
393 |
} |
394 |
|
395 |
/** |
396 |
* Returns the partition info with the specified id |
397 |
* |
398 |
* @param id |
399 |
* the id of the partition |
400 |
* @return the info |
401 |
*/ |
402 |
public PartitionInfo getInfo(int id) { |
403 |
return infos.get(id); |
404 |
} |
405 |
|
406 |
/** |
407 |
* Gets a partition of a multi-data element. NOTE THAT ALL PARTITIONS OF THE |
408 |
* MULTIGRID WILL BE LOADED (watch out for loading performance!). |
409 |
* |
410 |
* @param name |
411 |
* the name of the resource (probably given by the programmer) |
412 |
* @return the partition |
413 |
* @see SPMDClientController#addToMultiDataSplitControl(Object[], String) |
414 |
*/ |
415 |
public DataPartition[] getMultiData(String name) { |
416 |
// fill an array with the partitions |
417 |
DataPartition[] partitions = new DataPartition[multiDataInfos.get(name) |
418 |
.getCount()]; |
419 |
for (int i = 0; i < partitions.length; i++) { |
420 |
partitions[i] = getMultiData(name, i); |
421 |
} |
422 |
return partitions; |
423 |
} |
424 |
|
425 |
/** |
426 |
* Gets a partition of a multi-data element. Only the resource at the given |
427 |
* position will be loaded. |
428 |
* |
429 |
* @param name |
430 |
* the name of the resource (probably given by the programmer) |
431 |
* @param pos |
432 |
* the index of the requested element |
433 |
* @return the partition |
434 |
* @see SPMDClientController#addToMultiDataSplitControl(Object[], String) |
435 |
*/ |
436 |
public DataPartition getMultiData(String name, int pos) { |
437 |
try { |
438 |
return getData(multiDataInfos.get(name).getMultiID(pos)); |
439 |
} catch (RemoteException e) { |
440 |
// TODO Auto-generated catch block |
441 |
e.printStackTrace(); |
442 |
} |
443 |
return null; |
444 |
} |
445 |
|
446 |
/* |
447 |
* (non-Javadoc) |
448 |
* |
449 |
* @see appl.parallel.server.PartitionDataServer#getMultiDataInfo(java.lang.String) |
450 |
*/ |
451 |
public MultiDataInfo getMultiDataInfo(String name) throws RemoteException { |
452 |
return multiDataInfos.get(name); |
453 |
} |
454 |
|
455 |
/* |
456 |
* (non-Javadoc) |
457 |
* |
458 |
* @see appl.parallel.server.PartitionDataServer#getMultiDataObject(java.lang.String) |
459 |
*/ |
460 |
public MultiDataPartitionObject getMultiDataObject(String name) |
461 |
throws RemoteException { |
462 |
return new MultiDataPartitionObject(multiDataInfos.get(name), this); |
463 |
} |
464 |
|
465 |
/* |
466 |
* (non-Javadoc) |
467 |
* |
468 |
* @see appl.parallel.server.PartitionDataServer#getPartition(int, |
469 |
* java.awt.Rectangle) |
470 |
*/ |
471 |
public synchronized DataPartition getPartition(int id, Rectangle bounds) |
472 |
throws RemoteException { |
473 |
DataPartition thisPartition = (DataPartition) data.get(id); |
474 |
if (thisPartition == null && LOG.isDebugEnabled()) { |
475 |
LOG.error("Partition with id " + id + " and bounds " + bounds |
476 |
+ "was requested but not found on DataManager"); |
477 |
return null; |
478 |
} |
479 |
|
480 |
if (LOG.isDebugEnabled()) |
481 |
LOG.debug("Data retrieved from DataManager: ID:" + id |
482 |
+ " partition: " + bounds); |
483 |
return thisPartition.getPartition(bounds); |
484 |
} |
485 |
|
486 |
/* |
487 |
* (non-Javadoc) |
488 |
* |
489 |
* @see appl.parallel.server.PartitionDataServer#getPartitionInfo(int) |
490 |
*/ |
491 |
public SinglePartitionInfo getPartitionInfo(int rootID) |
492 |
throws RemoteException { |
493 |
return infos.get(rootID); |
494 |
} |
495 |
|
496 |
/** |
497 |
* other dataservers may not be initialized on creation of this object. |
498 |
* Therefore the dataServers must be looked up later |
499 |
*/ |
500 |
private void initializeDataServers() { |
501 |
if (dataServersInitialized == true) |
502 |
return; |
503 |
for (int i = 0; i < dataServers.length; i++) { |
504 |
dataServers[i] = getConnection(IPs[i]); |
505 |
} |
506 |
dataServersInitialized = true; |
507 |
} |
508 |
|
509 |
/* |
510 |
* (non-Javadoc) |
511 |
* |
512 |
* @see appl.parallel.client.DataServer#removeData(int) |
513 |
*/ |
514 |
public void removeData(int id) { |
515 |
data.remove(id); |
516 |
|
517 |
} |
518 |
|
519 |
/** |
520 |
* Removes the partition from the database |
521 |
* |
522 |
* @param name |
523 |
* the name of the partition to remove |
524 |
*/ |
525 |
public void removeData(String name) { |
526 |
data.remove(NameToIDMapping.get(name)); |
527 |
} |
528 |
|
529 |
/** |
530 |
* Removes the partition from the database |
531 |
* |
532 |
* @param partition |
533 |
* the Partition to remove |
534 |
*/ |
535 |
public void removePartition(DataPartition partition) { |
536 |
data.remove(partition.getRootID()); |
537 |
} |
538 |
|
539 |
/** |
540 |
* binds the server to the registry |
541 |
*/ |
542 |
private void startTheServer() throws RemoteException { |
543 |
Registry registry = LocateRegistry.getRegistry(registryPort); |
544 |
registry.rebind(bindingName, (PartitionDataServer) this); |
545 |
LOG.info("PartitionDataServer bound to port " + registryPort |
546 |
+ " and running...."); |
547 |
} |
548 |
|
549 |
/** |
550 |
* unbinds the server from the registry, removes all data and runs the |
551 |
* garbage collector |
552 |
*/ |
553 |
public void stop() { |
554 |
data.clear(); |
555 |
NameToIDMapping.clear(); |
556 |
infos.clear(); |
557 |
|
558 |
Registry registry; |
559 |
try { |
560 |
registry = LocateRegistry.getRegistry(); |
561 |
registry.unbind(bindingName); |
562 |
LOG.debug("Sucessfully removed " + bindingName + " from reggie"); |
563 |
} catch (RemoteException e) { |
564 |
LOG.warn("tried to unbind " + bindingName |
565 |
+ " from reggie, but an exception occured: " |
566 |
+ e.getMessage()); |
567 |
} catch (NotBoundException e) { |
568 |
LOG.warn("tried to unbind " + bindingName |
569 |
+ " from reggie, but the name was not bound " |
570 |
+ e.getMessage()); |
571 |
} |
572 |
System.gc(); |
573 |
} |
574 |
|
575 |
/** |
576 |
* @param rootID |
577 |
*/ |
578 |
public void unloadToSource(int rootID) throws RemoteException { |
579 |
SinglePartitionInfo info = infos.get(rootID); |
580 |
if (info == null) { |
581 |
LOG.error("Could not unload partition with id " + rootID |
582 |
+ ". The partitioninfo was not found!"); |
583 |
return; |
584 |
} |
585 |
// start unloading |
586 |
info.getPartitionDataHandler().setBasePartition(this.getData(rootID)); |
587 |
info.getPartitionDataHandler().unload(); |
588 |
// // remove data from memory and run garbage collection |
589 |
// data.remove(rootID); |
590 |
// System.gc(); |
591 |
} |
592 |
|
593 |
/* |
594 |
* (non-Javadoc) |
595 |
* |
596 |
* @see appl.parallel.server.PartitionDataServer#updateBaseParameter(java.lang.Object[], |
597 |
* java.lang.String[]) |
598 |
*/ |
599 |
public void updateBaseParameter(HashMap<String, Object> newParameters) |
600 |
throws RemoteException { |
601 |
// update the mapping. Old values are replaced automaticly |
602 |
baseParameters.putAll(newParameters); |
603 |
} |
604 |
|
605 |
/** |
606 |
* Updates the specified partition. For this the the neighborhood partitions |
607 |
* are updated from remote DataServers using the {@link SplitMap} of the |
608 |
* {@link SinglePartitionInfo}. |
609 |
* |
610 |
* @param id |
611 |
*/ |
612 |
public void updateFromNeighbors(int id) { |
613 |
initializeDataServers(); |
614 |
// get the data to be updated: |
615 |
DataPartition thisPartition = data.get(id); |
616 |
PartitionInfo info = infos.get(id); |
617 |
if (thisPartition == null || info == null) { |
618 |
LOG.error("Update with id " + id |
619 |
+ " failed, because no data was found with this id"); |
620 |
return; |
621 |
} |
622 |
|
623 |
// get some other needed infos |
624 |
SplitMap map = info.getSplitMap(); |
625 |
int mapPosition = info.getSplitMapPos(); |
626 |
// check if something to do |
627 |
if (map.getNeighborhoodRange() == 0) { |
628 |
LOG.debug("Update has nothing to do (neighborhoodrange is 0)"); |
629 |
return; |
630 |
} |
631 |
|
632 |
// calculate (for each neighbor) the needed partitions and get |
633 |
// the updates (implemented using Threads) |
634 |
int[] neighbors = map.getNeighborsForPosition(mapPosition); |
635 |
Future futures[] = new Future[neighbors.length]; |
636 |
for (int j = 0; j < neighbors.length; j++) { |
637 |
int neighborIdx = neighbors[j]; |
638 |
// get the overlap of the neighborhood area of the current partion |
639 |
// with the calcultation area of the remote partition |
640 |
Rectangle updateBounds = map.getPartitionNeighborhoodBounds( |
641 |
mapPosition).intersection( |
642 |
map.getPartitionCalculationBounds(neighborIdx)); |
643 |
// Get the partition from remote server: |
644 |
DataPartition updateData; |
645 |
futures[j] = executor.submit(new UpdateThread( |
646 |
dataServers[neighborIdx], hostnames[neighborIdx], |
647 |
thisPartition, updateBounds, id, remoteEventReceiver)); |
648 |
} |
649 |
|
650 |
for (Future future : futures) { |
651 |
try { |
652 |
future.get(); |
653 |
} catch (InterruptedException e) { |
654 |
e.printStackTrace(); |
655 |
} catch (ExecutionException e) { |
656 |
LOG.error("The Remote update of a partition failed", e); |
657 |
} |
658 |
} |
659 |
|
660 |
} |
661 |
|
662 |
/** |
663 |
* Does the same as {@link #updateFromNeighbors(int)} |
664 |
* |
665 |
* @param name |
666 |
* the name of the resource |
667 |
*/ |
668 |
public void updateFromNeighbors(String name) { |
669 |
// get by ID |
670 |
updateFromNeighbors(NameToIDMapping.get(name)); |
671 |
} |
672 |
|
673 |
/* |
674 |
* (non-Javadoc) |
675 |
* |
676 |
* @see appl.parallel.server.PartitionDataServer#setPartition(int, |
677 |
* appl.parallel.spmd.split.DataPartition, java.awt.Rectangle) |
678 |
*/ |
679 |
public synchronized void updatePartition(int id, DataPartition partition, |
680 |
Rectangle bounds) throws RemoteException { |
681 |
DataPartition thisPartition = (DataPartition) data.get(id); |
682 |
if (thisPartition == null && LOG.isDebugEnabled()) { |
683 |
LOG |
684 |
.error("Partition of data with id " + id + " and bounds " |
685 |
+ bounds |
686 |
+ "should be set but was not found in DataManager"); |
687 |
return; |
688 |
} |
689 |
thisPartition.setPartition(partition, bounds); |
690 |
if (LOG.isDebugEnabled()) |
691 |
LOG.debug("Partition of data set on DataManager with id " + id |
692 |
+ " and partition bounds: " + bounds); |
693 |
|
694 |
} |
695 |
|
696 |
} |