/[xulu]/trunk/src/appl/parallel/spmd/SPMDClientController.java
ViewVC logotype

Contents of /trunk/src/appl/parallel/spmd/SPMDClientController.java

Parent Directory Parent Directory | Revision Log Revision Log


Revision 114 - (show annotations)
Mon Jul 11 11:31:25 2011 UTC (13 years, 5 months ago) by mojays
File size: 31036 byte(s)
SCHMITZM library updated to current version (2.6-SNAPSHOT)
Added gt-xsd-filter.jar, gt-xsd-gml2.jar, picocontainer.jar and xsd.jar from Geotools 2.6.5
1 package appl.parallel.spmd;
2
3 import java.rmi.RemoteException;
4 import java.util.HashMap;
5 import java.util.Vector;
6 import java.util.concurrent.ExecutionException;
7 import java.util.concurrent.ExecutorService;
8 import java.util.concurrent.Executors;
9 import java.util.concurrent.Future;
10
11 import org.apache.log4j.LogManager;
12 import org.apache.log4j.Logger;
13
14 import appl.ext.XuluConfig;
15 import appl.parallel.ComputingResourceContainer;
16 import appl.parallel.ComputingResourceProperties;
17 import appl.parallel.client.ClientDataServer;
18 import appl.parallel.client.RemoteEventHandler;
19 import appl.parallel.data.PartitionDataHandler;
20 import appl.parallel.data.PartitionHandlerFactory;
21 import appl.parallel.event.CommEvent.CommType;
22 import appl.parallel.event.CommEventSink;
23 import appl.parallel.model.AbstractParallelStepModel;
24 import appl.parallel.server.PartitionDataServer;
25 import appl.parallel.server.SPMDResource;
26 import appl.parallel.spmd.split.AbstractSplitMap;
27 import appl.parallel.spmd.split.AbstractSplitMap.NeighborhoodBoxingMode;
28 import appl.parallel.spmd.split.SinglePartitionInfo;
29 import appl.parallel.spmd.split.SplitMap;
30 import appl.parallel.spmd.split.SplittableResource;
31 import appl.parallel.thread.ComputingResourceThread;
32 import appl.parallel.thread.DataServerThread;
33 import edu.bonn.xulu.plugin.data.grid.MultiGrid;
34
35 /**
36 * This class controls all the parallelization action on the client side and is
37 * the counterpart to {@link SPMDServerController}. It is accessed by the model
38 * developer by retrieving the {@link SPMDClientInterface} from the
39 * {@link AbstractParallelStepModel}.
40 *
41 * @author Dominik Appl
42 */
43 public class SPMDClientController implements SPMDClientInterface {
44
45 /**
46 * There are two states: <br>
47 * <br>
48 * <b>STATE.INIT</b> is the initializing state. In this state it is
49 * allowed:<br>
50 * to add resources to split control<br>
51 * change neighborhodRange/boxing modes/reference resource<br>
52 * <br>
53 * All other methods are disabled and will throw
54 * {@link UnsupportedOperationException}<br>
55 * <br>
56 * <b>STATE.RUN</b> is the running stage. All methods of {@link STATE#INIT}
57 * are disabled and will throw {@link UnsupportedOperationException}. This
58 * mode is automatically set by the first call to
59 * {@link SPMDClientController#runSPMDModelTask(SPMDTask, Object...)}
60 */
61 public enum STATE {
62 INIT, RUN
63 }
64
65 private STATE state = STATE.INIT;
66
67 private final String splitMapClassPropertyName = "Parallel.splitmapforclass";
68
69 private final Logger LOG = LogManager.getLogger(this.getClass().getName());
70
71 /**
72 * participating servers
73 */
74 private final SPMDResource[] servers;
75
76 private final ComputingResourceProperties[] serverInfos;
77
78 /**
79 * stores the {@link PartitionDataServer DataServers} of the resources. They
80 * are retrieved, after all resources are submitted to split control.
81 */
82 private final PartitionDataServer[] dataServers;;
83
84 /**
85 * number of partitions (== no of participating resources)
86 */
87 private final int noOfPartitions;
88
89 /**
90 * contains the {@link SinglePartitionInfo} of the
91 * {@link SplittableResource splitted resources} for each server
92 */
93 private final Vector<SinglePartitionInfo>[] singlePartitionInfos;
94
95 private final Vector<SinglePartitionInfo>[] toTransferPartitionInfos;
96
97 private HashMap<String, Object> toTransferBaseParameters;
98
99 private final HashMap<String, MultiDataInfo> multiDataInfos = new HashMap<String, MultiDataInfo>(
100 10);
101
102 private final HashMap<String, MultiDataInfo> toTransferMultiDataObjects = new HashMap<String, MultiDataInfo>(
103 10);
104
105 /** The IP-Addresses of the resources */
106 private String[] IPs;
107
108 /***************************************************************************
109 * This Vector contains all IDs of the resources, which are currently under
110 * splitControl
111 *
112 * @see #addToSplitControl(SplittableResource, String)
113 * @see #mergePartition(SplittableResource)
114 **************************************************************************/
115 private Vector<Integer> listOfAllActivelyControlledData = new Vector<Integer>();
116
117 /**
118 * @see AbstractSplitMap.NeighborhoodBoxingMode
119 */
120 private NeighborhoodBoxingMode boxingMode = AbstractSplitMap.NeighborhoodBoxingMode.inBoxing;
121
122 /** current neighborhood range, default is 0 */
123 private int neighborhoodRange = 0;
124
125 private final ClientDataServer spmdClient;
126
127 /**
128 * Local calculation bounds on server side are calculated using the
129 * reference resource This is the ID of that resource
130 */
131 protected int referenceResourceID = -1;
132
133 /** Thread execution pool */
134 private final ExecutorService executor;
135
136 /** true if there is data which has to be transfered to the servers */
137 private boolean dataToTransfer = false;
138
139 private SplitMap splitMap;
140
141 private final CommEventSink eventProxy;
142
143 private final double[] weights;
144
145 /**
146 * Creates a new Client controller.
147 *
148 * @param computingResources
149 * the resources which are used by this controller
150 * @param spmdClient
151 * the spmd client responsible for the data retrieval
152 * @param weights
153 * the weights for the distribution over the computing resources
154 * (values with a sum of 1) or null (distribution will be
155 * average)
156 * @param eventProxy
157 * a {@link RemoteEventHandler} for eventHandling
158 */
159 @SuppressWarnings("unchecked")
160 public SPMDClientController(
161 Vector<ComputingResourceContainer> computingResources,
162 double[] weights, ClientDataServer spmdClient,
163 CommEventSink eventProxy) {
164 if (weights == null)
165 weights = averageWeights(computingResources.size());
166 this.weights = weights;
167
168 this.eventProxy = eventProxy;
169 this.noOfPartitions = computingResources.size();
170 if (noOfPartitions == 0)
171 throw new UnsupportedOperationException(
172 "No Computing Ressources found");
173
174 this.spmdClient = spmdClient;
175 dataServers = new PartitionDataServer[noOfPartitions];
176 serverInfos = new ComputingResourceProperties[noOfPartitions];
177 servers = new SPMDResource[noOfPartitions];
178 for (int i = 0; i < computingResources.size(); i++) {
179 serverInfos[i] = computingResources.get(i).getInformation();
180 servers[i] = (SPMDResource) computingResources.get(i).getResource();
181
182 }
183 // extract the IPs
184 IPs = new String[noOfPartitions];
185 for (int i = 0; i < IPs.length; i++) {
186 IPs[i] = serverInfos[i].getIP();
187 IPs[i] = serverInfos[i].getPort() == null ? IPs[i] : IPs[i] + ":"
188 + serverInfos[i].getPort();
189
190 if (IPs[i] == null)
191 LOG
192 .fatal("The IP-Information of a computingRessource was NULL. "
193 + "This will result in failure of the computation!");
194 }
195
196 // initialize the singlePartitionInfos and tasks. Notice that the array
197 // singlePartitionInfos is final.
198 // Each vector of infos is permanently associated with one server.
199 this.singlePartitionInfos = new Vector[noOfPartitions];
200 this.toTransferPartitionInfos = new Vector[noOfPartitions];
201 this.toTransferBaseParameters = new HashMap<String, Object>();
202 for (int i = 0; i < noOfPartitions; i++) {
203 singlePartitionInfos[i] = new Vector<SinglePartitionInfo>();
204 toTransferPartitionInfos[i] = new Vector<SinglePartitionInfo>();
205 }
206
207 // initialize executor
208 executor = Executors.newCachedThreadPool();
209
210 // connect to all participating servers
211 connectAll();
212 // initialize DataServers (must be after connect)
213 for (int i = 0; i < noOfPartitions; i++)
214 try {
215 dataServers[i] = servers[i].createDataServer(IPs);
216 } catch (RemoteException e) {
217 LOG.fatal(e);
218 e.printStackTrace();
219 }
220 }
221
222 private double[] averageWeights(int noResources) {
223 // create weighted rating with the same weight
224 double[] weights = new double[noResources];
225 for (int i = 0; i < weights.length; i++) {
226 weights[i] = 1 / noResources;
227 }
228 return weights;
229 }
230
231 /*
232 * (non-Javadoc)
233 *
234 * @see appl.parallel.spmd.SPMDClientInterface#addBaseParameter(java.lang.Object,
235 * java.lang.String)
236 */
237 public void addBaseParameter(Object parameter, String parameterName) {
238 toTransferBaseParameters.put(parameterName, parameter);
239 dataToTransfer = true;
240 }
241
242 /*
243 * (non-Javadoc)
244 *
245 * @see appl.parallel.spmd.SPMDClientInterface#addBaseParameters(java.lang.Object[],
246 * java.lang.String[])
247 */
248 public void addBaseParameters(Object[] parameters, String[] parameterNames) {
249 for (int i = 0; i < parameterNames.length; i++) {
250 addBaseParameter(parameters[i], parameterNames[i]);
251 }
252 }
253
254 /*
255 * (non-Javadoc)
256 *
257 * @see appl.parallel.spmd.SPMDClientInterface#addToMultiDataSplitControl(java.lang.Object[],
258 * java.lang.String)
259 */
260 public MultiDataObject addToMultiDataSplitControl(
261 Object splittableResources[], String name) {
262 if (splittableResources.length == 0) {
263 throw new UnsupportedOperationException(
264 "There must be at least one resource to create a MultiData Element");
265 }
266 // Check if adding is allowed:
267 checkState(STATE.INIT);
268 SplittableResource[] resources = checkSplittableArray(splittableResources);
269
270 // add each element to splitcontrol. The constant gives each element a
271 // unique name and identifies it on the server side as belonging to a multisplit
272
273 MultiDataInfo multi = new MultiDataInfo(new int[0], name);
274 for (int i = 0; i < resources.length; i++) {
275 multi.addElement(resources[i].getRootID());
276 addToSplitControl(resources[i], MultiDataInfo.getNameWithIdx(i,
277 name));
278 }
279 toTransferMultiDataObjects.put(name, multi);
280 multiDataInfos.put(name, multi);
281 dataToTransfer = true;
282 return new MultiDataObject(multi, spmdClient);
283 }
284
285 /*
286 * (non-Javadoc)
287 *
288 * @see appl.parallel.spmd.SPMDClientInterface#addToMultiDataSplitControl(edu.bonn.xulu.plugin.data.grid.MultiGrid,
289 * java.lang.String)
290 */
291 public MultiDataObject addToMultiDataSplitControl(MultiGrid multiGrid,
292 String name) {
293 MultiDataObject dataObject = addToMultiDataSplitControl(multiGrid
294 .toArray(), name);
295 dataObject.setManagedGrid(multiGrid);
296 return dataObject;
297 }
298
299 /*
300 * (non-Javadoc)
301 *
302 * @see appl.parallel.spmd.SPMDClientInterface#addToSplitControl(java.lang.Object,
303 * java.lang.String)
304 */
305 public void addToSplitControl(Object splittableResource, String name) {
306
307 SplittableResource resource = checkSplittable(splittableResource);
308 // Check if adding is allowed:
309 checkState(STATE.INIT);
310 // if first call, than this is the reference for now - a map will be
311 // generated
312 if (referenceResourceID == -1)
313 setReferenceResource(resource);
314
315 // add the data to the SPMDClient for server retrieval
316 spmdClient.addData(resource);
317 // add the the resource to the controlled resources
318 listOfAllActivelyControlledData.add(resource.getRootID());
319 // make the singlePartitionInfos for each participating server and store
320 // the
321 // info for later use
322
323 SplitMap map = getSplitMap();
324 // create a partition info for each Server (only the
325 // splitMapPosition differs)
326 for (int i = 0; i < singlePartitionInfos.length; i++) {
327 PartitionDataHandler loader = PartitionHandlerFactory.newInstance(
328 resource.getRootID(), spmdClient,
329 map.getPartitionBounds(i), map
330 .getPartitionCalculationBounds(i));
331 SinglePartitionInfo info = new SinglePartitionInfo(resource
332 .getRootID(), name, loader, map, i);
333 singlePartitionInfos[i].add(info);
334 toTransferPartitionInfos[i].add(info);
335 dataToTransfer = true;
336 }
337 }
338
339 /**
340 * Checks if the given object is an instance of {@link SplittableResource}
341 * and gives it back as splittable.
342 *
343 * @param splittableResource
344 * @throws UnsupportedOperationException
345 * if not instance of {@link SplittableResource}
346 * @return the object as {@link SplittableResource}
347 */
348 protected SplittableResource checkSplittable(Object splittableResource) {
349 if (!(splittableResource instanceof SplittableResource))
350 throw new UnsupportedOperationException(
351 "Operation failed: the argument for 'addToSplitControl' \n "
352 + "must be an instance of SplittableResource! (like e.g. SplittableLLProxyGrid).\n "
353 + "You can add non splittable Objects as Parameters of the SPMDTasks! For arrays"
354 + "of SplittableResources use addToMultiSplitControl");
355 return (SplittableResource) splittableResource;
356 }
357
358 /**
359 * checks if every object of the array is an instance of
360 * {@link SplittableResource} and gives it back as splittable
361 *
362 * @param splittableResources
363 * an array of (hopefully)
364 * {@link SplittableResource SplittableResources}
365 * @throws UnsupportedOperationException
366 * if not all elements are instances of
367 * {@link SplittableResource} or if the splitHeights or
368 * SplitWidths do not match
369 * @return the object as {@link SplittableResource} array
370 */
371 protected SplittableResource[] checkSplittableArray(
372 Object[] splittableResources) {
373 SplittableResource[] res = new SplittableResource[splittableResources.length];
374 for (int i = 0; i < splittableResources.length; i++) {
375 if (!(splittableResources[i] instanceof SplittableResource))
376 throw new UnsupportedOperationException(
377 "Operation failed: the argument must be an instance "
378 + "of SplittableResource! (like e.g. SplittableLLProxyGrid).\n "
379 + "You can add non splittable Objects as Parameters of the SPMDTasks!");
380 res[i] = (SplittableResource) splittableResources[i];
381 }
382 // check if the splitLengths match
383 if (res.length == 0)
384 return res;
385 int splitHeight = res[0].getSplitHeight();
386 int splitWidth = res[0].getSplitWidth();
387 for (SplittableResource resource : res) {
388 if (resource.getSplitHeight() != splitHeight
389 || resource.getSplitWidth() != splitWidth)
390 throw new UnsupportedOperationException(
391 "Operation Failed: Splitvalues (height/width) of the array elements do not match!");
392 }
393 return res;
394 }
395
396 /**
397 * Throws a {@link UnsupportedOperationException} if the state does not
398 * match the required state
399 *
400 * @param requiredState
401 * the required state
402 */
403 private void checkState(STATE requiredState) {
404 if (requiredState != state)
405 throw new UnsupportedOperationException(
406 "This Operation is not available: " + "You are in state"
407 + state + " but you should be in state "
408 + requiredState + ". See documentation of "
409 + state.getClass().getName()
410 + " for more information ");
411 }
412
413 /**
414 * disconnects from all servers
415 */
416 public void close() {
417 disconnectAll();
418 }
419
420 /**
421 * connects to all servers
422 */
423 @SuppressWarnings("unchecked")
424 private void connectAll() {
425 checkState(STATE.INIT);
426 long l = System.currentTimeMillis();
427 Future[] futureResults = new Future[servers.length];
428 for (int i = 0; i < servers.length; i++) {
429 futureResults[i] = executor.submit(new ComputingResourceThread(
430 servers[i], serverInfos[i], null, CommType.CONNECT,
431 eventProxy, true) {
432 public Object run() throws Exception {
433 getServer().connect();
434 return null;
435 }
436 });
437 }
438 try {
439 // wait for threads to finish
440 for (Future future : futureResults) {
441 future.get();
442 }
443 } catch (Exception e) {
444 // TODO Auto-generated catch block
445 e.printStackTrace();
446 }
447 System.out.println("Connect time: " + (System.currentTimeMillis() - l)
448 + " ms");
449 }
450
451 /**
452 * disconnect from all servers
453 */
454 private void disconnectAll() {
455 Future[] futureResults = new Future[servers.length];
456 for (int i = 0; i < servers.length; i++) {
457 futureResults[i] = executor.submit(new ComputingResourceThread(
458 servers[i], serverInfos[i], null, CommType.DISCONNECT,
459 eventProxy, true) {
460 public Object run() throws Exception {
461 getServer().disconnect();
462 return null;
463 }
464 });
465 }
466 try {
467 // wait threads to finish
468 for (Future future : futureResults) {
469 future.get();
470 }
471 } catch (Exception e) {
472 // TODO Auto-generated catch block
473 e.printStackTrace();
474 }
475
476 }
477
478 /**
479 * @return the actual splitmap. The referenceResource must be set before
480 * call!
481 */
482 private SplitMap getSplitMap() {
483 if (referenceResourceID == -1) {
484 LOG.error("NO MAP CREATED YET!");
485 return null;
486 } else
487 return splitMap;
488 }
489
490 /**
491 * @return the current state
492 */
493 public STATE getState() {
494 return state;
495 }
496
497 /**
498 * Creates a {@link SplitMap} for the specified resource. Use
499 * {@link #getSplitMap()} for retrieval.
500 *
501 * @param splittable
502 * the resource, for which the {@link SplitMap} is created
503 */
504 private void makeSplitMap(SplittableResource splittable) {
505 SplitMap map = null;
506
507 // get splitMap implementation for this splittable from XuluConfig
508 String classname = XuluConfig.getXuluConfig().getProperty(
509 splitMapClassPropertyName + "."
510 + splittable.getClass().getSimpleName());
511 // if no entry was found lookup default splitter
512 if (classname == null)
513 classname = XuluConfig.getXuluConfig().getProperty(
514 splitMapClassPropertyName + "." + "default");
515 try {
516 map = (SplitMap) Class.forName(classname).newInstance();
517 map.setParameters(splittable, neighborhoodRange, noOfPartitions,
518 boxingMode);
519 map.setWeights(weights);
520 map.makeMap();
521 } catch (Exception e) {
522 String error = "Could not create Splitmap from classname : '"
523 + classname + "' out of property '"
524 + splitMapClassPropertyName + "'. Nested errormessage is :"
525 + e.getMessage();
526 LOG.fatal(error, e);
527 throw new UnsupportedOperationException(error);
528
529 }
530 this.splitMap = map;
531
532 }
533
534 /*
535 * (non-Javadoc)
536 *
537 * @see appl.parallel.spmd.SPMDClientInterface#mergeAllPartitions()
538 */
539 @SuppressWarnings("unchecked")
540 public synchronized void mergeAllPartitions() {
541 // clone the list first, because mergePartition(int) removes elements
542 // from the list, which causes
543 // problems with the for-each loop
544 Vector<Integer> activeIDs = (Vector<Integer>) listOfAllActivelyControlledData
545 .clone();
546 // Vector<Future> futures = new Vector<Future>();
547 // for (Integer id : activeIDs) {
548 // futures.add(executor.submit(new DataServerThread(null, id) {
549 // public Object call() throws Exception {
550 // mergePartition(getIntArgument());
551 // return null;
552 // }
553 // }));
554 // }
555 // // wait on tasks to finish
556 // for (Future future : futures) {
557 // try {
558 // future.get();
559 // } catch (Exception e) {
560 // e.printStackTrace();
561 // }
562 //
563 // }
564 for (Integer id : activeIDs) {
565 mergePartition((int) id);
566 }
567 }
568
569 /*
570 * (non-Javadoc)
571 *
572 * @see appl.parallel.spmd.SPMDClientInterface#mergeMultiData(appl.parallel.spmd.MultiDataObject)
573 */
574 public synchronized void mergeMultiData(MultiDataObject multidata) {
575 // the problem is that perhaps on serverside grids were added, but not
576 // on client side. It is assumed that all servers have created the
577 // same number of grids with the same names (which is assured by
578 // multiDataInfo)
579
580 // first lookup the local multiData info and the info of any
581 // (here the first) dataserver multiInfo - as i said: all should be the
582 // same
583 MultiDataInfo localInfo = multidata.getMultiInfo();
584 String name = multidata.getName();
585 MultiDataInfo remoteInfo;
586 try {
587 // all remote infos should be the same, so it is enough to retrieve
588 // the first one
589 remoteInfo = dataServers[0].getMultiDataInfo(name);
590 if (localInfo == null || remoteInfo == null) {
591 LOG.error("Could not lookup MultidataInfo with name " + name
592 + ". localInfo was " + localInfo + " remote info was "
593 + remoteInfo);
594 return;
595 }
596 // merge the ids which are there on both sides:
597 int i = 0;
598 for (; i < localInfo.getCount(); i++)
599 mergePartition(localInfo.getMultiID(i));
600 // merge the new partitions
601 for (; i < remoteInfo.getCount(); i++) {
602 // get the new ID (the same as on server side)
603 int newID = remoteInfo.getMultiID(i);
604 // first create the new data element out of the first element in
605 // the list:
606 multidata.addElement(newID);
607 SplittableResource newResource = (SplittableResource) multidata
608 .getElement(multidata.getCount() - 1);
609 spmdClient.addData(newResource);
610 // now merge
611 mergePartition(newID);
612 }
613 } catch (RemoteException e) {
614 LOG.error("ERROR while trying to merge multi data: "
615 + e.getMessage(), e);
616 }
617 }
618
619 /*
620 * (non-Javadoc)
621 *
622 * @see appl.parallel.spmd.SPMDClientInterface#mergeMultiData(appl.parallel.spmd.MultiDataObject,
623 * int)
624 */
625 public void mergeMultiData(MultiDataObject multidata, int idx) {
626 // the problem is that perhaps on serverside grids were added, but not
627 // on client side. It is assumed that all servers have created the
628 // same number of grids with the same names (which is assured by
629 // multiDataInfo)
630
631 // first lookup the local multiData info and the info of any
632 // (here the first) dataserver multiInfo - as i said: all should be the
633 // same
634 MultiDataInfo localInfo = multidata.getMultiInfo();
635 String name = multidata.getName();
636 MultiDataInfo remoteInfo;
637 try {
638 // all remote infos should be the same, so it is enough to retrieve
639 // the first one
640 remoteInfo = dataServers[0].getMultiDataInfo(name);
641 if (localInfo == null || remoteInfo == null) {
642 LOG.error("Could not lookup MultidataInfo with name " + name
643 + ". localInfo was " + localInfo + " remote info was "
644 + remoteInfo);
645 return;
646 }
647 // merge the grid with the given index
648 // for a local idx:
649 if (idx < localInfo.getCount())
650 mergePartition(localInfo.getMultiID(idx));
651 // else create a new grid
652 else {
653 // get the new ID (the same as on server side)
654 int newID = remoteInfo.getMultiID(idx);
655 if (newID == 0)
656 throw new UnsupportedOperationException(
657 "For the requested index (" + idx
658 + ") was no grid found on the servers");
659 // first create the new data element out of the first element in
660 // the list:
661 multidata.addElement(newID);
662 SplittableResource newResource = (SplittableResource) multidata
663 .getElement(multidata.getCount() - 1);
664 spmdClient.addData(newResource);
665 // now merge the serverdata into the new clientgrid
666 mergePartition(newID);
667 }
668 } catch (RemoteException e) {
669 LOG.error("ERROR while trying to merge multi data: "
670 + e.getMessage(), e);
671 }
672 }
673
674 /*
675 * (non-Javadoc)
676 *
677 * @see appl.parallel.spmd.SPMDClientInterface#mergePartition(int)
678 */
679 public synchronized void mergePartition(int rootID) {
680 checkState(STATE.RUN);
681 Future[] futures = new Future[noOfPartitions];
682 for (int i = 0; i < noOfPartitions; i++)
683 futures[i] = executor.submit(new DataServerThread(dataServers[i],
684 serverInfos[i], rootID, CommType.CLIENT_MERGE, eventProxy) {
685 public Object run() throws Exception {
686 getServer().unloadToSource(getIntArgument());
687 return null;
688 }
689 });
690 // wait on tasks to finish:
691 try {
692 // wait threads to finish
693 for (Future future : futures) {
694 future.get();
695 }
696 } catch (Exception e) {
697 // TODO Auto-generated catch block
698 e.printStackTrace();
699 }
700 listOfAllActivelyControlledData.remove((Integer) rootID);
701 }
702
703 /*
704 * (non-Javadoc)
705 *
706 * @see appl.parallel.spmd.SPMDClientInterface#mergePartition(java.lang.Object)
707 */
708 public void mergePartition(Object splittableResource) {
709 checkSplittable(splittableResource);
710 checkState(STATE.RUN);
711 mergePartition(((SplittableResource) splittableResource).getRootID());
712 }
713
714 /*
715 * (non-Javadoc)
716 *
717 * @see appl.parallel.spmd.SPMDClientInterface#runSPMDModelTask(appl.parallel.spmd.SPMDTask,
718 * java.lang.Object[])
719 */
720 @SuppressWarnings("unchecked")
721 public Object[] runSPMDModelTask(final SPMDTask task, Object... parameters)
722 throws Throwable {
723 state = STATE.RUN;
724 // transfer needed data
725 if (dataToTransfer) {
726 transferDataToServers();
727 }
728 // submit task and arguments to the resources
729 // each task is calculated in its own thread;
730 // create tasks
731 parameters = (parameters == null) ? new Object[0] : parameters;
732 try {
733 Future futureResults[] = new Future[servers.length];
734 for (int i = 0; i < servers.length; i++) {
735 futureResults[i] = executor.submit(new ComputingResourceThread(
736 servers[i], serverInfos[i], new Object[] { task,
737 referenceResourceID, parameters },
738 CommType.CLIENT_EXECUTION, eventProxy) {
739 public Object run() throws Exception {
740 SPMDResource server = (SPMDResource) getServer();
741 return server.runSPMDModelTask(task.getClass()
742 .getName(),
743 (Integer) getObjectArrayArgument()[1],
744 (Object[]) getObjectArrayArgument()[2]);
745 }
746 });
747 }
748
749 Vector<Object> results = new Vector<Object>(15);
750
751 // aim of the next loop is to get single Object-array containing all
752 // results!
753 // Notice that every server result is a result array.
754 // This is because serverexecution can happen in multiple threads
755 // (when multiple processors are available) each serverthread will
756 // produce its own result
757 for (int i = 0; i < noOfPartitions; i++) {
758 // wait for threads finished and collect results
759 Object[] result = (Object[]) futureResults[i].get();
760 for (int j = 0; j < result.length; j++)
761 results.add(result[j]);
762 }
763 return results.toArray();
764
765 } catch (ExecutionException e) {
766 LOG.fatal("Error while trying to execute task "
767 + task.getClass().getName() + ": " + e.getMessage(), e);
768 throw e.getCause();
769 } catch (Exception e) {
770 LOG.error("Error while trying to execute task "
771 + task.getClass().getName() + ": " + e.getMessage(), e);
772 e.printStackTrace();
773 }
774 return null;
775
776 }
777
778 /*
779 * (non-Javadoc)
780 *
781 * @see appl.parallel.spmd.SPMDClientInterface#setBoxingMode(appl.parallel.spmd.split.AbstractSplitMap.NeighborhoodBoxingMode)
782 */
783 public void setBoxingMode(AbstractSplitMap.NeighborhoodBoxingMode boxingMode) {
784 checkState(STATE.INIT);
785 this.boxingMode = boxingMode;
786 }
787
788 /*
789 * (non-Javadoc)
790 *
791 * @see appl.parallel.spmd.SPMDClientInterface#setNeighborhoodRange(int)
792 */
793 public void setNeighborhoodRange(int neighborhoodRange) {
794 checkState(STATE.INIT);
795 this.neighborhoodRange = neighborhoodRange;
796 }
797
798 /*
799 * (non-Javadoc)
800 *
801 * @see appl.parallel.spmd.SPMDClientInterface#setReferenceResource(java.lang.Object)
802 */
803 public void setReferenceResource(Object splittableResource) {
804 checkSplittable(splittableResource);
805 SplittableResource res = (SplittableResource) splittableResource;
806 checkState(STATE.INIT);
807 if (res instanceof SplittableResource) {
808 referenceResourceID = res.getRootID();
809 makeSplitMap(res);
810 } else
811 LOG
812 .warn("Set reference failed: given resource was not an instance of SplittableResource!");
813
814 }
815
816 /**
817 * transfers all collected data to the servers. (The data is collected to
818 * save communication time)
819 */
820 private void transferDataToServers() {
821 long time = System.currentTimeMillis();
822 Vector<Future> futures = new Vector<Future>();
823 if (toTransferPartitionInfos[0].size() > 0)
824 for (int i = 0; i < servers.length; i++) {
825 futures.add(executor.submit(new DataServerThread(
826 dataServers[i], serverInfos[i],
827 toTransferPartitionInfos[i],
828 CommType.TRANSFER_METADATA, eventProxy, true) {
829 public Object run() throws Exception {
830 getServer()
831 .addPartitionInfos(
832 (Vector<SinglePartitionInfo>) getObjectArgument());
833 return null;
834 }
835 }));
836 }
837 if (toTransferMultiDataObjects.size() > 0)
838 for (int i = 0; i < servers.length; i++) {
839 futures.add(executor.submit(new DataServerThread(
840 dataServers[i], serverInfos[i],
841 toTransferMultiDataObjects, CommType.TRANSFER_METADATA,
842 eventProxy, true) {
843 public Object run() throws Exception {
844 getServer()
845 .addMultiDataInfos(
846 (HashMap<String, MultiDataInfo>) getObjectArgument());
847 return null;
848 }
849 }));
850 }
851 if (toTransferBaseParameters.size() > 0)
852 for (int i = 0; i < servers.length; i++) {
853 futures.add(executor.submit(new DataServerThread(
854 dataServers[i], serverInfos[i],
855 toTransferBaseParameters, CommType.TRANSFER_PARAMETERS,
856 eventProxy) {
857 public Object run() throws Exception {
858 getServer().updateBaseParameter(
859 (HashMap) getObjectArgument());
860 return null;
861 }
862 }));
863 }
864 // wait on threads to finish
865 for (Future future : futures) {
866 try {
867 future.get();
868 } catch (InterruptedException e) {
869 LOG.error(e);
870 e.printStackTrace();
871 } catch (ExecutionException e) {
872 LOG.error(e);
873 e.printStackTrace();
874 }
875 }
876 dataToTransfer = false;
877 toTransferBaseParameters.clear();
878 toTransferBaseParameters.clear();
879 toTransferMultiDataObjects.clear();
880 if (LOG.isDebugEnabled())
881 LOG.debug("Transfered Data to clients in "
882 + (System.currentTimeMillis() - time) / 1000000 + " ms");
883 }
884
885 /*
886 * (non-Javadoc)
887 *
888 * @see appl.parallel.spmd.SPMDClientInterface#updateNeighborhood(java.lang.Object)
889 */
890 public void updateNeighborhood(Object splittableResource) {
891 checkSplittable(splittableResource);
892 SplittableResource res = (SplittableResource) splittableResource;
893 updateNeighborhood(res.getRootID());
894 }
895
896 /*
897 * (non-Javadoc)
898 *
899 * @see appl.parallel.spmd.SPMDClientInterface#updateNeighborhood(appl.parallel.spmd.MultiDataObject,
900 * int)
901 */
902 public void updateNeighborhood(MultiDataObject multiDataObject, int index) {
903 this.updateNeighborhood(multiDataObject.getMultiInfo()
904 .getMultiID(index));
905 }
906
907 /**
908 * updates the neighborhood of the splittable with the given id on the
909 * servers
910 */
911 private void updateNeighborhood(int rootID) {
912 checkState(STATE.RUN);
913 Future[] futureResults = new Future[servers.length];
914 for (int i = 0; i < servers.length; i++) {
915 futureResults[i] = executor.submit(new DataServerThread(
916 dataServers[i], serverInfos[i], rootID,
917 CommType.CLIENT_UPDATE, eventProxy) {
918 public Object run() throws Exception {
919 getServer().updateFromNeighbors(getIntArgument());
920 return null;
921 }
922 });
923 }
924 try {
925 // wait threads to finish
926 for (Future future : futureResults) {
927 future.get();
928 }
929 } catch (Exception e) {
930 // TODO Auto-generated catch block
931 e.printStackTrace();
932 }
933 }
934 }

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26