1 |
package appl.parallel.spmd; |
2 |
|
3 |
import java.net.InetAddress; |
4 |
import java.rmi.RemoteException; |
5 |
import java.util.HashMap; |
6 |
import java.util.Vector; |
7 |
import java.util.concurrent.Callable; |
8 |
import java.util.concurrent.ExecutionException; |
9 |
import java.util.concurrent.ExecutorService; |
10 |
import java.util.concurrent.Executors; |
11 |
import java.util.concurrent.Future; |
12 |
import java.util.concurrent.TimeUnit; |
13 |
|
14 |
import org.apache.log4j.LogManager; |
15 |
import org.apache.log4j.Logger; |
16 |
|
17 |
import edu.bonn.xulu.plugin.data.grid.MultiGrid; |
18 |
import appl.data.DataLoader; |
19 |
import appl.ext.XuluConfig; |
20 |
import appl.parallel.ComputingResource; |
21 |
import appl.parallel.ComputingResourceContainer; |
22 |
import appl.parallel.ComputingResourceProperties; |
23 |
import appl.parallel.client.RemoteEventHandler; |
24 |
import appl.parallel.client.ClientDataServer; |
25 |
import appl.parallel.data.PartitionDataHandler; |
26 |
import appl.parallel.data.PartitionHandlerFactory; |
27 |
import appl.parallel.data.XuluClientLoader; |
28 |
import appl.parallel.event.CommEvent; |
29 |
import appl.parallel.event.CommEventSink; |
30 |
import appl.parallel.event.TimeEvent; |
31 |
import appl.parallel.event.TimeMonitor; |
32 |
import appl.parallel.event.TransferEvent; |
33 |
import appl.parallel.event.TransferMonitor; |
34 |
import appl.parallel.event.CommEvent.CommType; |
35 |
import appl.parallel.server.PartitionDataServer; |
36 |
import appl.parallel.server.SPMDResource; |
37 |
import appl.parallel.spmd.split.AbstractSplitMap; |
38 |
import appl.parallel.spmd.split.DataPartition; |
39 |
import appl.parallel.spmd.split.SinglePartitionInfo; |
40 |
import appl.parallel.spmd.split.SplitMap; |
41 |
import appl.parallel.spmd.split.SplittableResource; |
42 |
import appl.parallel.spmd.split.AbstractSplitMap.NeighborhoodBoxingMode; |
43 |
import appl.parallel.thread.ComputingResourceThread; |
44 |
import appl.parallel.thread.DataServerThread; |
45 |
import appl.parallel.model.AbstractParallelStepModel; |
46 |
|
47 |
/** |
48 |
* This class controls all the parallelization action on the client side and is |
49 |
* the counterpart to {@link SPMDServerController}. It is accessed by the model |
50 |
* developer by retrieving the {@link SPMDClientInterface} from the |
51 |
* {@link AbstractParallelStepModel}. |
52 |
* |
53 |
* @author Dominik Appl |
54 |
*/ |
55 |
public class SPMDClientController implements SPMDClientInterface { |
56 |
|
57 |
/** |
58 |
* There are two states: <br> |
59 |
* <br> |
60 |
* <b>STATE.INIT</b> is the initializing state. In this state it is |
61 |
* allowed:<br> |
62 |
* to add resources to split control<br> |
63 |
* change neighborhodRange/boxing modes/reference resource<br> |
64 |
* <br> |
65 |
* All other methods are disabled and will throw |
66 |
* {@link UnsupportedOperationException}<br> |
67 |
* <br> |
68 |
* <b>STATE.RUN</b> is the running stage. All methods of {@link STATE#INIT} |
69 |
* are disabled and will throw {@link UnsupportedOperationException}. This |
70 |
* mode is automatically set by the first call to |
71 |
* {@link SPMDClientController#runSPMDModelTask(SPMDTask, Object...)} |
72 |
*/ |
73 |
public enum STATE { |
74 |
INIT, RUN |
75 |
} |
76 |
|
77 |
private STATE state = STATE.INIT; |
78 |
|
79 |
private final String splitMapClassPropertyName = "Parallel.splitmapforclass"; |
80 |
|
81 |
private final Logger LOG = LogManager.getLogger(this.getClass().getName()); |
82 |
|
83 |
/** |
84 |
* participating servers |
85 |
*/ |
86 |
private final SPMDResource[] servers; |
87 |
|
88 |
private final ComputingResourceProperties[] serverInfos; |
89 |
|
90 |
/** |
91 |
* stores the {@link PartitionDataServer DataServers} of the resources. They |
92 |
* are retrieved, after all resources are submitted to split control. |
93 |
*/ |
94 |
private final PartitionDataServer[] dataServers;; |
95 |
|
96 |
/** |
97 |
* number of partitions (== no of participating resources) |
98 |
*/ |
99 |
private final int noOfPartitions; |
100 |
|
101 |
/** |
102 |
* contains the {@link SinglePartitionInfo} of the |
103 |
* {@link SplittableResource splitted resources} for each server |
104 |
*/ |
105 |
private final Vector<SinglePartitionInfo>[] singlePartitionInfos; |
106 |
|
107 |
private final Vector<SinglePartitionInfo>[] toTransferPartitionInfos; |
108 |
|
109 |
private HashMap<String, Object> toTransferBaseParameters; |
110 |
|
111 |
private final HashMap<String, MultiDataInfo> multiDataInfos = new HashMap<String, MultiDataInfo>( |
112 |
10); |
113 |
|
114 |
private final HashMap<String, MultiDataInfo> toTransferMultiDataObjects = new HashMap<String, MultiDataInfo>( |
115 |
10); |
116 |
|
117 |
/** The IP-Addresses of the resources */ |
118 |
private String[] IPs; |
119 |
|
120 |
/*************************************************************************** |
121 |
* This Vector contains all IDs of the resources, which are currently under |
122 |
* splitControl |
123 |
* |
124 |
* @see #addToSplitControl(SplittableResource, String) |
125 |
* @see #mergePartition(SplittableResource) |
126 |
**************************************************************************/ |
127 |
private Vector<Integer> listOfAllActivelyControlledData = new Vector<Integer>(); |
128 |
|
129 |
/** |
130 |
* @see AbstractSplitMap.NeighborhoodBoxingMode |
131 |
*/ |
132 |
private NeighborhoodBoxingMode boxingMode = AbstractSplitMap.NeighborhoodBoxingMode.inBoxing; |
133 |
|
134 |
/** current neighborhood range, default is 0 */ |
135 |
private int neighborhoodRange = 0; |
136 |
|
137 |
private final ClientDataServer spmdClient; |
138 |
|
139 |
/** |
140 |
* Local calculation bounds on server side are calculated using the |
141 |
* reference resource This is the ID of that resource |
142 |
*/ |
143 |
protected int referenceResourceID = -1; |
144 |
|
145 |
/** Thread execution pool */ |
146 |
private final ExecutorService executor; |
147 |
|
148 |
/** true if there is data which has to be transfered to the servers */ |
149 |
private boolean dataToTransfer = false; |
150 |
|
151 |
private SplitMap splitMap; |
152 |
|
153 |
private final CommEventSink eventProxy; |
154 |
|
155 |
private final double[] weights; |
156 |
|
157 |
/** |
158 |
* Creates a new Client controller. |
159 |
* |
160 |
* @param computingResources |
161 |
* the resources which are used by this controller |
162 |
* @param spmdClient |
163 |
* the spmd client responsible for the data retrieval |
164 |
* @param weights |
165 |
* the weights for the distribution over the computing resources |
166 |
* (values with a sum of 1) or null (distribution will be |
167 |
* average) |
168 |
* @param eventProxy |
169 |
* a {@link RemoteEventHandler} for eventHandling |
170 |
*/ |
171 |
@SuppressWarnings("unchecked") |
172 |
public SPMDClientController( |
173 |
Vector<ComputingResourceContainer> computingResources, |
174 |
double[] weights, ClientDataServer spmdClient, |
175 |
CommEventSink eventProxy) { |
176 |
if (weights == null) |
177 |
weights = averageWeights(computingResources.size()); |
178 |
this.weights = weights; |
179 |
|
180 |
this.eventProxy = eventProxy; |
181 |
this.noOfPartitions = computingResources.size(); |
182 |
if (noOfPartitions == 0) |
183 |
throw new UnsupportedOperationException( |
184 |
"No Computing Ressources found"); |
185 |
|
186 |
this.spmdClient = spmdClient; |
187 |
dataServers = new PartitionDataServer[noOfPartitions]; |
188 |
serverInfos = new ComputingResourceProperties[noOfPartitions]; |
189 |
servers = new SPMDResource[noOfPartitions]; |
190 |
for (int i = 0; i < computingResources.size(); i++) { |
191 |
serverInfos[i] = computingResources.get(i).getInformation(); |
192 |
servers[i] = (SPMDResource) computingResources.get(i).getResource(); |
193 |
|
194 |
} |
195 |
// extract the IPs |
196 |
IPs = new String[noOfPartitions]; |
197 |
for (int i = 0; i < IPs.length; i++) { |
198 |
IPs[i] = serverInfos[i].getIP(); |
199 |
IPs[i] = serverInfos[i].getPort() == null ? IPs[i] : IPs[i] + ":" |
200 |
+ serverInfos[i].getPort(); |
201 |
|
202 |
if (IPs[i] == null) |
203 |
LOG |
204 |
.fatal("The IP-Information of a computingRessource was NULL. " |
205 |
+ "This will result in failure of the computation!"); |
206 |
} |
207 |
|
208 |
// initialize the singlePartitionInfos and tasks. Notice that the array |
209 |
// singlePartitionInfos is final. |
210 |
// Each vector of infos is permanently associated with one server. |
211 |
this.singlePartitionInfos = new Vector[noOfPartitions]; |
212 |
this.toTransferPartitionInfos = new Vector[noOfPartitions]; |
213 |
this.toTransferBaseParameters = new HashMap<String, Object>(); |
214 |
for (int i = 0; i < noOfPartitions; i++) { |
215 |
singlePartitionInfos[i] = new Vector<SinglePartitionInfo>(); |
216 |
toTransferPartitionInfos[i] = new Vector<SinglePartitionInfo>(); |
217 |
} |
218 |
|
219 |
// initialize executor |
220 |
executor = Executors.newCachedThreadPool(); |
221 |
|
222 |
// connect to all participating servers |
223 |
connectAll(); |
224 |
// initialize DataServers (must be after connect) |
225 |
for (int i = 0; i < noOfPartitions; i++) |
226 |
try { |
227 |
dataServers[i] = servers[i].createDataServer(IPs); |
228 |
} catch (RemoteException e) { |
229 |
LOG.fatal(e); |
230 |
e.printStackTrace(); |
231 |
} |
232 |
} |
233 |
|
234 |
private double[] averageWeights(int noResources) { |
235 |
// create weighted rating with the same weight |
236 |
double[] weights = new double[noResources]; |
237 |
for (int i = 0; i < weights.length; i++) { |
238 |
weights[i] = 1 / noResources; |
239 |
} |
240 |
return weights; |
241 |
} |
242 |
|
243 |
/* |
244 |
* (non-Javadoc) |
245 |
* |
246 |
* @see appl.parallel.spmd.SPMDClientInterface#addBaseParameter(java.lang.Object, |
247 |
* java.lang.String) |
248 |
*/ |
249 |
public void addBaseParameter(Object parameter, String parameterName) { |
250 |
toTransferBaseParameters.put(parameterName, parameter); |
251 |
dataToTransfer = true; |
252 |
} |
253 |
|
254 |
/* |
255 |
* (non-Javadoc) |
256 |
* |
257 |
* @see appl.parallel.spmd.SPMDClientInterface#addBaseParameters(java.lang.Object[], |
258 |
* java.lang.String[]) |
259 |
*/ |
260 |
public void addBaseParameters(Object[] parameters, String[] parameterNames) { |
261 |
for (int i = 0; i < parameterNames.length; i++) { |
262 |
addBaseParameter(parameters[i], parameterNames[i]); |
263 |
} |
264 |
} |
265 |
|
266 |
/* |
267 |
* (non-Javadoc) |
268 |
* |
269 |
* @see appl.parallel.spmd.SPMDClientInterface#addToMultiDataSplitControl(java.lang.Object[], |
270 |
* java.lang.String) |
271 |
*/ |
272 |
public MultiDataObject addToMultiDataSplitControl( |
273 |
Object splittableResources[], String name) { |
274 |
if (splittableResources.length == 0) { |
275 |
throw new UnsupportedOperationException( |
276 |
"There must be at least one resource to create a MultiData Element"); |
277 |
} |
278 |
// Check if adding is allowed: |
279 |
checkState(STATE.INIT); |
280 |
SplittableResource[] resources = checkSplittableArray(splittableResources); |
281 |
|
282 |
// add each element to splitcontrol. The constant gives each element a |
283 |
// unique name and identifies it on the server side as belonging to a multisplit |
284 |
|
285 |
MultiDataInfo multi = new MultiDataInfo(new int[0], name); |
286 |
for (int i = 0; i < resources.length; i++) { |
287 |
multi.addElement(resources[i].getRootID()); |
288 |
addToSplitControl(resources[i], MultiDataInfo.getNameWithIdx(i, |
289 |
name)); |
290 |
} |
291 |
toTransferMultiDataObjects.put(name, multi); |
292 |
multiDataInfos.put(name, multi); |
293 |
dataToTransfer = true; |
294 |
return new MultiDataObject(multi, spmdClient); |
295 |
} |
296 |
|
297 |
/* |
298 |
* (non-Javadoc) |
299 |
* |
300 |
* @see appl.parallel.spmd.SPMDClientInterface#addToMultiDataSplitControl(edu.bonn.xulu.plugin.data.grid.MultiGrid, |
301 |
* java.lang.String) |
302 |
*/ |
303 |
public MultiDataObject addToMultiDataSplitControl(MultiGrid multiGrid, |
304 |
String name) { |
305 |
MultiDataObject dataObject = addToMultiDataSplitControl(multiGrid |
306 |
.toArray(), name); |
307 |
dataObject.setManagedGrid(multiGrid); |
308 |
return dataObject; |
309 |
} |
310 |
|
311 |
/* |
312 |
* (non-Javadoc) |
313 |
* |
314 |
* @see appl.parallel.spmd.SPMDClientInterface#addToSplitControl(java.lang.Object, |
315 |
* java.lang.String) |
316 |
*/ |
317 |
public void addToSplitControl(Object splittableResource, String name) { |
318 |
|
319 |
SplittableResource resource = checkSplittable(splittableResource); |
320 |
// Check if adding is allowed: |
321 |
checkState(STATE.INIT); |
322 |
// if first call, than this is the reference for now - a map will be |
323 |
// generated |
324 |
if (referenceResourceID == -1) |
325 |
setReferenceResource(resource); |
326 |
|
327 |
// add the data to the SPMDClient for server retrieval |
328 |
spmdClient.addData(resource); |
329 |
// add the the resource to the controlled resources |
330 |
listOfAllActivelyControlledData.add(resource.getRootID()); |
331 |
// make the singlePartitionInfos for each participating server and store |
332 |
// the |
333 |
// info for later use |
334 |
|
335 |
SplitMap map = getSplitMap(); |
336 |
// create a partition info for each Server (only the |
337 |
// splitMapPosition differs) |
338 |
for (int i = 0; i < singlePartitionInfos.length; i++) { |
339 |
PartitionDataHandler loader = PartitionHandlerFactory.newInstance( |
340 |
resource.getRootID(), spmdClient, |
341 |
map.getPartitionBounds(i), map |
342 |
.getPartitionCalculationBounds(i)); |
343 |
SinglePartitionInfo info = new SinglePartitionInfo(resource |
344 |
.getRootID(), name, loader, map, i); |
345 |
singlePartitionInfos[i].add(info); |
346 |
toTransferPartitionInfos[i].add(info); |
347 |
dataToTransfer = true; |
348 |
} |
349 |
} |
350 |
|
351 |
/** |
352 |
* Checks if the given object is an instance of {@link SplittableResource} |
353 |
* and gives it back as splittable. |
354 |
* |
355 |
* @param splittableResource |
356 |
* @throws UnsupportedOperationException |
357 |
* if not instance of {@link SplittableResource} |
358 |
* @return the object as {@link SplittableResource} |
359 |
*/ |
360 |
protected SplittableResource checkSplittable(Object splittableResource) { |
361 |
if (!(splittableResource instanceof SplittableResource)) |
362 |
throw new UnsupportedOperationException( |
363 |
"Operation failed: the argument for 'addToSplitControl' \n " |
364 |
+ "must be an instance of SplittableResource! (like e.g. SplittableLLProxyGrid).\n " |
365 |
+ "You can add non splittable Objects as Parameters of the SPMDTasks! For arrays" |
366 |
+ "of SplittableResources use addToMultiSplitControl"); |
367 |
return (SplittableResource) splittableResource; |
368 |
} |
369 |
|
370 |
/** |
371 |
* checks if every object of the array is an instance of |
372 |
* {@link SplittableResource} and gives it back as splittable |
373 |
* |
374 |
* @param splittableResources |
375 |
* an array of (hopefully) |
376 |
* {@link SplittableResource SplittableResources} |
377 |
* @throws UnsupportedOperationException |
378 |
* if not all elements are instances of |
379 |
* {@link SplittableResource} or if the splitHeights or |
380 |
* SplitWidths do not match |
381 |
* @return the object as {@link SplittableResource} array |
382 |
*/ |
383 |
protected SplittableResource[] checkSplittableArray( |
384 |
Object[] splittableResources) { |
385 |
SplittableResource[] res = new SplittableResource[splittableResources.length]; |
386 |
for (int i = 0; i < splittableResources.length; i++) { |
387 |
if (!(splittableResources[i] instanceof SplittableResource)) |
388 |
throw new UnsupportedOperationException( |
389 |
"Operation failed: the argument must be an instance " |
390 |
+ "of SplittableResource! (like e.g. SplittableLLProxyGrid).\n " |
391 |
+ "You can add non splittable Objects as Parameters of the SPMDTasks!"); |
392 |
res[i] = (SplittableResource) splittableResources[i]; |
393 |
} |
394 |
// check if the splitLengths match |
395 |
if (res.length == 0) |
396 |
return res; |
397 |
int splitHeight = res[0].getSplitHeight(); |
398 |
int splitWidth = res[0].getSplitWidth(); |
399 |
for (SplittableResource resource : res) { |
400 |
if (resource.getSplitHeight() != splitHeight |
401 |
|| resource.getSplitWidth() != splitWidth) |
402 |
throw new UnsupportedOperationException( |
403 |
"Operation Failed: Splitvalues (height/width) of the array elements do not match!"); |
404 |
} |
405 |
return res; |
406 |
} |
407 |
|
408 |
/** |
409 |
* Throws a {@link UnsupportedOperationException} if the state does not |
410 |
* match the required state |
411 |
* |
412 |
* @param requiredState |
413 |
* the required state |
414 |
*/ |
415 |
private void checkState(STATE requiredState) { |
416 |
if (requiredState != state) |
417 |
throw new UnsupportedOperationException( |
418 |
"This Operation is not available: " + "You are in state" |
419 |
+ state + " but you should be in state " |
420 |
+ requiredState + ". See documentation of " |
421 |
+ state.getClass().getName() |
422 |
+ " for more information "); |
423 |
} |
424 |
|
425 |
/** |
426 |
* disconnects from all servers |
427 |
*/ |
428 |
public void close() { |
429 |
disconnectAll(); |
430 |
} |
431 |
|
432 |
/** |
433 |
* connects to all servers |
434 |
*/ |
435 |
@SuppressWarnings("unchecked") |
436 |
private void connectAll() { |
437 |
checkState(STATE.INIT); |
438 |
long l = System.currentTimeMillis(); |
439 |
Future[] futureResults = new Future[servers.length]; |
440 |
for (int i = 0; i < servers.length; i++) { |
441 |
futureResults[i] = executor.submit(new ComputingResourceThread( |
442 |
servers[i], serverInfos[i], null, CommType.CONNECT, |
443 |
eventProxy, true) { |
444 |
public Object run() throws Exception { |
445 |
getServer().connect(); |
446 |
return null; |
447 |
} |
448 |
}); |
449 |
} |
450 |
try { |
451 |
// wait for threads to finish |
452 |
for (Future future : futureResults) { |
453 |
future.get(); |
454 |
} |
455 |
} catch (Exception e) { |
456 |
// TODO Auto-generated catch block |
457 |
e.printStackTrace(); |
458 |
} |
459 |
System.out.println("Connect time: " + (System.currentTimeMillis() - l) |
460 |
+ " ms"); |
461 |
} |
462 |
|
463 |
/** |
464 |
* disconnect from all servers |
465 |
*/ |
466 |
private void disconnectAll() { |
467 |
Future[] futureResults = new Future[servers.length]; |
468 |
for (int i = 0; i < servers.length; i++) { |
469 |
futureResults[i] = executor.submit(new ComputingResourceThread( |
470 |
servers[i], serverInfos[i], null, CommType.DISCONNECT, |
471 |
eventProxy, true) { |
472 |
public Object run() throws Exception { |
473 |
getServer().disconnect(); |
474 |
return null; |
475 |
} |
476 |
}); |
477 |
} |
478 |
try { |
479 |
// wait threads to finish |
480 |
for (Future future : futureResults) { |
481 |
future.get(); |
482 |
} |
483 |
} catch (Exception e) { |
484 |
// TODO Auto-generated catch block |
485 |
e.printStackTrace(); |
486 |
} |
487 |
|
488 |
} |
489 |
|
490 |
/** |
491 |
* @return the actual splitmap. The referenceResource must be set before |
492 |
* call! |
493 |
*/ |
494 |
private SplitMap getSplitMap() { |
495 |
if (referenceResourceID == -1) { |
496 |
LOG.error("NO MAP CREATED YET!"); |
497 |
return null; |
498 |
} else |
499 |
return splitMap; |
500 |
} |
501 |
|
502 |
/** |
503 |
* @return the current state |
504 |
*/ |
505 |
public STATE getState() { |
506 |
return state; |
507 |
} |
508 |
|
509 |
/** |
510 |
* Creates a {@link SplitMap} for the specified resource. Use |
511 |
* {@link #getSplitMap()} for retrieval. |
512 |
* |
513 |
* @param splittable |
514 |
* the resource, for which the {@link SplitMap} is created |
515 |
*/ |
516 |
private void makeSplitMap(SplittableResource splittable) { |
517 |
SplitMap map = null; |
518 |
|
519 |
// get splitMap implementation for this splittable from XuluConfig |
520 |
String classname = XuluConfig.getXuluConfig().getProperty( |
521 |
splitMapClassPropertyName + "." |
522 |
+ splittable.getClass().getSimpleName()); |
523 |
// if no entry was found lookup default splitter |
524 |
if (classname == null) |
525 |
classname = XuluConfig.getXuluConfig().getProperty( |
526 |
splitMapClassPropertyName + "." + "default"); |
527 |
try { |
528 |
map = (SplitMap) Class.forName(classname).newInstance(); |
529 |
map.setParameters(splittable, neighborhoodRange, noOfPartitions, |
530 |
boxingMode); |
531 |
map.setWeights(weights); |
532 |
map.makeMap(); |
533 |
} catch (Exception e) { |
534 |
String error = "Could not create Splitmap from classname : '" |
535 |
+ classname + "' out of property '" |
536 |
+ splitMapClassPropertyName + "'. Nested errormessage is :" |
537 |
+ e.getMessage(); |
538 |
LOG.fatal(error, e); |
539 |
throw new UnsupportedOperationException(error); |
540 |
|
541 |
} |
542 |
this.splitMap = map; |
543 |
|
544 |
} |
545 |
|
546 |
/* |
547 |
* (non-Javadoc) |
548 |
* |
549 |
* @see appl.parallel.spmd.SPMDClientInterface#mergeAllPartitions() |
550 |
*/ |
551 |
@SuppressWarnings("unchecked") |
552 |
public synchronized void mergeAllPartitions() { |
553 |
// clone the list first, because mergePartition(int) removes elements |
554 |
// from the list, which causes |
555 |
// problems with the for-each loop |
556 |
Vector<Integer> activeIDs = (Vector<Integer>) listOfAllActivelyControlledData |
557 |
.clone(); |
558 |
// Vector<Future> futures = new Vector<Future>(); |
559 |
// for (Integer id : activeIDs) { |
560 |
// futures.add(executor.submit(new DataServerThread(null, id) { |
561 |
// public Object call() throws Exception { |
562 |
// mergePartition(getIntArgument()); |
563 |
// return null; |
564 |
// } |
565 |
// })); |
566 |
// } |
567 |
// // wait on tasks to finish |
568 |
// for (Future future : futures) { |
569 |
// try { |
570 |
// future.get(); |
571 |
// } catch (Exception e) { |
572 |
// e.printStackTrace(); |
573 |
// } |
574 |
// |
575 |
// } |
576 |
for (Integer id : activeIDs) { |
577 |
mergePartition((int) id); |
578 |
} |
579 |
} |
580 |
|
581 |
/* |
582 |
* (non-Javadoc) |
583 |
* |
584 |
* @see appl.parallel.spmd.SPMDClientInterface#mergeMultiData(appl.parallel.spmd.MultiDataObject) |
585 |
*/ |
586 |
public synchronized void mergeMultiData(MultiDataObject multidata) { |
587 |
// the problem is that perhaps on serverside grids were added, but not |
588 |
// on client side. It is assumed that all servers have created the |
589 |
// same number of grids with the same names (which is assured by |
590 |
// multiDataInfo) |
591 |
|
592 |
// first lookup the local multiData info and the info of any |
593 |
// (here the first) dataserver multiInfo - as i said: all should be the |
594 |
// same |
595 |
MultiDataInfo localInfo = multidata.getMultiInfo(); |
596 |
String name = multidata.getName(); |
597 |
MultiDataInfo remoteInfo; |
598 |
try { |
599 |
// all remote infos should be the same, so it is enough to retrieve |
600 |
// the first one |
601 |
remoteInfo = dataServers[0].getMultiDataInfo(name); |
602 |
if (localInfo == null || remoteInfo == null) { |
603 |
LOG.error("Could not lookup MultidataInfo with name " + name |
604 |
+ ". localInfo was " + localInfo + " remote info was " |
605 |
+ remoteInfo); |
606 |
return; |
607 |
} |
608 |
// merge the ids which are there on both sides: |
609 |
int i = 0; |
610 |
for (; i < localInfo.getCount(); i++) |
611 |
mergePartition(localInfo.getMultiID(i)); |
612 |
// merge the new partitions |
613 |
for (; i < remoteInfo.getCount(); i++) { |
614 |
// get the new ID (the same as on server side) |
615 |
int newID = remoteInfo.getMultiID(i); |
616 |
// first create the new data element out of the first element in |
617 |
// the list: |
618 |
multidata.addElement(newID); |
619 |
SplittableResource newResource = (SplittableResource) multidata |
620 |
.getElement(multidata.getCount() - 1); |
621 |
spmdClient.addData(newResource); |
622 |
// now merge |
623 |
mergePartition(newID); |
624 |
} |
625 |
} catch (RemoteException e) { |
626 |
LOG.error("ERROR while trying to merge multi data: " |
627 |
+ e.getMessage(), e); |
628 |
} |
629 |
} |
630 |
|
631 |
/* |
632 |
* (non-Javadoc) |
633 |
* |
634 |
* @see appl.parallel.spmd.SPMDClientInterface#mergeMultiData(appl.parallel.spmd.MultiDataObject, |
635 |
* int) |
636 |
*/ |
637 |
public void mergeMultiData(MultiDataObject multidata, int idx) { |
638 |
// the problem is that perhaps on serverside grids were added, but not |
639 |
// on client side. It is assumed that all servers have created the |
640 |
// same number of grids with the same names (which is assured by |
641 |
// multiDataInfo) |
642 |
|
643 |
// first lookup the local multiData info and the info of any |
644 |
// (here the first) dataserver multiInfo - as i said: all should be the |
645 |
// same |
646 |
MultiDataInfo localInfo = multidata.getMultiInfo(); |
647 |
String name = multidata.getName(); |
648 |
MultiDataInfo remoteInfo; |
649 |
try { |
650 |
// all remote infos should be the same, so it is enough to retrieve |
651 |
// the first one |
652 |
remoteInfo = dataServers[0].getMultiDataInfo(name); |
653 |
if (localInfo == null || remoteInfo == null) { |
654 |
LOG.error("Could not lookup MultidataInfo with name " + name |
655 |
+ ". localInfo was " + localInfo + " remote info was " |
656 |
+ remoteInfo); |
657 |
return; |
658 |
} |
659 |
// merge the grid with the given index |
660 |
// for a local idx: |
661 |
if (idx < localInfo.getCount()) |
662 |
mergePartition(localInfo.getMultiID(idx)); |
663 |
// else create a new grid |
664 |
else { |
665 |
// get the new ID (the same as on server side) |
666 |
int newID = remoteInfo.getMultiID(idx); |
667 |
if (newID == 0) |
668 |
throw new UnsupportedOperationException( |
669 |
"For the requested index (" + idx |
670 |
+ ") was no grid found on the servers"); |
671 |
// first create the new data element out of the first element in |
672 |
// the list: |
673 |
multidata.addElement(newID); |
674 |
SplittableResource newResource = (SplittableResource) multidata |
675 |
.getElement(multidata.getCount() - 1); |
676 |
spmdClient.addData(newResource); |
677 |
// now merge the serverdata into the new clientgrid |
678 |
mergePartition(newID); |
679 |
} |
680 |
} catch (RemoteException e) { |
681 |
LOG.error("ERROR while trying to merge multi data: " |
682 |
+ e.getMessage(), e); |
683 |
} |
684 |
} |
685 |
|
686 |
/* |
687 |
* (non-Javadoc) |
688 |
* |
689 |
* @see appl.parallel.spmd.SPMDClientInterface#mergePartition(int) |
690 |
*/ |
691 |
public synchronized void mergePartition(int rootID) { |
692 |
checkState(STATE.RUN); |
693 |
Future[] futures = new Future[noOfPartitions]; |
694 |
for (int i = 0; i < noOfPartitions; i++) |
695 |
futures[i] = executor.submit(new DataServerThread(dataServers[i], |
696 |
serverInfos[i], rootID, CommType.CLIENT_MERGE, eventProxy) { |
697 |
public Object run() throws Exception { |
698 |
getServer().unloadToSource(getIntArgument()); |
699 |
return null; |
700 |
} |
701 |
}); |
702 |
// wait on tasks to finish: |
703 |
try { |
704 |
// wait threads to finish |
705 |
for (Future future : futures) { |
706 |
future.get(); |
707 |
} |
708 |
} catch (Exception e) { |
709 |
// TODO Auto-generated catch block |
710 |
e.printStackTrace(); |
711 |
} |
712 |
listOfAllActivelyControlledData.remove((Integer) rootID); |
713 |
} |
714 |
|
715 |
/* |
716 |
* (non-Javadoc) |
717 |
* |
718 |
* @see appl.parallel.spmd.SPMDClientInterface#mergePartition(java.lang.Object) |
719 |
*/ |
720 |
public void mergePartition(Object splittableResource) { |
721 |
checkSplittable(splittableResource); |
722 |
checkState(STATE.RUN); |
723 |
mergePartition(((SplittableResource) splittableResource).getRootID()); |
724 |
} |
725 |
|
726 |
/* |
727 |
* (non-Javadoc) |
728 |
* |
729 |
* @see appl.parallel.spmd.SPMDClientInterface#runSPMDModelTask(appl.parallel.spmd.SPMDTask, |
730 |
* java.lang.Object[]) |
731 |
*/ |
732 |
@SuppressWarnings("unchecked") |
733 |
public Object[] runSPMDModelTask(final SPMDTask task, Object... parameters) |
734 |
throws Throwable { |
735 |
state = STATE.RUN; |
736 |
// transfer needed data |
737 |
if (dataToTransfer) { |
738 |
transferDataToServers(); |
739 |
} |
740 |
// submit task and arguments to the resources |
741 |
// each task is calculated in its own thread; |
742 |
// create tasks |
743 |
parameters = (parameters == null) ? new Object[0] : parameters; |
744 |
try { |
745 |
Future futureResults[] = new Future[servers.length]; |
746 |
for (int i = 0; i < servers.length; i++) { |
747 |
futureResults[i] = executor.submit(new ComputingResourceThread( |
748 |
servers[i], serverInfos[i], new Object[] { task, |
749 |
referenceResourceID, parameters }, |
750 |
CommType.CLIENT_EXECUTION, eventProxy) { |
751 |
public Object run() throws Exception { |
752 |
SPMDResource server = (SPMDResource) getServer(); |
753 |
return server.runSPMDModelTask(task.getClass() |
754 |
.getName(), |
755 |
(Integer) getObjectArrayArgument()[1], |
756 |
(Object[]) getObjectArrayArgument()[2]); |
757 |
} |
758 |
}); |
759 |
} |
760 |
|
761 |
Vector<Object> results = new Vector<Object>(15); |
762 |
|
763 |
// aim of the next loop is to get single Object-array containing all |
764 |
// results! |
765 |
// Notice that every server result is a result array. |
766 |
// This is because serverexecution can happen in multiple threads |
767 |
// (when multiple processors are available) each serverthread will |
768 |
// produce its own result |
769 |
for (int i = 0; i < noOfPartitions; i++) { |
770 |
// wait for threads finished and collect results |
771 |
Object[] result = (Object[]) futureResults[i].get(); |
772 |
for (int j = 0; j < result.length; j++) |
773 |
results.add(result[j]); |
774 |
} |
775 |
return results.toArray(); |
776 |
|
777 |
} catch (ExecutionException e) { |
778 |
LOG.fatal("Error while trying to execute task " |
779 |
+ task.getClass().getName() + ": " + e.getMessage(), e); |
780 |
throw e.getCause(); |
781 |
} catch (Exception e) { |
782 |
LOG.error("Error while trying to execute task " |
783 |
+ task.getClass().getName() + ": " + e.getMessage(), e); |
784 |
e.printStackTrace(); |
785 |
} |
786 |
return null; |
787 |
|
788 |
} |
789 |
|
790 |
/* |
791 |
* (non-Javadoc) |
792 |
* |
793 |
* @see appl.parallel.spmd.SPMDClientInterface#setBoxingMode(appl.parallel.spmd.split.AbstractSplitMap.NeighborhoodBoxingMode) |
794 |
*/ |
795 |
public void setBoxingMode(AbstractSplitMap.NeighborhoodBoxingMode boxingMode) { |
796 |
checkState(STATE.INIT); |
797 |
this.boxingMode = boxingMode; |
798 |
} |
799 |
|
800 |
/* |
801 |
* (non-Javadoc) |
802 |
* |
803 |
* @see appl.parallel.spmd.SPMDClientInterface#setNeighborhoodRange(int) |
804 |
*/ |
805 |
public void setNeighborhoodRange(int neighborhoodRange) { |
806 |
checkState(STATE.INIT); |
807 |
this.neighborhoodRange = neighborhoodRange; |
808 |
} |
809 |
|
810 |
/* |
811 |
* (non-Javadoc) |
812 |
* |
813 |
* @see appl.parallel.spmd.SPMDClientInterface#setReferenceResource(java.lang.Object) |
814 |
*/ |
815 |
public void setReferenceResource(Object splittableResource) { |
816 |
checkSplittable(splittableResource); |
817 |
SplittableResource res = (SplittableResource) splittableResource; |
818 |
checkState(STATE.INIT); |
819 |
if (res instanceof SplittableResource) { |
820 |
referenceResourceID = res.getRootID(); |
821 |
makeSplitMap(res); |
822 |
} else |
823 |
LOG |
824 |
.warn("Set reference failed: given resource was not an instance of SplittableResource!"); |
825 |
|
826 |
} |
827 |
|
828 |
/** |
829 |
* transfers all collected data to the servers. (The data is collected to |
830 |
* save communication time) |
831 |
*/ |
832 |
private void transferDataToServers() { |
833 |
long time = System.currentTimeMillis(); |
834 |
Vector<Future> futures = new Vector<Future>(); |
835 |
if (toTransferPartitionInfos[0].size() > 0) |
836 |
for (int i = 0; i < servers.length; i++) { |
837 |
futures.add(executor.submit(new DataServerThread( |
838 |
dataServers[i], serverInfos[i], |
839 |
toTransferPartitionInfos[i], |
840 |
CommType.TRANSFER_METADATA, eventProxy, true) { |
841 |
public Object run() throws Exception { |
842 |
getServer() |
843 |
.addPartitionInfos( |
844 |
(Vector<SinglePartitionInfo>) getObjectArgument()); |
845 |
return null; |
846 |
} |
847 |
})); |
848 |
} |
849 |
if (toTransferMultiDataObjects.size() > 0) |
850 |
for (int i = 0; i < servers.length; i++) { |
851 |
futures.add(executor.submit(new DataServerThread( |
852 |
dataServers[i], serverInfos[i], |
853 |
toTransferMultiDataObjects, CommType.TRANSFER_METADATA, |
854 |
eventProxy, true) { |
855 |
public Object run() throws Exception { |
856 |
getServer() |
857 |
.addMultiDataInfos( |
858 |
(HashMap<String, MultiDataInfo>) getObjectArgument()); |
859 |
return null; |
860 |
} |
861 |
})); |
862 |
} |
863 |
if (toTransferBaseParameters.size() > 0) |
864 |
for (int i = 0; i < servers.length; i++) { |
865 |
futures.add(executor.submit(new DataServerThread( |
866 |
dataServers[i], serverInfos[i], |
867 |
toTransferBaseParameters, CommType.TRANSFER_PARAMETERS, |
868 |
eventProxy) { |
869 |
public Object run() throws Exception { |
870 |
getServer().updateBaseParameter( |
871 |
(HashMap) getObjectArgument()); |
872 |
return null; |
873 |
} |
874 |
})); |
875 |
} |
876 |
// wait on threads to finish |
877 |
for (Future future : futures) { |
878 |
try { |
879 |
future.get(); |
880 |
} catch (InterruptedException e) { |
881 |
LOG.error(e); |
882 |
e.printStackTrace(); |
883 |
} catch (ExecutionException e) { |
884 |
LOG.error(e); |
885 |
e.printStackTrace(); |
886 |
} |
887 |
} |
888 |
dataToTransfer = false; |
889 |
toTransferBaseParameters.clear(); |
890 |
toTransferBaseParameters.clear(); |
891 |
toTransferMultiDataObjects.clear(); |
892 |
if (LOG.isDebugEnabled()) |
893 |
LOG.debug("Transfered Data to clients in " |
894 |
+ (System.currentTimeMillis() - time) / 1000000 + " ms"); |
895 |
} |
896 |
|
897 |
/* |
898 |
* (non-Javadoc) |
899 |
* |
900 |
* @see appl.parallel.spmd.SPMDClientInterface#updateNeighborhood(java.lang.Object) |
901 |
*/ |
902 |
public void updateNeighborhood(Object splittableResource) { |
903 |
checkSplittable(splittableResource); |
904 |
SplittableResource res = (SplittableResource) splittableResource; |
905 |
updateNeighborhood(res.getRootID()); |
906 |
} |
907 |
|
908 |
/* |
909 |
* (non-Javadoc) |
910 |
* |
911 |
* @see appl.parallel.spmd.SPMDClientInterface#updateNeighborhood(appl.parallel.spmd.MultiDataObject, |
912 |
* int) |
913 |
*/ |
914 |
public void updateNeighborhood(MultiDataObject multiDataObject, int index) { |
915 |
this.updateNeighborhood(multiDataObject.getMultiInfo() |
916 |
.getMultiID(index)); |
917 |
} |
918 |
|
919 |
/** |
920 |
* updates the neighborhood of the splittable with the given id on the |
921 |
* servers |
922 |
*/ |
923 |
private void updateNeighborhood(int rootID) { |
924 |
checkState(STATE.RUN); |
925 |
Future[] futureResults = new Future[servers.length]; |
926 |
for (int i = 0; i < servers.length; i++) { |
927 |
futureResults[i] = executor.submit(new DataServerThread( |
928 |
dataServers[i], serverInfos[i], rootID, |
929 |
CommType.CLIENT_UPDATE, eventProxy) { |
930 |
public Object run() throws Exception { |
931 |
getServer().updateFromNeighbors(getIntArgument()); |
932 |
return null; |
933 |
} |
934 |
}); |
935 |
} |
936 |
try { |
937 |
// wait threads to finish |
938 |
for (Future future : futureResults) { |
939 |
future.get(); |
940 |
} |
941 |
} catch (Exception e) { |
942 |
// TODO Auto-generated catch block |
943 |
e.printStackTrace(); |
944 |
} |
945 |
} |
946 |
} |