/[xulu]/branches/1.8-gt2-2.6/src/appl/parallel/server/PartitionDataManager.java
ViewVC logotype

Contents of /branches/1.8-gt2-2.6/src/appl/parallel/server/PartitionDataManager.java

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2 - (show annotations)
Wed Feb 25 11:54:01 2009 UTC (15 years, 9 months ago) by mojays
Original Path: trunk/src/appl/parallel/server/PartitionDataManager.java
File size: 20603 byte(s)
First Commit, corresponds to Revision 1008 of Wikisquare-SVN 
1 package appl.parallel.server;
2
3 import java.awt.Rectangle;
4 import java.net.InetAddress;
5 import java.net.UnknownHostException;
6 import java.rmi.Naming;
7 import java.rmi.NotBoundException;
8 import java.rmi.RemoteException;
9 import java.rmi.registry.LocateRegistry;
10 import java.rmi.registry.Registry;
11 import java.rmi.server.UnicastRemoteObject;
12 import java.util.HashMap;
13 import java.util.Hashtable;
14 import java.util.Vector;
15 import java.util.concurrent.ExecutionException;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Executors;
18 import java.util.concurrent.Future;
19
20 import org.apache.log4j.LogManager;
21 import org.apache.log4j.Logger;
22
23 import appl.data.LoadingException;
24 import appl.parallel.client.DataServer;
25 import appl.parallel.client.ClientDataServer;
26 import appl.parallel.data.PartitionDataHandler;
27 import appl.parallel.event.CommEventSink;
28 import appl.parallel.event.TimeEvent;
29 import appl.parallel.event.TransferEvent;
30 import appl.parallel.event.CommEvent.CommType;
31 import appl.parallel.services.RemoteEventProxy;
32 import appl.parallel.spmd.MultiDataInfo;
33 import appl.parallel.spmd.MultiDataPartitionObject;
34 import appl.parallel.spmd.SPMDClientController;
35 import appl.parallel.spmd.split.DataPartition;
36 import appl.parallel.spmd.split.PartitionInfo;
37 import appl.parallel.spmd.split.SinglePartitionInfo;
38 import appl.parallel.spmd.split.SplitMap;
39 import appl.parallel.thread.DataServerThread;
40
41 /**
42 * Manages the all data for the {@link XuluServer}. This includes retrieval of
43 * partitions from connected resources like the {@link DataServer} or, as part
44 * of the update of neighborhood regions, from other
45 * {@link XuluServer XuluServers}.
46 *
47 * @author Dominik Appl
48 */
49 public class PartitionDataManager extends UnicastRemoteObject implements
50 PartitionDataServer {
51
52 /**
53 * The Thread is used for updating the partitions neighborhood region from
54 * the other servers.
55 *
56 * @author Dominik Appl
57 */
58 class UpdateThread extends DataServerThread {
59
60 private final DataPartition thisPartition;
61
62 private final Rectangle updateBounds;
63
64 private final int id;
65
66 private final String hostname;
67
68 public UpdateThread(PartitionDataServer dataServer,
69 String ipOrHostname, DataPartition thisPartition,
70 Rectangle updateBounds, int id, CommEventSink sink) {
71 super(dataServer, null, null, CommType.REMOTE_UPDATE, sink);
72 hostname = ipOrHostname;
73 // TODO Auto-generated constructor stub
74 this.thisPartition = thisPartition;
75 this.updateBounds = updateBounds;
76 this.id = id;
77 }
78
79 /*
80 * (non-Javadoc)
81 *
82 * @see appl.parallel.thread.ExecutionThread#fireTimeEvents(long,
83 * java.lang.Object)
84 */
85 @Override
86 protected void fireTimeEvents(long execTime, Object result) {
87 try {
88 eventSink.fireRemoteEvent(new TimeEvent(execTime,
89 dataServerName, hostname, commType));
90 } catch (RemoteException e) {
91 e.printStackTrace();
92 }
93 }
94
95 /*
96 * (non-Javadoc)
97 *
98 * @see appl.parallel.thread.ExecutionThread#fireTransferEvent(java.lang.Object)
99 */
100 @Override
101 protected void fireTransferEvent(Object result) {
102 try {
103 eventSink.fireRemoteEvent(new TransferEvent(dataServerName,
104 hostname, commType, new Object[] { result }));
105 } catch (RemoteException e) {
106 e.printStackTrace();
107 }
108 }
109
110 /*
111 * (non-Javadoc)
112 *
113 * @see appl.parallel.thread.ExecutionThread#run()
114 */
115 @Override
116 protected Object run() throws Exception {
117 try {
118 DataPartition updateData = getServer().getPartition(id,
119 updateBounds);
120 // apply update:
121 this.thisPartition.setPartition(updateData, updateBounds);
122 return updateData;
123 } catch (Exception e) {
124 LOG.error("Error while retrieving partition " + updateBounds
125 + "from PartitionDataServer with ip " + hostname);
126 }
127 return null;
128 }
129 }
130
131 private final String bindingName = "PartitionDataServer";
132
133 private final Logger LOG = LogManager.getLogger(this.getClass().getName());
134
135 /**
136 * all partition data. The key is the id. Is an Hashtable instead of
137 * HashMasp for thread-safty.
138 */
139 private Hashtable<Integer, DataPartition> data = new Hashtable<Integer, DataPartition>(
140 15);
141
142 /**
143 * the names of the data objects
144 */
145 private HashMap<String, Integer> NameToIDMapping = new HashMap<String, Integer>(
146 15);
147
148 private HashMap<Integer, SinglePartitionInfo> infos = new HashMap<Integer, SinglePartitionInfo>(
149 10);
150
151 private HashMap<String, Object> baseParameters = new HashMap<String, Object>(
152 10);
153
154 private final PartitionDataServer[] dataServers;
155
156 private final ExecutorService executor;
157
158 private final String[] IPs;
159
160 private boolean dataServersInitialized = false;
161
162 private HashMap<String, MultiDataInfo> multiDataInfos = new HashMap<String, MultiDataInfo>(
163 10);
164
165 private final String dataServerName;
166
167 private final CommEventSink remoteEventReceiver;
168
169 private String[] hostnames;
170
171 private ClientDataServer localSPMDClient;
172
173 private final int registryPort;
174
175 /**
176 * Creates and inits a new PartitionDataserver.
177 *
178 * @param IPs
179 * the IPs of participating Servers for neighborhood updates(may
180 * include port specification)
181 * @param remoteEventReceiver
182 * generated events are forwarded to the receiver
183 * @param registryPort
184 * port of the registry to which the {@link PartitionDataManager}
185 * should be bound
186 * @throws RemoteException
187 * if a connection to a participating server fails
188 */
189 public PartitionDataManager(String[] IPs,
190 CommEventSink remoteEventReceiver, int registryPort)
191 throws RemoteException {
192 // create NameToID and IdToInfo-Mapping
193 this.IPs = IPs;
194 this.remoteEventReceiver = remoteEventReceiver;
195 this.registryPort = registryPort;
196
197 dataServers = new PartitionDataServer[IPs.length];
198 startTheServer();
199
200 // start Thread service
201 executor = Executors.newCachedThreadPool();
202 String name = "unknown PartitionDataServer";
203 try {
204 name = InetAddress.getLocalHost().getHostName();
205 } catch (UnknownHostException e) {
206 // TODO Auto-generated catch block
207 e.printStackTrace();
208 }
209 dataServerName = name;
210 // try to get the hostnames
211 hostnames = new String[IPs.length];
212 for (int i = 0; i < IPs.length; i++) {
213 // try {
214 hostnames[i] = IPs[i];
215 // hostnames[i] = InetAddress.getByName(IPs[i]).getHostName();
216 // } catch (UnknownHostException e) {
217 // LOG.warn("Exception while retrivieving hostname of address: " +
218 // IPs[i]);
219 // }
220 }
221 }
222
223 /**
224 * Same as {@link #PartitionDataManager(String[], CommEventSink, int)}, but uses
225 * a local SPMDClient. <br>
226 * This is used when a Server is running inside the Xulu-Client. It has
227 * performance advantages (direct access - no TCP/IP)
228 *
229 * @param IPs
230 * the IPs of participating Servers for neighborhood updates(may
231 * include port specification)
232 * @param remoteEventReceiver
233 * generated events are forwarded to the receiver
234 * @param registryport
235 * port of the registry to which the {@link PartitionDataManager}
236 * should be bound
237 * @param localSPMDClient
238 * a {@link ClientDataServer} for fast local access
239 * @throws RemoteException
240 */
241 public PartitionDataManager(String[] IPs,
242 RemoteEventProxy remoteEventReceiver, int registryport,
243 ClientDataServer localSPMDClient) throws RemoteException {
244 this(IPs, remoteEventReceiver, registryport);
245 this.localSPMDClient = localSPMDClient;
246 }
247
248 /*
249 * (non-Javadoc)
250 *
251 * @see appl.parallel.client.DataServer#addData(appl.parallel.spmd.split.DataPartition)
252 */
253 public void addData(DataPartition partition) throws RemoteException {
254 if (partition == null) {
255 LOG.warn("To add partition was null - no data added");
256 return;
257 }
258 data.put(partition.getRootID(), partition);
259
260 }
261
262 /*
263 * (non-Javadoc)
264 *
265 * @see appl.parallel.server.PartitionDataServer#addMultiDataInfos(java.util.HashMap)
266 */
267 public void addMultiDataInfos(
268 HashMap<String, MultiDataInfo> newMultiDataInfos)
269 throws RemoteException {
270 this.multiDataInfos.putAll(newMultiDataInfos);
271 }
272
273 /*
274 * (non-Javadoc)
275 *
276 * @see appl.parallel.server.PartitionDataServer#addPartitionInfos(java.util.Vector)
277 */
278 public void addPartitionInfos(
279 Vector<SinglePartitionInfo> singlePartitionInfos)
280 throws RemoteException {
281 for (SinglePartitionInfo singlePartitionInfo : singlePartitionInfos) {
282 NameToIDMapping.put(singlePartitionInfo.getBaseResourceName(),
283 singlePartitionInfo.getBaseResourceID());
284 infos.put(singlePartitionInfo.getBaseResourceID(),
285 singlePartitionInfo);
286 }
287 }
288
289 /**
290 * @param name
291 * @param idx
292 */
293 public void destroyMultiPartition(String name, int idx) {
294
295 }
296
297 /**
298 * returns the baseParameter by name
299 *
300 * @param name
301 * the name of the parameter
302 * @return the parameter
303 */
304 public Object getBaseParameter(String name) {
305 return baseParameters.get(name);
306 }
307
308 /**
309 * Returns a a (remote) PartitionDataServer for an IP-Address. Looks up the
310 * PartitionDataServer in the registry
311 */
312 private PartitionDataServer getConnection(String ip) {
313 PartitionDataServer server;
314 // lookup server
315 try {
316 server = (PartitionDataServer) Naming.lookup("rmi://" + ip + "/"
317 + bindingName);
318 } catch (Exception e) {
319 LOG.error("bindig of " + ip + " failed" + e.getMessage(), e);
320 e.printStackTrace();
321 return null;
322 }
323 return server;
324 }
325
326 /**
327 * Returns the {@link DataPartition} associated with the given ID
328 *
329 * @param id
330 * the id of the data
331 * @return the data
332 */
333 public synchronized DataPartition getData(int id) throws RemoteException {
334 // check if in hashtable already
335 DataPartition partition = data.get(id);
336 if (partition != null)
337 return partition;
338 // if not in hashtable:
339 // load the data and put it into hash
340 // find the element with the ID:
341 SinglePartitionInfo info = infos.get(id);
342
343 if (info == null) {
344 LOG.warn("No partition with id " + id + " found!");
345 return null;
346 }
347 // retrieve data from client or other location
348 try {
349 // if there is a local server try to get the local partition
350 PartitionDataHandler partitionDataHandler = info
351 .getPartitionDataHandler();
352 if (localSPMDClient != null)
353 partitionDataHandler.setSPMDClient(localSPMDClient);
354 partition = partitionDataHandler.load();
355 } catch (LoadingException e) {
356 // TODO Auto-generated catch block
357 LOG.error("Loading of partition with id '" + id
358 + "' failed. Null returned!");
359 e.printStackTrace();
360 return null;
361 }
362 if (partition == null) {
363 LOG.error("Loading of partition with id '" + id
364 + "' failed. Null returned!");
365 return null;
366 }
367
368 // enter data in partition table
369 data.put(id, partition);
370 return partition;
371 }
372
373 /**
374 * Gets a partition by name
375 *
376 * @param name
377 * the name of the resource (probably given by the programmer)
378 * @return the partition or null if not found
379 */
380 public DataPartition getData(String name) {
381 Integer id = NameToIDMapping.get(name);
382 if (id == null) {
383 LOG.warn("No data for name " + name + " found! Returning null");
384 return null;
385 }
386 try {
387 return getData(id);
388 } catch (RemoteException e) {
389 // should never be reached (because the method call is local
390 e.printStackTrace();
391 }
392 return null;
393 }
394
395 /**
396 * Returns the partition info with the specified id
397 *
398 * @param id
399 * the id of the partition
400 * @return the info
401 */
402 public PartitionInfo getInfo(int id) {
403 return infos.get(id);
404 }
405
406 /**
407 * Gets a partition of a multi-data element. NOTE THAT ALL PARTITIONS OF THE
408 * MULTIGRID WILL BE LOADED (watch out for loading performance!).
409 *
410 * @param name
411 * the name of the resource (probably given by the programmer)
412 * @return the partition
413 * @see SPMDClientController#addToMultiDataSplitControl(Object[], String)
414 */
415 public DataPartition[] getMultiData(String name) {
416 // fill an array with the partitions
417 DataPartition[] partitions = new DataPartition[multiDataInfos.get(name)
418 .getCount()];
419 for (int i = 0; i < partitions.length; i++) {
420 partitions[i] = getMultiData(name, i);
421 }
422 return partitions;
423 }
424
425 /**
426 * Gets a partition of a multi-data element. Only the resource at the given
427 * position will be loaded.
428 *
429 * @param name
430 * the name of the resource (probably given by the programmer)
431 * @param pos
432 * the index of the requested element
433 * @return the partition
434 * @see SPMDClientController#addToMultiDataSplitControl(Object[], String)
435 */
436 public DataPartition getMultiData(String name, int pos) {
437 try {
438 return getData(multiDataInfos.get(name).getMultiID(pos));
439 } catch (RemoteException e) {
440 // TODO Auto-generated catch block
441 e.printStackTrace();
442 }
443 return null;
444 }
445
446 /*
447 * (non-Javadoc)
448 *
449 * @see appl.parallel.server.PartitionDataServer#getMultiDataInfo(java.lang.String)
450 */
451 public MultiDataInfo getMultiDataInfo(String name) throws RemoteException {
452 return multiDataInfos.get(name);
453 }
454
455 /*
456 * (non-Javadoc)
457 *
458 * @see appl.parallel.server.PartitionDataServer#getMultiDataObject(java.lang.String)
459 */
460 public MultiDataPartitionObject getMultiDataObject(String name)
461 throws RemoteException {
462 return new MultiDataPartitionObject(multiDataInfos.get(name), this);
463 }
464
465 /*
466 * (non-Javadoc)
467 *
468 * @see appl.parallel.server.PartitionDataServer#getPartition(int,
469 * java.awt.Rectangle)
470 */
471 public synchronized DataPartition getPartition(int id, Rectangle bounds)
472 throws RemoteException {
473 DataPartition thisPartition = (DataPartition) data.get(id);
474 if (thisPartition == null && LOG.isDebugEnabled()) {
475 LOG.error("Partition with id " + id + " and bounds " + bounds
476 + "was requested but not found on DataManager");
477 return null;
478 }
479
480 if (LOG.isDebugEnabled())
481 LOG.debug("Data retrieved from DataManager: ID:" + id
482 + " partition: " + bounds);
483 return thisPartition.getPartition(bounds);
484 }
485
486 /*
487 * (non-Javadoc)
488 *
489 * @see appl.parallel.server.PartitionDataServer#getPartitionInfo(int)
490 */
491 public SinglePartitionInfo getPartitionInfo(int rootID)
492 throws RemoteException {
493 return infos.get(rootID);
494 }
495
496 /**
497 * other dataservers may not be initialized on creation of this object.
498 * Therefore the dataServers must be looked up later
499 */
500 private void initializeDataServers() {
501 if (dataServersInitialized == true)
502 return;
503 for (int i = 0; i < dataServers.length; i++) {
504 dataServers[i] = getConnection(IPs[i]);
505 }
506 dataServersInitialized = true;
507 }
508
509 /*
510 * (non-Javadoc)
511 *
512 * @see appl.parallel.client.DataServer#removeData(int)
513 */
514 public void removeData(int id) {
515 data.remove(id);
516
517 }
518
519 /**
520 * Removes the partition from the database
521 *
522 * @param name
523 * the name of the partition to remove
524 */
525 public void removeData(String name) {
526 data.remove(NameToIDMapping.get(name));
527 }
528
529 /**
530 * Removes the partition from the database
531 *
532 * @param partition
533 * the Partition to remove
534 */
535 public void removePartition(DataPartition partition) {
536 data.remove(partition.getRootID());
537 }
538
539 /**
540 * binds the server to the registry
541 */
542 private void startTheServer() throws RemoteException {
543 Registry registry = LocateRegistry.getRegistry(registryPort);
544 registry.rebind(bindingName, (PartitionDataServer) this);
545 LOG.info("PartitionDataServer bound to port " + registryPort
546 + " and running....");
547 }
548
549 /**
550 * unbinds the server from the registry, removes all data and runs the
551 * garbage collector
552 */
553 public void stop() {
554 data.clear();
555 NameToIDMapping.clear();
556 infos.clear();
557
558 Registry registry;
559 try {
560 registry = LocateRegistry.getRegistry();
561 registry.unbind(bindingName);
562 LOG.debug("Sucessfully removed " + bindingName + " from reggie");
563 } catch (RemoteException e) {
564 LOG.warn("tried to unbind " + bindingName
565 + " from reggie, but an exception occured: "
566 + e.getMessage());
567 } catch (NotBoundException e) {
568 LOG.warn("tried to unbind " + bindingName
569 + " from reggie, but the name was not bound "
570 + e.getMessage());
571 }
572 System.gc();
573 }
574
575 /**
576 * @param rootID
577 */
578 public void unloadToSource(int rootID) throws RemoteException {
579 SinglePartitionInfo info = infos.get(rootID);
580 if (info == null) {
581 LOG.error("Could not unload partition with id " + rootID
582 + ". The partitioninfo was not found!");
583 return;
584 }
585 // start unloading
586 info.getPartitionDataHandler().setBasePartition(this.getData(rootID));
587 info.getPartitionDataHandler().unload();
588 // // remove data from memory and run garbage collection
589 // data.remove(rootID);
590 // System.gc();
591 }
592
593 /*
594 * (non-Javadoc)
595 *
596 * @see appl.parallel.server.PartitionDataServer#updateBaseParameter(java.lang.Object[],
597 * java.lang.String[])
598 */
599 public void updateBaseParameter(HashMap<String, Object> newParameters)
600 throws RemoteException {
601 // update the mapping. Old values are replaced automaticly
602 baseParameters.putAll(newParameters);
603 }
604
605 /**
606 * Updates the specified partition. For this the the neighborhood partitions
607 * are updated from remote DataServers using the {@link SplitMap} of the
608 * {@link SinglePartitionInfo}.
609 *
610 * @param id
611 */
612 public void updateFromNeighbors(int id) {
613 initializeDataServers();
614 // get the data to be updated:
615 DataPartition thisPartition = data.get(id);
616 PartitionInfo info = infos.get(id);
617 if (thisPartition == null || info == null) {
618 LOG.error("Update with id " + id
619 + " failed, because no data was found with this id");
620 return;
621 }
622
623 // get some other needed infos
624 SplitMap map = info.getSplitMap();
625 int mapPosition = info.getSplitMapPos();
626 // check if something to do
627 if (map.getNeighborhoodRange() == 0) {
628 LOG.debug("Update has nothing to do (neighborhoodrange is 0)");
629 return;
630 }
631
632 // calculate (for each neighbor) the needed partitions and get
633 // the updates (implemented using Threads)
634 int[] neighbors = map.getNeighborsForPosition(mapPosition);
635 Future futures[] = new Future[neighbors.length];
636 for (int j = 0; j < neighbors.length; j++) {
637 int neighborIdx = neighbors[j];
638 // get the overlap of the neighborhood area of the current partion
639 // with the calcultation area of the remote partition
640 Rectangle updateBounds = map.getPartitionNeighborhoodBounds(
641 mapPosition).intersection(
642 map.getPartitionCalculationBounds(neighborIdx));
643 // Get the partition from remote server:
644 DataPartition updateData;
645 futures[j] = executor.submit(new UpdateThread(
646 dataServers[neighborIdx], hostnames[neighborIdx],
647 thisPartition, updateBounds, id, remoteEventReceiver));
648 }
649
650 for (Future future : futures) {
651 try {
652 future.get();
653 } catch (InterruptedException e) {
654 e.printStackTrace();
655 } catch (ExecutionException e) {
656 LOG.error("The Remote update of a partition failed", e);
657 }
658 }
659
660 }
661
662 /**
663 * Does the same as {@link #updateFromNeighbors(int)}
664 *
665 * @param name
666 * the name of the resource
667 */
668 public void updateFromNeighbors(String name) {
669 // get by ID
670 updateFromNeighbors(NameToIDMapping.get(name));
671 }
672
673 /*
674 * (non-Javadoc)
675 *
676 * @see appl.parallel.server.PartitionDataServer#setPartition(int,
677 * appl.parallel.spmd.split.DataPartition, java.awt.Rectangle)
678 */
679 public synchronized void updatePartition(int id, DataPartition partition,
680 Rectangle bounds) throws RemoteException {
681 DataPartition thisPartition = (DataPartition) data.get(id);
682 if (thisPartition == null && LOG.isDebugEnabled()) {
683 LOG
684 .error("Partition of data with id " + id + " and bounds "
685 + bounds
686 + "should be set but was not found in DataManager");
687 return;
688 }
689 thisPartition.setPartition(partition, bounds);
690 if (LOG.isDebugEnabled())
691 LOG.debug("Partition of data set on DataManager with id " + id
692 + " and partition bounds: " + bounds);
693
694 }
695
696 }

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26