1 |
mojays |
2 |
package appl.parallel.spmd; |
2 |
|
|
|
3 |
|
|
import java.io.PrintStream; |
4 |
|
|
import java.util.HashMap; |
5 |
|
|
import java.util.Iterator; |
6 |
|
|
import java.util.Vector; |
7 |
|
|
|
8 |
|
|
import org.apache.log4j.LogManager; |
9 |
|
|
import org.apache.log4j.Logger; |
10 |
|
|
|
11 |
|
|
import appl.parallel.ComputingResourceContainer; |
12 |
|
|
import appl.parallel.client.ClientDataServer; |
13 |
|
|
import appl.parallel.event.CommEventSink; |
14 |
|
|
import appl.parallel.thread.OneMethodThread; |
15 |
|
|
|
16 |
|
|
/** |
17 |
|
|
* Performance optimizations on client side can be made with this class. Allows |
18 |
|
|
* heavy multithreading.<br> |
19 |
|
|
* <br> |
20 |
|
|
* Merging can be done in a separate thread (parallel to further computation)<br> |
21 |
|
|
* |
22 |
|
|
* @see SPMDClientController for details on the usage of a client controller |
23 |
|
|
* |
24 |
|
|
* @author Dominik Appl |
25 |
|
|
*/ |
26 |
|
|
public class AdvancedSPMDClientController extends SPMDClientController |
27 |
|
|
implements AdvancedSPMDClientInterface { |
28 |
|
|
|
29 |
|
|
private final Logger LOG = LogManager.getLogger(this.getClass().getName()); |
30 |
|
|
|
31 |
|
|
// used for synchronization |
32 |
|
|
private static final Object syncOperationMontitor = new Object(); |
33 |
|
|
|
34 |
|
|
/** |
35 |
|
|
* For each {@link SyncPoint} a vector of threads associated with the point |
36 |
|
|
* is created |
37 |
|
|
*/ |
38 |
|
|
HashMap<SyncPoint, Vector<OneMethodThread>> vectorMap = new HashMap<SyncPoint, Vector<OneMethodThread>>(); |
39 |
|
|
|
40 |
|
|
/** |
41 |
|
|
* Empty thread which is used as indicator for a already synchronized sync |
42 |
|
|
* point |
43 |
|
|
*/ |
44 |
|
|
private static final Vector<OneMethodThread> alreadySynchronized = new Vector<OneMethodThread>( |
45 |
|
|
1); |
46 |
|
|
|
47 |
|
|
/** |
48 |
|
|
* same parameters as superclass |
49 |
|
|
* |
50 |
|
|
* @see SPMDClientController#SPMDClientController(Vector, double[], |
51 |
|
|
* ClientDataServer, CommEventSink) |
52 |
|
|
*/ |
53 |
|
|
public AdvancedSPMDClientController( |
54 |
|
|
Vector<ComputingResourceContainer> computingResources, |
55 |
|
|
double[] weights, ClientDataServer spmdClient, |
56 |
|
|
CommEventSink eventProxy) { |
57 |
|
|
super(computingResources, weights, spmdClient, eventProxy); |
58 |
|
|
} |
59 |
|
|
|
60 |
|
|
/* |
61 |
|
|
* (non-Javadoc) |
62 |
|
|
* |
63 |
|
|
* @see appl.parallel.spmd.AdvancedSPMDClientInterface#mergePartition(java.lang.Object, |
64 |
|
|
* appl.parallel.spmd.SyncPoint) |
65 |
|
|
*/ |
66 |
|
|
public void mergePartition(final Object partition, SyncPoint s, |
67 |
|
|
final PrintStream stream, final String message) { |
68 |
|
|
checkSplittable(partition); |
69 |
|
|
// create a new thread |
70 |
|
|
OneMethodThread newThread = new OneMethodThread("mergeThread" |
71 |
|
|
+ s.getId(), s.getPriority()) { |
72 |
|
|
@Override |
73 |
|
|
public void run() { |
74 |
|
|
mergePartition(partition); |
75 |
|
|
if (stream != null) { |
76 |
|
|
stream.println(message); |
77 |
|
|
} |
78 |
|
|
} |
79 |
|
|
}; |
80 |
|
|
mapThreadToSyncPoint(newThread, s); |
81 |
|
|
newThread.start(); |
82 |
|
|
} |
83 |
|
|
|
84 |
|
|
/* |
85 |
|
|
* (non-Javadoc) |
86 |
|
|
* |
87 |
|
|
* @see appl.parallel.spmd.AdvancedSPMDClientInterface#mergePartition(java.lang.Object, |
88 |
|
|
* appl.parallel.spmd.SyncPoint) |
89 |
|
|
*/ |
90 |
|
|
public synchronized void mergePartition(Object partition, SyncPoint s) { |
91 |
|
|
mergePartition(partition, s, null, null); |
92 |
|
|
} |
93 |
|
|
|
94 |
|
|
/** |
95 |
|
|
* Puts a Thread/Syncpoint pair into the Hashmap/Vector structure |
96 |
|
|
*/ |
97 |
|
|
private void mapThreadToSyncPoint(OneMethodThread thread, SyncPoint s) { |
98 |
|
|
// access to the syncpoints must be synchronized when working with |
99 |
|
|
// multithreading |
100 |
|
|
synchronized (syncOperationMontitor) { |
101 |
|
|
Vector<OneMethodThread> threads = vectorMap.get(s); |
102 |
|
|
// if this is the first thread for the syncPoint: create a new |
103 |
|
|
// Vector |
104 |
|
|
if (threads == null) { |
105 |
|
|
threads = new Vector<OneMethodThread>(5); |
106 |
|
|
vectorMap.put(s, threads); |
107 |
|
|
} |
108 |
|
|
threads.add(thread); |
109 |
|
|
} |
110 |
|
|
} |
111 |
|
|
|
112 |
|
|
/* |
113 |
|
|
* (non-Javadoc) |
114 |
|
|
* |
115 |
|
|
* @see appl.parallel.spmd.AdvancedSPMDClientInterface#mergeMultiData(appl.parallel.spmd.MultiDataObject, |
116 |
|
|
* int, appl.parallel.spmd.SyncPoint, java.io.PrintStream, |
117 |
|
|
* java.lang.String) |
118 |
|
|
*/ |
119 |
|
|
public synchronized void mergeMultiData(MultiDataObject multidata, int idx, |
120 |
|
|
SyncPoint s, PrintStream stream, String message) { |
121 |
|
|
// create a new thread |
122 |
|
|
OneMethodThread newThread = new OneMethodThread("mergeThread-" |
123 |
|
|
+ s.getId(), s.getPriority(), multidata, idx, stream, message) { |
124 |
|
|
@Override |
125 |
|
|
public void run() { |
126 |
|
|
mergeMultiData((MultiDataObject) getParameter(0), |
127 |
|
|
(Integer) getParameter(1)); |
128 |
|
|
if (getParameter(2) != null) { |
129 |
|
|
PrintStream stream = (PrintStream) getParameter(2); |
130 |
|
|
stream.println(getParameter(3)); |
131 |
|
|
} |
132 |
|
|
} |
133 |
|
|
}; |
134 |
|
|
mapThreadToSyncPoint(newThread, s); |
135 |
|
|
newThread.start(); |
136 |
|
|
} |
137 |
|
|
|
138 |
|
|
/* |
139 |
|
|
* (non-Javadoc) |
140 |
|
|
* |
141 |
|
|
* @see appl.parallel.spmd.AdvancedSPMDClientInterface#mergeMultiData(appl.parallel.spmd.MultiDataObject, |
142 |
|
|
* int, appl.parallel.spmd.SyncPoint) |
143 |
|
|
*/ |
144 |
|
|
public void mergeMultiData(MultiDataObject multidata, int idx, SyncPoint s) { |
145 |
|
|
mergeMultiData(multidata, idx, s, null, null); |
146 |
|
|
} |
147 |
|
|
|
148 |
|
|
/* |
149 |
|
|
* (non-Javadoc) |
150 |
|
|
* |
151 |
|
|
* @see appl.parallel.spmd.AdvancedSPMDClientInterface#synchronizeToSyncPoint(appl.parallel.spmd.SyncPoint) |
152 |
|
|
*/ |
153 |
|
|
public void synchronizeToSyncPoint(SyncPoint s) { |
154 |
|
|
// access to the syncpoints must be synchronized when working with |
155 |
|
|
// multithreading |
156 |
|
|
synchronized (syncOperationMontitor) { |
157 |
|
|
// join all threads associated with the given syncPoint |
158 |
|
|
Vector<OneMethodThread> threads = vectorMap.get(s); |
159 |
|
|
// Multithreading: check if another thread has already synchronized |
160 |
|
|
// to |
161 |
|
|
// the given point |
162 |
|
|
if (threads == alreadySynchronized) { |
163 |
|
|
if (LOG.isDebugEnabled()) |
164 |
|
|
LOG.debug("SyncPoint " + s.getId() |
165 |
|
|
+ " was already synchronized"); |
166 |
|
|
return; |
167 |
|
|
} |
168 |
|
|
// if no thread was found: Some error occured. |
169 |
|
|
if (threads == null) { |
170 |
|
|
UnsupportedOperationException e = new UnsupportedOperationException( |
171 |
|
|
"The Thread with the SyncPoint '" + s.getId() |
172 |
|
|
+ "' does not exist. Synchronisation failed!!"); |
173 |
|
|
LOG.error("The Thread with the SyncPoint '" + s.getId() |
174 |
|
|
+ "' was not found. Synchronisation failed!!", e); |
175 |
|
|
throw e; |
176 |
|
|
} |
177 |
|
|
for (Iterator iter = threads.iterator(); iter.hasNext();) { |
178 |
|
|
OneMethodThread thread = (OneMethodThread) iter.next(); |
179 |
|
|
thread.join(); |
180 |
|
|
} |
181 |
|
|
// indicate as synchronized |
182 |
|
|
vectorMap.put(s, alreadySynchronized); |
183 |
|
|
} |
184 |
|
|
} |
185 |
|
|
} |