/[xulu]/trunk/src/appl/parallel/spmd/SPMDClientController.java
ViewVC logotype

Annotation of /trunk/src/appl/parallel/spmd/SPMDClientController.java

Parent Directory Parent Directory | Revision Log Revision Log


Revision 78 - (hide annotations)
Wed Feb 10 16:43:46 2010 UTC (14 years, 10 months ago) by alfonx
File size: 31036 byte(s)
Merged branch 1.8-gt2-2.6 to trunk. Now the trunk is based on GeoTools 2.6.1 and schmitzm-2.0.x
1 mojays 2 package appl.parallel.spmd;
2    
3     import java.rmi.RemoteException;
4     import java.util.HashMap;
5     import java.util.Vector;
6     import java.util.concurrent.ExecutionException;
7     import java.util.concurrent.ExecutorService;
8     import java.util.concurrent.Executors;
9     import java.util.concurrent.Future;
10    
11     import org.apache.log4j.LogManager;
12     import org.apache.log4j.Logger;
13    
14     import appl.ext.XuluConfig;
15     import appl.parallel.ComputingResourceContainer;
16     import appl.parallel.ComputingResourceProperties;
17 alfonx 78 import appl.parallel.client.ClientDataServer;
18 mojays 2 import appl.parallel.client.RemoteEventHandler;
19     import appl.parallel.data.PartitionDataHandler;
20     import appl.parallel.data.PartitionHandlerFactory;
21     import appl.parallel.event.CommEventSink;
22     import appl.parallel.event.CommEvent.CommType;
23 alfonx 78 import appl.parallel.model.AbstractParallelStepModel;
24 mojays 2 import appl.parallel.server.PartitionDataServer;
25     import appl.parallel.server.SPMDResource;
26     import appl.parallel.spmd.split.AbstractSplitMap;
27     import appl.parallel.spmd.split.SinglePartitionInfo;
28     import appl.parallel.spmd.split.SplitMap;
29     import appl.parallel.spmd.split.SplittableResource;
30     import appl.parallel.spmd.split.AbstractSplitMap.NeighborhoodBoxingMode;
31     import appl.parallel.thread.ComputingResourceThread;
32     import appl.parallel.thread.DataServerThread;
33 alfonx 78 import edu.bonn.xulu.plugin.data.grid.MultiGrid;
34 mojays 2
35     /**
36     * This class controls all the parallelization action on the client side and is
37     * the counterpart to {@link SPMDServerController}. It is accessed by the model
38     * developer by retrieving the {@link SPMDClientInterface} from the
39     * {@link AbstractParallelStepModel}.
40     *
41     * @author Dominik Appl
42     */
43     public class SPMDClientController implements SPMDClientInterface {
44    
45     /**
46     * There are two states: <br>
47     * <br>
48     * <b>STATE.INIT</b> is the initializing state. In this state it is
49     * allowed:<br>
50     * to add resources to split control<br>
51     * change neighborhodRange/boxing modes/reference resource<br>
52     * <br>
53     * All other methods are disabled and will throw
54     * {@link UnsupportedOperationException}<br>
55     * <br>
56     * <b>STATE.RUN</b> is the running stage. All methods of {@link STATE#INIT}
57     * are disabled and will throw {@link UnsupportedOperationException}. This
58     * mode is automatically set by the first call to
59     * {@link SPMDClientController#runSPMDModelTask(SPMDTask, Object...)}
60     */
61     public enum STATE {
62     INIT, RUN
63     }
64    
65     private STATE state = STATE.INIT;
66    
67     private final String splitMapClassPropertyName = "Parallel.splitmapforclass";
68    
69     private final Logger LOG = LogManager.getLogger(this.getClass().getName());
70    
71     /**
72     * participating servers
73     */
74     private final SPMDResource[] servers;
75    
76     private final ComputingResourceProperties[] serverInfos;
77    
78     /**
79     * stores the {@link PartitionDataServer DataServers} of the resources. They
80     * are retrieved, after all resources are submitted to split control.
81     */
82     private final PartitionDataServer[] dataServers;;
83    
84     /**
85     * number of partitions (== no of participating resources)
86     */
87     private final int noOfPartitions;
88    
89     /**
90     * contains the {@link SinglePartitionInfo} of the
91     * {@link SplittableResource splitted resources} for each server
92     */
93     private final Vector<SinglePartitionInfo>[] singlePartitionInfos;
94    
95     private final Vector<SinglePartitionInfo>[] toTransferPartitionInfos;
96    
97     private HashMap<String, Object> toTransferBaseParameters;
98    
99     private final HashMap<String, MultiDataInfo> multiDataInfos = new HashMap<String, MultiDataInfo>(
100     10);
101    
102     private final HashMap<String, MultiDataInfo> toTransferMultiDataObjects = new HashMap<String, MultiDataInfo>(
103     10);
104    
105     /** The IP-Addresses of the resources */
106     private String[] IPs;
107    
108     /***************************************************************************
109     * This Vector contains all IDs of the resources, which are currently under
110     * splitControl
111     *
112     * @see #addToSplitControl(SplittableResource, String)
113     * @see #mergePartition(SplittableResource)
114     **************************************************************************/
115     private Vector<Integer> listOfAllActivelyControlledData = new Vector<Integer>();
116    
117     /**
118     * @see AbstractSplitMap.NeighborhoodBoxingMode
119     */
120     private NeighborhoodBoxingMode boxingMode = AbstractSplitMap.NeighborhoodBoxingMode.inBoxing;
121    
122     /** current neighborhood range, default is 0 */
123     private int neighborhoodRange = 0;
124    
125     private final ClientDataServer spmdClient;
126    
127     /**
128     * Local calculation bounds on server side are calculated using the
129     * reference resource This is the ID of that resource
130     */
131     protected int referenceResourceID = -1;
132    
133     /** Thread execution pool */
134     private final ExecutorService executor;
135    
136     /** true if there is data which has to be transfered to the servers */
137     private boolean dataToTransfer = false;
138    
139     private SplitMap splitMap;
140    
141     private final CommEventSink eventProxy;
142    
143     private final double[] weights;
144    
145     /**
146     * Creates a new Client controller.
147     *
148     * @param computingResources
149     * the resources which are used by this controller
150     * @param spmdClient
151     * the spmd client responsible for the data retrieval
152     * @param weights
153     * the weights for the distribution over the computing resources
154     * (values with a sum of 1) or null (distribution will be
155     * average)
156     * @param eventProxy
157     * a {@link RemoteEventHandler} for eventHandling
158     */
159     @SuppressWarnings("unchecked")
160     public SPMDClientController(
161     Vector<ComputingResourceContainer> computingResources,
162     double[] weights, ClientDataServer spmdClient,
163     CommEventSink eventProxy) {
164     if (weights == null)
165     weights = averageWeights(computingResources.size());
166     this.weights = weights;
167    
168     this.eventProxy = eventProxy;
169     this.noOfPartitions = computingResources.size();
170     if (noOfPartitions == 0)
171     throw new UnsupportedOperationException(
172     "No Computing Ressources found");
173    
174     this.spmdClient = spmdClient;
175     dataServers = new PartitionDataServer[noOfPartitions];
176     serverInfos = new ComputingResourceProperties[noOfPartitions];
177     servers = new SPMDResource[noOfPartitions];
178     for (int i = 0; i < computingResources.size(); i++) {
179     serverInfos[i] = computingResources.get(i).getInformation();
180     servers[i] = (SPMDResource) computingResources.get(i).getResource();
181    
182     }
183     // extract the IPs
184     IPs = new String[noOfPartitions];
185     for (int i = 0; i < IPs.length; i++) {
186     IPs[i] = serverInfos[i].getIP();
187     IPs[i] = serverInfos[i].getPort() == null ? IPs[i] : IPs[i] + ":"
188     + serverInfos[i].getPort();
189    
190     if (IPs[i] == null)
191     LOG
192     .fatal("The IP-Information of a computingRessource was NULL. "
193     + "This will result in failure of the computation!");
194     }
195    
196     // initialize the singlePartitionInfos and tasks. Notice that the array
197     // singlePartitionInfos is final.
198     // Each vector of infos is permanently associated with one server.
199     this.singlePartitionInfos = new Vector[noOfPartitions];
200     this.toTransferPartitionInfos = new Vector[noOfPartitions];
201     this.toTransferBaseParameters = new HashMap<String, Object>();
202     for (int i = 0; i < noOfPartitions; i++) {
203     singlePartitionInfos[i] = new Vector<SinglePartitionInfo>();
204     toTransferPartitionInfos[i] = new Vector<SinglePartitionInfo>();
205     }
206    
207     // initialize executor
208     executor = Executors.newCachedThreadPool();
209    
210     // connect to all participating servers
211     connectAll();
212     // initialize DataServers (must be after connect)
213     for (int i = 0; i < noOfPartitions; i++)
214     try {
215     dataServers[i] = servers[i].createDataServer(IPs);
216     } catch (RemoteException e) {
217     LOG.fatal(e);
218     e.printStackTrace();
219     }
220     }
221    
222     private double[] averageWeights(int noResources) {
223     // create weighted rating with the same weight
224     double[] weights = new double[noResources];
225     for (int i = 0; i < weights.length; i++) {
226     weights[i] = 1 / noResources;
227     }
228     return weights;
229     }
230    
231     /*
232     * (non-Javadoc)
233     *
234     * @see appl.parallel.spmd.SPMDClientInterface#addBaseParameter(java.lang.Object,
235     * java.lang.String)
236     */
237     public void addBaseParameter(Object parameter, String parameterName) {
238     toTransferBaseParameters.put(parameterName, parameter);
239     dataToTransfer = true;
240     }
241    
242     /*
243     * (non-Javadoc)
244     *
245     * @see appl.parallel.spmd.SPMDClientInterface#addBaseParameters(java.lang.Object[],
246     * java.lang.String[])
247     */
248     public void addBaseParameters(Object[] parameters, String[] parameterNames) {
249     for (int i = 0; i < parameterNames.length; i++) {
250     addBaseParameter(parameters[i], parameterNames[i]);
251     }
252     }
253    
254     /*
255     * (non-Javadoc)
256     *
257     * @see appl.parallel.spmd.SPMDClientInterface#addToMultiDataSplitControl(java.lang.Object[],
258     * java.lang.String)
259     */
260     public MultiDataObject addToMultiDataSplitControl(
261     Object splittableResources[], String name) {
262     if (splittableResources.length == 0) {
263     throw new UnsupportedOperationException(
264     "There must be at least one resource to create a MultiData Element");
265     }
266     // Check if adding is allowed:
267     checkState(STATE.INIT);
268     SplittableResource[] resources = checkSplittableArray(splittableResources);
269    
270     // add each element to splitcontrol. The constant gives each element a
271     // unique name and identifies it on the server side as belonging to a multisplit
272    
273     MultiDataInfo multi = new MultiDataInfo(new int[0], name);
274     for (int i = 0; i < resources.length; i++) {
275     multi.addElement(resources[i].getRootID());
276     addToSplitControl(resources[i], MultiDataInfo.getNameWithIdx(i,
277     name));
278     }
279     toTransferMultiDataObjects.put(name, multi);
280     multiDataInfos.put(name, multi);
281     dataToTransfer = true;
282     return new MultiDataObject(multi, spmdClient);
283     }
284    
285     /*
286     * (non-Javadoc)
287     *
288     * @see appl.parallel.spmd.SPMDClientInterface#addToMultiDataSplitControl(edu.bonn.xulu.plugin.data.grid.MultiGrid,
289     * java.lang.String)
290     */
291     public MultiDataObject addToMultiDataSplitControl(MultiGrid multiGrid,
292     String name) {
293     MultiDataObject dataObject = addToMultiDataSplitControl(multiGrid
294     .toArray(), name);
295     dataObject.setManagedGrid(multiGrid);
296     return dataObject;
297     }
298    
299     /*
300     * (non-Javadoc)
301     *
302     * @see appl.parallel.spmd.SPMDClientInterface#addToSplitControl(java.lang.Object,
303     * java.lang.String)
304     */
305     public void addToSplitControl(Object splittableResource, String name) {
306    
307     SplittableResource resource = checkSplittable(splittableResource);
308     // Check if adding is allowed:
309     checkState(STATE.INIT);
310     // if first call, than this is the reference for now - a map will be
311     // generated
312     if (referenceResourceID == -1)
313     setReferenceResource(resource);
314    
315     // add the data to the SPMDClient for server retrieval
316     spmdClient.addData(resource);
317     // add the the resource to the controlled resources
318     listOfAllActivelyControlledData.add(resource.getRootID());
319     // make the singlePartitionInfos for each participating server and store
320     // the
321     // info for later use
322    
323     SplitMap map = getSplitMap();
324     // create a partition info for each Server (only the
325     // splitMapPosition differs)
326     for (int i = 0; i < singlePartitionInfos.length; i++) {
327     PartitionDataHandler loader = PartitionHandlerFactory.newInstance(
328     resource.getRootID(), spmdClient,
329     map.getPartitionBounds(i), map
330     .getPartitionCalculationBounds(i));
331     SinglePartitionInfo info = new SinglePartitionInfo(resource
332     .getRootID(), name, loader, map, i);
333     singlePartitionInfos[i].add(info);
334     toTransferPartitionInfos[i].add(info);
335     dataToTransfer = true;
336     }
337     }
338    
339     /**
340     * Checks if the given object is an instance of {@link SplittableResource}
341     * and gives it back as splittable.
342     *
343     * @param splittableResource
344     * @throws UnsupportedOperationException
345     * if not instance of {@link SplittableResource}
346     * @return the object as {@link SplittableResource}
347     */
348     protected SplittableResource checkSplittable(Object splittableResource) {
349     if (!(splittableResource instanceof SplittableResource))
350     throw new UnsupportedOperationException(
351     "Operation failed: the argument for 'addToSplitControl' \n "
352     + "must be an instance of SplittableResource! (like e.g. SplittableLLProxyGrid).\n "
353     + "You can add non splittable Objects as Parameters of the SPMDTasks! For arrays"
354     + "of SplittableResources use addToMultiSplitControl");
355     return (SplittableResource) splittableResource;
356     }
357    
358     /**
359     * checks if every object of the array is an instance of
360     * {@link SplittableResource} and gives it back as splittable
361     *
362     * @param splittableResources
363     * an array of (hopefully)
364     * {@link SplittableResource SplittableResources}
365     * @throws UnsupportedOperationException
366     * if not all elements are instances of
367     * {@link SplittableResource} or if the splitHeights or
368     * SplitWidths do not match
369     * @return the object as {@link SplittableResource} array
370     */
371     protected SplittableResource[] checkSplittableArray(
372     Object[] splittableResources) {
373     SplittableResource[] res = new SplittableResource[splittableResources.length];
374     for (int i = 0; i < splittableResources.length; i++) {
375     if (!(splittableResources[i] instanceof SplittableResource))
376     throw new UnsupportedOperationException(
377     "Operation failed: the argument must be an instance "
378     + "of SplittableResource! (like e.g. SplittableLLProxyGrid).\n "
379     + "You can add non splittable Objects as Parameters of the SPMDTasks!");
380     res[i] = (SplittableResource) splittableResources[i];
381     }
382     // check if the splitLengths match
383     if (res.length == 0)
384     return res;
385     int splitHeight = res[0].getSplitHeight();
386     int splitWidth = res[0].getSplitWidth();
387     for (SplittableResource resource : res) {
388     if (resource.getSplitHeight() != splitHeight
389     || resource.getSplitWidth() != splitWidth)
390     throw new UnsupportedOperationException(
391     "Operation Failed: Splitvalues (height/width) of the array elements do not match!");
392     }
393     return res;
394     }
395    
396     /**
397     * Throws a {@link UnsupportedOperationException} if the state does not
398     * match the required state
399     *
400     * @param requiredState
401     * the required state
402     */
403     private void checkState(STATE requiredState) {
404     if (requiredState != state)
405     throw new UnsupportedOperationException(
406     "This Operation is not available: " + "You are in state"
407     + state + " but you should be in state "
408     + requiredState + ". See documentation of "
409     + state.getClass().getName()
410     + " for more information ");
411     }
412    
413     /**
414     * disconnects from all servers
415     */
416     public void close() {
417     disconnectAll();
418     }
419    
420     /**
421     * connects to all servers
422     */
423     @SuppressWarnings("unchecked")
424     private void connectAll() {
425     checkState(STATE.INIT);
426     long l = System.currentTimeMillis();
427     Future[] futureResults = new Future[servers.length];
428     for (int i = 0; i < servers.length; i++) {
429     futureResults[i] = executor.submit(new ComputingResourceThread(
430     servers[i], serverInfos[i], null, CommType.CONNECT,
431     eventProxy, true) {
432     public Object run() throws Exception {
433     getServer().connect();
434     return null;
435     }
436     });
437     }
438     try {
439     // wait for threads to finish
440     for (Future future : futureResults) {
441     future.get();
442     }
443     } catch (Exception e) {
444     // TODO Auto-generated catch block
445     e.printStackTrace();
446     }
447     System.out.println("Connect time: " + (System.currentTimeMillis() - l)
448     + " ms");
449     }
450    
451     /**
452     * disconnect from all servers
453     */
454     private void disconnectAll() {
455     Future[] futureResults = new Future[servers.length];
456     for (int i = 0; i < servers.length; i++) {
457     futureResults[i] = executor.submit(new ComputingResourceThread(
458     servers[i], serverInfos[i], null, CommType.DISCONNECT,
459     eventProxy, true) {
460     public Object run() throws Exception {
461     getServer().disconnect();
462     return null;
463     }
464     });
465     }
466     try {
467     // wait threads to finish
468     for (Future future : futureResults) {
469     future.get();
470     }
471     } catch (Exception e) {
472     // TODO Auto-generated catch block
473     e.printStackTrace();
474     }
475    
476     }
477    
478     /**
479     * @return the actual splitmap. The referenceResource must be set before
480     * call!
481     */
482     private SplitMap getSplitMap() {
483     if (referenceResourceID == -1) {
484     LOG.error("NO MAP CREATED YET!");
485     return null;
486     } else
487     return splitMap;
488     }
489    
490     /**
491     * @return the current state
492     */
493     public STATE getState() {
494     return state;
495     }
496    
497     /**
498     * Creates a {@link SplitMap} for the specified resource. Use
499     * {@link #getSplitMap()} for retrieval.
500     *
501     * @param splittable
502     * the resource, for which the {@link SplitMap} is created
503     */
504     private void makeSplitMap(SplittableResource splittable) {
505     SplitMap map = null;
506    
507     // get splitMap implementation for this splittable from XuluConfig
508     String classname = XuluConfig.getXuluConfig().getProperty(
509     splitMapClassPropertyName + "."
510     + splittable.getClass().getSimpleName());
511     // if no entry was found lookup default splitter
512     if (classname == null)
513     classname = XuluConfig.getXuluConfig().getProperty(
514     splitMapClassPropertyName + "." + "default");
515     try {
516     map = (SplitMap) Class.forName(classname).newInstance();
517     map.setParameters(splittable, neighborhoodRange, noOfPartitions,
518     boxingMode);
519     map.setWeights(weights);
520     map.makeMap();
521     } catch (Exception e) {
522     String error = "Could not create Splitmap from classname : '"
523     + classname + "' out of property '"
524     + splitMapClassPropertyName + "'. Nested errormessage is :"
525     + e.getMessage();
526     LOG.fatal(error, e);
527     throw new UnsupportedOperationException(error);
528    
529     }
530     this.splitMap = map;
531    
532     }
533    
534     /*
535     * (non-Javadoc)
536     *
537     * @see appl.parallel.spmd.SPMDClientInterface#mergeAllPartitions()
538     */
539     @SuppressWarnings("unchecked")
540     public synchronized void mergeAllPartitions() {
541     // clone the list first, because mergePartition(int) removes elements
542     // from the list, which causes
543     // problems with the for-each loop
544     Vector<Integer> activeIDs = (Vector<Integer>) listOfAllActivelyControlledData
545     .clone();
546     // Vector<Future> futures = new Vector<Future>();
547     // for (Integer id : activeIDs) {
548     // futures.add(executor.submit(new DataServerThread(null, id) {
549     // public Object call() throws Exception {
550     // mergePartition(getIntArgument());
551     // return null;
552     // }
553     // }));
554     // }
555     // // wait on tasks to finish
556     // for (Future future : futures) {
557     // try {
558     // future.get();
559     // } catch (Exception e) {
560     // e.printStackTrace();
561     // }
562     //
563     // }
564     for (Integer id : activeIDs) {
565     mergePartition((int) id);
566     }
567     }
568    
569     /*
570     * (non-Javadoc)
571     *
572     * @see appl.parallel.spmd.SPMDClientInterface#mergeMultiData(appl.parallel.spmd.MultiDataObject)
573     */
574     public synchronized void mergeMultiData(MultiDataObject multidata) {
575     // the problem is that perhaps on serverside grids were added, but not
576     // on client side. It is assumed that all servers have created the
577     // same number of grids with the same names (which is assured by
578     // multiDataInfo)
579    
580     // first lookup the local multiData info and the info of any
581     // (here the first) dataserver multiInfo - as i said: all should be the
582     // same
583     MultiDataInfo localInfo = multidata.getMultiInfo();
584     String name = multidata.getName();
585     MultiDataInfo remoteInfo;
586     try {
587     // all remote infos should be the same, so it is enough to retrieve
588     // the first one
589     remoteInfo = dataServers[0].getMultiDataInfo(name);
590     if (localInfo == null || remoteInfo == null) {
591     LOG.error("Could not lookup MultidataInfo with name " + name
592     + ". localInfo was " + localInfo + " remote info was "
593     + remoteInfo);
594     return;
595     }
596     // merge the ids which are there on both sides:
597     int i = 0;
598     for (; i < localInfo.getCount(); i++)
599     mergePartition(localInfo.getMultiID(i));
600     // merge the new partitions
601     for (; i < remoteInfo.getCount(); i++) {
602     // get the new ID (the same as on server side)
603     int newID = remoteInfo.getMultiID(i);
604     // first create the new data element out of the first element in
605     // the list:
606     multidata.addElement(newID);
607     SplittableResource newResource = (SplittableResource) multidata
608     .getElement(multidata.getCount() - 1);
609     spmdClient.addData(newResource);
610     // now merge
611     mergePartition(newID);
612     }
613     } catch (RemoteException e) {
614     LOG.error("ERROR while trying to merge multi data: "
615     + e.getMessage(), e);
616     }
617     }
618    
619     /*
620     * (non-Javadoc)
621     *
622     * @see appl.parallel.spmd.SPMDClientInterface#mergeMultiData(appl.parallel.spmd.MultiDataObject,
623     * int)
624     */
625     public void mergeMultiData(MultiDataObject multidata, int idx) {
626     // the problem is that perhaps on serverside grids were added, but not
627     // on client side. It is assumed that all servers have created the
628     // same number of grids with the same names (which is assured by
629     // multiDataInfo)
630    
631     // first lookup the local multiData info and the info of any
632     // (here the first) dataserver multiInfo - as i said: all should be the
633     // same
634     MultiDataInfo localInfo = multidata.getMultiInfo();
635     String name = multidata.getName();
636     MultiDataInfo remoteInfo;
637     try {
638     // all remote infos should be the same, so it is enough to retrieve
639     // the first one
640     remoteInfo = dataServers[0].getMultiDataInfo(name);
641     if (localInfo == null || remoteInfo == null) {
642     LOG.error("Could not lookup MultidataInfo with name " + name
643     + ". localInfo was " + localInfo + " remote info was "
644     + remoteInfo);
645     return;
646     }
647     // merge the grid with the given index
648     // for a local idx:
649     if (idx < localInfo.getCount())
650     mergePartition(localInfo.getMultiID(idx));
651     // else create a new grid
652     else {
653     // get the new ID (the same as on server side)
654     int newID = remoteInfo.getMultiID(idx);
655     if (newID == 0)
656     throw new UnsupportedOperationException(
657     "For the requested index (" + idx
658     + ") was no grid found on the servers");
659     // first create the new data element out of the first element in
660     // the list:
661     multidata.addElement(newID);
662     SplittableResource newResource = (SplittableResource) multidata
663     .getElement(multidata.getCount() - 1);
664     spmdClient.addData(newResource);
665     // now merge the serverdata into the new clientgrid
666     mergePartition(newID);
667     }
668     } catch (RemoteException e) {
669     LOG.error("ERROR while trying to merge multi data: "
670     + e.getMessage(), e);
671     }
672     }
673    
674     /*
675     * (non-Javadoc)
676     *
677     * @see appl.parallel.spmd.SPMDClientInterface#mergePartition(int)
678     */
679     public synchronized void mergePartition(int rootID) {
680     checkState(STATE.RUN);
681     Future[] futures = new Future[noOfPartitions];
682     for (int i = 0; i < noOfPartitions; i++)
683     futures[i] = executor.submit(new DataServerThread(dataServers[i],
684     serverInfos[i], rootID, CommType.CLIENT_MERGE, eventProxy) {
685     public Object run() throws Exception {
686     getServer().unloadToSource(getIntArgument());
687     return null;
688     }
689     });
690     // wait on tasks to finish:
691     try {
692     // wait threads to finish
693     for (Future future : futures) {
694     future.get();
695     }
696     } catch (Exception e) {
697     // TODO Auto-generated catch block
698     e.printStackTrace();
699     }
700     listOfAllActivelyControlledData.remove((Integer) rootID);
701     }
702    
703     /*
704     * (non-Javadoc)
705     *
706     * @see appl.parallel.spmd.SPMDClientInterface#mergePartition(java.lang.Object)
707     */
708     public void mergePartition(Object splittableResource) {
709     checkSplittable(splittableResource);
710     checkState(STATE.RUN);
711     mergePartition(((SplittableResource) splittableResource).getRootID());
712     }
713    
714     /*
715     * (non-Javadoc)
716     *
717     * @see appl.parallel.spmd.SPMDClientInterface#runSPMDModelTask(appl.parallel.spmd.SPMDTask,
718     * java.lang.Object[])
719     */
720     @SuppressWarnings("unchecked")
721     public Object[] runSPMDModelTask(final SPMDTask task, Object... parameters)
722     throws Throwable {
723     state = STATE.RUN;
724     // transfer needed data
725     if (dataToTransfer) {
726     transferDataToServers();
727     }
728     // submit task and arguments to the resources
729     // each task is calculated in its own thread;
730     // create tasks
731     parameters = (parameters == null) ? new Object[0] : parameters;
732     try {
733     Future futureResults[] = new Future[servers.length];
734     for (int i = 0; i < servers.length; i++) {
735     futureResults[i] = executor.submit(new ComputingResourceThread(
736     servers[i], serverInfos[i], new Object[] { task,
737     referenceResourceID, parameters },
738     CommType.CLIENT_EXECUTION, eventProxy) {
739     public Object run() throws Exception {
740     SPMDResource server = (SPMDResource) getServer();
741     return server.runSPMDModelTask(task.getClass()
742     .getName(),
743     (Integer) getObjectArrayArgument()[1],
744     (Object[]) getObjectArrayArgument()[2]);
745     }
746     });
747     }
748    
749     Vector<Object> results = new Vector<Object>(15);
750    
751     // aim of the next loop is to get single Object-array containing all
752     // results!
753     // Notice that every server result is a result array.
754     // This is because serverexecution can happen in multiple threads
755     // (when multiple processors are available) each serverthread will
756     // produce its own result
757     for (int i = 0; i < noOfPartitions; i++) {
758     // wait for threads finished and collect results
759     Object[] result = (Object[]) futureResults[i].get();
760     for (int j = 0; j < result.length; j++)
761     results.add(result[j]);
762     }
763     return results.toArray();
764    
765     } catch (ExecutionException e) {
766     LOG.fatal("Error while trying to execute task "
767     + task.getClass().getName() + ": " + e.getMessage(), e);
768     throw e.getCause();
769     } catch (Exception e) {
770     LOG.error("Error while trying to execute task "
771     + task.getClass().getName() + ": " + e.getMessage(), e);
772     e.printStackTrace();
773     }
774     return null;
775    
776     }
777    
778     /*
779     * (non-Javadoc)
780     *
781     * @see appl.parallel.spmd.SPMDClientInterface#setBoxingMode(appl.parallel.spmd.split.AbstractSplitMap.NeighborhoodBoxingMode)
782     */
783     public void setBoxingMode(AbstractSplitMap.NeighborhoodBoxingMode boxingMode) {
784     checkState(STATE.INIT);
785     this.boxingMode = boxingMode;
786     }
787    
788     /*
789     * (non-Javadoc)
790     *
791     * @see appl.parallel.spmd.SPMDClientInterface#setNeighborhoodRange(int)
792     */
793     public void setNeighborhoodRange(int neighborhoodRange) {
794     checkState(STATE.INIT);
795     this.neighborhoodRange = neighborhoodRange;
796     }
797    
798     /*
799     * (non-Javadoc)
800     *
801     * @see appl.parallel.spmd.SPMDClientInterface#setReferenceResource(java.lang.Object)
802     */
803     public void setReferenceResource(Object splittableResource) {
804     checkSplittable(splittableResource);
805     SplittableResource res = (SplittableResource) splittableResource;
806     checkState(STATE.INIT);
807     if (res instanceof SplittableResource) {
808     referenceResourceID = res.getRootID();
809     makeSplitMap(res);
810     } else
811     LOG
812     .warn("Set reference failed: given resource was not an instance of SplittableResource!");
813    
814     }
815    
816     /**
817     * transfers all collected data to the servers. (The data is collected to
818     * save communication time)
819     */
820     private void transferDataToServers() {
821     long time = System.currentTimeMillis();
822     Vector<Future> futures = new Vector<Future>();
823     if (toTransferPartitionInfos[0].size() > 0)
824     for (int i = 0; i < servers.length; i++) {
825     futures.add(executor.submit(new DataServerThread(
826     dataServers[i], serverInfos[i],
827     toTransferPartitionInfos[i],
828     CommType.TRANSFER_METADATA, eventProxy, true) {
829     public Object run() throws Exception {
830     getServer()
831     .addPartitionInfos(
832     (Vector<SinglePartitionInfo>) getObjectArgument());
833     return null;
834     }
835     }));
836     }
837     if (toTransferMultiDataObjects.size() > 0)
838     for (int i = 0; i < servers.length; i++) {
839     futures.add(executor.submit(new DataServerThread(
840     dataServers[i], serverInfos[i],
841     toTransferMultiDataObjects, CommType.TRANSFER_METADATA,
842     eventProxy, true) {
843     public Object run() throws Exception {
844     getServer()
845     .addMultiDataInfos(
846     (HashMap<String, MultiDataInfo>) getObjectArgument());
847     return null;
848     }
849     }));
850     }
851     if (toTransferBaseParameters.size() > 0)
852     for (int i = 0; i < servers.length; i++) {
853     futures.add(executor.submit(new DataServerThread(
854     dataServers[i], serverInfos[i],
855     toTransferBaseParameters, CommType.TRANSFER_PARAMETERS,
856     eventProxy) {
857     public Object run() throws Exception {
858     getServer().updateBaseParameter(
859     (HashMap) getObjectArgument());
860     return null;
861     }
862     }));
863     }
864     // wait on threads to finish
865     for (Future future : futures) {
866     try {
867     future.get();
868     } catch (InterruptedException e) {
869     LOG.error(e);
870     e.printStackTrace();
871     } catch (ExecutionException e) {
872     LOG.error(e);
873     e.printStackTrace();
874     }
875     }
876     dataToTransfer = false;
877     toTransferBaseParameters.clear();
878     toTransferBaseParameters.clear();
879     toTransferMultiDataObjects.clear();
880     if (LOG.isDebugEnabled())
881     LOG.debug("Transfered Data to clients in "
882     + (System.currentTimeMillis() - time) / 1000000 + " ms");
883     }
884    
885     /*
886     * (non-Javadoc)
887     *
888     * @see appl.parallel.spmd.SPMDClientInterface#updateNeighborhood(java.lang.Object)
889     */
890     public void updateNeighborhood(Object splittableResource) {
891     checkSplittable(splittableResource);
892     SplittableResource res = (SplittableResource) splittableResource;
893     updateNeighborhood(res.getRootID());
894     }
895    
896     /*
897     * (non-Javadoc)
898     *
899     * @see appl.parallel.spmd.SPMDClientInterface#updateNeighborhood(appl.parallel.spmd.MultiDataObject,
900     * int)
901     */
902     public void updateNeighborhood(MultiDataObject multiDataObject, int index) {
903     this.updateNeighborhood(multiDataObject.getMultiInfo()
904     .getMultiID(index));
905     }
906    
907     /**
908     * updates the neighborhood of the splittable with the given id on the
909     * servers
910     */
911     private void updateNeighborhood(int rootID) {
912     checkState(STATE.RUN);
913     Future[] futureResults = new Future[servers.length];
914     for (int i = 0; i < servers.length; i++) {
915     futureResults[i] = executor.submit(new DataServerThread(
916     dataServers[i], serverInfos[i], rootID,
917     CommType.CLIENT_UPDATE, eventProxy) {
918     public Object run() throws Exception {
919     getServer().updateFromNeighbors(getIntArgument());
920     return null;
921     }
922     });
923     }
924     try {
925     // wait threads to finish
926     for (Future future : futureResults) {
927     future.get();
928     }
929     } catch (Exception e) {
930     // TODO Auto-generated catch block
931     e.printStackTrace();
932     }
933     }
934     }

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26