1 |
package appl.parallel.spmd; |
2 |
|
3 |
import java.io.PrintStream; |
4 |
import java.util.HashMap; |
5 |
import java.util.Iterator; |
6 |
import java.util.Vector; |
7 |
|
8 |
import org.apache.log4j.LogManager; |
9 |
import org.apache.log4j.Logger; |
10 |
|
11 |
import appl.parallel.ComputingResourceContainer; |
12 |
import appl.parallel.client.ClientDataServer; |
13 |
import appl.parallel.event.CommEventSink; |
14 |
import appl.parallel.thread.OneMethodThread; |
15 |
|
16 |
/** |
17 |
* Performance optimizations on client side can be made with this class. Allows |
18 |
* heavy multithreading.<br> |
19 |
* <br> |
20 |
* Merging can be done in a separate thread (parallel to further computation)<br> |
21 |
* |
22 |
* @see SPMDClientController for details on the usage of a client controller |
23 |
* |
24 |
* @author Dominik Appl |
25 |
*/ |
26 |
public class AdvancedSPMDClientController extends SPMDClientController |
27 |
implements AdvancedSPMDClientInterface { |
28 |
|
29 |
private final Logger LOG = LogManager.getLogger(this.getClass().getName()); |
30 |
|
31 |
// used for synchronization |
32 |
private static final Object syncOperationMontitor = new Object(); |
33 |
|
34 |
/** |
35 |
* For each {@link SyncPoint} a vector of threads associated with the point |
36 |
* is created |
37 |
*/ |
38 |
HashMap<SyncPoint, Vector<OneMethodThread>> vectorMap = new HashMap<SyncPoint, Vector<OneMethodThread>>(); |
39 |
|
40 |
/** |
41 |
* Empty thread which is used as indicator for a already synchronized sync |
42 |
* point |
43 |
*/ |
44 |
private static final Vector<OneMethodThread> alreadySynchronized = new Vector<OneMethodThread>( |
45 |
1); |
46 |
|
47 |
/** |
48 |
* same parameters as superclass |
49 |
* |
50 |
* @see SPMDClientController#SPMDClientController(Vector, double[], |
51 |
* ClientDataServer, CommEventSink) |
52 |
*/ |
53 |
public AdvancedSPMDClientController( |
54 |
Vector<ComputingResourceContainer> computingResources, |
55 |
double[] weights, ClientDataServer spmdClient, |
56 |
CommEventSink eventProxy) { |
57 |
super(computingResources, weights, spmdClient, eventProxy); |
58 |
} |
59 |
|
60 |
/* |
61 |
* (non-Javadoc) |
62 |
* |
63 |
* @see appl.parallel.spmd.AdvancedSPMDClientInterface#mergePartition(java.lang.Object, |
64 |
* appl.parallel.spmd.SyncPoint) |
65 |
*/ |
66 |
public void mergePartition(final Object partition, SyncPoint s, |
67 |
final PrintStream stream, final String message) { |
68 |
checkSplittable(partition); |
69 |
// create a new thread |
70 |
OneMethodThread newThread = new OneMethodThread("mergeThread" |
71 |
+ s.getId(), s.getPriority()) { |
72 |
@Override |
73 |
public void run() { |
74 |
mergePartition(partition); |
75 |
if (stream != null) { |
76 |
stream.println(message); |
77 |
} |
78 |
} |
79 |
}; |
80 |
mapThreadToSyncPoint(newThread, s); |
81 |
newThread.start(); |
82 |
} |
83 |
|
84 |
/* |
85 |
* (non-Javadoc) |
86 |
* |
87 |
* @see appl.parallel.spmd.AdvancedSPMDClientInterface#mergePartition(java.lang.Object, |
88 |
* appl.parallel.spmd.SyncPoint) |
89 |
*/ |
90 |
public synchronized void mergePartition(Object partition, SyncPoint s) { |
91 |
mergePartition(partition, s, null, null); |
92 |
} |
93 |
|
94 |
/** |
95 |
* Puts a Thread/Syncpoint pair into the Hashmap/Vector structure |
96 |
*/ |
97 |
private void mapThreadToSyncPoint(OneMethodThread thread, SyncPoint s) { |
98 |
// access to the syncpoints must be synchronized when working with |
99 |
// multithreading |
100 |
synchronized (syncOperationMontitor) { |
101 |
Vector<OneMethodThread> threads = vectorMap.get(s); |
102 |
// if this is the first thread for the syncPoint: create a new |
103 |
// Vector |
104 |
if (threads == null) { |
105 |
threads = new Vector<OneMethodThread>(5); |
106 |
vectorMap.put(s, threads); |
107 |
} |
108 |
threads.add(thread); |
109 |
} |
110 |
} |
111 |
|
112 |
/* |
113 |
* (non-Javadoc) |
114 |
* |
115 |
* @see appl.parallel.spmd.AdvancedSPMDClientInterface#mergeMultiData(appl.parallel.spmd.MultiDataObject, |
116 |
* int, appl.parallel.spmd.SyncPoint, java.io.PrintStream, |
117 |
* java.lang.String) |
118 |
*/ |
119 |
public synchronized void mergeMultiData(MultiDataObject multidata, int idx, |
120 |
SyncPoint s, PrintStream stream, String message) { |
121 |
// create a new thread |
122 |
OneMethodThread newThread = new OneMethodThread("mergeThread-" |
123 |
+ s.getId(), s.getPriority(), multidata, idx, stream, message) { |
124 |
@Override |
125 |
public void run() { |
126 |
mergeMultiData((MultiDataObject) getParameter(0), |
127 |
(Integer) getParameter(1)); |
128 |
if (getParameter(2) != null) { |
129 |
PrintStream stream = (PrintStream) getParameter(2); |
130 |
stream.println(getParameter(3)); |
131 |
} |
132 |
} |
133 |
}; |
134 |
mapThreadToSyncPoint(newThread, s); |
135 |
newThread.start(); |
136 |
} |
137 |
|
138 |
/* |
139 |
* (non-Javadoc) |
140 |
* |
141 |
* @see appl.parallel.spmd.AdvancedSPMDClientInterface#mergeMultiData(appl.parallel.spmd.MultiDataObject, |
142 |
* int, appl.parallel.spmd.SyncPoint) |
143 |
*/ |
144 |
public void mergeMultiData(MultiDataObject multidata, int idx, SyncPoint s) { |
145 |
mergeMultiData(multidata, idx, s, null, null); |
146 |
} |
147 |
|
148 |
/* |
149 |
* (non-Javadoc) |
150 |
* |
151 |
* @see appl.parallel.spmd.AdvancedSPMDClientInterface#synchronizeToSyncPoint(appl.parallel.spmd.SyncPoint) |
152 |
*/ |
153 |
public void synchronizeToSyncPoint(SyncPoint s) { |
154 |
// access to the syncpoints must be synchronized when working with |
155 |
// multithreading |
156 |
synchronized (syncOperationMontitor) { |
157 |
// join all threads associated with the given syncPoint |
158 |
Vector<OneMethodThread> threads = vectorMap.get(s); |
159 |
// Multithreading: check if another thread has already synchronized |
160 |
// to |
161 |
// the given point |
162 |
if (threads == alreadySynchronized) { |
163 |
if (LOG.isDebugEnabled()) |
164 |
LOG.debug("SyncPoint " + s.getId() |
165 |
+ " was already synchronized"); |
166 |
return; |
167 |
} |
168 |
// if no thread was found: Some error occured. |
169 |
if (threads == null) { |
170 |
UnsupportedOperationException e = new UnsupportedOperationException( |
171 |
"The Thread with the SyncPoint '" + s.getId() |
172 |
+ "' does not exist. Synchronisation failed!!"); |
173 |
LOG.error("The Thread with the SyncPoint '" + s.getId() |
174 |
+ "' was not found. Synchronisation failed!!", e); |
175 |
throw e; |
176 |
} |
177 |
for (Iterator iter = threads.iterator(); iter.hasNext();) { |
178 |
OneMethodThread thread = (OneMethodThread) iter.next(); |
179 |
thread.join(); |
180 |
} |
181 |
// indicate as synchronized |
182 |
vectorMap.put(s, alreadySynchronized); |
183 |
} |
184 |
} |
185 |
} |