/[xulu]/trunk/src/appl/parallel/spmd/SPMDClientController.java
ViewVC logotype

Annotation of /trunk/src/appl/parallel/spmd/SPMDClientController.java

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2 - (hide annotations)
Wed Feb 25 11:54:01 2009 UTC (15 years, 9 months ago) by mojays
File size: 31515 byte(s)
First Commit, corresponds to Revision 1008 of Wikisquare-SVN 
1 mojays 2 package appl.parallel.spmd;
2    
3     import java.net.InetAddress;
4     import java.rmi.RemoteException;
5     import java.util.HashMap;
6     import java.util.Vector;
7     import java.util.concurrent.Callable;
8     import java.util.concurrent.ExecutionException;
9     import java.util.concurrent.ExecutorService;
10     import java.util.concurrent.Executors;
11     import java.util.concurrent.Future;
12     import java.util.concurrent.TimeUnit;
13    
14     import org.apache.log4j.LogManager;
15     import org.apache.log4j.Logger;
16    
17     import edu.bonn.xulu.plugin.data.grid.MultiGrid;
18     import appl.data.DataLoader;
19     import appl.ext.XuluConfig;
20     import appl.parallel.ComputingResource;
21     import appl.parallel.ComputingResourceContainer;
22     import appl.parallel.ComputingResourceProperties;
23     import appl.parallel.client.RemoteEventHandler;
24     import appl.parallel.client.ClientDataServer;
25     import appl.parallel.data.PartitionDataHandler;
26     import appl.parallel.data.PartitionHandlerFactory;
27     import appl.parallel.data.XuluClientLoader;
28     import appl.parallel.event.CommEvent;
29     import appl.parallel.event.CommEventSink;
30     import appl.parallel.event.TimeEvent;
31     import appl.parallel.event.TimeMonitor;
32     import appl.parallel.event.TransferEvent;
33     import appl.parallel.event.TransferMonitor;
34     import appl.parallel.event.CommEvent.CommType;
35     import appl.parallel.server.PartitionDataServer;
36     import appl.parallel.server.SPMDResource;
37     import appl.parallel.spmd.split.AbstractSplitMap;
38     import appl.parallel.spmd.split.DataPartition;
39     import appl.parallel.spmd.split.SinglePartitionInfo;
40     import appl.parallel.spmd.split.SplitMap;
41     import appl.parallel.spmd.split.SplittableResource;
42     import appl.parallel.spmd.split.AbstractSplitMap.NeighborhoodBoxingMode;
43     import appl.parallel.thread.ComputingResourceThread;
44     import appl.parallel.thread.DataServerThread;
45     import appl.parallel.model.AbstractParallelStepModel;
46    
47     /**
48     * This class controls all the parallelization action on the client side and is
49     * the counterpart to {@link SPMDServerController}. It is accessed by the model
50     * developer by retrieving the {@link SPMDClientInterface} from the
51     * {@link AbstractParallelStepModel}.
52     *
53     * @author Dominik Appl
54     */
55     public class SPMDClientController implements SPMDClientInterface {
56    
57     /**
58     * There are two states: <br>
59     * <br>
60     * <b>STATE.INIT</b> is the initializing state. In this state it is
61     * allowed:<br>
62     * to add resources to split control<br>
63     * change neighborhodRange/boxing modes/reference resource<br>
64     * <br>
65     * All other methods are disabled and will throw
66     * {@link UnsupportedOperationException}<br>
67     * <br>
68     * <b>STATE.RUN</b> is the running stage. All methods of {@link STATE#INIT}
69     * are disabled and will throw {@link UnsupportedOperationException}. This
70     * mode is automatically set by the first call to
71     * {@link SPMDClientController#runSPMDModelTask(SPMDTask, Object...)}
72     */
73     public enum STATE {
74     INIT, RUN
75     }
76    
77     private STATE state = STATE.INIT;
78    
79     private final String splitMapClassPropertyName = "Parallel.splitmapforclass";
80    
81     private final Logger LOG = LogManager.getLogger(this.getClass().getName());
82    
83     /**
84     * participating servers
85     */
86     private final SPMDResource[] servers;
87    
88     private final ComputingResourceProperties[] serverInfos;
89    
90     /**
91     * stores the {@link PartitionDataServer DataServers} of the resources. They
92     * are retrieved, after all resources are submitted to split control.
93     */
94     private final PartitionDataServer[] dataServers;;
95    
96     /**
97     * number of partitions (== no of participating resources)
98     */
99     private final int noOfPartitions;
100    
101     /**
102     * contains the {@link SinglePartitionInfo} of the
103     * {@link SplittableResource splitted resources} for each server
104     */
105     private final Vector<SinglePartitionInfo>[] singlePartitionInfos;
106    
107     private final Vector<SinglePartitionInfo>[] toTransferPartitionInfos;
108    
109     private HashMap<String, Object> toTransferBaseParameters;
110    
111     private final HashMap<String, MultiDataInfo> multiDataInfos = new HashMap<String, MultiDataInfo>(
112     10);
113    
114     private final HashMap<String, MultiDataInfo> toTransferMultiDataObjects = new HashMap<String, MultiDataInfo>(
115     10);
116    
117     /** The IP-Addresses of the resources */
118     private String[] IPs;
119    
120     /***************************************************************************
121     * This Vector contains all IDs of the resources, which are currently under
122     * splitControl
123     *
124     * @see #addToSplitControl(SplittableResource, String)
125     * @see #mergePartition(SplittableResource)
126     **************************************************************************/
127     private Vector<Integer> listOfAllActivelyControlledData = new Vector<Integer>();
128    
129     /**
130     * @see AbstractSplitMap.NeighborhoodBoxingMode
131     */
132     private NeighborhoodBoxingMode boxingMode = AbstractSplitMap.NeighborhoodBoxingMode.inBoxing;
133    
134     /** current neighborhood range, default is 0 */
135     private int neighborhoodRange = 0;
136    
137     private final ClientDataServer spmdClient;
138    
139     /**
140     * Local calculation bounds on server side are calculated using the
141     * reference resource This is the ID of that resource
142     */
143     protected int referenceResourceID = -1;
144    
145     /** Thread execution pool */
146     private final ExecutorService executor;
147    
148     /** true if there is data which has to be transfered to the servers */
149     private boolean dataToTransfer = false;
150    
151     private SplitMap splitMap;
152    
153     private final CommEventSink eventProxy;
154    
155     private final double[] weights;
156    
157     /**
158     * Creates a new Client controller.
159     *
160     * @param computingResources
161     * the resources which are used by this controller
162     * @param spmdClient
163     * the spmd client responsible for the data retrieval
164     * @param weights
165     * the weights for the distribution over the computing resources
166     * (values with a sum of 1) or null (distribution will be
167     * average)
168     * @param eventProxy
169     * a {@link RemoteEventHandler} for eventHandling
170     */
171     @SuppressWarnings("unchecked")
172     public SPMDClientController(
173     Vector<ComputingResourceContainer> computingResources,
174     double[] weights, ClientDataServer spmdClient,
175     CommEventSink eventProxy) {
176     if (weights == null)
177     weights = averageWeights(computingResources.size());
178     this.weights = weights;
179    
180     this.eventProxy = eventProxy;
181     this.noOfPartitions = computingResources.size();
182     if (noOfPartitions == 0)
183     throw new UnsupportedOperationException(
184     "No Computing Ressources found");
185    
186     this.spmdClient = spmdClient;
187     dataServers = new PartitionDataServer[noOfPartitions];
188     serverInfos = new ComputingResourceProperties[noOfPartitions];
189     servers = new SPMDResource[noOfPartitions];
190     for (int i = 0; i < computingResources.size(); i++) {
191     serverInfos[i] = computingResources.get(i).getInformation();
192     servers[i] = (SPMDResource) computingResources.get(i).getResource();
193    
194     }
195     // extract the IPs
196     IPs = new String[noOfPartitions];
197     for (int i = 0; i < IPs.length; i++) {
198     IPs[i] = serverInfos[i].getIP();
199     IPs[i] = serverInfos[i].getPort() == null ? IPs[i] : IPs[i] + ":"
200     + serverInfos[i].getPort();
201    
202     if (IPs[i] == null)
203     LOG
204     .fatal("The IP-Information of a computingRessource was NULL. "
205     + "This will result in failure of the computation!");
206     }
207    
208     // initialize the singlePartitionInfos and tasks. Notice that the array
209     // singlePartitionInfos is final.
210     // Each vector of infos is permanently associated with one server.
211     this.singlePartitionInfos = new Vector[noOfPartitions];
212     this.toTransferPartitionInfos = new Vector[noOfPartitions];
213     this.toTransferBaseParameters = new HashMap<String, Object>();
214     for (int i = 0; i < noOfPartitions; i++) {
215     singlePartitionInfos[i] = new Vector<SinglePartitionInfo>();
216     toTransferPartitionInfos[i] = new Vector<SinglePartitionInfo>();
217     }
218    
219     // initialize executor
220     executor = Executors.newCachedThreadPool();
221    
222     // connect to all participating servers
223     connectAll();
224     // initialize DataServers (must be after connect)
225     for (int i = 0; i < noOfPartitions; i++)
226     try {
227     dataServers[i] = servers[i].createDataServer(IPs);
228     } catch (RemoteException e) {
229     LOG.fatal(e);
230     e.printStackTrace();
231     }
232     }
233    
234     private double[] averageWeights(int noResources) {
235     // create weighted rating with the same weight
236     double[] weights = new double[noResources];
237     for (int i = 0; i < weights.length; i++) {
238     weights[i] = 1 / noResources;
239     }
240     return weights;
241     }
242    
243     /*
244     * (non-Javadoc)
245     *
246     * @see appl.parallel.spmd.SPMDClientInterface#addBaseParameter(java.lang.Object,
247     * java.lang.String)
248     */
249     public void addBaseParameter(Object parameter, String parameterName) {
250     toTransferBaseParameters.put(parameterName, parameter);
251     dataToTransfer = true;
252     }
253    
254     /*
255     * (non-Javadoc)
256     *
257     * @see appl.parallel.spmd.SPMDClientInterface#addBaseParameters(java.lang.Object[],
258     * java.lang.String[])
259     */
260     public void addBaseParameters(Object[] parameters, String[] parameterNames) {
261     for (int i = 0; i < parameterNames.length; i++) {
262     addBaseParameter(parameters[i], parameterNames[i]);
263     }
264     }
265    
266     /*
267     * (non-Javadoc)
268     *
269     * @see appl.parallel.spmd.SPMDClientInterface#addToMultiDataSplitControl(java.lang.Object[],
270     * java.lang.String)
271     */
272     public MultiDataObject addToMultiDataSplitControl(
273     Object splittableResources[], String name) {
274     if (splittableResources.length == 0) {
275     throw new UnsupportedOperationException(
276     "There must be at least one resource to create a MultiData Element");
277     }
278     // Check if adding is allowed:
279     checkState(STATE.INIT);
280     SplittableResource[] resources = checkSplittableArray(splittableResources);
281    
282     // add each element to splitcontrol. The constant gives each element a
283     // unique name and identifies it on the server side as belonging to a multisplit
284    
285     MultiDataInfo multi = new MultiDataInfo(new int[0], name);
286     for (int i = 0; i < resources.length; i++) {
287     multi.addElement(resources[i].getRootID());
288     addToSplitControl(resources[i], MultiDataInfo.getNameWithIdx(i,
289     name));
290     }
291     toTransferMultiDataObjects.put(name, multi);
292     multiDataInfos.put(name, multi);
293     dataToTransfer = true;
294     return new MultiDataObject(multi, spmdClient);
295     }
296    
297     /*
298     * (non-Javadoc)
299     *
300     * @see appl.parallel.spmd.SPMDClientInterface#addToMultiDataSplitControl(edu.bonn.xulu.plugin.data.grid.MultiGrid,
301     * java.lang.String)
302     */
303     public MultiDataObject addToMultiDataSplitControl(MultiGrid multiGrid,
304     String name) {
305     MultiDataObject dataObject = addToMultiDataSplitControl(multiGrid
306     .toArray(), name);
307     dataObject.setManagedGrid(multiGrid);
308     return dataObject;
309     }
310    
311     /*
312     * (non-Javadoc)
313     *
314     * @see appl.parallel.spmd.SPMDClientInterface#addToSplitControl(java.lang.Object,
315     * java.lang.String)
316     */
317     public void addToSplitControl(Object splittableResource, String name) {
318    
319     SplittableResource resource = checkSplittable(splittableResource);
320     // Check if adding is allowed:
321     checkState(STATE.INIT);
322     // if first call, than this is the reference for now - a map will be
323     // generated
324     if (referenceResourceID == -1)
325     setReferenceResource(resource);
326    
327     // add the data to the SPMDClient for server retrieval
328     spmdClient.addData(resource);
329     // add the the resource to the controlled resources
330     listOfAllActivelyControlledData.add(resource.getRootID());
331     // make the singlePartitionInfos for each participating server and store
332     // the
333     // info for later use
334    
335     SplitMap map = getSplitMap();
336     // create a partition info for each Server (only the
337     // splitMapPosition differs)
338     for (int i = 0; i < singlePartitionInfos.length; i++) {
339     PartitionDataHandler loader = PartitionHandlerFactory.newInstance(
340     resource.getRootID(), spmdClient,
341     map.getPartitionBounds(i), map
342     .getPartitionCalculationBounds(i));
343     SinglePartitionInfo info = new SinglePartitionInfo(resource
344     .getRootID(), name, loader, map, i);
345     singlePartitionInfos[i].add(info);
346     toTransferPartitionInfos[i].add(info);
347     dataToTransfer = true;
348     }
349     }
350    
351     /**
352     * Checks if the given object is an instance of {@link SplittableResource}
353     * and gives it back as splittable.
354     *
355     * @param splittableResource
356     * @throws UnsupportedOperationException
357     * if not instance of {@link SplittableResource}
358     * @return the object as {@link SplittableResource}
359     */
360     protected SplittableResource checkSplittable(Object splittableResource) {
361     if (!(splittableResource instanceof SplittableResource))
362     throw new UnsupportedOperationException(
363     "Operation failed: the argument for 'addToSplitControl' \n "
364     + "must be an instance of SplittableResource! (like e.g. SplittableLLProxyGrid).\n "
365     + "You can add non splittable Objects as Parameters of the SPMDTasks! For arrays"
366     + "of SplittableResources use addToMultiSplitControl");
367     return (SplittableResource) splittableResource;
368     }
369    
370     /**
371     * checks if every object of the array is an instance of
372     * {@link SplittableResource} and gives it back as splittable
373     *
374     * @param splittableResources
375     * an array of (hopefully)
376     * {@link SplittableResource SplittableResources}
377     * @throws UnsupportedOperationException
378     * if not all elements are instances of
379     * {@link SplittableResource} or if the splitHeights or
380     * SplitWidths do not match
381     * @return the object as {@link SplittableResource} array
382     */
383     protected SplittableResource[] checkSplittableArray(
384     Object[] splittableResources) {
385     SplittableResource[] res = new SplittableResource[splittableResources.length];
386     for (int i = 0; i < splittableResources.length; i++) {
387     if (!(splittableResources[i] instanceof SplittableResource))
388     throw new UnsupportedOperationException(
389     "Operation failed: the argument must be an instance "
390     + "of SplittableResource! (like e.g. SplittableLLProxyGrid).\n "
391     + "You can add non splittable Objects as Parameters of the SPMDTasks!");
392     res[i] = (SplittableResource) splittableResources[i];
393     }
394     // check if the splitLengths match
395     if (res.length == 0)
396     return res;
397     int splitHeight = res[0].getSplitHeight();
398     int splitWidth = res[0].getSplitWidth();
399     for (SplittableResource resource : res) {
400     if (resource.getSplitHeight() != splitHeight
401     || resource.getSplitWidth() != splitWidth)
402     throw new UnsupportedOperationException(
403     "Operation Failed: Splitvalues (height/width) of the array elements do not match!");
404     }
405     return res;
406     }
407    
408     /**
409     * Throws a {@link UnsupportedOperationException} if the state does not
410     * match the required state
411     *
412     * @param requiredState
413     * the required state
414     */
415     private void checkState(STATE requiredState) {
416     if (requiredState != state)
417     throw new UnsupportedOperationException(
418     "This Operation is not available: " + "You are in state"
419     + state + " but you should be in state "
420     + requiredState + ". See documentation of "
421     + state.getClass().getName()
422     + " for more information ");
423     }
424    
425     /**
426     * disconnects from all servers
427     */
428     public void close() {
429     disconnectAll();
430     }
431    
432     /**
433     * connects to all servers
434     */
435     @SuppressWarnings("unchecked")
436     private void connectAll() {
437     checkState(STATE.INIT);
438     long l = System.currentTimeMillis();
439     Future[] futureResults = new Future[servers.length];
440     for (int i = 0; i < servers.length; i++) {
441     futureResults[i] = executor.submit(new ComputingResourceThread(
442     servers[i], serverInfos[i], null, CommType.CONNECT,
443     eventProxy, true) {
444     public Object run() throws Exception {
445     getServer().connect();
446     return null;
447     }
448     });
449     }
450     try {
451     // wait for threads to finish
452     for (Future future : futureResults) {
453     future.get();
454     }
455     } catch (Exception e) {
456     // TODO Auto-generated catch block
457     e.printStackTrace();
458     }
459     System.out.println("Connect time: " + (System.currentTimeMillis() - l)
460     + " ms");
461     }
462    
463     /**
464     * disconnect from all servers
465     */
466     private void disconnectAll() {
467     Future[] futureResults = new Future[servers.length];
468     for (int i = 0; i < servers.length; i++) {
469     futureResults[i] = executor.submit(new ComputingResourceThread(
470     servers[i], serverInfos[i], null, CommType.DISCONNECT,
471     eventProxy, true) {
472     public Object run() throws Exception {
473     getServer().disconnect();
474     return null;
475     }
476     });
477     }
478     try {
479     // wait threads to finish
480     for (Future future : futureResults) {
481     future.get();
482     }
483     } catch (Exception e) {
484     // TODO Auto-generated catch block
485     e.printStackTrace();
486     }
487    
488     }
489    
490     /**
491     * @return the actual splitmap. The referenceResource must be set before
492     * call!
493     */
494     private SplitMap getSplitMap() {
495     if (referenceResourceID == -1) {
496     LOG.error("NO MAP CREATED YET!");
497     return null;
498     } else
499     return splitMap;
500     }
501    
502     /**
503     * @return the current state
504     */
505     public STATE getState() {
506     return state;
507     }
508    
509     /**
510     * Creates a {@link SplitMap} for the specified resource. Use
511     * {@link #getSplitMap()} for retrieval.
512     *
513     * @param splittable
514     * the resource, for which the {@link SplitMap} is created
515     */
516     private void makeSplitMap(SplittableResource splittable) {
517     SplitMap map = null;
518    
519     // get splitMap implementation for this splittable from XuluConfig
520     String classname = XuluConfig.getXuluConfig().getProperty(
521     splitMapClassPropertyName + "."
522     + splittable.getClass().getSimpleName());
523     // if no entry was found lookup default splitter
524     if (classname == null)
525     classname = XuluConfig.getXuluConfig().getProperty(
526     splitMapClassPropertyName + "." + "default");
527     try {
528     map = (SplitMap) Class.forName(classname).newInstance();
529     map.setParameters(splittable, neighborhoodRange, noOfPartitions,
530     boxingMode);
531     map.setWeights(weights);
532     map.makeMap();
533     } catch (Exception e) {
534     String error = "Could not create Splitmap from classname : '"
535     + classname + "' out of property '"
536     + splitMapClassPropertyName + "'. Nested errormessage is :"
537     + e.getMessage();
538     LOG.fatal(error, e);
539     throw new UnsupportedOperationException(error);
540    
541     }
542     this.splitMap = map;
543    
544     }
545    
546     /*
547     * (non-Javadoc)
548     *
549     * @see appl.parallel.spmd.SPMDClientInterface#mergeAllPartitions()
550     */
551     @SuppressWarnings("unchecked")
552     public synchronized void mergeAllPartitions() {
553     // clone the list first, because mergePartition(int) removes elements
554     // from the list, which causes
555     // problems with the for-each loop
556     Vector<Integer> activeIDs = (Vector<Integer>) listOfAllActivelyControlledData
557     .clone();
558     // Vector<Future> futures = new Vector<Future>();
559     // for (Integer id : activeIDs) {
560     // futures.add(executor.submit(new DataServerThread(null, id) {
561     // public Object call() throws Exception {
562     // mergePartition(getIntArgument());
563     // return null;
564     // }
565     // }));
566     // }
567     // // wait on tasks to finish
568     // for (Future future : futures) {
569     // try {
570     // future.get();
571     // } catch (Exception e) {
572     // e.printStackTrace();
573     // }
574     //
575     // }
576     for (Integer id : activeIDs) {
577     mergePartition((int) id);
578     }
579     }
580    
581     /*
582     * (non-Javadoc)
583     *
584     * @see appl.parallel.spmd.SPMDClientInterface#mergeMultiData(appl.parallel.spmd.MultiDataObject)
585     */
586     public synchronized void mergeMultiData(MultiDataObject multidata) {
587     // the problem is that perhaps on serverside grids were added, but not
588     // on client side. It is assumed that all servers have created the
589     // same number of grids with the same names (which is assured by
590     // multiDataInfo)
591    
592     // first lookup the local multiData info and the info of any
593     // (here the first) dataserver multiInfo - as i said: all should be the
594     // same
595     MultiDataInfo localInfo = multidata.getMultiInfo();
596     String name = multidata.getName();
597     MultiDataInfo remoteInfo;
598     try {
599     // all remote infos should be the same, so it is enough to retrieve
600     // the first one
601     remoteInfo = dataServers[0].getMultiDataInfo(name);
602     if (localInfo == null || remoteInfo == null) {
603     LOG.error("Could not lookup MultidataInfo with name " + name
604     + ". localInfo was " + localInfo + " remote info was "
605     + remoteInfo);
606     return;
607     }
608     // merge the ids which are there on both sides:
609     int i = 0;
610     for (; i < localInfo.getCount(); i++)
611     mergePartition(localInfo.getMultiID(i));
612     // merge the new partitions
613     for (; i < remoteInfo.getCount(); i++) {
614     // get the new ID (the same as on server side)
615     int newID = remoteInfo.getMultiID(i);
616     // first create the new data element out of the first element in
617     // the list:
618     multidata.addElement(newID);
619     SplittableResource newResource = (SplittableResource) multidata
620     .getElement(multidata.getCount() - 1);
621     spmdClient.addData(newResource);
622     // now merge
623     mergePartition(newID);
624     }
625     } catch (RemoteException e) {
626     LOG.error("ERROR while trying to merge multi data: "
627     + e.getMessage(), e);
628     }
629     }
630    
631     /*
632     * (non-Javadoc)
633     *
634     * @see appl.parallel.spmd.SPMDClientInterface#mergeMultiData(appl.parallel.spmd.MultiDataObject,
635     * int)
636     */
637     public void mergeMultiData(MultiDataObject multidata, int idx) {
638     // the problem is that perhaps on serverside grids were added, but not
639     // on client side. It is assumed that all servers have created the
640     // same number of grids with the same names (which is assured by
641     // multiDataInfo)
642    
643     // first lookup the local multiData info and the info of any
644     // (here the first) dataserver multiInfo - as i said: all should be the
645     // same
646     MultiDataInfo localInfo = multidata.getMultiInfo();
647     String name = multidata.getName();
648     MultiDataInfo remoteInfo;
649     try {
650     // all remote infos should be the same, so it is enough to retrieve
651     // the first one
652     remoteInfo = dataServers[0].getMultiDataInfo(name);
653     if (localInfo == null || remoteInfo == null) {
654     LOG.error("Could not lookup MultidataInfo with name " + name
655     + ". localInfo was " + localInfo + " remote info was "
656     + remoteInfo);
657     return;
658     }
659     // merge the grid with the given index
660     // for a local idx:
661     if (idx < localInfo.getCount())
662     mergePartition(localInfo.getMultiID(idx));
663     // else create a new grid
664     else {
665     // get the new ID (the same as on server side)
666     int newID = remoteInfo.getMultiID(idx);
667     if (newID == 0)
668     throw new UnsupportedOperationException(
669     "For the requested index (" + idx
670     + ") was no grid found on the servers");
671     // first create the new data element out of the first element in
672     // the list:
673     multidata.addElement(newID);
674     SplittableResource newResource = (SplittableResource) multidata
675     .getElement(multidata.getCount() - 1);
676     spmdClient.addData(newResource);
677     // now merge the serverdata into the new clientgrid
678     mergePartition(newID);
679     }
680     } catch (RemoteException e) {
681     LOG.error("ERROR while trying to merge multi data: "
682     + e.getMessage(), e);
683     }
684     }
685    
686     /*
687     * (non-Javadoc)
688     *
689     * @see appl.parallel.spmd.SPMDClientInterface#mergePartition(int)
690     */
691     public synchronized void mergePartition(int rootID) {
692     checkState(STATE.RUN);
693     Future[] futures = new Future[noOfPartitions];
694     for (int i = 0; i < noOfPartitions; i++)
695     futures[i] = executor.submit(new DataServerThread(dataServers[i],
696     serverInfos[i], rootID, CommType.CLIENT_MERGE, eventProxy) {
697     public Object run() throws Exception {
698     getServer().unloadToSource(getIntArgument());
699     return null;
700     }
701     });
702     // wait on tasks to finish:
703     try {
704     // wait threads to finish
705     for (Future future : futures) {
706     future.get();
707     }
708     } catch (Exception e) {
709     // TODO Auto-generated catch block
710     e.printStackTrace();
711     }
712     listOfAllActivelyControlledData.remove((Integer) rootID);
713     }
714    
715     /*
716     * (non-Javadoc)
717     *
718     * @see appl.parallel.spmd.SPMDClientInterface#mergePartition(java.lang.Object)
719     */
720     public void mergePartition(Object splittableResource) {
721     checkSplittable(splittableResource);
722     checkState(STATE.RUN);
723     mergePartition(((SplittableResource) splittableResource).getRootID());
724     }
725    
726     /*
727     * (non-Javadoc)
728     *
729     * @see appl.parallel.spmd.SPMDClientInterface#runSPMDModelTask(appl.parallel.spmd.SPMDTask,
730     * java.lang.Object[])
731     */
732     @SuppressWarnings("unchecked")
733     public Object[] runSPMDModelTask(final SPMDTask task, Object... parameters)
734     throws Throwable {
735     state = STATE.RUN;
736     // transfer needed data
737     if (dataToTransfer) {
738     transferDataToServers();
739     }
740     // submit task and arguments to the resources
741     // each task is calculated in its own thread;
742     // create tasks
743     parameters = (parameters == null) ? new Object[0] : parameters;
744     try {
745     Future futureResults[] = new Future[servers.length];
746     for (int i = 0; i < servers.length; i++) {
747     futureResults[i] = executor.submit(new ComputingResourceThread(
748     servers[i], serverInfos[i], new Object[] { task,
749     referenceResourceID, parameters },
750     CommType.CLIENT_EXECUTION, eventProxy) {
751     public Object run() throws Exception {
752     SPMDResource server = (SPMDResource) getServer();
753     return server.runSPMDModelTask(task.getClass()
754     .getName(),
755     (Integer) getObjectArrayArgument()[1],
756     (Object[]) getObjectArrayArgument()[2]);
757     }
758     });
759     }
760    
761     Vector<Object> results = new Vector<Object>(15);
762    
763     // aim of the next loop is to get single Object-array containing all
764     // results!
765     // Notice that every server result is a result array.
766     // This is because serverexecution can happen in multiple threads
767     // (when multiple processors are available) each serverthread will
768     // produce its own result
769     for (int i = 0; i < noOfPartitions; i++) {
770     // wait for threads finished and collect results
771     Object[] result = (Object[]) futureResults[i].get();
772     for (int j = 0; j < result.length; j++)
773     results.add(result[j]);
774     }
775     return results.toArray();
776    
777     } catch (ExecutionException e) {
778     LOG.fatal("Error while trying to execute task "
779     + task.getClass().getName() + ": " + e.getMessage(), e);
780     throw e.getCause();
781     } catch (Exception e) {
782     LOG.error("Error while trying to execute task "
783     + task.getClass().getName() + ": " + e.getMessage(), e);
784     e.printStackTrace();
785     }
786     return null;
787    
788     }
789    
790     /*
791     * (non-Javadoc)
792     *
793     * @see appl.parallel.spmd.SPMDClientInterface#setBoxingMode(appl.parallel.spmd.split.AbstractSplitMap.NeighborhoodBoxingMode)
794     */
795     public void setBoxingMode(AbstractSplitMap.NeighborhoodBoxingMode boxingMode) {
796     checkState(STATE.INIT);
797     this.boxingMode = boxingMode;
798     }
799    
800     /*
801     * (non-Javadoc)
802     *
803     * @see appl.parallel.spmd.SPMDClientInterface#setNeighborhoodRange(int)
804     */
805     public void setNeighborhoodRange(int neighborhoodRange) {
806     checkState(STATE.INIT);
807     this.neighborhoodRange = neighborhoodRange;
808     }
809    
810     /*
811     * (non-Javadoc)
812     *
813     * @see appl.parallel.spmd.SPMDClientInterface#setReferenceResource(java.lang.Object)
814     */
815     public void setReferenceResource(Object splittableResource) {
816     checkSplittable(splittableResource);
817     SplittableResource res = (SplittableResource) splittableResource;
818     checkState(STATE.INIT);
819     if (res instanceof SplittableResource) {
820     referenceResourceID = res.getRootID();
821     makeSplitMap(res);
822     } else
823     LOG
824     .warn("Set reference failed: given resource was not an instance of SplittableResource!");
825    
826     }
827    
828     /**
829     * transfers all collected data to the servers. (The data is collected to
830     * save communication time)
831     */
832     private void transferDataToServers() {
833     long time = System.currentTimeMillis();
834     Vector<Future> futures = new Vector<Future>();
835     if (toTransferPartitionInfos[0].size() > 0)
836     for (int i = 0; i < servers.length; i++) {
837     futures.add(executor.submit(new DataServerThread(
838     dataServers[i], serverInfos[i],
839     toTransferPartitionInfos[i],
840     CommType.TRANSFER_METADATA, eventProxy, true) {
841     public Object run() throws Exception {
842     getServer()
843     .addPartitionInfos(
844     (Vector<SinglePartitionInfo>) getObjectArgument());
845     return null;
846     }
847     }));
848     }
849     if (toTransferMultiDataObjects.size() > 0)
850     for (int i = 0; i < servers.length; i++) {
851     futures.add(executor.submit(new DataServerThread(
852     dataServers[i], serverInfos[i],
853     toTransferMultiDataObjects, CommType.TRANSFER_METADATA,
854     eventProxy, true) {
855     public Object run() throws Exception {
856     getServer()
857     .addMultiDataInfos(
858     (HashMap<String, MultiDataInfo>) getObjectArgument());
859     return null;
860     }
861     }));
862     }
863     if (toTransferBaseParameters.size() > 0)
864     for (int i = 0; i < servers.length; i++) {
865     futures.add(executor.submit(new DataServerThread(
866     dataServers[i], serverInfos[i],
867     toTransferBaseParameters, CommType.TRANSFER_PARAMETERS,
868     eventProxy) {
869     public Object run() throws Exception {
870     getServer().updateBaseParameter(
871     (HashMap) getObjectArgument());
872     return null;
873     }
874     }));
875     }
876     // wait on threads to finish
877     for (Future future : futures) {
878     try {
879     future.get();
880     } catch (InterruptedException e) {
881     LOG.error(e);
882     e.printStackTrace();
883     } catch (ExecutionException e) {
884     LOG.error(e);
885     e.printStackTrace();
886     }
887     }
888     dataToTransfer = false;
889     toTransferBaseParameters.clear();
890     toTransferBaseParameters.clear();
891     toTransferMultiDataObjects.clear();
892     if (LOG.isDebugEnabled())
893     LOG.debug("Transfered Data to clients in "
894     + (System.currentTimeMillis() - time) / 1000000 + " ms");
895     }
896    
897     /*
898     * (non-Javadoc)
899     *
900     * @see appl.parallel.spmd.SPMDClientInterface#updateNeighborhood(java.lang.Object)
901     */
902     public void updateNeighborhood(Object splittableResource) {
903     checkSplittable(splittableResource);
904     SplittableResource res = (SplittableResource) splittableResource;
905     updateNeighborhood(res.getRootID());
906     }
907    
908     /*
909     * (non-Javadoc)
910     *
911     * @see appl.parallel.spmd.SPMDClientInterface#updateNeighborhood(appl.parallel.spmd.MultiDataObject,
912     * int)
913     */
914     public void updateNeighborhood(MultiDataObject multiDataObject, int index) {
915     this.updateNeighborhood(multiDataObject.getMultiInfo()
916     .getMultiID(index));
917     }
918    
919     /**
920     * updates the neighborhood of the splittable with the given id on the
921     * servers
922     */
923     private void updateNeighborhood(int rootID) {
924     checkState(STATE.RUN);
925     Future[] futureResults = new Future[servers.length];
926     for (int i = 0; i < servers.length; i++) {
927     futureResults[i] = executor.submit(new DataServerThread(
928     dataServers[i], serverInfos[i], rootID,
929     CommType.CLIENT_UPDATE, eventProxy) {
930     public Object run() throws Exception {
931     getServer().updateFromNeighbors(getIntArgument());
932     return null;
933     }
934     });
935     }
936     try {
937     // wait threads to finish
938     for (Future future : futureResults) {
939     future.get();
940     }
941     } catch (Exception e) {
942     // TODO Auto-generated catch block
943     e.printStackTrace();
944     }
945     }
946     }

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26