1 |
mojays |
2 |
package appl.parallel.server; |
2 |
|
|
|
3 |
|
|
import java.awt.Rectangle; |
4 |
|
|
import java.net.InetAddress; |
5 |
|
|
import java.net.UnknownHostException; |
6 |
|
|
import java.rmi.Naming; |
7 |
|
|
import java.rmi.NotBoundException; |
8 |
|
|
import java.rmi.RemoteException; |
9 |
|
|
import java.rmi.registry.LocateRegistry; |
10 |
|
|
import java.rmi.registry.Registry; |
11 |
|
|
import java.rmi.server.UnicastRemoteObject; |
12 |
|
|
import java.util.HashMap; |
13 |
|
|
import java.util.Hashtable; |
14 |
|
|
import java.util.Vector; |
15 |
|
|
import java.util.concurrent.ExecutionException; |
16 |
|
|
import java.util.concurrent.ExecutorService; |
17 |
|
|
import java.util.concurrent.Executors; |
18 |
|
|
import java.util.concurrent.Future; |
19 |
|
|
|
20 |
|
|
import org.apache.log4j.LogManager; |
21 |
|
|
import org.apache.log4j.Logger; |
22 |
|
|
|
23 |
|
|
import appl.data.LoadingException; |
24 |
alfonx |
78 |
import appl.parallel.client.ClientDataServer; |
25 |
mojays |
2 |
import appl.parallel.client.DataServer; |
26 |
|
|
import appl.parallel.data.PartitionDataHandler; |
27 |
|
|
import appl.parallel.event.CommEventSink; |
28 |
|
|
import appl.parallel.event.TimeEvent; |
29 |
|
|
import appl.parallel.event.TransferEvent; |
30 |
|
|
import appl.parallel.event.CommEvent.CommType; |
31 |
|
|
import appl.parallel.services.RemoteEventProxy; |
32 |
|
|
import appl.parallel.spmd.MultiDataInfo; |
33 |
|
|
import appl.parallel.spmd.MultiDataPartitionObject; |
34 |
|
|
import appl.parallel.spmd.SPMDClientController; |
35 |
|
|
import appl.parallel.spmd.split.DataPartition; |
36 |
|
|
import appl.parallel.spmd.split.PartitionInfo; |
37 |
|
|
import appl.parallel.spmd.split.SinglePartitionInfo; |
38 |
|
|
import appl.parallel.spmd.split.SplitMap; |
39 |
|
|
import appl.parallel.thread.DataServerThread; |
40 |
|
|
|
41 |
|
|
/** |
42 |
|
|
* Manages the all data for the {@link XuluServer}. This includes retrieval of |
43 |
|
|
* partitions from connected resources like the {@link DataServer} or, as part |
44 |
|
|
* of the update of neighborhood regions, from other |
45 |
|
|
* {@link XuluServer XuluServers}. |
46 |
|
|
* |
47 |
|
|
* @author Dominik Appl |
48 |
|
|
*/ |
49 |
|
|
public class PartitionDataManager extends UnicastRemoteObject implements |
50 |
|
|
PartitionDataServer { |
51 |
|
|
|
52 |
|
|
/** |
53 |
|
|
* The Thread is used for updating the partitions neighborhood region from |
54 |
|
|
* the other servers. |
55 |
|
|
* |
56 |
|
|
* @author Dominik Appl |
57 |
|
|
*/ |
58 |
|
|
class UpdateThread extends DataServerThread { |
59 |
|
|
|
60 |
|
|
private final DataPartition thisPartition; |
61 |
|
|
|
62 |
|
|
private final Rectangle updateBounds; |
63 |
|
|
|
64 |
|
|
private final int id; |
65 |
|
|
|
66 |
|
|
private final String hostname; |
67 |
|
|
|
68 |
|
|
public UpdateThread(PartitionDataServer dataServer, |
69 |
|
|
String ipOrHostname, DataPartition thisPartition, |
70 |
|
|
Rectangle updateBounds, int id, CommEventSink sink) { |
71 |
|
|
super(dataServer, null, null, CommType.REMOTE_UPDATE, sink); |
72 |
|
|
hostname = ipOrHostname; |
73 |
|
|
// TODO Auto-generated constructor stub |
74 |
|
|
this.thisPartition = thisPartition; |
75 |
|
|
this.updateBounds = updateBounds; |
76 |
|
|
this.id = id; |
77 |
|
|
} |
78 |
|
|
|
79 |
|
|
/* |
80 |
|
|
* (non-Javadoc) |
81 |
|
|
* |
82 |
|
|
* @see appl.parallel.thread.ExecutionThread#fireTimeEvents(long, |
83 |
|
|
* java.lang.Object) |
84 |
|
|
*/ |
85 |
|
|
@Override |
86 |
|
|
protected void fireTimeEvents(long execTime, Object result) { |
87 |
|
|
try { |
88 |
|
|
eventSink.fireRemoteEvent(new TimeEvent(execTime, |
89 |
|
|
dataServerName, hostname, commType)); |
90 |
|
|
} catch (RemoteException e) { |
91 |
|
|
e.printStackTrace(); |
92 |
|
|
} |
93 |
|
|
} |
94 |
|
|
|
95 |
|
|
/* |
96 |
|
|
* (non-Javadoc) |
97 |
|
|
* |
98 |
|
|
* @see appl.parallel.thread.ExecutionThread#fireTransferEvent(java.lang.Object) |
99 |
|
|
*/ |
100 |
|
|
@Override |
101 |
|
|
protected void fireTransferEvent(Object result) { |
102 |
|
|
try { |
103 |
|
|
eventSink.fireRemoteEvent(new TransferEvent(dataServerName, |
104 |
|
|
hostname, commType, new Object[] { result })); |
105 |
|
|
} catch (RemoteException e) { |
106 |
|
|
e.printStackTrace(); |
107 |
|
|
} |
108 |
|
|
} |
109 |
|
|
|
110 |
|
|
/* |
111 |
|
|
* (non-Javadoc) |
112 |
|
|
* |
113 |
|
|
* @see appl.parallel.thread.ExecutionThread#run() |
114 |
|
|
*/ |
115 |
|
|
@Override |
116 |
|
|
protected Object run() throws Exception { |
117 |
|
|
try { |
118 |
|
|
DataPartition updateData = getServer().getPartition(id, |
119 |
|
|
updateBounds); |
120 |
|
|
// apply update: |
121 |
|
|
this.thisPartition.setPartition(updateData, updateBounds); |
122 |
|
|
return updateData; |
123 |
|
|
} catch (Exception e) { |
124 |
|
|
LOG.error("Error while retrieving partition " + updateBounds |
125 |
|
|
+ "from PartitionDataServer with ip " + hostname); |
126 |
|
|
} |
127 |
|
|
return null; |
128 |
|
|
} |
129 |
|
|
} |
130 |
|
|
|
131 |
|
|
private final String bindingName = "PartitionDataServer"; |
132 |
|
|
|
133 |
|
|
private final Logger LOG = LogManager.getLogger(this.getClass().getName()); |
134 |
|
|
|
135 |
|
|
/** |
136 |
|
|
* all partition data. The key is the id. Is an Hashtable instead of |
137 |
|
|
* HashMasp for thread-safty. |
138 |
|
|
*/ |
139 |
|
|
private Hashtable<Integer, DataPartition> data = new Hashtable<Integer, DataPartition>( |
140 |
|
|
15); |
141 |
|
|
|
142 |
|
|
/** |
143 |
|
|
* the names of the data objects |
144 |
|
|
*/ |
145 |
|
|
private HashMap<String, Integer> NameToIDMapping = new HashMap<String, Integer>( |
146 |
|
|
15); |
147 |
|
|
|
148 |
|
|
private HashMap<Integer, SinglePartitionInfo> infos = new HashMap<Integer, SinglePartitionInfo>( |
149 |
|
|
10); |
150 |
|
|
|
151 |
|
|
private HashMap<String, Object> baseParameters = new HashMap<String, Object>( |
152 |
|
|
10); |
153 |
|
|
|
154 |
|
|
private final PartitionDataServer[] dataServers; |
155 |
|
|
|
156 |
|
|
private final ExecutorService executor; |
157 |
|
|
|
158 |
|
|
private final String[] IPs; |
159 |
|
|
|
160 |
|
|
private boolean dataServersInitialized = false; |
161 |
|
|
|
162 |
|
|
private HashMap<String, MultiDataInfo> multiDataInfos = new HashMap<String, MultiDataInfo>( |
163 |
|
|
10); |
164 |
|
|
|
165 |
|
|
private final String dataServerName; |
166 |
|
|
|
167 |
|
|
private final CommEventSink remoteEventReceiver; |
168 |
|
|
|
169 |
|
|
private String[] hostnames; |
170 |
|
|
|
171 |
|
|
private ClientDataServer localSPMDClient; |
172 |
|
|
|
173 |
|
|
private final int registryPort; |
174 |
|
|
|
175 |
|
|
/** |
176 |
|
|
* Creates and inits a new PartitionDataserver. |
177 |
|
|
* |
178 |
|
|
* @param IPs |
179 |
|
|
* the IPs of participating Servers for neighborhood updates(may |
180 |
|
|
* include port specification) |
181 |
|
|
* @param remoteEventReceiver |
182 |
|
|
* generated events are forwarded to the receiver |
183 |
|
|
* @param registryPort |
184 |
|
|
* port of the registry to which the {@link PartitionDataManager} |
185 |
|
|
* should be bound |
186 |
|
|
* @throws RemoteException |
187 |
|
|
* if a connection to a participating server fails |
188 |
|
|
*/ |
189 |
|
|
public PartitionDataManager(String[] IPs, |
190 |
|
|
CommEventSink remoteEventReceiver, int registryPort) |
191 |
|
|
throws RemoteException { |
192 |
|
|
// create NameToID and IdToInfo-Mapping |
193 |
|
|
this.IPs = IPs; |
194 |
|
|
this.remoteEventReceiver = remoteEventReceiver; |
195 |
|
|
this.registryPort = registryPort; |
196 |
|
|
|
197 |
|
|
dataServers = new PartitionDataServer[IPs.length]; |
198 |
|
|
startTheServer(); |
199 |
|
|
|
200 |
|
|
// start Thread service |
201 |
|
|
executor = Executors.newCachedThreadPool(); |
202 |
|
|
String name = "unknown PartitionDataServer"; |
203 |
|
|
try { |
204 |
|
|
name = InetAddress.getLocalHost().getHostName(); |
205 |
|
|
} catch (UnknownHostException e) { |
206 |
|
|
// TODO Auto-generated catch block |
207 |
|
|
e.printStackTrace(); |
208 |
|
|
} |
209 |
|
|
dataServerName = name; |
210 |
|
|
// try to get the hostnames |
211 |
|
|
hostnames = new String[IPs.length]; |
212 |
|
|
for (int i = 0; i < IPs.length; i++) { |
213 |
|
|
// try { |
214 |
|
|
hostnames[i] = IPs[i]; |
215 |
|
|
// hostnames[i] = InetAddress.getByName(IPs[i]).getHostName(); |
216 |
|
|
// } catch (UnknownHostException e) { |
217 |
|
|
// LOG.warn("Exception while retrivieving hostname of address: " + |
218 |
|
|
// IPs[i]); |
219 |
|
|
// } |
220 |
|
|
} |
221 |
|
|
} |
222 |
|
|
|
223 |
|
|
/** |
224 |
|
|
* Same as {@link #PartitionDataManager(String[], CommEventSink, int)}, but uses |
225 |
|
|
* a local SPMDClient. <br> |
226 |
|
|
* This is used when a Server is running inside the Xulu-Client. It has |
227 |
|
|
* performance advantages (direct access - no TCP/IP) |
228 |
|
|
* |
229 |
|
|
* @param IPs |
230 |
|
|
* the IPs of participating Servers for neighborhood updates(may |
231 |
|
|
* include port specification) |
232 |
|
|
* @param remoteEventReceiver |
233 |
|
|
* generated events are forwarded to the receiver |
234 |
|
|
* @param registryport |
235 |
|
|
* port of the registry to which the {@link PartitionDataManager} |
236 |
|
|
* should be bound |
237 |
|
|
* @param localSPMDClient |
238 |
|
|
* a {@link ClientDataServer} for fast local access |
239 |
|
|
* @throws RemoteException |
240 |
|
|
*/ |
241 |
|
|
public PartitionDataManager(String[] IPs, |
242 |
|
|
RemoteEventProxy remoteEventReceiver, int registryport, |
243 |
|
|
ClientDataServer localSPMDClient) throws RemoteException { |
244 |
|
|
this(IPs, remoteEventReceiver, registryport); |
245 |
|
|
this.localSPMDClient = localSPMDClient; |
246 |
|
|
} |
247 |
|
|
|
248 |
|
|
/* |
249 |
|
|
* (non-Javadoc) |
250 |
|
|
* |
251 |
|
|
* @see appl.parallel.client.DataServer#addData(appl.parallel.spmd.split.DataPartition) |
252 |
|
|
*/ |
253 |
|
|
public void addData(DataPartition partition) throws RemoteException { |
254 |
|
|
if (partition == null) { |
255 |
|
|
LOG.warn("To add partition was null - no data added"); |
256 |
|
|
return; |
257 |
|
|
} |
258 |
|
|
data.put(partition.getRootID(), partition); |
259 |
|
|
|
260 |
|
|
} |
261 |
|
|
|
262 |
|
|
/* |
263 |
|
|
* (non-Javadoc) |
264 |
|
|
* |
265 |
|
|
* @see appl.parallel.server.PartitionDataServer#addMultiDataInfos(java.util.HashMap) |
266 |
|
|
*/ |
267 |
|
|
public void addMultiDataInfos( |
268 |
|
|
HashMap<String, MultiDataInfo> newMultiDataInfos) |
269 |
|
|
throws RemoteException { |
270 |
|
|
this.multiDataInfos.putAll(newMultiDataInfos); |
271 |
|
|
} |
272 |
|
|
|
273 |
|
|
/* |
274 |
|
|
* (non-Javadoc) |
275 |
|
|
* |
276 |
|
|
* @see appl.parallel.server.PartitionDataServer#addPartitionInfos(java.util.Vector) |
277 |
|
|
*/ |
278 |
|
|
public void addPartitionInfos( |
279 |
|
|
Vector<SinglePartitionInfo> singlePartitionInfos) |
280 |
|
|
throws RemoteException { |
281 |
|
|
for (SinglePartitionInfo singlePartitionInfo : singlePartitionInfos) { |
282 |
|
|
NameToIDMapping.put(singlePartitionInfo.getBaseResourceName(), |
283 |
|
|
singlePartitionInfo.getBaseResourceID()); |
284 |
|
|
infos.put(singlePartitionInfo.getBaseResourceID(), |
285 |
|
|
singlePartitionInfo); |
286 |
|
|
} |
287 |
|
|
} |
288 |
|
|
|
289 |
|
|
/** |
290 |
|
|
* @param name |
291 |
|
|
* @param idx |
292 |
|
|
*/ |
293 |
|
|
public void destroyMultiPartition(String name, int idx) { |
294 |
|
|
|
295 |
|
|
} |
296 |
|
|
|
297 |
|
|
/** |
298 |
|
|
* returns the baseParameter by name |
299 |
|
|
* |
300 |
|
|
* @param name |
301 |
|
|
* the name of the parameter |
302 |
|
|
* @return the parameter |
303 |
|
|
*/ |
304 |
|
|
public Object getBaseParameter(String name) { |
305 |
|
|
return baseParameters.get(name); |
306 |
|
|
} |
307 |
|
|
|
308 |
|
|
/** |
309 |
|
|
* Returns a a (remote) PartitionDataServer for an IP-Address. Looks up the |
310 |
|
|
* PartitionDataServer in the registry |
311 |
|
|
*/ |
312 |
|
|
private PartitionDataServer getConnection(String ip) { |
313 |
|
|
PartitionDataServer server; |
314 |
|
|
// lookup server |
315 |
|
|
try { |
316 |
|
|
server = (PartitionDataServer) Naming.lookup("rmi://" + ip + "/" |
317 |
|
|
+ bindingName); |
318 |
|
|
} catch (Exception e) { |
319 |
|
|
LOG.error("bindig of " + ip + " failed" + e.getMessage(), e); |
320 |
|
|
e.printStackTrace(); |
321 |
|
|
return null; |
322 |
|
|
} |
323 |
|
|
return server; |
324 |
|
|
} |
325 |
|
|
|
326 |
|
|
/** |
327 |
|
|
* Returns the {@link DataPartition} associated with the given ID |
328 |
|
|
* |
329 |
|
|
* @param id |
330 |
|
|
* the id of the data |
331 |
|
|
* @return the data |
332 |
|
|
*/ |
333 |
|
|
public synchronized DataPartition getData(int id) throws RemoteException { |
334 |
|
|
// check if in hashtable already |
335 |
|
|
DataPartition partition = data.get(id); |
336 |
|
|
if (partition != null) |
337 |
|
|
return partition; |
338 |
|
|
// if not in hashtable: |
339 |
|
|
// load the data and put it into hash |
340 |
|
|
// find the element with the ID: |
341 |
|
|
SinglePartitionInfo info = infos.get(id); |
342 |
|
|
|
343 |
|
|
if (info == null) { |
344 |
|
|
LOG.warn("No partition with id " + id + " found!"); |
345 |
|
|
return null; |
346 |
|
|
} |
347 |
|
|
// retrieve data from client or other location |
348 |
|
|
try { |
349 |
|
|
// if there is a local server try to get the local partition |
350 |
|
|
PartitionDataHandler partitionDataHandler = info |
351 |
|
|
.getPartitionDataHandler(); |
352 |
|
|
if (localSPMDClient != null) |
353 |
|
|
partitionDataHandler.setSPMDClient(localSPMDClient); |
354 |
|
|
partition = partitionDataHandler.load(); |
355 |
|
|
} catch (LoadingException e) { |
356 |
|
|
// TODO Auto-generated catch block |
357 |
|
|
LOG.error("Loading of partition with id '" + id |
358 |
|
|
+ "' failed. Null returned!"); |
359 |
|
|
e.printStackTrace(); |
360 |
|
|
return null; |
361 |
|
|
} |
362 |
|
|
if (partition == null) { |
363 |
|
|
LOG.error("Loading of partition with id '" + id |
364 |
|
|
+ "' failed. Null returned!"); |
365 |
|
|
return null; |
366 |
|
|
} |
367 |
|
|
|
368 |
|
|
// enter data in partition table |
369 |
|
|
data.put(id, partition); |
370 |
|
|
return partition; |
371 |
|
|
} |
372 |
|
|
|
373 |
|
|
/** |
374 |
|
|
* Gets a partition by name |
375 |
|
|
* |
376 |
|
|
* @param name |
377 |
|
|
* the name of the resource (probably given by the programmer) |
378 |
|
|
* @return the partition or null if not found |
379 |
|
|
*/ |
380 |
|
|
public DataPartition getData(String name) { |
381 |
|
|
Integer id = NameToIDMapping.get(name); |
382 |
|
|
if (id == null) { |
383 |
|
|
LOG.warn("No data for name " + name + " found! Returning null"); |
384 |
|
|
return null; |
385 |
|
|
} |
386 |
|
|
try { |
387 |
|
|
return getData(id); |
388 |
|
|
} catch (RemoteException e) { |
389 |
|
|
// should never be reached (because the method call is local |
390 |
|
|
e.printStackTrace(); |
391 |
|
|
} |
392 |
|
|
return null; |
393 |
|
|
} |
394 |
|
|
|
395 |
|
|
/** |
396 |
|
|
* Returns the partition info with the specified id |
397 |
|
|
* |
398 |
|
|
* @param id |
399 |
|
|
* the id of the partition |
400 |
|
|
* @return the info |
401 |
|
|
*/ |
402 |
|
|
public PartitionInfo getInfo(int id) { |
403 |
|
|
return infos.get(id); |
404 |
|
|
} |
405 |
|
|
|
406 |
|
|
/** |
407 |
|
|
* Gets a partition of a multi-data element. NOTE THAT ALL PARTITIONS OF THE |
408 |
|
|
* MULTIGRID WILL BE LOADED (watch out for loading performance!). |
409 |
|
|
* |
410 |
|
|
* @param name |
411 |
|
|
* the name of the resource (probably given by the programmer) |
412 |
|
|
* @return the partition |
413 |
|
|
* @see SPMDClientController#addToMultiDataSplitControl(Object[], String) |
414 |
|
|
*/ |
415 |
|
|
public DataPartition[] getMultiData(String name) { |
416 |
|
|
// fill an array with the partitions |
417 |
|
|
DataPartition[] partitions = new DataPartition[multiDataInfos.get(name) |
418 |
|
|
.getCount()]; |
419 |
|
|
for (int i = 0; i < partitions.length; i++) { |
420 |
|
|
partitions[i] = getMultiData(name, i); |
421 |
|
|
} |
422 |
|
|
return partitions; |
423 |
|
|
} |
424 |
|
|
|
425 |
|
|
/** |
426 |
|
|
* Gets a partition of a multi-data element. Only the resource at the given |
427 |
|
|
* position will be loaded. |
428 |
|
|
* |
429 |
|
|
* @param name |
430 |
|
|
* the name of the resource (probably given by the programmer) |
431 |
|
|
* @param pos |
432 |
|
|
* the index of the requested element |
433 |
|
|
* @return the partition |
434 |
|
|
* @see SPMDClientController#addToMultiDataSplitControl(Object[], String) |
435 |
|
|
*/ |
436 |
|
|
public DataPartition getMultiData(String name, int pos) { |
437 |
|
|
try { |
438 |
|
|
return getData(multiDataInfos.get(name).getMultiID(pos)); |
439 |
|
|
} catch (RemoteException e) { |
440 |
|
|
// TODO Auto-generated catch block |
441 |
|
|
e.printStackTrace(); |
442 |
|
|
} |
443 |
|
|
return null; |
444 |
|
|
} |
445 |
|
|
|
446 |
|
|
/* |
447 |
|
|
* (non-Javadoc) |
448 |
|
|
* |
449 |
|
|
* @see appl.parallel.server.PartitionDataServer#getMultiDataInfo(java.lang.String) |
450 |
|
|
*/ |
451 |
|
|
public MultiDataInfo getMultiDataInfo(String name) throws RemoteException { |
452 |
|
|
return multiDataInfos.get(name); |
453 |
|
|
} |
454 |
|
|
|
455 |
|
|
/* |
456 |
|
|
* (non-Javadoc) |
457 |
|
|
* |
458 |
|
|
* @see appl.parallel.server.PartitionDataServer#getMultiDataObject(java.lang.String) |
459 |
|
|
*/ |
460 |
|
|
public MultiDataPartitionObject getMultiDataObject(String name) |
461 |
|
|
throws RemoteException { |
462 |
|
|
return new MultiDataPartitionObject(multiDataInfos.get(name), this); |
463 |
|
|
} |
464 |
|
|
|
465 |
|
|
/* |
466 |
|
|
* (non-Javadoc) |
467 |
|
|
* |
468 |
|
|
* @see appl.parallel.server.PartitionDataServer#getPartition(int, |
469 |
|
|
* java.awt.Rectangle) |
470 |
|
|
*/ |
471 |
|
|
public synchronized DataPartition getPartition(int id, Rectangle bounds) |
472 |
|
|
throws RemoteException { |
473 |
|
|
DataPartition thisPartition = (DataPartition) data.get(id); |
474 |
|
|
if (thisPartition == null && LOG.isDebugEnabled()) { |
475 |
|
|
LOG.error("Partition with id " + id + " and bounds " + bounds |
476 |
|
|
+ "was requested but not found on DataManager"); |
477 |
|
|
return null; |
478 |
|
|
} |
479 |
|
|
|
480 |
|
|
if (LOG.isDebugEnabled()) |
481 |
|
|
LOG.debug("Data retrieved from DataManager: ID:" + id |
482 |
|
|
+ " partition: " + bounds); |
483 |
|
|
return thisPartition.getPartition(bounds); |
484 |
|
|
} |
485 |
|
|
|
486 |
|
|
/* |
487 |
|
|
* (non-Javadoc) |
488 |
|
|
* |
489 |
|
|
* @see appl.parallel.server.PartitionDataServer#getPartitionInfo(int) |
490 |
|
|
*/ |
491 |
|
|
public SinglePartitionInfo getPartitionInfo(int rootID) |
492 |
|
|
throws RemoteException { |
493 |
|
|
return infos.get(rootID); |
494 |
|
|
} |
495 |
|
|
|
496 |
|
|
/** |
497 |
|
|
* other dataservers may not be initialized on creation of this object. |
498 |
|
|
* Therefore the dataServers must be looked up later |
499 |
|
|
*/ |
500 |
|
|
private void initializeDataServers() { |
501 |
|
|
if (dataServersInitialized == true) |
502 |
|
|
return; |
503 |
|
|
for (int i = 0; i < dataServers.length; i++) { |
504 |
|
|
dataServers[i] = getConnection(IPs[i]); |
505 |
|
|
} |
506 |
|
|
dataServersInitialized = true; |
507 |
|
|
} |
508 |
|
|
|
509 |
|
|
/* |
510 |
|
|
* (non-Javadoc) |
511 |
|
|
* |
512 |
|
|
* @see appl.parallel.client.DataServer#removeData(int) |
513 |
|
|
*/ |
514 |
|
|
public void removeData(int id) { |
515 |
|
|
data.remove(id); |
516 |
|
|
|
517 |
|
|
} |
518 |
|
|
|
519 |
|
|
/** |
520 |
|
|
* Removes the partition from the database |
521 |
|
|
* |
522 |
|
|
* @param name |
523 |
|
|
* the name of the partition to remove |
524 |
|
|
*/ |
525 |
|
|
public void removeData(String name) { |
526 |
|
|
data.remove(NameToIDMapping.get(name)); |
527 |
|
|
} |
528 |
|
|
|
529 |
|
|
/** |
530 |
|
|
* Removes the partition from the database |
531 |
|
|
* |
532 |
|
|
* @param partition |
533 |
|
|
* the Partition to remove |
534 |
|
|
*/ |
535 |
|
|
public void removePartition(DataPartition partition) { |
536 |
|
|
data.remove(partition.getRootID()); |
537 |
|
|
} |
538 |
|
|
|
539 |
|
|
/** |
540 |
|
|
* binds the server to the registry |
541 |
|
|
*/ |
542 |
|
|
private void startTheServer() throws RemoteException { |
543 |
|
|
Registry registry = LocateRegistry.getRegistry(registryPort); |
544 |
|
|
registry.rebind(bindingName, (PartitionDataServer) this); |
545 |
|
|
LOG.info("PartitionDataServer bound to port " + registryPort |
546 |
|
|
+ " and running...."); |
547 |
|
|
} |
548 |
|
|
|
549 |
|
|
/** |
550 |
|
|
* unbinds the server from the registry, removes all data and runs the |
551 |
|
|
* garbage collector |
552 |
|
|
*/ |
553 |
|
|
public void stop() { |
554 |
|
|
data.clear(); |
555 |
|
|
NameToIDMapping.clear(); |
556 |
|
|
infos.clear(); |
557 |
|
|
|
558 |
|
|
Registry registry; |
559 |
|
|
try { |
560 |
|
|
registry = LocateRegistry.getRegistry(); |
561 |
|
|
registry.unbind(bindingName); |
562 |
|
|
LOG.debug("Sucessfully removed " + bindingName + " from reggie"); |
563 |
|
|
} catch (RemoteException e) { |
564 |
|
|
LOG.warn("tried to unbind " + bindingName |
565 |
|
|
+ " from reggie, but an exception occured: " |
566 |
|
|
+ e.getMessage()); |
567 |
|
|
} catch (NotBoundException e) { |
568 |
|
|
LOG.warn("tried to unbind " + bindingName |
569 |
|
|
+ " from reggie, but the name was not bound " |
570 |
|
|
+ e.getMessage()); |
571 |
|
|
} |
572 |
|
|
System.gc(); |
573 |
|
|
} |
574 |
|
|
|
575 |
|
|
/** |
576 |
|
|
* @param rootID |
577 |
|
|
*/ |
578 |
|
|
public void unloadToSource(int rootID) throws RemoteException { |
579 |
|
|
SinglePartitionInfo info = infos.get(rootID); |
580 |
|
|
if (info == null) { |
581 |
|
|
LOG.error("Could not unload partition with id " + rootID |
582 |
|
|
+ ". The partitioninfo was not found!"); |
583 |
|
|
return; |
584 |
|
|
} |
585 |
|
|
// start unloading |
586 |
|
|
info.getPartitionDataHandler().setBasePartition(this.getData(rootID)); |
587 |
|
|
info.getPartitionDataHandler().unload(); |
588 |
|
|
// // remove data from memory and run garbage collection |
589 |
|
|
// data.remove(rootID); |
590 |
|
|
// System.gc(); |
591 |
|
|
} |
592 |
|
|
|
593 |
|
|
/* |
594 |
|
|
* (non-Javadoc) |
595 |
|
|
* |
596 |
|
|
* @see appl.parallel.server.PartitionDataServer#updateBaseParameter(java.lang.Object[], |
597 |
|
|
* java.lang.String[]) |
598 |
|
|
*/ |
599 |
|
|
public void updateBaseParameter(HashMap<String, Object> newParameters) |
600 |
|
|
throws RemoteException { |
601 |
|
|
// update the mapping. Old values are replaced automaticly |
602 |
|
|
baseParameters.putAll(newParameters); |
603 |
|
|
} |
604 |
|
|
|
605 |
|
|
/** |
606 |
|
|
* Updates the specified partition. For this the the neighborhood partitions |
607 |
|
|
* are updated from remote DataServers using the {@link SplitMap} of the |
608 |
|
|
* {@link SinglePartitionInfo}. |
609 |
|
|
* |
610 |
|
|
* @param id |
611 |
|
|
*/ |
612 |
|
|
public void updateFromNeighbors(int id) { |
613 |
|
|
initializeDataServers(); |
614 |
|
|
// get the data to be updated: |
615 |
|
|
DataPartition thisPartition = data.get(id); |
616 |
|
|
PartitionInfo info = infos.get(id); |
617 |
|
|
if (thisPartition == null || info == null) { |
618 |
|
|
LOG.error("Update with id " + id |
619 |
|
|
+ " failed, because no data was found with this id"); |
620 |
|
|
return; |
621 |
|
|
} |
622 |
|
|
|
623 |
|
|
// get some other needed infos |
624 |
|
|
SplitMap map = info.getSplitMap(); |
625 |
|
|
int mapPosition = info.getSplitMapPos(); |
626 |
|
|
// check if something to do |
627 |
|
|
if (map.getNeighborhoodRange() == 0) { |
628 |
|
|
LOG.debug("Update has nothing to do (neighborhoodrange is 0)"); |
629 |
|
|
return; |
630 |
|
|
} |
631 |
|
|
|
632 |
|
|
// calculate (for each neighbor) the needed partitions and get |
633 |
|
|
// the updates (implemented using Threads) |
634 |
|
|
int[] neighbors = map.getNeighborsForPosition(mapPosition); |
635 |
|
|
Future futures[] = new Future[neighbors.length]; |
636 |
|
|
for (int j = 0; j < neighbors.length; j++) { |
637 |
|
|
int neighborIdx = neighbors[j]; |
638 |
|
|
// get the overlap of the neighborhood area of the current partion |
639 |
|
|
// with the calcultation area of the remote partition |
640 |
|
|
Rectangle updateBounds = map.getPartitionNeighborhoodBounds( |
641 |
|
|
mapPosition).intersection( |
642 |
|
|
map.getPartitionCalculationBounds(neighborIdx)); |
643 |
|
|
// Get the partition from remote server: |
644 |
|
|
DataPartition updateData; |
645 |
|
|
futures[j] = executor.submit(new UpdateThread( |
646 |
|
|
dataServers[neighborIdx], hostnames[neighborIdx], |
647 |
|
|
thisPartition, updateBounds, id, remoteEventReceiver)); |
648 |
|
|
} |
649 |
|
|
|
650 |
|
|
for (Future future : futures) { |
651 |
|
|
try { |
652 |
|
|
future.get(); |
653 |
|
|
} catch (InterruptedException e) { |
654 |
|
|
e.printStackTrace(); |
655 |
|
|
} catch (ExecutionException e) { |
656 |
|
|
LOG.error("The Remote update of a partition failed", e); |
657 |
|
|
} |
658 |
|
|
} |
659 |
|
|
|
660 |
|
|
} |
661 |
|
|
|
662 |
|
|
/** |
663 |
|
|
* Does the same as {@link #updateFromNeighbors(int)} |
664 |
|
|
* |
665 |
|
|
* @param name |
666 |
|
|
* the name of the resource |
667 |
|
|
*/ |
668 |
|
|
public void updateFromNeighbors(String name) { |
669 |
|
|
// get by ID |
670 |
|
|
updateFromNeighbors(NameToIDMapping.get(name)); |
671 |
|
|
} |
672 |
|
|
|
673 |
|
|
/* |
674 |
|
|
* (non-Javadoc) |
675 |
|
|
* |
676 |
|
|
* @see appl.parallel.server.PartitionDataServer#setPartition(int, |
677 |
|
|
* appl.parallel.spmd.split.DataPartition, java.awt.Rectangle) |
678 |
|
|
*/ |
679 |
|
|
public synchronized void updatePartition(int id, DataPartition partition, |
680 |
|
|
Rectangle bounds) throws RemoteException { |
681 |
|
|
DataPartition thisPartition = (DataPartition) data.get(id); |
682 |
|
|
if (thisPartition == null && LOG.isDebugEnabled()) { |
683 |
|
|
LOG |
684 |
|
|
.error("Partition of data with id " + id + " and bounds " |
685 |
|
|
+ bounds |
686 |
|
|
+ "should be set but was not found in DataManager"); |
687 |
|
|
return; |
688 |
|
|
} |
689 |
|
|
thisPartition.setPartition(partition, bounds); |
690 |
|
|
if (LOG.isDebugEnabled()) |
691 |
|
|
LOG.debug("Partition of data set on DataManager with id " + id |
692 |
|
|
+ " and partition bounds: " + bounds); |
693 |
|
|
|
694 |
|
|
} |
695 |
|
|
|
696 |
|
|
} |