/[xulu]/branches/1.8-gt2-2.6/src/appl/parallel/spmd/AdvancedSPMDClientController.java
ViewVC logotype

Annotation of /branches/1.8-gt2-2.6/src/appl/parallel/spmd/AdvancedSPMDClientController.java

Parent Directory Parent Directory | Revision Log Revision Log


Revision 47 - (hide annotations)
Mon Aug 31 14:23:19 2009 UTC (15 years, 3 months ago) by mojays
File size: 5962 byte(s)
Branch 1.8-gt2-2.6 (from rev 45) for geotools 2.6 migration
1 mojays 2 package appl.parallel.spmd;
2    
3     import java.io.PrintStream;
4     import java.util.HashMap;
5     import java.util.Iterator;
6     import java.util.Vector;
7    
8     import org.apache.log4j.LogManager;
9     import org.apache.log4j.Logger;
10    
11     import appl.parallel.ComputingResourceContainer;
12     import appl.parallel.client.ClientDataServer;
13     import appl.parallel.event.CommEventSink;
14     import appl.parallel.thread.OneMethodThread;
15    
16     /**
17     * Performance optimizations on client side can be made with this class. Allows
18     * heavy multithreading.<br>
19     * <br>
20     * Merging can be done in a separate thread (parallel to further computation)<br>
21     *
22     * @see SPMDClientController for details on the usage of a client controller
23     *
24     * @author Dominik Appl
25     */
26     public class AdvancedSPMDClientController extends SPMDClientController
27     implements AdvancedSPMDClientInterface {
28    
29     private final Logger LOG = LogManager.getLogger(this.getClass().getName());
30    
31     // used for synchronization
32     private static final Object syncOperationMontitor = new Object();
33    
34     /**
35     * For each {@link SyncPoint} a vector of threads associated with the point
36     * is created
37     */
38     HashMap<SyncPoint, Vector<OneMethodThread>> vectorMap = new HashMap<SyncPoint, Vector<OneMethodThread>>();
39    
40     /**
41     * Empty thread which is used as indicator for a already synchronized sync
42     * point
43     */
44     private static final Vector<OneMethodThread> alreadySynchronized = new Vector<OneMethodThread>(
45     1);
46    
47     /**
48     * same parameters as superclass
49     *
50     * @see SPMDClientController#SPMDClientController(Vector, double[],
51     * ClientDataServer, CommEventSink)
52     */
53     public AdvancedSPMDClientController(
54     Vector<ComputingResourceContainer> computingResources,
55     double[] weights, ClientDataServer spmdClient,
56     CommEventSink eventProxy) {
57     super(computingResources, weights, spmdClient, eventProxy);
58     }
59    
60     /*
61     * (non-Javadoc)
62     *
63     * @see appl.parallel.spmd.AdvancedSPMDClientInterface#mergePartition(java.lang.Object,
64     * appl.parallel.spmd.SyncPoint)
65     */
66     public void mergePartition(final Object partition, SyncPoint s,
67     final PrintStream stream, final String message) {
68     checkSplittable(partition);
69     // create a new thread
70     OneMethodThread newThread = new OneMethodThread("mergeThread"
71     + s.getId(), s.getPriority()) {
72     @Override
73     public void run() {
74     mergePartition(partition);
75     if (stream != null) {
76     stream.println(message);
77     }
78     }
79     };
80     mapThreadToSyncPoint(newThread, s);
81     newThread.start();
82     }
83    
84     /*
85     * (non-Javadoc)
86     *
87     * @see appl.parallel.spmd.AdvancedSPMDClientInterface#mergePartition(java.lang.Object,
88     * appl.parallel.spmd.SyncPoint)
89     */
90     public synchronized void mergePartition(Object partition, SyncPoint s) {
91     mergePartition(partition, s, null, null);
92     }
93    
94     /**
95     * Puts a Thread/Syncpoint pair into the Hashmap/Vector structure
96     */
97     private void mapThreadToSyncPoint(OneMethodThread thread, SyncPoint s) {
98     // access to the syncpoints must be synchronized when working with
99     // multithreading
100     synchronized (syncOperationMontitor) {
101     Vector<OneMethodThread> threads = vectorMap.get(s);
102     // if this is the first thread for the syncPoint: create a new
103     // Vector
104     if (threads == null) {
105     threads = new Vector<OneMethodThread>(5);
106     vectorMap.put(s, threads);
107     }
108     threads.add(thread);
109     }
110     }
111    
112     /*
113     * (non-Javadoc)
114     *
115     * @see appl.parallel.spmd.AdvancedSPMDClientInterface#mergeMultiData(appl.parallel.spmd.MultiDataObject,
116     * int, appl.parallel.spmd.SyncPoint, java.io.PrintStream,
117     * java.lang.String)
118     */
119     public synchronized void mergeMultiData(MultiDataObject multidata, int idx,
120     SyncPoint s, PrintStream stream, String message) {
121     // create a new thread
122     OneMethodThread newThread = new OneMethodThread("mergeThread-"
123     + s.getId(), s.getPriority(), multidata, idx, stream, message) {
124     @Override
125     public void run() {
126     mergeMultiData((MultiDataObject) getParameter(0),
127     (Integer) getParameter(1));
128     if (getParameter(2) != null) {
129     PrintStream stream = (PrintStream) getParameter(2);
130     stream.println(getParameter(3));
131     }
132     }
133     };
134     mapThreadToSyncPoint(newThread, s);
135     newThread.start();
136     }
137    
138     /*
139     * (non-Javadoc)
140     *
141     * @see appl.parallel.spmd.AdvancedSPMDClientInterface#mergeMultiData(appl.parallel.spmd.MultiDataObject,
142     * int, appl.parallel.spmd.SyncPoint)
143     */
144     public void mergeMultiData(MultiDataObject multidata, int idx, SyncPoint s) {
145     mergeMultiData(multidata, idx, s, null, null);
146     }
147    
148     /*
149     * (non-Javadoc)
150     *
151     * @see appl.parallel.spmd.AdvancedSPMDClientInterface#synchronizeToSyncPoint(appl.parallel.spmd.SyncPoint)
152     */
153     public void synchronizeToSyncPoint(SyncPoint s) {
154     // access to the syncpoints must be synchronized when working with
155     // multithreading
156     synchronized (syncOperationMontitor) {
157     // join all threads associated with the given syncPoint
158     Vector<OneMethodThread> threads = vectorMap.get(s);
159     // Multithreading: check if another thread has already synchronized
160     // to
161     // the given point
162     if (threads == alreadySynchronized) {
163     if (LOG.isDebugEnabled())
164     LOG.debug("SyncPoint " + s.getId()
165     + " was already synchronized");
166     return;
167     }
168     // if no thread was found: Some error occured.
169     if (threads == null) {
170     UnsupportedOperationException e = new UnsupportedOperationException(
171     "The Thread with the SyncPoint '" + s.getId()
172     + "' does not exist. Synchronisation failed!!");
173     LOG.error("The Thread with the SyncPoint '" + s.getId()
174     + "' was not found. Synchronisation failed!!", e);
175     throw e;
176     }
177     for (Iterator iter = threads.iterator(); iter.hasNext();) {
178     OneMethodThread thread = (OneMethodThread) iter.next();
179     thread.join();
180     }
181     // indicate as synchronized
182     vectorMap.put(s, alreadySynchronized);
183     }
184     }
185     }

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26