1 |
mojays |
2 |
package appl.parallel.spmd; |
2 |
|
|
|
3 |
|
|
import java.rmi.RemoteException; |
4 |
|
|
import java.util.HashMap; |
5 |
|
|
import java.util.Vector; |
6 |
|
|
import java.util.concurrent.ExecutionException; |
7 |
|
|
import java.util.concurrent.ExecutorService; |
8 |
|
|
import java.util.concurrent.Executors; |
9 |
|
|
import java.util.concurrent.Future; |
10 |
|
|
|
11 |
|
|
import org.apache.log4j.LogManager; |
12 |
|
|
import org.apache.log4j.Logger; |
13 |
|
|
|
14 |
|
|
import appl.ext.XuluConfig; |
15 |
|
|
import appl.parallel.ComputingResourceContainer; |
16 |
|
|
import appl.parallel.ComputingResourceProperties; |
17 |
alfonx |
60 |
import appl.parallel.client.ClientDataServer; |
18 |
mojays |
2 |
import appl.parallel.client.RemoteEventHandler; |
19 |
|
|
import appl.parallel.data.PartitionDataHandler; |
20 |
|
|
import appl.parallel.data.PartitionHandlerFactory; |
21 |
|
|
import appl.parallel.event.CommEventSink; |
22 |
|
|
import appl.parallel.event.CommEvent.CommType; |
23 |
alfonx |
60 |
import appl.parallel.model.AbstractParallelStepModel; |
24 |
mojays |
2 |
import appl.parallel.server.PartitionDataServer; |
25 |
|
|
import appl.parallel.server.SPMDResource; |
26 |
|
|
import appl.parallel.spmd.split.AbstractSplitMap; |
27 |
|
|
import appl.parallel.spmd.split.SinglePartitionInfo; |
28 |
|
|
import appl.parallel.spmd.split.SplitMap; |
29 |
|
|
import appl.parallel.spmd.split.SplittableResource; |
30 |
|
|
import appl.parallel.spmd.split.AbstractSplitMap.NeighborhoodBoxingMode; |
31 |
|
|
import appl.parallel.thread.ComputingResourceThread; |
32 |
|
|
import appl.parallel.thread.DataServerThread; |
33 |
alfonx |
60 |
import edu.bonn.xulu.plugin.data.grid.MultiGrid; |
34 |
mojays |
2 |
|
35 |
|
|
/** |
36 |
|
|
* This class controls all the parallelization action on the client side and is |
37 |
|
|
* the counterpart to {@link SPMDServerController}. It is accessed by the model |
38 |
|
|
* developer by retrieving the {@link SPMDClientInterface} from the |
39 |
|
|
* {@link AbstractParallelStepModel}. |
40 |
|
|
* |
41 |
|
|
* @author Dominik Appl |
42 |
|
|
*/ |
43 |
|
|
public class SPMDClientController implements SPMDClientInterface { |
44 |
|
|
|
45 |
|
|
/** |
46 |
|
|
* There are two states: <br> |
47 |
|
|
* <br> |
48 |
|
|
* <b>STATE.INIT</b> is the initializing state. In this state it is |
49 |
|
|
* allowed:<br> |
50 |
|
|
* to add resources to split control<br> |
51 |
|
|
* change neighborhodRange/boxing modes/reference resource<br> |
52 |
|
|
* <br> |
53 |
|
|
* All other methods are disabled and will throw |
54 |
|
|
* {@link UnsupportedOperationException}<br> |
55 |
|
|
* <br> |
56 |
|
|
* <b>STATE.RUN</b> is the running stage. All methods of {@link STATE#INIT} |
57 |
|
|
* are disabled and will throw {@link UnsupportedOperationException}. This |
58 |
|
|
* mode is automatically set by the first call to |
59 |
|
|
* {@link SPMDClientController#runSPMDModelTask(SPMDTask, Object...)} |
60 |
|
|
*/ |
61 |
|
|
public enum STATE { |
62 |
|
|
INIT, RUN |
63 |
|
|
} |
64 |
|
|
|
65 |
|
|
private STATE state = STATE.INIT; |
66 |
|
|
|
67 |
|
|
private final String splitMapClassPropertyName = "Parallel.splitmapforclass"; |
68 |
|
|
|
69 |
|
|
private final Logger LOG = LogManager.getLogger(this.getClass().getName()); |
70 |
|
|
|
71 |
|
|
/** |
72 |
|
|
* participating servers |
73 |
|
|
*/ |
74 |
|
|
private final SPMDResource[] servers; |
75 |
|
|
|
76 |
|
|
private final ComputingResourceProperties[] serverInfos; |
77 |
|
|
|
78 |
|
|
/** |
79 |
|
|
* stores the {@link PartitionDataServer DataServers} of the resources. They |
80 |
|
|
* are retrieved, after all resources are submitted to split control. |
81 |
|
|
*/ |
82 |
|
|
private final PartitionDataServer[] dataServers;; |
83 |
|
|
|
84 |
|
|
/** |
85 |
|
|
* number of partitions (== no of participating resources) |
86 |
|
|
*/ |
87 |
|
|
private final int noOfPartitions; |
88 |
|
|
|
89 |
|
|
/** |
90 |
|
|
* contains the {@link SinglePartitionInfo} of the |
91 |
|
|
* {@link SplittableResource splitted resources} for each server |
92 |
|
|
*/ |
93 |
|
|
private final Vector<SinglePartitionInfo>[] singlePartitionInfos; |
94 |
|
|
|
95 |
|
|
private final Vector<SinglePartitionInfo>[] toTransferPartitionInfos; |
96 |
|
|
|
97 |
|
|
private HashMap<String, Object> toTransferBaseParameters; |
98 |
|
|
|
99 |
|
|
private final HashMap<String, MultiDataInfo> multiDataInfos = new HashMap<String, MultiDataInfo>( |
100 |
|
|
10); |
101 |
|
|
|
102 |
|
|
private final HashMap<String, MultiDataInfo> toTransferMultiDataObjects = new HashMap<String, MultiDataInfo>( |
103 |
|
|
10); |
104 |
|
|
|
105 |
|
|
/** The IP-Addresses of the resources */ |
106 |
|
|
private String[] IPs; |
107 |
|
|
|
108 |
|
|
/*************************************************************************** |
109 |
|
|
* This Vector contains all IDs of the resources, which are currently under |
110 |
|
|
* splitControl |
111 |
|
|
* |
112 |
|
|
* @see #addToSplitControl(SplittableResource, String) |
113 |
|
|
* @see #mergePartition(SplittableResource) |
114 |
|
|
**************************************************************************/ |
115 |
|
|
private Vector<Integer> listOfAllActivelyControlledData = new Vector<Integer>(); |
116 |
|
|
|
117 |
|
|
/** |
118 |
|
|
* @see AbstractSplitMap.NeighborhoodBoxingMode |
119 |
|
|
*/ |
120 |
|
|
private NeighborhoodBoxingMode boxingMode = AbstractSplitMap.NeighborhoodBoxingMode.inBoxing; |
121 |
|
|
|
122 |
|
|
/** current neighborhood range, default is 0 */ |
123 |
|
|
private int neighborhoodRange = 0; |
124 |
|
|
|
125 |
|
|
private final ClientDataServer spmdClient; |
126 |
|
|
|
127 |
|
|
/** |
128 |
|
|
* Local calculation bounds on server side are calculated using the |
129 |
|
|
* reference resource This is the ID of that resource |
130 |
|
|
*/ |
131 |
|
|
protected int referenceResourceID = -1; |
132 |
|
|
|
133 |
|
|
/** Thread execution pool */ |
134 |
|
|
private final ExecutorService executor; |
135 |
|
|
|
136 |
|
|
/** true if there is data which has to be transfered to the servers */ |
137 |
|
|
private boolean dataToTransfer = false; |
138 |
|
|
|
139 |
|
|
private SplitMap splitMap; |
140 |
|
|
|
141 |
|
|
private final CommEventSink eventProxy; |
142 |
|
|
|
143 |
|
|
private final double[] weights; |
144 |
|
|
|
145 |
|
|
/** |
146 |
|
|
* Creates a new Client controller. |
147 |
|
|
* |
148 |
|
|
* @param computingResources |
149 |
|
|
* the resources which are used by this controller |
150 |
|
|
* @param spmdClient |
151 |
|
|
* the spmd client responsible for the data retrieval |
152 |
|
|
* @param weights |
153 |
|
|
* the weights for the distribution over the computing resources |
154 |
|
|
* (values with a sum of 1) or null (distribution will be |
155 |
|
|
* average) |
156 |
|
|
* @param eventProxy |
157 |
|
|
* a {@link RemoteEventHandler} for eventHandling |
158 |
|
|
*/ |
159 |
|
|
@SuppressWarnings("unchecked") |
160 |
|
|
public SPMDClientController( |
161 |
|
|
Vector<ComputingResourceContainer> computingResources, |
162 |
|
|
double[] weights, ClientDataServer spmdClient, |
163 |
|
|
CommEventSink eventProxy) { |
164 |
|
|
if (weights == null) |
165 |
|
|
weights = averageWeights(computingResources.size()); |
166 |
|
|
this.weights = weights; |
167 |
|
|
|
168 |
|
|
this.eventProxy = eventProxy; |
169 |
|
|
this.noOfPartitions = computingResources.size(); |
170 |
|
|
if (noOfPartitions == 0) |
171 |
|
|
throw new UnsupportedOperationException( |
172 |
|
|
"No Computing Ressources found"); |
173 |
|
|
|
174 |
|
|
this.spmdClient = spmdClient; |
175 |
|
|
dataServers = new PartitionDataServer[noOfPartitions]; |
176 |
|
|
serverInfos = new ComputingResourceProperties[noOfPartitions]; |
177 |
|
|
servers = new SPMDResource[noOfPartitions]; |
178 |
|
|
for (int i = 0; i < computingResources.size(); i++) { |
179 |
|
|
serverInfos[i] = computingResources.get(i).getInformation(); |
180 |
|
|
servers[i] = (SPMDResource) computingResources.get(i).getResource(); |
181 |
|
|
|
182 |
|
|
} |
183 |
|
|
// extract the IPs |
184 |
|
|
IPs = new String[noOfPartitions]; |
185 |
|
|
for (int i = 0; i < IPs.length; i++) { |
186 |
|
|
IPs[i] = serverInfos[i].getIP(); |
187 |
|
|
IPs[i] = serverInfos[i].getPort() == null ? IPs[i] : IPs[i] + ":" |
188 |
|
|
+ serverInfos[i].getPort(); |
189 |
|
|
|
190 |
|
|
if (IPs[i] == null) |
191 |
|
|
LOG |
192 |
|
|
.fatal("The IP-Information of a computingRessource was NULL. " |
193 |
|
|
+ "This will result in failure of the computation!"); |
194 |
|
|
} |
195 |
|
|
|
196 |
|
|
// initialize the singlePartitionInfos and tasks. Notice that the array |
197 |
|
|
// singlePartitionInfos is final. |
198 |
|
|
// Each vector of infos is permanently associated with one server. |
199 |
|
|
this.singlePartitionInfos = new Vector[noOfPartitions]; |
200 |
|
|
this.toTransferPartitionInfos = new Vector[noOfPartitions]; |
201 |
|
|
this.toTransferBaseParameters = new HashMap<String, Object>(); |
202 |
|
|
for (int i = 0; i < noOfPartitions; i++) { |
203 |
|
|
singlePartitionInfos[i] = new Vector<SinglePartitionInfo>(); |
204 |
|
|
toTransferPartitionInfos[i] = new Vector<SinglePartitionInfo>(); |
205 |
|
|
} |
206 |
|
|
|
207 |
|
|
// initialize executor |
208 |
|
|
executor = Executors.newCachedThreadPool(); |
209 |
|
|
|
210 |
|
|
// connect to all participating servers |
211 |
|
|
connectAll(); |
212 |
|
|
// initialize DataServers (must be after connect) |
213 |
|
|
for (int i = 0; i < noOfPartitions; i++) |
214 |
|
|
try { |
215 |
|
|
dataServers[i] = servers[i].createDataServer(IPs); |
216 |
|
|
} catch (RemoteException e) { |
217 |
|
|
LOG.fatal(e); |
218 |
|
|
e.printStackTrace(); |
219 |
|
|
} |
220 |
|
|
} |
221 |
|
|
|
222 |
|
|
private double[] averageWeights(int noResources) { |
223 |
|
|
// create weighted rating with the same weight |
224 |
|
|
double[] weights = new double[noResources]; |
225 |
|
|
for (int i = 0; i < weights.length; i++) { |
226 |
|
|
weights[i] = 1 / noResources; |
227 |
|
|
} |
228 |
|
|
return weights; |
229 |
|
|
} |
230 |
|
|
|
231 |
|
|
/* |
232 |
|
|
* (non-Javadoc) |
233 |
|
|
* |
234 |
|
|
* @see appl.parallel.spmd.SPMDClientInterface#addBaseParameter(java.lang.Object, |
235 |
|
|
* java.lang.String) |
236 |
|
|
*/ |
237 |
|
|
public void addBaseParameter(Object parameter, String parameterName) { |
238 |
|
|
toTransferBaseParameters.put(parameterName, parameter); |
239 |
|
|
dataToTransfer = true; |
240 |
|
|
} |
241 |
|
|
|
242 |
|
|
/* |
243 |
|
|
* (non-Javadoc) |
244 |
|
|
* |
245 |
|
|
* @see appl.parallel.spmd.SPMDClientInterface#addBaseParameters(java.lang.Object[], |
246 |
|
|
* java.lang.String[]) |
247 |
|
|
*/ |
248 |
|
|
public void addBaseParameters(Object[] parameters, String[] parameterNames) { |
249 |
|
|
for (int i = 0; i < parameterNames.length; i++) { |
250 |
|
|
addBaseParameter(parameters[i], parameterNames[i]); |
251 |
|
|
} |
252 |
|
|
} |
253 |
|
|
|
254 |
|
|
/* |
255 |
|
|
* (non-Javadoc) |
256 |
|
|
* |
257 |
|
|
* @see appl.parallel.spmd.SPMDClientInterface#addToMultiDataSplitControl(java.lang.Object[], |
258 |
|
|
* java.lang.String) |
259 |
|
|
*/ |
260 |
|
|
public MultiDataObject addToMultiDataSplitControl( |
261 |
|
|
Object splittableResources[], String name) { |
262 |
|
|
if (splittableResources.length == 0) { |
263 |
|
|
throw new UnsupportedOperationException( |
264 |
|
|
"There must be at least one resource to create a MultiData Element"); |
265 |
|
|
} |
266 |
|
|
// Check if adding is allowed: |
267 |
|
|
checkState(STATE.INIT); |
268 |
|
|
SplittableResource[] resources = checkSplittableArray(splittableResources); |
269 |
|
|
|
270 |
|
|
// add each element to splitcontrol. The constant gives each element a |
271 |
|
|
// unique name and identifies it on the server side as belonging to a multisplit |
272 |
|
|
|
273 |
|
|
MultiDataInfo multi = new MultiDataInfo(new int[0], name); |
274 |
|
|
for (int i = 0; i < resources.length; i++) { |
275 |
|
|
multi.addElement(resources[i].getRootID()); |
276 |
|
|
addToSplitControl(resources[i], MultiDataInfo.getNameWithIdx(i, |
277 |
|
|
name)); |
278 |
|
|
} |
279 |
|
|
toTransferMultiDataObjects.put(name, multi); |
280 |
|
|
multiDataInfos.put(name, multi); |
281 |
|
|
dataToTransfer = true; |
282 |
|
|
return new MultiDataObject(multi, spmdClient); |
283 |
|
|
} |
284 |
|
|
|
285 |
|
|
/* |
286 |
|
|
* (non-Javadoc) |
287 |
|
|
* |
288 |
|
|
* @see appl.parallel.spmd.SPMDClientInterface#addToMultiDataSplitControl(edu.bonn.xulu.plugin.data.grid.MultiGrid, |
289 |
|
|
* java.lang.String) |
290 |
|
|
*/ |
291 |
|
|
public MultiDataObject addToMultiDataSplitControl(MultiGrid multiGrid, |
292 |
|
|
String name) { |
293 |
|
|
MultiDataObject dataObject = addToMultiDataSplitControl(multiGrid |
294 |
|
|
.toArray(), name); |
295 |
|
|
dataObject.setManagedGrid(multiGrid); |
296 |
|
|
return dataObject; |
297 |
|
|
} |
298 |
|
|
|
299 |
|
|
/* |
300 |
|
|
* (non-Javadoc) |
301 |
|
|
* |
302 |
|
|
* @see appl.parallel.spmd.SPMDClientInterface#addToSplitControl(java.lang.Object, |
303 |
|
|
* java.lang.String) |
304 |
|
|
*/ |
305 |
|
|
public void addToSplitControl(Object splittableResource, String name) { |
306 |
|
|
|
307 |
|
|
SplittableResource resource = checkSplittable(splittableResource); |
308 |
|
|
// Check if adding is allowed: |
309 |
|
|
checkState(STATE.INIT); |
310 |
|
|
// if first call, than this is the reference for now - a map will be |
311 |
|
|
// generated |
312 |
|
|
if (referenceResourceID == -1) |
313 |
|
|
setReferenceResource(resource); |
314 |
|
|
|
315 |
|
|
// add the data to the SPMDClient for server retrieval |
316 |
|
|
spmdClient.addData(resource); |
317 |
|
|
// add the the resource to the controlled resources |
318 |
|
|
listOfAllActivelyControlledData.add(resource.getRootID()); |
319 |
|
|
// make the singlePartitionInfos for each participating server and store |
320 |
|
|
// the |
321 |
|
|
// info for later use |
322 |
|
|
|
323 |
|
|
SplitMap map = getSplitMap(); |
324 |
|
|
// create a partition info for each Server (only the |
325 |
|
|
// splitMapPosition differs) |
326 |
|
|
for (int i = 0; i < singlePartitionInfos.length; i++) { |
327 |
|
|
PartitionDataHandler loader = PartitionHandlerFactory.newInstance( |
328 |
|
|
resource.getRootID(), spmdClient, |
329 |
|
|
map.getPartitionBounds(i), map |
330 |
|
|
.getPartitionCalculationBounds(i)); |
331 |
|
|
SinglePartitionInfo info = new SinglePartitionInfo(resource |
332 |
|
|
.getRootID(), name, loader, map, i); |
333 |
|
|
singlePartitionInfos[i].add(info); |
334 |
|
|
toTransferPartitionInfos[i].add(info); |
335 |
|
|
dataToTransfer = true; |
336 |
|
|
} |
337 |
|
|
} |
338 |
|
|
|
339 |
|
|
/** |
340 |
|
|
* Checks if the given object is an instance of {@link SplittableResource} |
341 |
|
|
* and gives it back as splittable. |
342 |
|
|
* |
343 |
|
|
* @param splittableResource |
344 |
|
|
* @throws UnsupportedOperationException |
345 |
|
|
* if not instance of {@link SplittableResource} |
346 |
|
|
* @return the object as {@link SplittableResource} |
347 |
|
|
*/ |
348 |
|
|
protected SplittableResource checkSplittable(Object splittableResource) { |
349 |
|
|
if (!(splittableResource instanceof SplittableResource)) |
350 |
|
|
throw new UnsupportedOperationException( |
351 |
|
|
"Operation failed: the argument for 'addToSplitControl' \n " |
352 |
|
|
+ "must be an instance of SplittableResource! (like e.g. SplittableLLProxyGrid).\n " |
353 |
|
|
+ "You can add non splittable Objects as Parameters of the SPMDTasks! For arrays" |
354 |
|
|
+ "of SplittableResources use addToMultiSplitControl"); |
355 |
|
|
return (SplittableResource) splittableResource; |
356 |
|
|
} |
357 |
|
|
|
358 |
|
|
/** |
359 |
|
|
* checks if every object of the array is an instance of |
360 |
|
|
* {@link SplittableResource} and gives it back as splittable |
361 |
|
|
* |
362 |
|
|
* @param splittableResources |
363 |
|
|
* an array of (hopefully) |
364 |
|
|
* {@link SplittableResource SplittableResources} |
365 |
|
|
* @throws UnsupportedOperationException |
366 |
|
|
* if not all elements are instances of |
367 |
|
|
* {@link SplittableResource} or if the splitHeights or |
368 |
|
|
* SplitWidths do not match |
369 |
|
|
* @return the object as {@link SplittableResource} array |
370 |
|
|
*/ |
371 |
|
|
protected SplittableResource[] checkSplittableArray( |
372 |
|
|
Object[] splittableResources) { |
373 |
|
|
SplittableResource[] res = new SplittableResource[splittableResources.length]; |
374 |
|
|
for (int i = 0; i < splittableResources.length; i++) { |
375 |
|
|
if (!(splittableResources[i] instanceof SplittableResource)) |
376 |
|
|
throw new UnsupportedOperationException( |
377 |
|
|
"Operation failed: the argument must be an instance " |
378 |
|
|
+ "of SplittableResource! (like e.g. SplittableLLProxyGrid).\n " |
379 |
|
|
+ "You can add non splittable Objects as Parameters of the SPMDTasks!"); |
380 |
|
|
res[i] = (SplittableResource) splittableResources[i]; |
381 |
|
|
} |
382 |
|
|
// check if the splitLengths match |
383 |
|
|
if (res.length == 0) |
384 |
|
|
return res; |
385 |
|
|
int splitHeight = res[0].getSplitHeight(); |
386 |
|
|
int splitWidth = res[0].getSplitWidth(); |
387 |
|
|
for (SplittableResource resource : res) { |
388 |
|
|
if (resource.getSplitHeight() != splitHeight |
389 |
|
|
|| resource.getSplitWidth() != splitWidth) |
390 |
|
|
throw new UnsupportedOperationException( |
391 |
|
|
"Operation Failed: Splitvalues (height/width) of the array elements do not match!"); |
392 |
|
|
} |
393 |
|
|
return res; |
394 |
|
|
} |
395 |
|
|
|
396 |
|
|
/** |
397 |
|
|
* Throws a {@link UnsupportedOperationException} if the state does not |
398 |
|
|
* match the required state |
399 |
|
|
* |
400 |
|
|
* @param requiredState |
401 |
|
|
* the required state |
402 |
|
|
*/ |
403 |
|
|
private void checkState(STATE requiredState) { |
404 |
|
|
if (requiredState != state) |
405 |
|
|
throw new UnsupportedOperationException( |
406 |
|
|
"This Operation is not available: " + "You are in state" |
407 |
|
|
+ state + " but you should be in state " |
408 |
|
|
+ requiredState + ". See documentation of " |
409 |
|
|
+ state.getClass().getName() |
410 |
|
|
+ " for more information "); |
411 |
|
|
} |
412 |
|
|
|
413 |
|
|
/** |
414 |
|
|
* disconnects from all servers |
415 |
|
|
*/ |
416 |
|
|
public void close() { |
417 |
|
|
disconnectAll(); |
418 |
|
|
} |
419 |
|
|
|
420 |
|
|
/** |
421 |
|
|
* connects to all servers |
422 |
|
|
*/ |
423 |
|
|
@SuppressWarnings("unchecked") |
424 |
|
|
private void connectAll() { |
425 |
|
|
checkState(STATE.INIT); |
426 |
|
|
long l = System.currentTimeMillis(); |
427 |
|
|
Future[] futureResults = new Future[servers.length]; |
428 |
|
|
for (int i = 0; i < servers.length; i++) { |
429 |
|
|
futureResults[i] = executor.submit(new ComputingResourceThread( |
430 |
|
|
servers[i], serverInfos[i], null, CommType.CONNECT, |
431 |
|
|
eventProxy, true) { |
432 |
|
|
public Object run() throws Exception { |
433 |
|
|
getServer().connect(); |
434 |
|
|
return null; |
435 |
|
|
} |
436 |
|
|
}); |
437 |
|
|
} |
438 |
|
|
try { |
439 |
|
|
// wait for threads to finish |
440 |
|
|
for (Future future : futureResults) { |
441 |
|
|
future.get(); |
442 |
|
|
} |
443 |
|
|
} catch (Exception e) { |
444 |
|
|
// TODO Auto-generated catch block |
445 |
|
|
e.printStackTrace(); |
446 |
|
|
} |
447 |
|
|
System.out.println("Connect time: " + (System.currentTimeMillis() - l) |
448 |
|
|
+ " ms"); |
449 |
|
|
} |
450 |
|
|
|
451 |
|
|
/** |
452 |
|
|
* disconnect from all servers |
453 |
|
|
*/ |
454 |
|
|
private void disconnectAll() { |
455 |
|
|
Future[] futureResults = new Future[servers.length]; |
456 |
|
|
for (int i = 0; i < servers.length; i++) { |
457 |
|
|
futureResults[i] = executor.submit(new ComputingResourceThread( |
458 |
|
|
servers[i], serverInfos[i], null, CommType.DISCONNECT, |
459 |
|
|
eventProxy, true) { |
460 |
|
|
public Object run() throws Exception { |
461 |
|
|
getServer().disconnect(); |
462 |
|
|
return null; |
463 |
|
|
} |
464 |
|
|
}); |
465 |
|
|
} |
466 |
|
|
try { |
467 |
|
|
// wait threads to finish |
468 |
|
|
for (Future future : futureResults) { |
469 |
|
|
future.get(); |
470 |
|
|
} |
471 |
|
|
} catch (Exception e) { |
472 |
|
|
// TODO Auto-generated catch block |
473 |
|
|
e.printStackTrace(); |
474 |
|
|
} |
475 |
|
|
|
476 |
|
|
} |
477 |
|
|
|
478 |
|
|
/** |
479 |
|
|
* @return the actual splitmap. The referenceResource must be set before |
480 |
|
|
* call! |
481 |
|
|
*/ |
482 |
|
|
private SplitMap getSplitMap() { |
483 |
|
|
if (referenceResourceID == -1) { |
484 |
|
|
LOG.error("NO MAP CREATED YET!"); |
485 |
|
|
return null; |
486 |
|
|
} else |
487 |
|
|
return splitMap; |
488 |
|
|
} |
489 |
|
|
|
490 |
|
|
/** |
491 |
|
|
* @return the current state |
492 |
|
|
*/ |
493 |
|
|
public STATE getState() { |
494 |
|
|
return state; |
495 |
|
|
} |
496 |
|
|
|
497 |
|
|
/** |
498 |
|
|
* Creates a {@link SplitMap} for the specified resource. Use |
499 |
|
|
* {@link #getSplitMap()} for retrieval. |
500 |
|
|
* |
501 |
|
|
* @param splittable |
502 |
|
|
* the resource, for which the {@link SplitMap} is created |
503 |
|
|
*/ |
504 |
|
|
private void makeSplitMap(SplittableResource splittable) { |
505 |
|
|
SplitMap map = null; |
506 |
|
|
|
507 |
|
|
// get splitMap implementation for this splittable from XuluConfig |
508 |
|
|
String classname = XuluConfig.getXuluConfig().getProperty( |
509 |
|
|
splitMapClassPropertyName + "." |
510 |
|
|
+ splittable.getClass().getSimpleName()); |
511 |
|
|
// if no entry was found lookup default splitter |
512 |
|
|
if (classname == null) |
513 |
|
|
classname = XuluConfig.getXuluConfig().getProperty( |
514 |
|
|
splitMapClassPropertyName + "." + "default"); |
515 |
|
|
try { |
516 |
|
|
map = (SplitMap) Class.forName(classname).newInstance(); |
517 |
|
|
map.setParameters(splittable, neighborhoodRange, noOfPartitions, |
518 |
|
|
boxingMode); |
519 |
|
|
map.setWeights(weights); |
520 |
|
|
map.makeMap(); |
521 |
|
|
} catch (Exception e) { |
522 |
|
|
String error = "Could not create Splitmap from classname : '" |
523 |
|
|
+ classname + "' out of property '" |
524 |
|
|
+ splitMapClassPropertyName + "'. Nested errormessage is :" |
525 |
|
|
+ e.getMessage(); |
526 |
|
|
LOG.fatal(error, e); |
527 |
|
|
throw new UnsupportedOperationException(error); |
528 |
|
|
|
529 |
|
|
} |
530 |
|
|
this.splitMap = map; |
531 |
|
|
|
532 |
|
|
} |
533 |
|
|
|
534 |
|
|
/* |
535 |
|
|
* (non-Javadoc) |
536 |
|
|
* |
537 |
|
|
* @see appl.parallel.spmd.SPMDClientInterface#mergeAllPartitions() |
538 |
|
|
*/ |
539 |
|
|
@SuppressWarnings("unchecked") |
540 |
|
|
public synchronized void mergeAllPartitions() { |
541 |
|
|
// clone the list first, because mergePartition(int) removes elements |
542 |
|
|
// from the list, which causes |
543 |
|
|
// problems with the for-each loop |
544 |
|
|
Vector<Integer> activeIDs = (Vector<Integer>) listOfAllActivelyControlledData |
545 |
|
|
.clone(); |
546 |
|
|
// Vector<Future> futures = new Vector<Future>(); |
547 |
|
|
// for (Integer id : activeIDs) { |
548 |
|
|
// futures.add(executor.submit(new DataServerThread(null, id) { |
549 |
|
|
// public Object call() throws Exception { |
550 |
|
|
// mergePartition(getIntArgument()); |
551 |
|
|
// return null; |
552 |
|
|
// } |
553 |
|
|
// })); |
554 |
|
|
// } |
555 |
|
|
// // wait on tasks to finish |
556 |
|
|
// for (Future future : futures) { |
557 |
|
|
// try { |
558 |
|
|
// future.get(); |
559 |
|
|
// } catch (Exception e) { |
560 |
|
|
// e.printStackTrace(); |
561 |
|
|
// } |
562 |
|
|
// |
563 |
|
|
// } |
564 |
|
|
for (Integer id : activeIDs) { |
565 |
|
|
mergePartition((int) id); |
566 |
|
|
} |
567 |
|
|
} |
568 |
|
|
|
569 |
|
|
/* |
570 |
|
|
* (non-Javadoc) |
571 |
|
|
* |
572 |
|
|
* @see appl.parallel.spmd.SPMDClientInterface#mergeMultiData(appl.parallel.spmd.MultiDataObject) |
573 |
|
|
*/ |
574 |
|
|
public synchronized void mergeMultiData(MultiDataObject multidata) { |
575 |
|
|
// the problem is that perhaps on serverside grids were added, but not |
576 |
|
|
// on client side. It is assumed that all servers have created the |
577 |
|
|
// same number of grids with the same names (which is assured by |
578 |
|
|
// multiDataInfo) |
579 |
|
|
|
580 |
|
|
// first lookup the local multiData info and the info of any |
581 |
|
|
// (here the first) dataserver multiInfo - as i said: all should be the |
582 |
|
|
// same |
583 |
|
|
MultiDataInfo localInfo = multidata.getMultiInfo(); |
584 |
|
|
String name = multidata.getName(); |
585 |
|
|
MultiDataInfo remoteInfo; |
586 |
|
|
try { |
587 |
|
|
// all remote infos should be the same, so it is enough to retrieve |
588 |
|
|
// the first one |
589 |
|
|
remoteInfo = dataServers[0].getMultiDataInfo(name); |
590 |
|
|
if (localInfo == null || remoteInfo == null) { |
591 |
|
|
LOG.error("Could not lookup MultidataInfo with name " + name |
592 |
|
|
+ ". localInfo was " + localInfo + " remote info was " |
593 |
|
|
+ remoteInfo); |
594 |
|
|
return; |
595 |
|
|
} |
596 |
|
|
// merge the ids which are there on both sides: |
597 |
|
|
int i = 0; |
598 |
|
|
for (; i < localInfo.getCount(); i++) |
599 |
|
|
mergePartition(localInfo.getMultiID(i)); |
600 |
|
|
// merge the new partitions |
601 |
|
|
for (; i < remoteInfo.getCount(); i++) { |
602 |
|
|
// get the new ID (the same as on server side) |
603 |
|
|
int newID = remoteInfo.getMultiID(i); |
604 |
|
|
// first create the new data element out of the first element in |
605 |
|
|
// the list: |
606 |
|
|
multidata.addElement(newID); |
607 |
|
|
SplittableResource newResource = (SplittableResource) multidata |
608 |
|
|
.getElement(multidata.getCount() - 1); |
609 |
|
|
spmdClient.addData(newResource); |
610 |
|
|
// now merge |
611 |
|
|
mergePartition(newID); |
612 |
|
|
} |
613 |
|
|
} catch (RemoteException e) { |
614 |
|
|
LOG.error("ERROR while trying to merge multi data: " |
615 |
|
|
+ e.getMessage(), e); |
616 |
|
|
} |
617 |
|
|
} |
618 |
|
|
|
619 |
|
|
/* |
620 |
|
|
* (non-Javadoc) |
621 |
|
|
* |
622 |
|
|
* @see appl.parallel.spmd.SPMDClientInterface#mergeMultiData(appl.parallel.spmd.MultiDataObject, |
623 |
|
|
* int) |
624 |
|
|
*/ |
625 |
|
|
public void mergeMultiData(MultiDataObject multidata, int idx) { |
626 |
|
|
// the problem is that perhaps on serverside grids were added, but not |
627 |
|
|
// on client side. It is assumed that all servers have created the |
628 |
|
|
// same number of grids with the same names (which is assured by |
629 |
|
|
// multiDataInfo) |
630 |
|
|
|
631 |
|
|
// first lookup the local multiData info and the info of any |
632 |
|
|
// (here the first) dataserver multiInfo - as i said: all should be the |
633 |
|
|
// same |
634 |
|
|
MultiDataInfo localInfo = multidata.getMultiInfo(); |
635 |
|
|
String name = multidata.getName(); |
636 |
|
|
MultiDataInfo remoteInfo; |
637 |
|
|
try { |
638 |
|
|
// all remote infos should be the same, so it is enough to retrieve |
639 |
|
|
// the first one |
640 |
|
|
remoteInfo = dataServers[0].getMultiDataInfo(name); |
641 |
|
|
if (localInfo == null || remoteInfo == null) { |
642 |
|
|
LOG.error("Could not lookup MultidataInfo with name " + name |
643 |
|
|
+ ". localInfo was " + localInfo + " remote info was " |
644 |
|
|
+ remoteInfo); |
645 |
|
|
return; |
646 |
|
|
} |
647 |
|
|
// merge the grid with the given index |
648 |
|
|
// for a local idx: |
649 |
|
|
if (idx < localInfo.getCount()) |
650 |
|
|
mergePartition(localInfo.getMultiID(idx)); |
651 |
|
|
// else create a new grid |
652 |
|
|
else { |
653 |
|
|
// get the new ID (the same as on server side) |
654 |
|
|
int newID = remoteInfo.getMultiID(idx); |
655 |
|
|
if (newID == 0) |
656 |
|
|
throw new UnsupportedOperationException( |
657 |
|
|
"For the requested index (" + idx |
658 |
|
|
+ ") was no grid found on the servers"); |
659 |
|
|
// first create the new data element out of the first element in |
660 |
|
|
// the list: |
661 |
|
|
multidata.addElement(newID); |
662 |
|
|
SplittableResource newResource = (SplittableResource) multidata |
663 |
|
|
.getElement(multidata.getCount() - 1); |
664 |
|
|
spmdClient.addData(newResource); |
665 |
|
|
// now merge the serverdata into the new clientgrid |
666 |
|
|
mergePartition(newID); |
667 |
|
|
} |
668 |
|
|
} catch (RemoteException e) { |
669 |
|
|
LOG.error("ERROR while trying to merge multi data: " |
670 |
|
|
+ e.getMessage(), e); |
671 |
|
|
} |
672 |
|
|
} |
673 |
|
|
|
674 |
|
|
/* |
675 |
|
|
* (non-Javadoc) |
676 |
|
|
* |
677 |
|
|
* @see appl.parallel.spmd.SPMDClientInterface#mergePartition(int) |
678 |
|
|
*/ |
679 |
|
|
public synchronized void mergePartition(int rootID) { |
680 |
|
|
checkState(STATE.RUN); |
681 |
|
|
Future[] futures = new Future[noOfPartitions]; |
682 |
|
|
for (int i = 0; i < noOfPartitions; i++) |
683 |
|
|
futures[i] = executor.submit(new DataServerThread(dataServers[i], |
684 |
|
|
serverInfos[i], rootID, CommType.CLIENT_MERGE, eventProxy) { |
685 |
|
|
public Object run() throws Exception { |
686 |
|
|
getServer().unloadToSource(getIntArgument()); |
687 |
|
|
return null; |
688 |
|
|
} |
689 |
|
|
}); |
690 |
|
|
// wait on tasks to finish: |
691 |
|
|
try { |
692 |
|
|
// wait threads to finish |
693 |
|
|
for (Future future : futures) { |
694 |
|
|
future.get(); |
695 |
|
|
} |
696 |
|
|
} catch (Exception e) { |
697 |
|
|
// TODO Auto-generated catch block |
698 |
|
|
e.printStackTrace(); |
699 |
|
|
} |
700 |
|
|
listOfAllActivelyControlledData.remove((Integer) rootID); |
701 |
|
|
} |
702 |
|
|
|
703 |
|
|
/* |
704 |
|
|
* (non-Javadoc) |
705 |
|
|
* |
706 |
|
|
* @see appl.parallel.spmd.SPMDClientInterface#mergePartition(java.lang.Object) |
707 |
|
|
*/ |
708 |
|
|
public void mergePartition(Object splittableResource) { |
709 |
|
|
checkSplittable(splittableResource); |
710 |
|
|
checkState(STATE.RUN); |
711 |
|
|
mergePartition(((SplittableResource) splittableResource).getRootID()); |
712 |
|
|
} |
713 |
|
|
|
714 |
|
|
/* |
715 |
|
|
* (non-Javadoc) |
716 |
|
|
* |
717 |
|
|
* @see appl.parallel.spmd.SPMDClientInterface#runSPMDModelTask(appl.parallel.spmd.SPMDTask, |
718 |
|
|
* java.lang.Object[]) |
719 |
|
|
*/ |
720 |
|
|
@SuppressWarnings("unchecked") |
721 |
|
|
public Object[] runSPMDModelTask(final SPMDTask task, Object... parameters) |
722 |
|
|
throws Throwable { |
723 |
|
|
state = STATE.RUN; |
724 |
|
|
// transfer needed data |
725 |
|
|
if (dataToTransfer) { |
726 |
|
|
transferDataToServers(); |
727 |
|
|
} |
728 |
|
|
// submit task and arguments to the resources |
729 |
|
|
// each task is calculated in its own thread; |
730 |
|
|
// create tasks |
731 |
|
|
parameters = (parameters == null) ? new Object[0] : parameters; |
732 |
|
|
try { |
733 |
|
|
Future futureResults[] = new Future[servers.length]; |
734 |
|
|
for (int i = 0; i < servers.length; i++) { |
735 |
|
|
futureResults[i] = executor.submit(new ComputingResourceThread( |
736 |
|
|
servers[i], serverInfos[i], new Object[] { task, |
737 |
|
|
referenceResourceID, parameters }, |
738 |
|
|
CommType.CLIENT_EXECUTION, eventProxy) { |
739 |
|
|
public Object run() throws Exception { |
740 |
|
|
SPMDResource server = (SPMDResource) getServer(); |
741 |
|
|
return server.runSPMDModelTask(task.getClass() |
742 |
|
|
.getName(), |
743 |
|
|
(Integer) getObjectArrayArgument()[1], |
744 |
|
|
(Object[]) getObjectArrayArgument()[2]); |
745 |
|
|
} |
746 |
|
|
}); |
747 |
|
|
} |
748 |
|
|
|
749 |
|
|
Vector<Object> results = new Vector<Object>(15); |
750 |
|
|
|
751 |
|
|
// aim of the next loop is to get single Object-array containing all |
752 |
|
|
// results! |
753 |
|
|
// Notice that every server result is a result array. |
754 |
|
|
// This is because serverexecution can happen in multiple threads |
755 |
|
|
// (when multiple processors are available) each serverthread will |
756 |
|
|
// produce its own result |
757 |
|
|
for (int i = 0; i < noOfPartitions; i++) { |
758 |
|
|
// wait for threads finished and collect results |
759 |
|
|
Object[] result = (Object[]) futureResults[i].get(); |
760 |
|
|
for (int j = 0; j < result.length; j++) |
761 |
|
|
results.add(result[j]); |
762 |
|
|
} |
763 |
|
|
return results.toArray(); |
764 |
|
|
|
765 |
|
|
} catch (ExecutionException e) { |
766 |
|
|
LOG.fatal("Error while trying to execute task " |
767 |
|
|
+ task.getClass().getName() + ": " + e.getMessage(), e); |
768 |
|
|
throw e.getCause(); |
769 |
|
|
} catch (Exception e) { |
770 |
|
|
LOG.error("Error while trying to execute task " |
771 |
|
|
+ task.getClass().getName() + ": " + e.getMessage(), e); |
772 |
|
|
e.printStackTrace(); |
773 |
|
|
} |
774 |
|
|
return null; |
775 |
|
|
|
776 |
|
|
} |
777 |
|
|
|
778 |
|
|
/* |
779 |
|
|
* (non-Javadoc) |
780 |
|
|
* |
781 |
|
|
* @see appl.parallel.spmd.SPMDClientInterface#setBoxingMode(appl.parallel.spmd.split.AbstractSplitMap.NeighborhoodBoxingMode) |
782 |
|
|
*/ |
783 |
|
|
public void setBoxingMode(AbstractSplitMap.NeighborhoodBoxingMode boxingMode) { |
784 |
|
|
checkState(STATE.INIT); |
785 |
|
|
this.boxingMode = boxingMode; |
786 |
|
|
} |
787 |
|
|
|
788 |
|
|
/* |
789 |
|
|
* (non-Javadoc) |
790 |
|
|
* |
791 |
|
|
* @see appl.parallel.spmd.SPMDClientInterface#setNeighborhoodRange(int) |
792 |
|
|
*/ |
793 |
|
|
public void setNeighborhoodRange(int neighborhoodRange) { |
794 |
|
|
checkState(STATE.INIT); |
795 |
|
|
this.neighborhoodRange = neighborhoodRange; |
796 |
|
|
} |
797 |
|
|
|
798 |
|
|
/* |
799 |
|
|
* (non-Javadoc) |
800 |
|
|
* |
801 |
|
|
* @see appl.parallel.spmd.SPMDClientInterface#setReferenceResource(java.lang.Object) |
802 |
|
|
*/ |
803 |
|
|
public void setReferenceResource(Object splittableResource) { |
804 |
|
|
checkSplittable(splittableResource); |
805 |
|
|
SplittableResource res = (SplittableResource) splittableResource; |
806 |
|
|
checkState(STATE.INIT); |
807 |
|
|
if (res instanceof SplittableResource) { |
808 |
|
|
referenceResourceID = res.getRootID(); |
809 |
|
|
makeSplitMap(res); |
810 |
|
|
} else |
811 |
|
|
LOG |
812 |
|
|
.warn("Set reference failed: given resource was not an instance of SplittableResource!"); |
813 |
|
|
|
814 |
|
|
} |
815 |
|
|
|
816 |
|
|
/** |
817 |
|
|
* transfers all collected data to the servers. (The data is collected to |
818 |
|
|
* save communication time) |
819 |
|
|
*/ |
820 |
|
|
private void transferDataToServers() { |
821 |
|
|
long time = System.currentTimeMillis(); |
822 |
|
|
Vector<Future> futures = new Vector<Future>(); |
823 |
|
|
if (toTransferPartitionInfos[0].size() > 0) |
824 |
|
|
for (int i = 0; i < servers.length; i++) { |
825 |
|
|
futures.add(executor.submit(new DataServerThread( |
826 |
|
|
dataServers[i], serverInfos[i], |
827 |
|
|
toTransferPartitionInfos[i], |
828 |
|
|
CommType.TRANSFER_METADATA, eventProxy, true) { |
829 |
|
|
public Object run() throws Exception { |
830 |
|
|
getServer() |
831 |
|
|
.addPartitionInfos( |
832 |
|
|
(Vector<SinglePartitionInfo>) getObjectArgument()); |
833 |
|
|
return null; |
834 |
|
|
} |
835 |
|
|
})); |
836 |
|
|
} |
837 |
|
|
if (toTransferMultiDataObjects.size() > 0) |
838 |
|
|
for (int i = 0; i < servers.length; i++) { |
839 |
|
|
futures.add(executor.submit(new DataServerThread( |
840 |
|
|
dataServers[i], serverInfos[i], |
841 |
|
|
toTransferMultiDataObjects, CommType.TRANSFER_METADATA, |
842 |
|
|
eventProxy, true) { |
843 |
|
|
public Object run() throws Exception { |
844 |
|
|
getServer() |
845 |
|
|
.addMultiDataInfos( |
846 |
|
|
(HashMap<String, MultiDataInfo>) getObjectArgument()); |
847 |
|
|
return null; |
848 |
|
|
} |
849 |
|
|
})); |
850 |
|
|
} |
851 |
|
|
if (toTransferBaseParameters.size() > 0) |
852 |
|
|
for (int i = 0; i < servers.length; i++) { |
853 |
|
|
futures.add(executor.submit(new DataServerThread( |
854 |
|
|
dataServers[i], serverInfos[i], |
855 |
|
|
toTransferBaseParameters, CommType.TRANSFER_PARAMETERS, |
856 |
|
|
eventProxy) { |
857 |
|
|
public Object run() throws Exception { |
858 |
|
|
getServer().updateBaseParameter( |
859 |
|
|
(HashMap) getObjectArgument()); |
860 |
|
|
return null; |
861 |
|
|
} |
862 |
|
|
})); |
863 |
|
|
} |
864 |
|
|
// wait on threads to finish |
865 |
|
|
for (Future future : futures) { |
866 |
|
|
try { |
867 |
|
|
future.get(); |
868 |
|
|
} catch (InterruptedException e) { |
869 |
|
|
LOG.error(e); |
870 |
|
|
e.printStackTrace(); |
871 |
|
|
} catch (ExecutionException e) { |
872 |
|
|
LOG.error(e); |
873 |
|
|
e.printStackTrace(); |
874 |
|
|
} |
875 |
|
|
} |
876 |
|
|
dataToTransfer = false; |
877 |
|
|
toTransferBaseParameters.clear(); |
878 |
|
|
toTransferBaseParameters.clear(); |
879 |
|
|
toTransferMultiDataObjects.clear(); |
880 |
|
|
if (LOG.isDebugEnabled()) |
881 |
|
|
LOG.debug("Transfered Data to clients in " |
882 |
|
|
+ (System.currentTimeMillis() - time) / 1000000 + " ms"); |
883 |
|
|
} |
884 |
|
|
|
885 |
|
|
/* |
886 |
|
|
* (non-Javadoc) |
887 |
|
|
* |
888 |
|
|
* @see appl.parallel.spmd.SPMDClientInterface#updateNeighborhood(java.lang.Object) |
889 |
|
|
*/ |
890 |
|
|
public void updateNeighborhood(Object splittableResource) { |
891 |
|
|
checkSplittable(splittableResource); |
892 |
|
|
SplittableResource res = (SplittableResource) splittableResource; |
893 |
|
|
updateNeighborhood(res.getRootID()); |
894 |
|
|
} |
895 |
|
|
|
896 |
|
|
/* |
897 |
|
|
* (non-Javadoc) |
898 |
|
|
* |
899 |
|
|
* @see appl.parallel.spmd.SPMDClientInterface#updateNeighborhood(appl.parallel.spmd.MultiDataObject, |
900 |
|
|
* int) |
901 |
|
|
*/ |
902 |
|
|
public void updateNeighborhood(MultiDataObject multiDataObject, int index) { |
903 |
|
|
this.updateNeighborhood(multiDataObject.getMultiInfo() |
904 |
|
|
.getMultiID(index)); |
905 |
|
|
} |
906 |
|
|
|
907 |
|
|
/** |
908 |
|
|
* updates the neighborhood of the splittable with the given id on the |
909 |
|
|
* servers |
910 |
|
|
*/ |
911 |
|
|
private void updateNeighborhood(int rootID) { |
912 |
|
|
checkState(STATE.RUN); |
913 |
|
|
Future[] futureResults = new Future[servers.length]; |
914 |
|
|
for (int i = 0; i < servers.length; i++) { |
915 |
|
|
futureResults[i] = executor.submit(new DataServerThread( |
916 |
|
|
dataServers[i], serverInfos[i], rootID, |
917 |
|
|
CommType.CLIENT_UPDATE, eventProxy) { |
918 |
|
|
public Object run() throws Exception { |
919 |
|
|
getServer().updateFromNeighbors(getIntArgument()); |
920 |
|
|
return null; |
921 |
|
|
} |
922 |
|
|
}); |
923 |
|
|
} |
924 |
|
|
try { |
925 |
|
|
// wait threads to finish |
926 |
|
|
for (Future future : futureResults) { |
927 |
|
|
future.get(); |
928 |
|
|
} |
929 |
|
|
} catch (Exception e) { |
930 |
|
|
// TODO Auto-generated catch block |
931 |
|
|
e.printStackTrace(); |
932 |
|
|
} |
933 |
|
|
} |
934 |
|
|
} |