/[xulu]/branches/1.8-gt2-2.6/src/appl/parallel/server/PartitionDataManager.java
ViewVC logotype

Annotation of /branches/1.8-gt2-2.6/src/appl/parallel/server/PartitionDataManager.java

Parent Directory Parent Directory | Revision Log Revision Log


Revision 47 - (hide annotations)
Mon Aug 31 14:23:19 2009 UTC (15 years, 3 months ago) by mojays
File size: 20603 byte(s)
Branch 1.8-gt2-2.6 (from rev 45) for geotools 2.6 migration
1 mojays 2 package appl.parallel.server;
2    
3     import java.awt.Rectangle;
4     import java.net.InetAddress;
5     import java.net.UnknownHostException;
6     import java.rmi.Naming;
7     import java.rmi.NotBoundException;
8     import java.rmi.RemoteException;
9     import java.rmi.registry.LocateRegistry;
10     import java.rmi.registry.Registry;
11     import java.rmi.server.UnicastRemoteObject;
12     import java.util.HashMap;
13     import java.util.Hashtable;
14     import java.util.Vector;
15     import java.util.concurrent.ExecutionException;
16     import java.util.concurrent.ExecutorService;
17     import java.util.concurrent.Executors;
18     import java.util.concurrent.Future;
19    
20     import org.apache.log4j.LogManager;
21     import org.apache.log4j.Logger;
22    
23     import appl.data.LoadingException;
24     import appl.parallel.client.DataServer;
25     import appl.parallel.client.ClientDataServer;
26     import appl.parallel.data.PartitionDataHandler;
27     import appl.parallel.event.CommEventSink;
28     import appl.parallel.event.TimeEvent;
29     import appl.parallel.event.TransferEvent;
30     import appl.parallel.event.CommEvent.CommType;
31     import appl.parallel.services.RemoteEventProxy;
32     import appl.parallel.spmd.MultiDataInfo;
33     import appl.parallel.spmd.MultiDataPartitionObject;
34     import appl.parallel.spmd.SPMDClientController;
35     import appl.parallel.spmd.split.DataPartition;
36     import appl.parallel.spmd.split.PartitionInfo;
37     import appl.parallel.spmd.split.SinglePartitionInfo;
38     import appl.parallel.spmd.split.SplitMap;
39     import appl.parallel.thread.DataServerThread;
40    
41     /**
42     * Manages the all data for the {@link XuluServer}. This includes retrieval of
43     * partitions from connected resources like the {@link DataServer} or, as part
44     * of the update of neighborhood regions, from other
45     * {@link XuluServer XuluServers}.
46     *
47     * @author Dominik Appl
48     */
49     public class PartitionDataManager extends UnicastRemoteObject implements
50     PartitionDataServer {
51    
52     /**
53     * The Thread is used for updating the partitions neighborhood region from
54     * the other servers.
55     *
56     * @author Dominik Appl
57     */
58     class UpdateThread extends DataServerThread {
59    
60     private final DataPartition thisPartition;
61    
62     private final Rectangle updateBounds;
63    
64     private final int id;
65    
66     private final String hostname;
67    
68     public UpdateThread(PartitionDataServer dataServer,
69     String ipOrHostname, DataPartition thisPartition,
70     Rectangle updateBounds, int id, CommEventSink sink) {
71     super(dataServer, null, null, CommType.REMOTE_UPDATE, sink);
72     hostname = ipOrHostname;
73     // TODO Auto-generated constructor stub
74     this.thisPartition = thisPartition;
75     this.updateBounds = updateBounds;
76     this.id = id;
77     }
78    
79     /*
80     * (non-Javadoc)
81     *
82     * @see appl.parallel.thread.ExecutionThread#fireTimeEvents(long,
83     * java.lang.Object)
84     */
85     @Override
86     protected void fireTimeEvents(long execTime, Object result) {
87     try {
88     eventSink.fireRemoteEvent(new TimeEvent(execTime,
89     dataServerName, hostname, commType));
90     } catch (RemoteException e) {
91     e.printStackTrace();
92     }
93     }
94    
95     /*
96     * (non-Javadoc)
97     *
98     * @see appl.parallel.thread.ExecutionThread#fireTransferEvent(java.lang.Object)
99     */
100     @Override
101     protected void fireTransferEvent(Object result) {
102     try {
103     eventSink.fireRemoteEvent(new TransferEvent(dataServerName,
104     hostname, commType, new Object[] { result }));
105     } catch (RemoteException e) {
106     e.printStackTrace();
107     }
108     }
109    
110     /*
111     * (non-Javadoc)
112     *
113     * @see appl.parallel.thread.ExecutionThread#run()
114     */
115     @Override
116     protected Object run() throws Exception {
117     try {
118     DataPartition updateData = getServer().getPartition(id,
119     updateBounds);
120     // apply update:
121     this.thisPartition.setPartition(updateData, updateBounds);
122     return updateData;
123     } catch (Exception e) {
124     LOG.error("Error while retrieving partition " + updateBounds
125     + "from PartitionDataServer with ip " + hostname);
126     }
127     return null;
128     }
129     }
130    
131     private final String bindingName = "PartitionDataServer";
132    
133     private final Logger LOG = LogManager.getLogger(this.getClass().getName());
134    
135     /**
136     * all partition data. The key is the id. Is an Hashtable instead of
137     * HashMasp for thread-safty.
138     */
139     private Hashtable<Integer, DataPartition> data = new Hashtable<Integer, DataPartition>(
140     15);
141    
142     /**
143     * the names of the data objects
144     */
145     private HashMap<String, Integer> NameToIDMapping = new HashMap<String, Integer>(
146     15);
147    
148     private HashMap<Integer, SinglePartitionInfo> infos = new HashMap<Integer, SinglePartitionInfo>(
149     10);
150    
151     private HashMap<String, Object> baseParameters = new HashMap<String, Object>(
152     10);
153    
154     private final PartitionDataServer[] dataServers;
155    
156     private final ExecutorService executor;
157    
158     private final String[] IPs;
159    
160     private boolean dataServersInitialized = false;
161    
162     private HashMap<String, MultiDataInfo> multiDataInfos = new HashMap<String, MultiDataInfo>(
163     10);
164    
165     private final String dataServerName;
166    
167     private final CommEventSink remoteEventReceiver;
168    
169     private String[] hostnames;
170    
171     private ClientDataServer localSPMDClient;
172    
173     private final int registryPort;
174    
175     /**
176     * Creates and inits a new PartitionDataserver.
177     *
178     * @param IPs
179     * the IPs of participating Servers for neighborhood updates(may
180     * include port specification)
181     * @param remoteEventReceiver
182     * generated events are forwarded to the receiver
183     * @param registryPort
184     * port of the registry to which the {@link PartitionDataManager}
185     * should be bound
186     * @throws RemoteException
187     * if a connection to a participating server fails
188     */
189     public PartitionDataManager(String[] IPs,
190     CommEventSink remoteEventReceiver, int registryPort)
191     throws RemoteException {
192     // create NameToID and IdToInfo-Mapping
193     this.IPs = IPs;
194     this.remoteEventReceiver = remoteEventReceiver;
195     this.registryPort = registryPort;
196    
197     dataServers = new PartitionDataServer[IPs.length];
198     startTheServer();
199    
200     // start Thread service
201     executor = Executors.newCachedThreadPool();
202     String name = "unknown PartitionDataServer";
203     try {
204     name = InetAddress.getLocalHost().getHostName();
205     } catch (UnknownHostException e) {
206     // TODO Auto-generated catch block
207     e.printStackTrace();
208     }
209     dataServerName = name;
210     // try to get the hostnames
211     hostnames = new String[IPs.length];
212     for (int i = 0; i < IPs.length; i++) {
213     // try {
214     hostnames[i] = IPs[i];
215     // hostnames[i] = InetAddress.getByName(IPs[i]).getHostName();
216     // } catch (UnknownHostException e) {
217     // LOG.warn("Exception while retrivieving hostname of address: " +
218     // IPs[i]);
219     // }
220     }
221     }
222    
223     /**
224     * Same as {@link #PartitionDataManager(String[], CommEventSink, int)}, but uses
225     * a local SPMDClient. <br>
226     * This is used when a Server is running inside the Xulu-Client. It has
227     * performance advantages (direct access - no TCP/IP)
228     *
229     * @param IPs
230     * the IPs of participating Servers for neighborhood updates(may
231     * include port specification)
232     * @param remoteEventReceiver
233     * generated events are forwarded to the receiver
234     * @param registryport
235     * port of the registry to which the {@link PartitionDataManager}
236     * should be bound
237     * @param localSPMDClient
238     * a {@link ClientDataServer} for fast local access
239     * @throws RemoteException
240     */
241     public PartitionDataManager(String[] IPs,
242     RemoteEventProxy remoteEventReceiver, int registryport,
243     ClientDataServer localSPMDClient) throws RemoteException {
244     this(IPs, remoteEventReceiver, registryport);
245     this.localSPMDClient = localSPMDClient;
246     }
247    
248     /*
249     * (non-Javadoc)
250     *
251     * @see appl.parallel.client.DataServer#addData(appl.parallel.spmd.split.DataPartition)
252     */
253     public void addData(DataPartition partition) throws RemoteException {
254     if (partition == null) {
255     LOG.warn("To add partition was null - no data added");
256     return;
257     }
258     data.put(partition.getRootID(), partition);
259    
260     }
261    
262     /*
263     * (non-Javadoc)
264     *
265     * @see appl.parallel.server.PartitionDataServer#addMultiDataInfos(java.util.HashMap)
266     */
267     public void addMultiDataInfos(
268     HashMap<String, MultiDataInfo> newMultiDataInfos)
269     throws RemoteException {
270     this.multiDataInfos.putAll(newMultiDataInfos);
271     }
272    
273     /*
274     * (non-Javadoc)
275     *
276     * @see appl.parallel.server.PartitionDataServer#addPartitionInfos(java.util.Vector)
277     */
278     public void addPartitionInfos(
279     Vector<SinglePartitionInfo> singlePartitionInfos)
280     throws RemoteException {
281     for (SinglePartitionInfo singlePartitionInfo : singlePartitionInfos) {
282     NameToIDMapping.put(singlePartitionInfo.getBaseResourceName(),
283     singlePartitionInfo.getBaseResourceID());
284     infos.put(singlePartitionInfo.getBaseResourceID(),
285     singlePartitionInfo);
286     }
287     }
288    
289     /**
290     * @param name
291     * @param idx
292     */
293     public void destroyMultiPartition(String name, int idx) {
294    
295     }
296    
297     /**
298     * returns the baseParameter by name
299     *
300     * @param name
301     * the name of the parameter
302     * @return the parameter
303     */
304     public Object getBaseParameter(String name) {
305     return baseParameters.get(name);
306     }
307    
308     /**
309     * Returns a a (remote) PartitionDataServer for an IP-Address. Looks up the
310     * PartitionDataServer in the registry
311     */
312     private PartitionDataServer getConnection(String ip) {
313     PartitionDataServer server;
314     // lookup server
315     try {
316     server = (PartitionDataServer) Naming.lookup("rmi://" + ip + "/"
317     + bindingName);
318     } catch (Exception e) {
319     LOG.error("bindig of " + ip + " failed" + e.getMessage(), e);
320     e.printStackTrace();
321     return null;
322     }
323     return server;
324     }
325    
326     /**
327     * Returns the {@link DataPartition} associated with the given ID
328     *
329     * @param id
330     * the id of the data
331     * @return the data
332     */
333     public synchronized DataPartition getData(int id) throws RemoteException {
334     // check if in hashtable already
335     DataPartition partition = data.get(id);
336     if (partition != null)
337     return partition;
338     // if not in hashtable:
339     // load the data and put it into hash
340     // find the element with the ID:
341     SinglePartitionInfo info = infos.get(id);
342    
343     if (info == null) {
344     LOG.warn("No partition with id " + id + " found!");
345     return null;
346     }
347     // retrieve data from client or other location
348     try {
349     // if there is a local server try to get the local partition
350     PartitionDataHandler partitionDataHandler = info
351     .getPartitionDataHandler();
352     if (localSPMDClient != null)
353     partitionDataHandler.setSPMDClient(localSPMDClient);
354     partition = partitionDataHandler.load();
355     } catch (LoadingException e) {
356     // TODO Auto-generated catch block
357     LOG.error("Loading of partition with id '" + id
358     + "' failed. Null returned!");
359     e.printStackTrace();
360     return null;
361     }
362     if (partition == null) {
363     LOG.error("Loading of partition with id '" + id
364     + "' failed. Null returned!");
365     return null;
366     }
367    
368     // enter data in partition table
369     data.put(id, partition);
370     return partition;
371     }
372    
373     /**
374     * Gets a partition by name
375     *
376     * @param name
377     * the name of the resource (probably given by the programmer)
378     * @return the partition or null if not found
379     */
380     public DataPartition getData(String name) {
381     Integer id = NameToIDMapping.get(name);
382     if (id == null) {
383     LOG.warn("No data for name " + name + " found! Returning null");
384     return null;
385     }
386     try {
387     return getData(id);
388     } catch (RemoteException e) {
389     // should never be reached (because the method call is local
390     e.printStackTrace();
391     }
392     return null;
393     }
394    
395     /**
396     * Returns the partition info with the specified id
397     *
398     * @param id
399     * the id of the partition
400     * @return the info
401     */
402     public PartitionInfo getInfo(int id) {
403     return infos.get(id);
404     }
405    
406     /**
407     * Gets a partition of a multi-data element. NOTE THAT ALL PARTITIONS OF THE
408     * MULTIGRID WILL BE LOADED (watch out for loading performance!).
409     *
410     * @param name
411     * the name of the resource (probably given by the programmer)
412     * @return the partition
413     * @see SPMDClientController#addToMultiDataSplitControl(Object[], String)
414     */
415     public DataPartition[] getMultiData(String name) {
416     // fill an array with the partitions
417     DataPartition[] partitions = new DataPartition[multiDataInfos.get(name)
418     .getCount()];
419     for (int i = 0; i < partitions.length; i++) {
420     partitions[i] = getMultiData(name, i);
421     }
422     return partitions;
423     }
424    
425     /**
426     * Gets a partition of a multi-data element. Only the resource at the given
427     * position will be loaded.
428     *
429     * @param name
430     * the name of the resource (probably given by the programmer)
431     * @param pos
432     * the index of the requested element
433     * @return the partition
434     * @see SPMDClientController#addToMultiDataSplitControl(Object[], String)
435     */
436     public DataPartition getMultiData(String name, int pos) {
437     try {
438     return getData(multiDataInfos.get(name).getMultiID(pos));
439     } catch (RemoteException e) {
440     // TODO Auto-generated catch block
441     e.printStackTrace();
442     }
443     return null;
444     }
445    
446     /*
447     * (non-Javadoc)
448     *
449     * @see appl.parallel.server.PartitionDataServer#getMultiDataInfo(java.lang.String)
450     */
451     public MultiDataInfo getMultiDataInfo(String name) throws RemoteException {
452     return multiDataInfos.get(name);
453     }
454    
455     /*
456     * (non-Javadoc)
457     *
458     * @see appl.parallel.server.PartitionDataServer#getMultiDataObject(java.lang.String)
459     */
460     public MultiDataPartitionObject getMultiDataObject(String name)
461     throws RemoteException {
462     return new MultiDataPartitionObject(multiDataInfos.get(name), this);
463     }
464    
465     /*
466     * (non-Javadoc)
467     *
468     * @see appl.parallel.server.PartitionDataServer#getPartition(int,
469     * java.awt.Rectangle)
470     */
471     public synchronized DataPartition getPartition(int id, Rectangle bounds)
472     throws RemoteException {
473     DataPartition thisPartition = (DataPartition) data.get(id);
474     if (thisPartition == null && LOG.isDebugEnabled()) {
475     LOG.error("Partition with id " + id + " and bounds " + bounds
476     + "was requested but not found on DataManager");
477     return null;
478     }
479    
480     if (LOG.isDebugEnabled())
481     LOG.debug("Data retrieved from DataManager: ID:" + id
482     + " partition: " + bounds);
483     return thisPartition.getPartition(bounds);
484     }
485    
486     /*
487     * (non-Javadoc)
488     *
489     * @see appl.parallel.server.PartitionDataServer#getPartitionInfo(int)
490     */
491     public SinglePartitionInfo getPartitionInfo(int rootID)
492     throws RemoteException {
493     return infos.get(rootID);
494     }
495    
496     /**
497     * other dataservers may not be initialized on creation of this object.
498     * Therefore the dataServers must be looked up later
499     */
500     private void initializeDataServers() {
501     if (dataServersInitialized == true)
502     return;
503     for (int i = 0; i < dataServers.length; i++) {
504     dataServers[i] = getConnection(IPs[i]);
505     }
506     dataServersInitialized = true;
507     }
508    
509     /*
510     * (non-Javadoc)
511     *
512     * @see appl.parallel.client.DataServer#removeData(int)
513     */
514     public void removeData(int id) {
515     data.remove(id);
516    
517     }
518    
519     /**
520     * Removes the partition from the database
521     *
522     * @param name
523     * the name of the partition to remove
524     */
525     public void removeData(String name) {
526     data.remove(NameToIDMapping.get(name));
527     }
528    
529     /**
530     * Removes the partition from the database
531     *
532     * @param partition
533     * the Partition to remove
534     */
535     public void removePartition(DataPartition partition) {
536     data.remove(partition.getRootID());
537     }
538    
539     /**
540     * binds the server to the registry
541     */
542     private void startTheServer() throws RemoteException {
543     Registry registry = LocateRegistry.getRegistry(registryPort);
544     registry.rebind(bindingName, (PartitionDataServer) this);
545     LOG.info("PartitionDataServer bound to port " + registryPort
546     + " and running....");
547     }
548    
549     /**
550     * unbinds the server from the registry, removes all data and runs the
551     * garbage collector
552     */
553     public void stop() {
554     data.clear();
555     NameToIDMapping.clear();
556     infos.clear();
557    
558     Registry registry;
559     try {
560     registry = LocateRegistry.getRegistry();
561     registry.unbind(bindingName);
562     LOG.debug("Sucessfully removed " + bindingName + " from reggie");
563     } catch (RemoteException e) {
564     LOG.warn("tried to unbind " + bindingName
565     + " from reggie, but an exception occured: "
566     + e.getMessage());
567     } catch (NotBoundException e) {
568     LOG.warn("tried to unbind " + bindingName
569     + " from reggie, but the name was not bound "
570     + e.getMessage());
571     }
572     System.gc();
573     }
574    
575     /**
576     * @param rootID
577     */
578     public void unloadToSource(int rootID) throws RemoteException {
579     SinglePartitionInfo info = infos.get(rootID);
580     if (info == null) {
581     LOG.error("Could not unload partition with id " + rootID
582     + ". The partitioninfo was not found!");
583     return;
584     }
585     // start unloading
586     info.getPartitionDataHandler().setBasePartition(this.getData(rootID));
587     info.getPartitionDataHandler().unload();
588     // // remove data from memory and run garbage collection
589     // data.remove(rootID);
590     // System.gc();
591     }
592    
593     /*
594     * (non-Javadoc)
595     *
596     * @see appl.parallel.server.PartitionDataServer#updateBaseParameter(java.lang.Object[],
597     * java.lang.String[])
598     */
599     public void updateBaseParameter(HashMap<String, Object> newParameters)
600     throws RemoteException {
601     // update the mapping. Old values are replaced automaticly
602     baseParameters.putAll(newParameters);
603     }
604    
605     /**
606     * Updates the specified partition. For this the the neighborhood partitions
607     * are updated from remote DataServers using the {@link SplitMap} of the
608     * {@link SinglePartitionInfo}.
609     *
610     * @param id
611     */
612     public void updateFromNeighbors(int id) {
613     initializeDataServers();
614     // get the data to be updated:
615     DataPartition thisPartition = data.get(id);
616     PartitionInfo info = infos.get(id);
617     if (thisPartition == null || info == null) {
618     LOG.error("Update with id " + id
619     + " failed, because no data was found with this id");
620     return;
621     }
622    
623     // get some other needed infos
624     SplitMap map = info.getSplitMap();
625     int mapPosition = info.getSplitMapPos();
626     // check if something to do
627     if (map.getNeighborhoodRange() == 0) {
628     LOG.debug("Update has nothing to do (neighborhoodrange is 0)");
629     return;
630     }
631    
632     // calculate (for each neighbor) the needed partitions and get
633     // the updates (implemented using Threads)
634     int[] neighbors = map.getNeighborsForPosition(mapPosition);
635     Future futures[] = new Future[neighbors.length];
636     for (int j = 0; j < neighbors.length; j++) {
637     int neighborIdx = neighbors[j];
638     // get the overlap of the neighborhood area of the current partion
639     // with the calcultation area of the remote partition
640     Rectangle updateBounds = map.getPartitionNeighborhoodBounds(
641     mapPosition).intersection(
642     map.getPartitionCalculationBounds(neighborIdx));
643     // Get the partition from remote server:
644     DataPartition updateData;
645     futures[j] = executor.submit(new UpdateThread(
646     dataServers[neighborIdx], hostnames[neighborIdx],
647     thisPartition, updateBounds, id, remoteEventReceiver));
648     }
649    
650     for (Future future : futures) {
651     try {
652     future.get();
653     } catch (InterruptedException e) {
654     e.printStackTrace();
655     } catch (ExecutionException e) {
656     LOG.error("The Remote update of a partition failed", e);
657     }
658     }
659    
660     }
661    
662     /**
663     * Does the same as {@link #updateFromNeighbors(int)}
664     *
665     * @param name
666     * the name of the resource
667     */
668     public void updateFromNeighbors(String name) {
669     // get by ID
670     updateFromNeighbors(NameToIDMapping.get(name));
671     }
672    
673     /*
674     * (non-Javadoc)
675     *
676     * @see appl.parallel.server.PartitionDataServer#setPartition(int,
677     * appl.parallel.spmd.split.DataPartition, java.awt.Rectangle)
678     */
679     public synchronized void updatePartition(int id, DataPartition partition,
680     Rectangle bounds) throws RemoteException {
681     DataPartition thisPartition = (DataPartition) data.get(id);
682     if (thisPartition == null && LOG.isDebugEnabled()) {
683     LOG
684     .error("Partition of data with id " + id + " and bounds "
685     + bounds
686     + "should be set but was not found in DataManager");
687     return;
688     }
689     thisPartition.setPartition(partition, bounds);
690     if (LOG.isDebugEnabled())
691     LOG.debug("Partition of data set on DataManager with id " + id
692     + " and partition bounds: " + bounds);
693    
694     }
695    
696     }

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26