1 |
package appl.parallel.spmd; |
2 |
|
3 |
import java.rmi.RemoteException; |
4 |
import java.util.HashMap; |
5 |
import java.util.Vector; |
6 |
import java.util.concurrent.ExecutionException; |
7 |
import java.util.concurrent.ExecutorService; |
8 |
import java.util.concurrent.Executors; |
9 |
import java.util.concurrent.Future; |
10 |
|
11 |
import org.apache.log4j.LogManager; |
12 |
import org.apache.log4j.Logger; |
13 |
|
14 |
import appl.ext.XuluConfig; |
15 |
import appl.parallel.ComputingResourceContainer; |
16 |
import appl.parallel.ComputingResourceProperties; |
17 |
import appl.parallel.client.ClientDataServer; |
18 |
import appl.parallel.client.RemoteEventHandler; |
19 |
import appl.parallel.data.PartitionDataHandler; |
20 |
import appl.parallel.data.PartitionHandlerFactory; |
21 |
import appl.parallel.event.CommEventSink; |
22 |
import appl.parallel.event.CommEvent.CommType; |
23 |
import appl.parallel.model.AbstractParallelStepModel; |
24 |
import appl.parallel.server.PartitionDataServer; |
25 |
import appl.parallel.server.SPMDResource; |
26 |
import appl.parallel.spmd.split.AbstractSplitMap; |
27 |
import appl.parallel.spmd.split.SinglePartitionInfo; |
28 |
import appl.parallel.spmd.split.SplitMap; |
29 |
import appl.parallel.spmd.split.SplittableResource; |
30 |
import appl.parallel.spmd.split.AbstractSplitMap.NeighborhoodBoxingMode; |
31 |
import appl.parallel.thread.ComputingResourceThread; |
32 |
import appl.parallel.thread.DataServerThread; |
33 |
import edu.bonn.xulu.plugin.data.grid.MultiGrid; |
34 |
|
35 |
/** |
36 |
* This class controls all the parallelization action on the client side and is |
37 |
* the counterpart to {@link SPMDServerController}. It is accessed by the model |
38 |
* developer by retrieving the {@link SPMDClientInterface} from the |
39 |
* {@link AbstractParallelStepModel}. |
40 |
* |
41 |
* @author Dominik Appl |
42 |
*/ |
43 |
public class SPMDClientController implements SPMDClientInterface { |
44 |
|
45 |
/** |
46 |
* There are two states: <br> |
47 |
* <br> |
48 |
* <b>STATE.INIT</b> is the initializing state. In this state it is |
49 |
* allowed:<br> |
50 |
* to add resources to split control<br> |
51 |
* change neighborhodRange/boxing modes/reference resource<br> |
52 |
* <br> |
53 |
* All other methods are disabled and will throw |
54 |
* {@link UnsupportedOperationException}<br> |
55 |
* <br> |
56 |
* <b>STATE.RUN</b> is the running stage. All methods of {@link STATE#INIT} |
57 |
* are disabled and will throw {@link UnsupportedOperationException}. This |
58 |
* mode is automatically set by the first call to |
59 |
* {@link SPMDClientController#runSPMDModelTask(SPMDTask, Object...)} |
60 |
*/ |
61 |
public enum STATE { |
62 |
INIT, RUN |
63 |
} |
64 |
|
65 |
private STATE state = STATE.INIT; |
66 |
|
67 |
private final String splitMapClassPropertyName = "Parallel.splitmapforclass"; |
68 |
|
69 |
private final Logger LOG = LogManager.getLogger(this.getClass().getName()); |
70 |
|
71 |
/** |
72 |
* participating servers |
73 |
*/ |
74 |
private final SPMDResource[] servers; |
75 |
|
76 |
private final ComputingResourceProperties[] serverInfos; |
77 |
|
78 |
/** |
79 |
* stores the {@link PartitionDataServer DataServers} of the resources. They |
80 |
* are retrieved, after all resources are submitted to split control. |
81 |
*/ |
82 |
private final PartitionDataServer[] dataServers;; |
83 |
|
84 |
/** |
85 |
* number of partitions (== no of participating resources) |
86 |
*/ |
87 |
private final int noOfPartitions; |
88 |
|
89 |
/** |
90 |
* contains the {@link SinglePartitionInfo} of the |
91 |
* {@link SplittableResource splitted resources} for each server |
92 |
*/ |
93 |
private final Vector<SinglePartitionInfo>[] singlePartitionInfos; |
94 |
|
95 |
private final Vector<SinglePartitionInfo>[] toTransferPartitionInfos; |
96 |
|
97 |
private HashMap<String, Object> toTransferBaseParameters; |
98 |
|
99 |
private final HashMap<String, MultiDataInfo> multiDataInfos = new HashMap<String, MultiDataInfo>( |
100 |
10); |
101 |
|
102 |
private final HashMap<String, MultiDataInfo> toTransferMultiDataObjects = new HashMap<String, MultiDataInfo>( |
103 |
10); |
104 |
|
105 |
/** The IP-Addresses of the resources */ |
106 |
private String[] IPs; |
107 |
|
108 |
/*************************************************************************** |
109 |
* This Vector contains all IDs of the resources, which are currently under |
110 |
* splitControl |
111 |
* |
112 |
* @see #addToSplitControl(SplittableResource, String) |
113 |
* @see #mergePartition(SplittableResource) |
114 |
**************************************************************************/ |
115 |
private Vector<Integer> listOfAllActivelyControlledData = new Vector<Integer>(); |
116 |
|
117 |
/** |
118 |
* @see AbstractSplitMap.NeighborhoodBoxingMode |
119 |
*/ |
120 |
private NeighborhoodBoxingMode boxingMode = AbstractSplitMap.NeighborhoodBoxingMode.inBoxing; |
121 |
|
122 |
/** current neighborhood range, default is 0 */ |
123 |
private int neighborhoodRange = 0; |
124 |
|
125 |
private final ClientDataServer spmdClient; |
126 |
|
127 |
/** |
128 |
* Local calculation bounds on server side are calculated using the |
129 |
* reference resource This is the ID of that resource |
130 |
*/ |
131 |
protected int referenceResourceID = -1; |
132 |
|
133 |
/** Thread execution pool */ |
134 |
private final ExecutorService executor; |
135 |
|
136 |
/** true if there is data which has to be transfered to the servers */ |
137 |
private boolean dataToTransfer = false; |
138 |
|
139 |
private SplitMap splitMap; |
140 |
|
141 |
private final CommEventSink eventProxy; |
142 |
|
143 |
private final double[] weights; |
144 |
|
145 |
/** |
146 |
* Creates a new Client controller. |
147 |
* |
148 |
* @param computingResources |
149 |
* the resources which are used by this controller |
150 |
* @param spmdClient |
151 |
* the spmd client responsible for the data retrieval |
152 |
* @param weights |
153 |
* the weights for the distribution over the computing resources |
154 |
* (values with a sum of 1) or null (distribution will be |
155 |
* average) |
156 |
* @param eventProxy |
157 |
* a {@link RemoteEventHandler} for eventHandling |
158 |
*/ |
159 |
@SuppressWarnings("unchecked") |
160 |
public SPMDClientController( |
161 |
Vector<ComputingResourceContainer> computingResources, |
162 |
double[] weights, ClientDataServer spmdClient, |
163 |
CommEventSink eventProxy) { |
164 |
if (weights == null) |
165 |
weights = averageWeights(computingResources.size()); |
166 |
this.weights = weights; |
167 |
|
168 |
this.eventProxy = eventProxy; |
169 |
this.noOfPartitions = computingResources.size(); |
170 |
if (noOfPartitions == 0) |
171 |
throw new UnsupportedOperationException( |
172 |
"No Computing Ressources found"); |
173 |
|
174 |
this.spmdClient = spmdClient; |
175 |
dataServers = new PartitionDataServer[noOfPartitions]; |
176 |
serverInfos = new ComputingResourceProperties[noOfPartitions]; |
177 |
servers = new SPMDResource[noOfPartitions]; |
178 |
for (int i = 0; i < computingResources.size(); i++) { |
179 |
serverInfos[i] = computingResources.get(i).getInformation(); |
180 |
servers[i] = (SPMDResource) computingResources.get(i).getResource(); |
181 |
|
182 |
} |
183 |
// extract the IPs |
184 |
IPs = new String[noOfPartitions]; |
185 |
for (int i = 0; i < IPs.length; i++) { |
186 |
IPs[i] = serverInfos[i].getIP(); |
187 |
IPs[i] = serverInfos[i].getPort() == null ? IPs[i] : IPs[i] + ":" |
188 |
+ serverInfos[i].getPort(); |
189 |
|
190 |
if (IPs[i] == null) |
191 |
LOG |
192 |
.fatal("The IP-Information of a computingRessource was NULL. " |
193 |
+ "This will result in failure of the computation!"); |
194 |
} |
195 |
|
196 |
// initialize the singlePartitionInfos and tasks. Notice that the array |
197 |
// singlePartitionInfos is final. |
198 |
// Each vector of infos is permanently associated with one server. |
199 |
this.singlePartitionInfos = new Vector[noOfPartitions]; |
200 |
this.toTransferPartitionInfos = new Vector[noOfPartitions]; |
201 |
this.toTransferBaseParameters = new HashMap<String, Object>(); |
202 |
for (int i = 0; i < noOfPartitions; i++) { |
203 |
singlePartitionInfos[i] = new Vector<SinglePartitionInfo>(); |
204 |
toTransferPartitionInfos[i] = new Vector<SinglePartitionInfo>(); |
205 |
} |
206 |
|
207 |
// initialize executor |
208 |
executor = Executors.newCachedThreadPool(); |
209 |
|
210 |
// connect to all participating servers |
211 |
connectAll(); |
212 |
// initialize DataServers (must be after connect) |
213 |
for (int i = 0; i < noOfPartitions; i++) |
214 |
try { |
215 |
dataServers[i] = servers[i].createDataServer(IPs); |
216 |
} catch (RemoteException e) { |
217 |
LOG.fatal(e); |
218 |
e.printStackTrace(); |
219 |
} |
220 |
} |
221 |
|
222 |
private double[] averageWeights(int noResources) { |
223 |
// create weighted rating with the same weight |
224 |
double[] weights = new double[noResources]; |
225 |
for (int i = 0; i < weights.length; i++) { |
226 |
weights[i] = 1 / noResources; |
227 |
} |
228 |
return weights; |
229 |
} |
230 |
|
231 |
/* |
232 |
* (non-Javadoc) |
233 |
* |
234 |
* @see appl.parallel.spmd.SPMDClientInterface#addBaseParameter(java.lang.Object, |
235 |
* java.lang.String) |
236 |
*/ |
237 |
public void addBaseParameter(Object parameter, String parameterName) { |
238 |
toTransferBaseParameters.put(parameterName, parameter); |
239 |
dataToTransfer = true; |
240 |
} |
241 |
|
242 |
/* |
243 |
* (non-Javadoc) |
244 |
* |
245 |
* @see appl.parallel.spmd.SPMDClientInterface#addBaseParameters(java.lang.Object[], |
246 |
* java.lang.String[]) |
247 |
*/ |
248 |
public void addBaseParameters(Object[] parameters, String[] parameterNames) { |
249 |
for (int i = 0; i < parameterNames.length; i++) { |
250 |
addBaseParameter(parameters[i], parameterNames[i]); |
251 |
} |
252 |
} |
253 |
|
254 |
/* |
255 |
* (non-Javadoc) |
256 |
* |
257 |
* @see appl.parallel.spmd.SPMDClientInterface#addToMultiDataSplitControl(java.lang.Object[], |
258 |
* java.lang.String) |
259 |
*/ |
260 |
public MultiDataObject addToMultiDataSplitControl( |
261 |
Object splittableResources[], String name) { |
262 |
if (splittableResources.length == 0) { |
263 |
throw new UnsupportedOperationException( |
264 |
"There must be at least one resource to create a MultiData Element"); |
265 |
} |
266 |
// Check if adding is allowed: |
267 |
checkState(STATE.INIT); |
268 |
SplittableResource[] resources = checkSplittableArray(splittableResources); |
269 |
|
270 |
// add each element to splitcontrol. The constant gives each element a |
271 |
// unique name and identifies it on the server side as belonging to a multisplit |
272 |
|
273 |
MultiDataInfo multi = new MultiDataInfo(new int[0], name); |
274 |
for (int i = 0; i < resources.length; i++) { |
275 |
multi.addElement(resources[i].getRootID()); |
276 |
addToSplitControl(resources[i], MultiDataInfo.getNameWithIdx(i, |
277 |
name)); |
278 |
} |
279 |
toTransferMultiDataObjects.put(name, multi); |
280 |
multiDataInfos.put(name, multi); |
281 |
dataToTransfer = true; |
282 |
return new MultiDataObject(multi, spmdClient); |
283 |
} |
284 |
|
285 |
/* |
286 |
* (non-Javadoc) |
287 |
* |
288 |
* @see appl.parallel.spmd.SPMDClientInterface#addToMultiDataSplitControl(edu.bonn.xulu.plugin.data.grid.MultiGrid, |
289 |
* java.lang.String) |
290 |
*/ |
291 |
public MultiDataObject addToMultiDataSplitControl(MultiGrid multiGrid, |
292 |
String name) { |
293 |
MultiDataObject dataObject = addToMultiDataSplitControl(multiGrid |
294 |
.toArray(), name); |
295 |
dataObject.setManagedGrid(multiGrid); |
296 |
return dataObject; |
297 |
} |
298 |
|
299 |
/* |
300 |
* (non-Javadoc) |
301 |
* |
302 |
* @see appl.parallel.spmd.SPMDClientInterface#addToSplitControl(java.lang.Object, |
303 |
* java.lang.String) |
304 |
*/ |
305 |
public void addToSplitControl(Object splittableResource, String name) { |
306 |
|
307 |
SplittableResource resource = checkSplittable(splittableResource); |
308 |
// Check if adding is allowed: |
309 |
checkState(STATE.INIT); |
310 |
// if first call, than this is the reference for now - a map will be |
311 |
// generated |
312 |
if (referenceResourceID == -1) |
313 |
setReferenceResource(resource); |
314 |
|
315 |
// add the data to the SPMDClient for server retrieval |
316 |
spmdClient.addData(resource); |
317 |
// add the the resource to the controlled resources |
318 |
listOfAllActivelyControlledData.add(resource.getRootID()); |
319 |
// make the singlePartitionInfos for each participating server and store |
320 |
// the |
321 |
// info for later use |
322 |
|
323 |
SplitMap map = getSplitMap(); |
324 |
// create a partition info for each Server (only the |
325 |
// splitMapPosition differs) |
326 |
for (int i = 0; i < singlePartitionInfos.length; i++) { |
327 |
PartitionDataHandler loader = PartitionHandlerFactory.newInstance( |
328 |
resource.getRootID(), spmdClient, |
329 |
map.getPartitionBounds(i), map |
330 |
.getPartitionCalculationBounds(i)); |
331 |
SinglePartitionInfo info = new SinglePartitionInfo(resource |
332 |
.getRootID(), name, loader, map, i); |
333 |
singlePartitionInfos[i].add(info); |
334 |
toTransferPartitionInfos[i].add(info); |
335 |
dataToTransfer = true; |
336 |
} |
337 |
} |
338 |
|
339 |
/** |
340 |
* Checks if the given object is an instance of {@link SplittableResource} |
341 |
* and gives it back as splittable. |
342 |
* |
343 |
* @param splittableResource |
344 |
* @throws UnsupportedOperationException |
345 |
* if not instance of {@link SplittableResource} |
346 |
* @return the object as {@link SplittableResource} |
347 |
*/ |
348 |
protected SplittableResource checkSplittable(Object splittableResource) { |
349 |
if (!(splittableResource instanceof SplittableResource)) |
350 |
throw new UnsupportedOperationException( |
351 |
"Operation failed: the argument for 'addToSplitControl' \n " |
352 |
+ "must be an instance of SplittableResource! (like e.g. SplittableLLProxyGrid).\n " |
353 |
+ "You can add non splittable Objects as Parameters of the SPMDTasks! For arrays" |
354 |
+ "of SplittableResources use addToMultiSplitControl"); |
355 |
return (SplittableResource) splittableResource; |
356 |
} |
357 |
|
358 |
/** |
359 |
* checks if every object of the array is an instance of |
360 |
* {@link SplittableResource} and gives it back as splittable |
361 |
* |
362 |
* @param splittableResources |
363 |
* an array of (hopefully) |
364 |
* {@link SplittableResource SplittableResources} |
365 |
* @throws UnsupportedOperationException |
366 |
* if not all elements are instances of |
367 |
* {@link SplittableResource} or if the splitHeights or |
368 |
* SplitWidths do not match |
369 |
* @return the object as {@link SplittableResource} array |
370 |
*/ |
371 |
protected SplittableResource[] checkSplittableArray( |
372 |
Object[] splittableResources) { |
373 |
SplittableResource[] res = new SplittableResource[splittableResources.length]; |
374 |
for (int i = 0; i < splittableResources.length; i++) { |
375 |
if (!(splittableResources[i] instanceof SplittableResource)) |
376 |
throw new UnsupportedOperationException( |
377 |
"Operation failed: the argument must be an instance " |
378 |
+ "of SplittableResource! (like e.g. SplittableLLProxyGrid).\n " |
379 |
+ "You can add non splittable Objects as Parameters of the SPMDTasks!"); |
380 |
res[i] = (SplittableResource) splittableResources[i]; |
381 |
} |
382 |
// check if the splitLengths match |
383 |
if (res.length == 0) |
384 |
return res; |
385 |
int splitHeight = res[0].getSplitHeight(); |
386 |
int splitWidth = res[0].getSplitWidth(); |
387 |
for (SplittableResource resource : res) { |
388 |
if (resource.getSplitHeight() != splitHeight |
389 |
|| resource.getSplitWidth() != splitWidth) |
390 |
throw new UnsupportedOperationException( |
391 |
"Operation Failed: Splitvalues (height/width) of the array elements do not match!"); |
392 |
} |
393 |
return res; |
394 |
} |
395 |
|
396 |
/** |
397 |
* Throws a {@link UnsupportedOperationException} if the state does not |
398 |
* match the required state |
399 |
* |
400 |
* @param requiredState |
401 |
* the required state |
402 |
*/ |
403 |
private void checkState(STATE requiredState) { |
404 |
if (requiredState != state) |
405 |
throw new UnsupportedOperationException( |
406 |
"This Operation is not available: " + "You are in state" |
407 |
+ state + " but you should be in state " |
408 |
+ requiredState + ". See documentation of " |
409 |
+ state.getClass().getName() |
410 |
+ " for more information "); |
411 |
} |
412 |
|
413 |
/** |
414 |
* disconnects from all servers |
415 |
*/ |
416 |
public void close() { |
417 |
disconnectAll(); |
418 |
} |
419 |
|
420 |
/** |
421 |
* connects to all servers |
422 |
*/ |
423 |
@SuppressWarnings("unchecked") |
424 |
private void connectAll() { |
425 |
checkState(STATE.INIT); |
426 |
long l = System.currentTimeMillis(); |
427 |
Future[] futureResults = new Future[servers.length]; |
428 |
for (int i = 0; i < servers.length; i++) { |
429 |
futureResults[i] = executor.submit(new ComputingResourceThread( |
430 |
servers[i], serverInfos[i], null, CommType.CONNECT, |
431 |
eventProxy, true) { |
432 |
public Object run() throws Exception { |
433 |
getServer().connect(); |
434 |
return null; |
435 |
} |
436 |
}); |
437 |
} |
438 |
try { |
439 |
// wait for threads to finish |
440 |
for (Future future : futureResults) { |
441 |
future.get(); |
442 |
} |
443 |
} catch (Exception e) { |
444 |
// TODO Auto-generated catch block |
445 |
e.printStackTrace(); |
446 |
} |
447 |
System.out.println("Connect time: " + (System.currentTimeMillis() - l) |
448 |
+ " ms"); |
449 |
} |
450 |
|
451 |
/** |
452 |
* disconnect from all servers |
453 |
*/ |
454 |
private void disconnectAll() { |
455 |
Future[] futureResults = new Future[servers.length]; |
456 |
for (int i = 0; i < servers.length; i++) { |
457 |
futureResults[i] = executor.submit(new ComputingResourceThread( |
458 |
servers[i], serverInfos[i], null, CommType.DISCONNECT, |
459 |
eventProxy, true) { |
460 |
public Object run() throws Exception { |
461 |
getServer().disconnect(); |
462 |
return null; |
463 |
} |
464 |
}); |
465 |
} |
466 |
try { |
467 |
// wait threads to finish |
468 |
for (Future future : futureResults) { |
469 |
future.get(); |
470 |
} |
471 |
} catch (Exception e) { |
472 |
// TODO Auto-generated catch block |
473 |
e.printStackTrace(); |
474 |
} |
475 |
|
476 |
} |
477 |
|
478 |
/** |
479 |
* @return the actual splitmap. The referenceResource must be set before |
480 |
* call! |
481 |
*/ |
482 |
private SplitMap getSplitMap() { |
483 |
if (referenceResourceID == -1) { |
484 |
LOG.error("NO MAP CREATED YET!"); |
485 |
return null; |
486 |
} else |
487 |
return splitMap; |
488 |
} |
489 |
|
490 |
/** |
491 |
* @return the current state |
492 |
*/ |
493 |
public STATE getState() { |
494 |
return state; |
495 |
} |
496 |
|
497 |
/** |
498 |
* Creates a {@link SplitMap} for the specified resource. Use |
499 |
* {@link #getSplitMap()} for retrieval. |
500 |
* |
501 |
* @param splittable |
502 |
* the resource, for which the {@link SplitMap} is created |
503 |
*/ |
504 |
private void makeSplitMap(SplittableResource splittable) { |
505 |
SplitMap map = null; |
506 |
|
507 |
// get splitMap implementation for this splittable from XuluConfig |
508 |
String classname = XuluConfig.getXuluConfig().getProperty( |
509 |
splitMapClassPropertyName + "." |
510 |
+ splittable.getClass().getSimpleName()); |
511 |
// if no entry was found lookup default splitter |
512 |
if (classname == null) |
513 |
classname = XuluConfig.getXuluConfig().getProperty( |
514 |
splitMapClassPropertyName + "." + "default"); |
515 |
try { |
516 |
map = (SplitMap) Class.forName(classname).newInstance(); |
517 |
map.setParameters(splittable, neighborhoodRange, noOfPartitions, |
518 |
boxingMode); |
519 |
map.setWeights(weights); |
520 |
map.makeMap(); |
521 |
} catch (Exception e) { |
522 |
String error = "Could not create Splitmap from classname : '" |
523 |
+ classname + "' out of property '" |
524 |
+ splitMapClassPropertyName + "'. Nested errormessage is :" |
525 |
+ e.getMessage(); |
526 |
LOG.fatal(error, e); |
527 |
throw new UnsupportedOperationException(error); |
528 |
|
529 |
} |
530 |
this.splitMap = map; |
531 |
|
532 |
} |
533 |
|
534 |
/* |
535 |
* (non-Javadoc) |
536 |
* |
537 |
* @see appl.parallel.spmd.SPMDClientInterface#mergeAllPartitions() |
538 |
*/ |
539 |
@SuppressWarnings("unchecked") |
540 |
public synchronized void mergeAllPartitions() { |
541 |
// clone the list first, because mergePartition(int) removes elements |
542 |
// from the list, which causes |
543 |
// problems with the for-each loop |
544 |
Vector<Integer> activeIDs = (Vector<Integer>) listOfAllActivelyControlledData |
545 |
.clone(); |
546 |
// Vector<Future> futures = new Vector<Future>(); |
547 |
// for (Integer id : activeIDs) { |
548 |
// futures.add(executor.submit(new DataServerThread(null, id) { |
549 |
// public Object call() throws Exception { |
550 |
// mergePartition(getIntArgument()); |
551 |
// return null; |
552 |
// } |
553 |
// })); |
554 |
// } |
555 |
// // wait on tasks to finish |
556 |
// for (Future future : futures) { |
557 |
// try { |
558 |
// future.get(); |
559 |
// } catch (Exception e) { |
560 |
// e.printStackTrace(); |
561 |
// } |
562 |
// |
563 |
// } |
564 |
for (Integer id : activeIDs) { |
565 |
mergePartition((int) id); |
566 |
} |
567 |
} |
568 |
|
569 |
/* |
570 |
* (non-Javadoc) |
571 |
* |
572 |
* @see appl.parallel.spmd.SPMDClientInterface#mergeMultiData(appl.parallel.spmd.MultiDataObject) |
573 |
*/ |
574 |
public synchronized void mergeMultiData(MultiDataObject multidata) { |
575 |
// the problem is that perhaps on serverside grids were added, but not |
576 |
// on client side. It is assumed that all servers have created the |
577 |
// same number of grids with the same names (which is assured by |
578 |
// multiDataInfo) |
579 |
|
580 |
// first lookup the local multiData info and the info of any |
581 |
// (here the first) dataserver multiInfo - as i said: all should be the |
582 |
// same |
583 |
MultiDataInfo localInfo = multidata.getMultiInfo(); |
584 |
String name = multidata.getName(); |
585 |
MultiDataInfo remoteInfo; |
586 |
try { |
587 |
// all remote infos should be the same, so it is enough to retrieve |
588 |
// the first one |
589 |
remoteInfo = dataServers[0].getMultiDataInfo(name); |
590 |
if (localInfo == null || remoteInfo == null) { |
591 |
LOG.error("Could not lookup MultidataInfo with name " + name |
592 |
+ ". localInfo was " + localInfo + " remote info was " |
593 |
+ remoteInfo); |
594 |
return; |
595 |
} |
596 |
// merge the ids which are there on both sides: |
597 |
int i = 0; |
598 |
for (; i < localInfo.getCount(); i++) |
599 |
mergePartition(localInfo.getMultiID(i)); |
600 |
// merge the new partitions |
601 |
for (; i < remoteInfo.getCount(); i++) { |
602 |
// get the new ID (the same as on server side) |
603 |
int newID = remoteInfo.getMultiID(i); |
604 |
// first create the new data element out of the first element in |
605 |
// the list: |
606 |
multidata.addElement(newID); |
607 |
SplittableResource newResource = (SplittableResource) multidata |
608 |
.getElement(multidata.getCount() - 1); |
609 |
spmdClient.addData(newResource); |
610 |
// now merge |
611 |
mergePartition(newID); |
612 |
} |
613 |
} catch (RemoteException e) { |
614 |
LOG.error("ERROR while trying to merge multi data: " |
615 |
+ e.getMessage(), e); |
616 |
} |
617 |
} |
618 |
|
619 |
/* |
620 |
* (non-Javadoc) |
621 |
* |
622 |
* @see appl.parallel.spmd.SPMDClientInterface#mergeMultiData(appl.parallel.spmd.MultiDataObject, |
623 |
* int) |
624 |
*/ |
625 |
public void mergeMultiData(MultiDataObject multidata, int idx) { |
626 |
// the problem is that perhaps on serverside grids were added, but not |
627 |
// on client side. It is assumed that all servers have created the |
628 |
// same number of grids with the same names (which is assured by |
629 |
// multiDataInfo) |
630 |
|
631 |
// first lookup the local multiData info and the info of any |
632 |
// (here the first) dataserver multiInfo - as i said: all should be the |
633 |
// same |
634 |
MultiDataInfo localInfo = multidata.getMultiInfo(); |
635 |
String name = multidata.getName(); |
636 |
MultiDataInfo remoteInfo; |
637 |
try { |
638 |
// all remote infos should be the same, so it is enough to retrieve |
639 |
// the first one |
640 |
remoteInfo = dataServers[0].getMultiDataInfo(name); |
641 |
if (localInfo == null || remoteInfo == null) { |
642 |
LOG.error("Could not lookup MultidataInfo with name " + name |
643 |
+ ". localInfo was " + localInfo + " remote info was " |
644 |
+ remoteInfo); |
645 |
return; |
646 |
} |
647 |
// merge the grid with the given index |
648 |
// for a local idx: |
649 |
if (idx < localInfo.getCount()) |
650 |
mergePartition(localInfo.getMultiID(idx)); |
651 |
// else create a new grid |
652 |
else { |
653 |
// get the new ID (the same as on server side) |
654 |
int newID = remoteInfo.getMultiID(idx); |
655 |
if (newID == 0) |
656 |
throw new UnsupportedOperationException( |
657 |
"For the requested index (" + idx |
658 |
+ ") was no grid found on the servers"); |
659 |
// first create the new data element out of the first element in |
660 |
// the list: |
661 |
multidata.addElement(newID); |
662 |
SplittableResource newResource = (SplittableResource) multidata |
663 |
.getElement(multidata.getCount() - 1); |
664 |
spmdClient.addData(newResource); |
665 |
// now merge the serverdata into the new clientgrid |
666 |
mergePartition(newID); |
667 |
} |
668 |
} catch (RemoteException e) { |
669 |
LOG.error("ERROR while trying to merge multi data: " |
670 |
+ e.getMessage(), e); |
671 |
} |
672 |
} |
673 |
|
674 |
/* |
675 |
* (non-Javadoc) |
676 |
* |
677 |
* @see appl.parallel.spmd.SPMDClientInterface#mergePartition(int) |
678 |
*/ |
679 |
public synchronized void mergePartition(int rootID) { |
680 |
checkState(STATE.RUN); |
681 |
Future[] futures = new Future[noOfPartitions]; |
682 |
for (int i = 0; i < noOfPartitions; i++) |
683 |
futures[i] = executor.submit(new DataServerThread(dataServers[i], |
684 |
serverInfos[i], rootID, CommType.CLIENT_MERGE, eventProxy) { |
685 |
public Object run() throws Exception { |
686 |
getServer().unloadToSource(getIntArgument()); |
687 |
return null; |
688 |
} |
689 |
}); |
690 |
// wait on tasks to finish: |
691 |
try { |
692 |
// wait threads to finish |
693 |
for (Future future : futures) { |
694 |
future.get(); |
695 |
} |
696 |
} catch (Exception e) { |
697 |
// TODO Auto-generated catch block |
698 |
e.printStackTrace(); |
699 |
} |
700 |
listOfAllActivelyControlledData.remove((Integer) rootID); |
701 |
} |
702 |
|
703 |
/* |
704 |
* (non-Javadoc) |
705 |
* |
706 |
* @see appl.parallel.spmd.SPMDClientInterface#mergePartition(java.lang.Object) |
707 |
*/ |
708 |
public void mergePartition(Object splittableResource) { |
709 |
checkSplittable(splittableResource); |
710 |
checkState(STATE.RUN); |
711 |
mergePartition(((SplittableResource) splittableResource).getRootID()); |
712 |
} |
713 |
|
714 |
/* |
715 |
* (non-Javadoc) |
716 |
* |
717 |
* @see appl.parallel.spmd.SPMDClientInterface#runSPMDModelTask(appl.parallel.spmd.SPMDTask, |
718 |
* java.lang.Object[]) |
719 |
*/ |
720 |
@SuppressWarnings("unchecked") |
721 |
public Object[] runSPMDModelTask(final SPMDTask task, Object... parameters) |
722 |
throws Throwable { |
723 |
state = STATE.RUN; |
724 |
// transfer needed data |
725 |
if (dataToTransfer) { |
726 |
transferDataToServers(); |
727 |
} |
728 |
// submit task and arguments to the resources |
729 |
// each task is calculated in its own thread; |
730 |
// create tasks |
731 |
parameters = (parameters == null) ? new Object[0] : parameters; |
732 |
try { |
733 |
Future futureResults[] = new Future[servers.length]; |
734 |
for (int i = 0; i < servers.length; i++) { |
735 |
futureResults[i] = executor.submit(new ComputingResourceThread( |
736 |
servers[i], serverInfos[i], new Object[] { task, |
737 |
referenceResourceID, parameters }, |
738 |
CommType.CLIENT_EXECUTION, eventProxy) { |
739 |
public Object run() throws Exception { |
740 |
SPMDResource server = (SPMDResource) getServer(); |
741 |
return server.runSPMDModelTask(task.getClass() |
742 |
.getName(), |
743 |
(Integer) getObjectArrayArgument()[1], |
744 |
(Object[]) getObjectArrayArgument()[2]); |
745 |
} |
746 |
}); |
747 |
} |
748 |
|
749 |
Vector<Object> results = new Vector<Object>(15); |
750 |
|
751 |
// aim of the next loop is to get single Object-array containing all |
752 |
// results! |
753 |
// Notice that every server result is a result array. |
754 |
// This is because serverexecution can happen in multiple threads |
755 |
// (when multiple processors are available) each serverthread will |
756 |
// produce its own result |
757 |
for (int i = 0; i < noOfPartitions; i++) { |
758 |
// wait for threads finished and collect results |
759 |
Object[] result = (Object[]) futureResults[i].get(); |
760 |
for (int j = 0; j < result.length; j++) |
761 |
results.add(result[j]); |
762 |
} |
763 |
return results.toArray(); |
764 |
|
765 |
} catch (ExecutionException e) { |
766 |
LOG.fatal("Error while trying to execute task " |
767 |
+ task.getClass().getName() + ": " + e.getMessage(), e); |
768 |
throw e.getCause(); |
769 |
} catch (Exception e) { |
770 |
LOG.error("Error while trying to execute task " |
771 |
+ task.getClass().getName() + ": " + e.getMessage(), e); |
772 |
e.printStackTrace(); |
773 |
} |
774 |
return null; |
775 |
|
776 |
} |
777 |
|
778 |
/* |
779 |
* (non-Javadoc) |
780 |
* |
781 |
* @see appl.parallel.spmd.SPMDClientInterface#setBoxingMode(appl.parallel.spmd.split.AbstractSplitMap.NeighborhoodBoxingMode) |
782 |
*/ |
783 |
public void setBoxingMode(AbstractSplitMap.NeighborhoodBoxingMode boxingMode) { |
784 |
checkState(STATE.INIT); |
785 |
this.boxingMode = boxingMode; |
786 |
} |
787 |
|
788 |
/* |
789 |
* (non-Javadoc) |
790 |
* |
791 |
* @see appl.parallel.spmd.SPMDClientInterface#setNeighborhoodRange(int) |
792 |
*/ |
793 |
public void setNeighborhoodRange(int neighborhoodRange) { |
794 |
checkState(STATE.INIT); |
795 |
this.neighborhoodRange = neighborhoodRange; |
796 |
} |
797 |
|
798 |
/* |
799 |
* (non-Javadoc) |
800 |
* |
801 |
* @see appl.parallel.spmd.SPMDClientInterface#setReferenceResource(java.lang.Object) |
802 |
*/ |
803 |
public void setReferenceResource(Object splittableResource) { |
804 |
checkSplittable(splittableResource); |
805 |
SplittableResource res = (SplittableResource) splittableResource; |
806 |
checkState(STATE.INIT); |
807 |
if (res instanceof SplittableResource) { |
808 |
referenceResourceID = res.getRootID(); |
809 |
makeSplitMap(res); |
810 |
} else |
811 |
LOG |
812 |
.warn("Set reference failed: given resource was not an instance of SplittableResource!"); |
813 |
|
814 |
} |
815 |
|
816 |
/** |
817 |
* transfers all collected data to the servers. (The data is collected to |
818 |
* save communication time) |
819 |
*/ |
820 |
private void transferDataToServers() { |
821 |
long time = System.currentTimeMillis(); |
822 |
Vector<Future> futures = new Vector<Future>(); |
823 |
if (toTransferPartitionInfos[0].size() > 0) |
824 |
for (int i = 0; i < servers.length; i++) { |
825 |
futures.add(executor.submit(new DataServerThread( |
826 |
dataServers[i], serverInfos[i], |
827 |
toTransferPartitionInfos[i], |
828 |
CommType.TRANSFER_METADATA, eventProxy, true) { |
829 |
public Object run() throws Exception { |
830 |
getServer() |
831 |
.addPartitionInfos( |
832 |
(Vector<SinglePartitionInfo>) getObjectArgument()); |
833 |
return null; |
834 |
} |
835 |
})); |
836 |
} |
837 |
if (toTransferMultiDataObjects.size() > 0) |
838 |
for (int i = 0; i < servers.length; i++) { |
839 |
futures.add(executor.submit(new DataServerThread( |
840 |
dataServers[i], serverInfos[i], |
841 |
toTransferMultiDataObjects, CommType.TRANSFER_METADATA, |
842 |
eventProxy, true) { |
843 |
public Object run() throws Exception { |
844 |
getServer() |
845 |
.addMultiDataInfos( |
846 |
(HashMap<String, MultiDataInfo>) getObjectArgument()); |
847 |
return null; |
848 |
} |
849 |
})); |
850 |
} |
851 |
if (toTransferBaseParameters.size() > 0) |
852 |
for (int i = 0; i < servers.length; i++) { |
853 |
futures.add(executor.submit(new DataServerThread( |
854 |
dataServers[i], serverInfos[i], |
855 |
toTransferBaseParameters, CommType.TRANSFER_PARAMETERS, |
856 |
eventProxy) { |
857 |
public Object run() throws Exception { |
858 |
getServer().updateBaseParameter( |
859 |
(HashMap) getObjectArgument()); |
860 |
return null; |
861 |
} |
862 |
})); |
863 |
} |
864 |
// wait on threads to finish |
865 |
for (Future future : futures) { |
866 |
try { |
867 |
future.get(); |
868 |
} catch (InterruptedException e) { |
869 |
LOG.error(e); |
870 |
e.printStackTrace(); |
871 |
} catch (ExecutionException e) { |
872 |
LOG.error(e); |
873 |
e.printStackTrace(); |
874 |
} |
875 |
} |
876 |
dataToTransfer = false; |
877 |
toTransferBaseParameters.clear(); |
878 |
toTransferBaseParameters.clear(); |
879 |
toTransferMultiDataObjects.clear(); |
880 |
if (LOG.isDebugEnabled()) |
881 |
LOG.debug("Transfered Data to clients in " |
882 |
+ (System.currentTimeMillis() - time) / 1000000 + " ms"); |
883 |
} |
884 |
|
885 |
/* |
886 |
* (non-Javadoc) |
887 |
* |
888 |
* @see appl.parallel.spmd.SPMDClientInterface#updateNeighborhood(java.lang.Object) |
889 |
*/ |
890 |
public void updateNeighborhood(Object splittableResource) { |
891 |
checkSplittable(splittableResource); |
892 |
SplittableResource res = (SplittableResource) splittableResource; |
893 |
updateNeighborhood(res.getRootID()); |
894 |
} |
895 |
|
896 |
/* |
897 |
* (non-Javadoc) |
898 |
* |
899 |
* @see appl.parallel.spmd.SPMDClientInterface#updateNeighborhood(appl.parallel.spmd.MultiDataObject, |
900 |
* int) |
901 |
*/ |
902 |
public void updateNeighborhood(MultiDataObject multiDataObject, int index) { |
903 |
this.updateNeighborhood(multiDataObject.getMultiInfo() |
904 |
.getMultiID(index)); |
905 |
} |
906 |
|
907 |
/** |
908 |
* updates the neighborhood of the splittable with the given id on the |
909 |
* servers |
910 |
*/ |
911 |
private void updateNeighborhood(int rootID) { |
912 |
checkState(STATE.RUN); |
913 |
Future[] futureResults = new Future[servers.length]; |
914 |
for (int i = 0; i < servers.length; i++) { |
915 |
futureResults[i] = executor.submit(new DataServerThread( |
916 |
dataServers[i], serverInfos[i], rootID, |
917 |
CommType.CLIENT_UPDATE, eventProxy) { |
918 |
public Object run() throws Exception { |
919 |
getServer().updateFromNeighbors(getIntArgument()); |
920 |
return null; |
921 |
} |
922 |
}); |
923 |
} |
924 |
try { |
925 |
// wait threads to finish |
926 |
for (Future future : futureResults) { |
927 |
future.get(); |
928 |
} |
929 |
} catch (Exception e) { |
930 |
// TODO Auto-generated catch block |
931 |
e.printStackTrace(); |
932 |
} |
933 |
} |
934 |
} |