/[xulu]/trunk/src/appl/parallel/spmd/SPMDClientController.java
ViewVC logotype

Contents of /trunk/src/appl/parallel/spmd/SPMDClientController.java

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2 - (show annotations)
Wed Feb 25 11:54:01 2009 UTC (15 years, 9 months ago) by mojays
File size: 31515 byte(s)
First Commit, corresponds to Revision 1008 of Wikisquare-SVN 
1 package appl.parallel.spmd;
2
3 import java.net.InetAddress;
4 import java.rmi.RemoteException;
5 import java.util.HashMap;
6 import java.util.Vector;
7 import java.util.concurrent.Callable;
8 import java.util.concurrent.ExecutionException;
9 import java.util.concurrent.ExecutorService;
10 import java.util.concurrent.Executors;
11 import java.util.concurrent.Future;
12 import java.util.concurrent.TimeUnit;
13
14 import org.apache.log4j.LogManager;
15 import org.apache.log4j.Logger;
16
17 import edu.bonn.xulu.plugin.data.grid.MultiGrid;
18 import appl.data.DataLoader;
19 import appl.ext.XuluConfig;
20 import appl.parallel.ComputingResource;
21 import appl.parallel.ComputingResourceContainer;
22 import appl.parallel.ComputingResourceProperties;
23 import appl.parallel.client.RemoteEventHandler;
24 import appl.parallel.client.ClientDataServer;
25 import appl.parallel.data.PartitionDataHandler;
26 import appl.parallel.data.PartitionHandlerFactory;
27 import appl.parallel.data.XuluClientLoader;
28 import appl.parallel.event.CommEvent;
29 import appl.parallel.event.CommEventSink;
30 import appl.parallel.event.TimeEvent;
31 import appl.parallel.event.TimeMonitor;
32 import appl.parallel.event.TransferEvent;
33 import appl.parallel.event.TransferMonitor;
34 import appl.parallel.event.CommEvent.CommType;
35 import appl.parallel.server.PartitionDataServer;
36 import appl.parallel.server.SPMDResource;
37 import appl.parallel.spmd.split.AbstractSplitMap;
38 import appl.parallel.spmd.split.DataPartition;
39 import appl.parallel.spmd.split.SinglePartitionInfo;
40 import appl.parallel.spmd.split.SplitMap;
41 import appl.parallel.spmd.split.SplittableResource;
42 import appl.parallel.spmd.split.AbstractSplitMap.NeighborhoodBoxingMode;
43 import appl.parallel.thread.ComputingResourceThread;
44 import appl.parallel.thread.DataServerThread;
45 import appl.parallel.model.AbstractParallelStepModel;
46
47 /**
48 * This class controls all the parallelization action on the client side and is
49 * the counterpart to {@link SPMDServerController}. It is accessed by the model
50 * developer by retrieving the {@link SPMDClientInterface} from the
51 * {@link AbstractParallelStepModel}.
52 *
53 * @author Dominik Appl
54 */
55 public class SPMDClientController implements SPMDClientInterface {
56
57 /**
58 * There are two states: <br>
59 * <br>
60 * <b>STATE.INIT</b> is the initializing state. In this state it is
61 * allowed:<br>
62 * to add resources to split control<br>
63 * change neighborhodRange/boxing modes/reference resource<br>
64 * <br>
65 * All other methods are disabled and will throw
66 * {@link UnsupportedOperationException}<br>
67 * <br>
68 * <b>STATE.RUN</b> is the running stage. All methods of {@link STATE#INIT}
69 * are disabled and will throw {@link UnsupportedOperationException}. This
70 * mode is automatically set by the first call to
71 * {@link SPMDClientController#runSPMDModelTask(SPMDTask, Object...)}
72 */
73 public enum STATE {
74 INIT, RUN
75 }
76
77 private STATE state = STATE.INIT;
78
79 private final String splitMapClassPropertyName = "Parallel.splitmapforclass";
80
81 private final Logger LOG = LogManager.getLogger(this.getClass().getName());
82
83 /**
84 * participating servers
85 */
86 private final SPMDResource[] servers;
87
88 private final ComputingResourceProperties[] serverInfos;
89
90 /**
91 * stores the {@link PartitionDataServer DataServers} of the resources. They
92 * are retrieved, after all resources are submitted to split control.
93 */
94 private final PartitionDataServer[] dataServers;;
95
96 /**
97 * number of partitions (== no of participating resources)
98 */
99 private final int noOfPartitions;
100
101 /**
102 * contains the {@link SinglePartitionInfo} of the
103 * {@link SplittableResource splitted resources} for each server
104 */
105 private final Vector<SinglePartitionInfo>[] singlePartitionInfos;
106
107 private final Vector<SinglePartitionInfo>[] toTransferPartitionInfos;
108
109 private HashMap<String, Object> toTransferBaseParameters;
110
111 private final HashMap<String, MultiDataInfo> multiDataInfos = new HashMap<String, MultiDataInfo>(
112 10);
113
114 private final HashMap<String, MultiDataInfo> toTransferMultiDataObjects = new HashMap<String, MultiDataInfo>(
115 10);
116
117 /** The IP-Addresses of the resources */
118 private String[] IPs;
119
120 /***************************************************************************
121 * This Vector contains all IDs of the resources, which are currently under
122 * splitControl
123 *
124 * @see #addToSplitControl(SplittableResource, String)
125 * @see #mergePartition(SplittableResource)
126 **************************************************************************/
127 private Vector<Integer> listOfAllActivelyControlledData = new Vector<Integer>();
128
129 /**
130 * @see AbstractSplitMap.NeighborhoodBoxingMode
131 */
132 private NeighborhoodBoxingMode boxingMode = AbstractSplitMap.NeighborhoodBoxingMode.inBoxing;
133
134 /** current neighborhood range, default is 0 */
135 private int neighborhoodRange = 0;
136
137 private final ClientDataServer spmdClient;
138
139 /**
140 * Local calculation bounds on server side are calculated using the
141 * reference resource This is the ID of that resource
142 */
143 protected int referenceResourceID = -1;
144
145 /** Thread execution pool */
146 private final ExecutorService executor;
147
148 /** true if there is data which has to be transfered to the servers */
149 private boolean dataToTransfer = false;
150
151 private SplitMap splitMap;
152
153 private final CommEventSink eventProxy;
154
155 private final double[] weights;
156
157 /**
158 * Creates a new Client controller.
159 *
160 * @param computingResources
161 * the resources which are used by this controller
162 * @param spmdClient
163 * the spmd client responsible for the data retrieval
164 * @param weights
165 * the weights for the distribution over the computing resources
166 * (values with a sum of 1) or null (distribution will be
167 * average)
168 * @param eventProxy
169 * a {@link RemoteEventHandler} for eventHandling
170 */
171 @SuppressWarnings("unchecked")
172 public SPMDClientController(
173 Vector<ComputingResourceContainer> computingResources,
174 double[] weights, ClientDataServer spmdClient,
175 CommEventSink eventProxy) {
176 if (weights == null)
177 weights = averageWeights(computingResources.size());
178 this.weights = weights;
179
180 this.eventProxy = eventProxy;
181 this.noOfPartitions = computingResources.size();
182 if (noOfPartitions == 0)
183 throw new UnsupportedOperationException(
184 "No Computing Ressources found");
185
186 this.spmdClient = spmdClient;
187 dataServers = new PartitionDataServer[noOfPartitions];
188 serverInfos = new ComputingResourceProperties[noOfPartitions];
189 servers = new SPMDResource[noOfPartitions];
190 for (int i = 0; i < computingResources.size(); i++) {
191 serverInfos[i] = computingResources.get(i).getInformation();
192 servers[i] = (SPMDResource) computingResources.get(i).getResource();
193
194 }
195 // extract the IPs
196 IPs = new String[noOfPartitions];
197 for (int i = 0; i < IPs.length; i++) {
198 IPs[i] = serverInfos[i].getIP();
199 IPs[i] = serverInfos[i].getPort() == null ? IPs[i] : IPs[i] + ":"
200 + serverInfos[i].getPort();
201
202 if (IPs[i] == null)
203 LOG
204 .fatal("The IP-Information of a computingRessource was NULL. "
205 + "This will result in failure of the computation!");
206 }
207
208 // initialize the singlePartitionInfos and tasks. Notice that the array
209 // singlePartitionInfos is final.
210 // Each vector of infos is permanently associated with one server.
211 this.singlePartitionInfos = new Vector[noOfPartitions];
212 this.toTransferPartitionInfos = new Vector[noOfPartitions];
213 this.toTransferBaseParameters = new HashMap<String, Object>();
214 for (int i = 0; i < noOfPartitions; i++) {
215 singlePartitionInfos[i] = new Vector<SinglePartitionInfo>();
216 toTransferPartitionInfos[i] = new Vector<SinglePartitionInfo>();
217 }
218
219 // initialize executor
220 executor = Executors.newCachedThreadPool();
221
222 // connect to all participating servers
223 connectAll();
224 // initialize DataServers (must be after connect)
225 for (int i = 0; i < noOfPartitions; i++)
226 try {
227 dataServers[i] = servers[i].createDataServer(IPs);
228 } catch (RemoteException e) {
229 LOG.fatal(e);
230 e.printStackTrace();
231 }
232 }
233
234 private double[] averageWeights(int noResources) {
235 // create weighted rating with the same weight
236 double[] weights = new double[noResources];
237 for (int i = 0; i < weights.length; i++) {
238 weights[i] = 1 / noResources;
239 }
240 return weights;
241 }
242
243 /*
244 * (non-Javadoc)
245 *
246 * @see appl.parallel.spmd.SPMDClientInterface#addBaseParameter(java.lang.Object,
247 * java.lang.String)
248 */
249 public void addBaseParameter(Object parameter, String parameterName) {
250 toTransferBaseParameters.put(parameterName, parameter);
251 dataToTransfer = true;
252 }
253
254 /*
255 * (non-Javadoc)
256 *
257 * @see appl.parallel.spmd.SPMDClientInterface#addBaseParameters(java.lang.Object[],
258 * java.lang.String[])
259 */
260 public void addBaseParameters(Object[] parameters, String[] parameterNames) {
261 for (int i = 0; i < parameterNames.length; i++) {
262 addBaseParameter(parameters[i], parameterNames[i]);
263 }
264 }
265
266 /*
267 * (non-Javadoc)
268 *
269 * @see appl.parallel.spmd.SPMDClientInterface#addToMultiDataSplitControl(java.lang.Object[],
270 * java.lang.String)
271 */
272 public MultiDataObject addToMultiDataSplitControl(
273 Object splittableResources[], String name) {
274 if (splittableResources.length == 0) {
275 throw new UnsupportedOperationException(
276 "There must be at least one resource to create a MultiData Element");
277 }
278 // Check if adding is allowed:
279 checkState(STATE.INIT);
280 SplittableResource[] resources = checkSplittableArray(splittableResources);
281
282 // add each element to splitcontrol. The constant gives each element a
283 // unique name and identifies it on the server side as belonging to a multisplit
284
285 MultiDataInfo multi = new MultiDataInfo(new int[0], name);
286 for (int i = 0; i < resources.length; i++) {
287 multi.addElement(resources[i].getRootID());
288 addToSplitControl(resources[i], MultiDataInfo.getNameWithIdx(i,
289 name));
290 }
291 toTransferMultiDataObjects.put(name, multi);
292 multiDataInfos.put(name, multi);
293 dataToTransfer = true;
294 return new MultiDataObject(multi, spmdClient);
295 }
296
297 /*
298 * (non-Javadoc)
299 *
300 * @see appl.parallel.spmd.SPMDClientInterface#addToMultiDataSplitControl(edu.bonn.xulu.plugin.data.grid.MultiGrid,
301 * java.lang.String)
302 */
303 public MultiDataObject addToMultiDataSplitControl(MultiGrid multiGrid,
304 String name) {
305 MultiDataObject dataObject = addToMultiDataSplitControl(multiGrid
306 .toArray(), name);
307 dataObject.setManagedGrid(multiGrid);
308 return dataObject;
309 }
310
311 /*
312 * (non-Javadoc)
313 *
314 * @see appl.parallel.spmd.SPMDClientInterface#addToSplitControl(java.lang.Object,
315 * java.lang.String)
316 */
317 public void addToSplitControl(Object splittableResource, String name) {
318
319 SplittableResource resource = checkSplittable(splittableResource);
320 // Check if adding is allowed:
321 checkState(STATE.INIT);
322 // if first call, than this is the reference for now - a map will be
323 // generated
324 if (referenceResourceID == -1)
325 setReferenceResource(resource);
326
327 // add the data to the SPMDClient for server retrieval
328 spmdClient.addData(resource);
329 // add the the resource to the controlled resources
330 listOfAllActivelyControlledData.add(resource.getRootID());
331 // make the singlePartitionInfos for each participating server and store
332 // the
333 // info for later use
334
335 SplitMap map = getSplitMap();
336 // create a partition info for each Server (only the
337 // splitMapPosition differs)
338 for (int i = 0; i < singlePartitionInfos.length; i++) {
339 PartitionDataHandler loader = PartitionHandlerFactory.newInstance(
340 resource.getRootID(), spmdClient,
341 map.getPartitionBounds(i), map
342 .getPartitionCalculationBounds(i));
343 SinglePartitionInfo info = new SinglePartitionInfo(resource
344 .getRootID(), name, loader, map, i);
345 singlePartitionInfos[i].add(info);
346 toTransferPartitionInfos[i].add(info);
347 dataToTransfer = true;
348 }
349 }
350
351 /**
352 * Checks if the given object is an instance of {@link SplittableResource}
353 * and gives it back as splittable.
354 *
355 * @param splittableResource
356 * @throws UnsupportedOperationException
357 * if not instance of {@link SplittableResource}
358 * @return the object as {@link SplittableResource}
359 */
360 protected SplittableResource checkSplittable(Object splittableResource) {
361 if (!(splittableResource instanceof SplittableResource))
362 throw new UnsupportedOperationException(
363 "Operation failed: the argument for 'addToSplitControl' \n "
364 + "must be an instance of SplittableResource! (like e.g. SplittableLLProxyGrid).\n "
365 + "You can add non splittable Objects as Parameters of the SPMDTasks! For arrays"
366 + "of SplittableResources use addToMultiSplitControl");
367 return (SplittableResource) splittableResource;
368 }
369
370 /**
371 * checks if every object of the array is an instance of
372 * {@link SplittableResource} and gives it back as splittable
373 *
374 * @param splittableResources
375 * an array of (hopefully)
376 * {@link SplittableResource SplittableResources}
377 * @throws UnsupportedOperationException
378 * if not all elements are instances of
379 * {@link SplittableResource} or if the splitHeights or
380 * SplitWidths do not match
381 * @return the object as {@link SplittableResource} array
382 */
383 protected SplittableResource[] checkSplittableArray(
384 Object[] splittableResources) {
385 SplittableResource[] res = new SplittableResource[splittableResources.length];
386 for (int i = 0; i < splittableResources.length; i++) {
387 if (!(splittableResources[i] instanceof SplittableResource))
388 throw new UnsupportedOperationException(
389 "Operation failed: the argument must be an instance "
390 + "of SplittableResource! (like e.g. SplittableLLProxyGrid).\n "
391 + "You can add non splittable Objects as Parameters of the SPMDTasks!");
392 res[i] = (SplittableResource) splittableResources[i];
393 }
394 // check if the splitLengths match
395 if (res.length == 0)
396 return res;
397 int splitHeight = res[0].getSplitHeight();
398 int splitWidth = res[0].getSplitWidth();
399 for (SplittableResource resource : res) {
400 if (resource.getSplitHeight() != splitHeight
401 || resource.getSplitWidth() != splitWidth)
402 throw new UnsupportedOperationException(
403 "Operation Failed: Splitvalues (height/width) of the array elements do not match!");
404 }
405 return res;
406 }
407
408 /**
409 * Throws a {@link UnsupportedOperationException} if the state does not
410 * match the required state
411 *
412 * @param requiredState
413 * the required state
414 */
415 private void checkState(STATE requiredState) {
416 if (requiredState != state)
417 throw new UnsupportedOperationException(
418 "This Operation is not available: " + "You are in state"
419 + state + " but you should be in state "
420 + requiredState + ". See documentation of "
421 + state.getClass().getName()
422 + " for more information ");
423 }
424
425 /**
426 * disconnects from all servers
427 */
428 public void close() {
429 disconnectAll();
430 }
431
432 /**
433 * connects to all servers
434 */
435 @SuppressWarnings("unchecked")
436 private void connectAll() {
437 checkState(STATE.INIT);
438 long l = System.currentTimeMillis();
439 Future[] futureResults = new Future[servers.length];
440 for (int i = 0; i < servers.length; i++) {
441 futureResults[i] = executor.submit(new ComputingResourceThread(
442 servers[i], serverInfos[i], null, CommType.CONNECT,
443 eventProxy, true) {
444 public Object run() throws Exception {
445 getServer().connect();
446 return null;
447 }
448 });
449 }
450 try {
451 // wait for threads to finish
452 for (Future future : futureResults) {
453 future.get();
454 }
455 } catch (Exception e) {
456 // TODO Auto-generated catch block
457 e.printStackTrace();
458 }
459 System.out.println("Connect time: " + (System.currentTimeMillis() - l)
460 + " ms");
461 }
462
463 /**
464 * disconnect from all servers
465 */
466 private void disconnectAll() {
467 Future[] futureResults = new Future[servers.length];
468 for (int i = 0; i < servers.length; i++) {
469 futureResults[i] = executor.submit(new ComputingResourceThread(
470 servers[i], serverInfos[i], null, CommType.DISCONNECT,
471 eventProxy, true) {
472 public Object run() throws Exception {
473 getServer().disconnect();
474 return null;
475 }
476 });
477 }
478 try {
479 // wait threads to finish
480 for (Future future : futureResults) {
481 future.get();
482 }
483 } catch (Exception e) {
484 // TODO Auto-generated catch block
485 e.printStackTrace();
486 }
487
488 }
489
490 /**
491 * @return the actual splitmap. The referenceResource must be set before
492 * call!
493 */
494 private SplitMap getSplitMap() {
495 if (referenceResourceID == -1) {
496 LOG.error("NO MAP CREATED YET!");
497 return null;
498 } else
499 return splitMap;
500 }
501
502 /**
503 * @return the current state
504 */
505 public STATE getState() {
506 return state;
507 }
508
509 /**
510 * Creates a {@link SplitMap} for the specified resource. Use
511 * {@link #getSplitMap()} for retrieval.
512 *
513 * @param splittable
514 * the resource, for which the {@link SplitMap} is created
515 */
516 private void makeSplitMap(SplittableResource splittable) {
517 SplitMap map = null;
518
519 // get splitMap implementation for this splittable from XuluConfig
520 String classname = XuluConfig.getXuluConfig().getProperty(
521 splitMapClassPropertyName + "."
522 + splittable.getClass().getSimpleName());
523 // if no entry was found lookup default splitter
524 if (classname == null)
525 classname = XuluConfig.getXuluConfig().getProperty(
526 splitMapClassPropertyName + "." + "default");
527 try {
528 map = (SplitMap) Class.forName(classname).newInstance();
529 map.setParameters(splittable, neighborhoodRange, noOfPartitions,
530 boxingMode);
531 map.setWeights(weights);
532 map.makeMap();
533 } catch (Exception e) {
534 String error = "Could not create Splitmap from classname : '"
535 + classname + "' out of property '"
536 + splitMapClassPropertyName + "'. Nested errormessage is :"
537 + e.getMessage();
538 LOG.fatal(error, e);
539 throw new UnsupportedOperationException(error);
540
541 }
542 this.splitMap = map;
543
544 }
545
546 /*
547 * (non-Javadoc)
548 *
549 * @see appl.parallel.spmd.SPMDClientInterface#mergeAllPartitions()
550 */
551 @SuppressWarnings("unchecked")
552 public synchronized void mergeAllPartitions() {
553 // clone the list first, because mergePartition(int) removes elements
554 // from the list, which causes
555 // problems with the for-each loop
556 Vector<Integer> activeIDs = (Vector<Integer>) listOfAllActivelyControlledData
557 .clone();
558 // Vector<Future> futures = new Vector<Future>();
559 // for (Integer id : activeIDs) {
560 // futures.add(executor.submit(new DataServerThread(null, id) {
561 // public Object call() throws Exception {
562 // mergePartition(getIntArgument());
563 // return null;
564 // }
565 // }));
566 // }
567 // // wait on tasks to finish
568 // for (Future future : futures) {
569 // try {
570 // future.get();
571 // } catch (Exception e) {
572 // e.printStackTrace();
573 // }
574 //
575 // }
576 for (Integer id : activeIDs) {
577 mergePartition((int) id);
578 }
579 }
580
581 /*
582 * (non-Javadoc)
583 *
584 * @see appl.parallel.spmd.SPMDClientInterface#mergeMultiData(appl.parallel.spmd.MultiDataObject)
585 */
586 public synchronized void mergeMultiData(MultiDataObject multidata) {
587 // the problem is that perhaps on serverside grids were added, but not
588 // on client side. It is assumed that all servers have created the
589 // same number of grids with the same names (which is assured by
590 // multiDataInfo)
591
592 // first lookup the local multiData info and the info of any
593 // (here the first) dataserver multiInfo - as i said: all should be the
594 // same
595 MultiDataInfo localInfo = multidata.getMultiInfo();
596 String name = multidata.getName();
597 MultiDataInfo remoteInfo;
598 try {
599 // all remote infos should be the same, so it is enough to retrieve
600 // the first one
601 remoteInfo = dataServers[0].getMultiDataInfo(name);
602 if (localInfo == null || remoteInfo == null) {
603 LOG.error("Could not lookup MultidataInfo with name " + name
604 + ". localInfo was " + localInfo + " remote info was "
605 + remoteInfo);
606 return;
607 }
608 // merge the ids which are there on both sides:
609 int i = 0;
610 for (; i < localInfo.getCount(); i++)
611 mergePartition(localInfo.getMultiID(i));
612 // merge the new partitions
613 for (; i < remoteInfo.getCount(); i++) {
614 // get the new ID (the same as on server side)
615 int newID = remoteInfo.getMultiID(i);
616 // first create the new data element out of the first element in
617 // the list:
618 multidata.addElement(newID);
619 SplittableResource newResource = (SplittableResource) multidata
620 .getElement(multidata.getCount() - 1);
621 spmdClient.addData(newResource);
622 // now merge
623 mergePartition(newID);
624 }
625 } catch (RemoteException e) {
626 LOG.error("ERROR while trying to merge multi data: "
627 + e.getMessage(), e);
628 }
629 }
630
631 /*
632 * (non-Javadoc)
633 *
634 * @see appl.parallel.spmd.SPMDClientInterface#mergeMultiData(appl.parallel.spmd.MultiDataObject,
635 * int)
636 */
637 public void mergeMultiData(MultiDataObject multidata, int idx) {
638 // the problem is that perhaps on serverside grids were added, but not
639 // on client side. It is assumed that all servers have created the
640 // same number of grids with the same names (which is assured by
641 // multiDataInfo)
642
643 // first lookup the local multiData info and the info of any
644 // (here the first) dataserver multiInfo - as i said: all should be the
645 // same
646 MultiDataInfo localInfo = multidata.getMultiInfo();
647 String name = multidata.getName();
648 MultiDataInfo remoteInfo;
649 try {
650 // all remote infos should be the same, so it is enough to retrieve
651 // the first one
652 remoteInfo = dataServers[0].getMultiDataInfo(name);
653 if (localInfo == null || remoteInfo == null) {
654 LOG.error("Could not lookup MultidataInfo with name " + name
655 + ". localInfo was " + localInfo + " remote info was "
656 + remoteInfo);
657 return;
658 }
659 // merge the grid with the given index
660 // for a local idx:
661 if (idx < localInfo.getCount())
662 mergePartition(localInfo.getMultiID(idx));
663 // else create a new grid
664 else {
665 // get the new ID (the same as on server side)
666 int newID = remoteInfo.getMultiID(idx);
667 if (newID == 0)
668 throw new UnsupportedOperationException(
669 "For the requested index (" + idx
670 + ") was no grid found on the servers");
671 // first create the new data element out of the first element in
672 // the list:
673 multidata.addElement(newID);
674 SplittableResource newResource = (SplittableResource) multidata
675 .getElement(multidata.getCount() - 1);
676 spmdClient.addData(newResource);
677 // now merge the serverdata into the new clientgrid
678 mergePartition(newID);
679 }
680 } catch (RemoteException e) {
681 LOG.error("ERROR while trying to merge multi data: "
682 + e.getMessage(), e);
683 }
684 }
685
686 /*
687 * (non-Javadoc)
688 *
689 * @see appl.parallel.spmd.SPMDClientInterface#mergePartition(int)
690 */
691 public synchronized void mergePartition(int rootID) {
692 checkState(STATE.RUN);
693 Future[] futures = new Future[noOfPartitions];
694 for (int i = 0; i < noOfPartitions; i++)
695 futures[i] = executor.submit(new DataServerThread(dataServers[i],
696 serverInfos[i], rootID, CommType.CLIENT_MERGE, eventProxy) {
697 public Object run() throws Exception {
698 getServer().unloadToSource(getIntArgument());
699 return null;
700 }
701 });
702 // wait on tasks to finish:
703 try {
704 // wait threads to finish
705 for (Future future : futures) {
706 future.get();
707 }
708 } catch (Exception e) {
709 // TODO Auto-generated catch block
710 e.printStackTrace();
711 }
712 listOfAllActivelyControlledData.remove((Integer) rootID);
713 }
714
715 /*
716 * (non-Javadoc)
717 *
718 * @see appl.parallel.spmd.SPMDClientInterface#mergePartition(java.lang.Object)
719 */
720 public void mergePartition(Object splittableResource) {
721 checkSplittable(splittableResource);
722 checkState(STATE.RUN);
723 mergePartition(((SplittableResource) splittableResource).getRootID());
724 }
725
726 /*
727 * (non-Javadoc)
728 *
729 * @see appl.parallel.spmd.SPMDClientInterface#runSPMDModelTask(appl.parallel.spmd.SPMDTask,
730 * java.lang.Object[])
731 */
732 @SuppressWarnings("unchecked")
733 public Object[] runSPMDModelTask(final SPMDTask task, Object... parameters)
734 throws Throwable {
735 state = STATE.RUN;
736 // transfer needed data
737 if (dataToTransfer) {
738 transferDataToServers();
739 }
740 // submit task and arguments to the resources
741 // each task is calculated in its own thread;
742 // create tasks
743 parameters = (parameters == null) ? new Object[0] : parameters;
744 try {
745 Future futureResults[] = new Future[servers.length];
746 for (int i = 0; i < servers.length; i++) {
747 futureResults[i] = executor.submit(new ComputingResourceThread(
748 servers[i], serverInfos[i], new Object[] { task,
749 referenceResourceID, parameters },
750 CommType.CLIENT_EXECUTION, eventProxy) {
751 public Object run() throws Exception {
752 SPMDResource server = (SPMDResource) getServer();
753 return server.runSPMDModelTask(task.getClass()
754 .getName(),
755 (Integer) getObjectArrayArgument()[1],
756 (Object[]) getObjectArrayArgument()[2]);
757 }
758 });
759 }
760
761 Vector<Object> results = new Vector<Object>(15);
762
763 // aim of the next loop is to get single Object-array containing all
764 // results!
765 // Notice that every server result is a result array.
766 // This is because serverexecution can happen in multiple threads
767 // (when multiple processors are available) each serverthread will
768 // produce its own result
769 for (int i = 0; i < noOfPartitions; i++) {
770 // wait for threads finished and collect results
771 Object[] result = (Object[]) futureResults[i].get();
772 for (int j = 0; j < result.length; j++)
773 results.add(result[j]);
774 }
775 return results.toArray();
776
777 } catch (ExecutionException e) {
778 LOG.fatal("Error while trying to execute task "
779 + task.getClass().getName() + ": " + e.getMessage(), e);
780 throw e.getCause();
781 } catch (Exception e) {
782 LOG.error("Error while trying to execute task "
783 + task.getClass().getName() + ": " + e.getMessage(), e);
784 e.printStackTrace();
785 }
786 return null;
787
788 }
789
790 /*
791 * (non-Javadoc)
792 *
793 * @see appl.parallel.spmd.SPMDClientInterface#setBoxingMode(appl.parallel.spmd.split.AbstractSplitMap.NeighborhoodBoxingMode)
794 */
795 public void setBoxingMode(AbstractSplitMap.NeighborhoodBoxingMode boxingMode) {
796 checkState(STATE.INIT);
797 this.boxingMode = boxingMode;
798 }
799
800 /*
801 * (non-Javadoc)
802 *
803 * @see appl.parallel.spmd.SPMDClientInterface#setNeighborhoodRange(int)
804 */
805 public void setNeighborhoodRange(int neighborhoodRange) {
806 checkState(STATE.INIT);
807 this.neighborhoodRange = neighborhoodRange;
808 }
809
810 /*
811 * (non-Javadoc)
812 *
813 * @see appl.parallel.spmd.SPMDClientInterface#setReferenceResource(java.lang.Object)
814 */
815 public void setReferenceResource(Object splittableResource) {
816 checkSplittable(splittableResource);
817 SplittableResource res = (SplittableResource) splittableResource;
818 checkState(STATE.INIT);
819 if (res instanceof SplittableResource) {
820 referenceResourceID = res.getRootID();
821 makeSplitMap(res);
822 } else
823 LOG
824 .warn("Set reference failed: given resource was not an instance of SplittableResource!");
825
826 }
827
828 /**
829 * transfers all collected data to the servers. (The data is collected to
830 * save communication time)
831 */
832 private void transferDataToServers() {
833 long time = System.currentTimeMillis();
834 Vector<Future> futures = new Vector<Future>();
835 if (toTransferPartitionInfos[0].size() > 0)
836 for (int i = 0; i < servers.length; i++) {
837 futures.add(executor.submit(new DataServerThread(
838 dataServers[i], serverInfos[i],
839 toTransferPartitionInfos[i],
840 CommType.TRANSFER_METADATA, eventProxy, true) {
841 public Object run() throws Exception {
842 getServer()
843 .addPartitionInfos(
844 (Vector<SinglePartitionInfo>) getObjectArgument());
845 return null;
846 }
847 }));
848 }
849 if (toTransferMultiDataObjects.size() > 0)
850 for (int i = 0; i < servers.length; i++) {
851 futures.add(executor.submit(new DataServerThread(
852 dataServers[i], serverInfos[i],
853 toTransferMultiDataObjects, CommType.TRANSFER_METADATA,
854 eventProxy, true) {
855 public Object run() throws Exception {
856 getServer()
857 .addMultiDataInfos(
858 (HashMap<String, MultiDataInfo>) getObjectArgument());
859 return null;
860 }
861 }));
862 }
863 if (toTransferBaseParameters.size() > 0)
864 for (int i = 0; i < servers.length; i++) {
865 futures.add(executor.submit(new DataServerThread(
866 dataServers[i], serverInfos[i],
867 toTransferBaseParameters, CommType.TRANSFER_PARAMETERS,
868 eventProxy) {
869 public Object run() throws Exception {
870 getServer().updateBaseParameter(
871 (HashMap) getObjectArgument());
872 return null;
873 }
874 }));
875 }
876 // wait on threads to finish
877 for (Future future : futures) {
878 try {
879 future.get();
880 } catch (InterruptedException e) {
881 LOG.error(e);
882 e.printStackTrace();
883 } catch (ExecutionException e) {
884 LOG.error(e);
885 e.printStackTrace();
886 }
887 }
888 dataToTransfer = false;
889 toTransferBaseParameters.clear();
890 toTransferBaseParameters.clear();
891 toTransferMultiDataObjects.clear();
892 if (LOG.isDebugEnabled())
893 LOG.debug("Transfered Data to clients in "
894 + (System.currentTimeMillis() - time) / 1000000 + " ms");
895 }
896
897 /*
898 * (non-Javadoc)
899 *
900 * @see appl.parallel.spmd.SPMDClientInterface#updateNeighborhood(java.lang.Object)
901 */
902 public void updateNeighborhood(Object splittableResource) {
903 checkSplittable(splittableResource);
904 SplittableResource res = (SplittableResource) splittableResource;
905 updateNeighborhood(res.getRootID());
906 }
907
908 /*
909 * (non-Javadoc)
910 *
911 * @see appl.parallel.spmd.SPMDClientInterface#updateNeighborhood(appl.parallel.spmd.MultiDataObject,
912 * int)
913 */
914 public void updateNeighborhood(MultiDataObject multiDataObject, int index) {
915 this.updateNeighborhood(multiDataObject.getMultiInfo()
916 .getMultiID(index));
917 }
918
919 /**
920 * updates the neighborhood of the splittable with the given id on the
921 * servers
922 */
923 private void updateNeighborhood(int rootID) {
924 checkState(STATE.RUN);
925 Future[] futureResults = new Future[servers.length];
926 for (int i = 0; i < servers.length; i++) {
927 futureResults[i] = executor.submit(new DataServerThread(
928 dataServers[i], serverInfos[i], rootID,
929 CommType.CLIENT_UPDATE, eventProxy) {
930 public Object run() throws Exception {
931 getServer().updateFromNeighbors(getIntArgument());
932 return null;
933 }
934 });
935 }
936 try {
937 // wait threads to finish
938 for (Future future : futureResults) {
939 future.get();
940 }
941 } catch (Exception e) {
942 // TODO Auto-generated catch block
943 e.printStackTrace();
944 }
945 }
946 }

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26