/[xulu]/branches/1.8-gt2-2.6/src/appl/parallel/spmd/AdvancedSPMDClientController.java
ViewVC logotype

Contents of /branches/1.8-gt2-2.6/src/appl/parallel/spmd/AdvancedSPMDClientController.java

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2 - (show annotations)
Wed Feb 25 11:54:01 2009 UTC (15 years, 9 months ago) by mojays
Original Path: trunk/src/appl/parallel/spmd/AdvancedSPMDClientController.java
File size: 5962 byte(s)
First Commit, corresponds to Revision 1008 of Wikisquare-SVN 
1 package appl.parallel.spmd;
2
3 import java.io.PrintStream;
4 import java.util.HashMap;
5 import java.util.Iterator;
6 import java.util.Vector;
7
8 import org.apache.log4j.LogManager;
9 import org.apache.log4j.Logger;
10
11 import appl.parallel.ComputingResourceContainer;
12 import appl.parallel.client.ClientDataServer;
13 import appl.parallel.event.CommEventSink;
14 import appl.parallel.thread.OneMethodThread;
15
16 /**
17 * Performance optimizations on client side can be made with this class. Allows
18 * heavy multithreading.<br>
19 * <br>
20 * Merging can be done in a separate thread (parallel to further computation)<br>
21 *
22 * @see SPMDClientController for details on the usage of a client controller
23 *
24 * @author Dominik Appl
25 */
26 public class AdvancedSPMDClientController extends SPMDClientController
27 implements AdvancedSPMDClientInterface {
28
29 private final Logger LOG = LogManager.getLogger(this.getClass().getName());
30
31 // used for synchronization
32 private static final Object syncOperationMontitor = new Object();
33
34 /**
35 * For each {@link SyncPoint} a vector of threads associated with the point
36 * is created
37 */
38 HashMap<SyncPoint, Vector<OneMethodThread>> vectorMap = new HashMap<SyncPoint, Vector<OneMethodThread>>();
39
40 /**
41 * Empty thread which is used as indicator for a already synchronized sync
42 * point
43 */
44 private static final Vector<OneMethodThread> alreadySynchronized = new Vector<OneMethodThread>(
45 1);
46
47 /**
48 * same parameters as superclass
49 *
50 * @see SPMDClientController#SPMDClientController(Vector, double[],
51 * ClientDataServer, CommEventSink)
52 */
53 public AdvancedSPMDClientController(
54 Vector<ComputingResourceContainer> computingResources,
55 double[] weights, ClientDataServer spmdClient,
56 CommEventSink eventProxy) {
57 super(computingResources, weights, spmdClient, eventProxy);
58 }
59
60 /*
61 * (non-Javadoc)
62 *
63 * @see appl.parallel.spmd.AdvancedSPMDClientInterface#mergePartition(java.lang.Object,
64 * appl.parallel.spmd.SyncPoint)
65 */
66 public void mergePartition(final Object partition, SyncPoint s,
67 final PrintStream stream, final String message) {
68 checkSplittable(partition);
69 // create a new thread
70 OneMethodThread newThread = new OneMethodThread("mergeThread"
71 + s.getId(), s.getPriority()) {
72 @Override
73 public void run() {
74 mergePartition(partition);
75 if (stream != null) {
76 stream.println(message);
77 }
78 }
79 };
80 mapThreadToSyncPoint(newThread, s);
81 newThread.start();
82 }
83
84 /*
85 * (non-Javadoc)
86 *
87 * @see appl.parallel.spmd.AdvancedSPMDClientInterface#mergePartition(java.lang.Object,
88 * appl.parallel.spmd.SyncPoint)
89 */
90 public synchronized void mergePartition(Object partition, SyncPoint s) {
91 mergePartition(partition, s, null, null);
92 }
93
94 /**
95 * Puts a Thread/Syncpoint pair into the Hashmap/Vector structure
96 */
97 private void mapThreadToSyncPoint(OneMethodThread thread, SyncPoint s) {
98 // access to the syncpoints must be synchronized when working with
99 // multithreading
100 synchronized (syncOperationMontitor) {
101 Vector<OneMethodThread> threads = vectorMap.get(s);
102 // if this is the first thread for the syncPoint: create a new
103 // Vector
104 if (threads == null) {
105 threads = new Vector<OneMethodThread>(5);
106 vectorMap.put(s, threads);
107 }
108 threads.add(thread);
109 }
110 }
111
112 /*
113 * (non-Javadoc)
114 *
115 * @see appl.parallel.spmd.AdvancedSPMDClientInterface#mergeMultiData(appl.parallel.spmd.MultiDataObject,
116 * int, appl.parallel.spmd.SyncPoint, java.io.PrintStream,
117 * java.lang.String)
118 */
119 public synchronized void mergeMultiData(MultiDataObject multidata, int idx,
120 SyncPoint s, PrintStream stream, String message) {
121 // create a new thread
122 OneMethodThread newThread = new OneMethodThread("mergeThread-"
123 + s.getId(), s.getPriority(), multidata, idx, stream, message) {
124 @Override
125 public void run() {
126 mergeMultiData((MultiDataObject) getParameter(0),
127 (Integer) getParameter(1));
128 if (getParameter(2) != null) {
129 PrintStream stream = (PrintStream) getParameter(2);
130 stream.println(getParameter(3));
131 }
132 }
133 };
134 mapThreadToSyncPoint(newThread, s);
135 newThread.start();
136 }
137
138 /*
139 * (non-Javadoc)
140 *
141 * @see appl.parallel.spmd.AdvancedSPMDClientInterface#mergeMultiData(appl.parallel.spmd.MultiDataObject,
142 * int, appl.parallel.spmd.SyncPoint)
143 */
144 public void mergeMultiData(MultiDataObject multidata, int idx, SyncPoint s) {
145 mergeMultiData(multidata, idx, s, null, null);
146 }
147
148 /*
149 * (non-Javadoc)
150 *
151 * @see appl.parallel.spmd.AdvancedSPMDClientInterface#synchronizeToSyncPoint(appl.parallel.spmd.SyncPoint)
152 */
153 public void synchronizeToSyncPoint(SyncPoint s) {
154 // access to the syncpoints must be synchronized when working with
155 // multithreading
156 synchronized (syncOperationMontitor) {
157 // join all threads associated with the given syncPoint
158 Vector<OneMethodThread> threads = vectorMap.get(s);
159 // Multithreading: check if another thread has already synchronized
160 // to
161 // the given point
162 if (threads == alreadySynchronized) {
163 if (LOG.isDebugEnabled())
164 LOG.debug("SyncPoint " + s.getId()
165 + " was already synchronized");
166 return;
167 }
168 // if no thread was found: Some error occured.
169 if (threads == null) {
170 UnsupportedOperationException e = new UnsupportedOperationException(
171 "The Thread with the SyncPoint '" + s.getId()
172 + "' does not exist. Synchronisation failed!!");
173 LOG.error("The Thread with the SyncPoint '" + s.getId()
174 + "' was not found. Synchronisation failed!!", e);
175 throw e;
176 }
177 for (Iterator iter = threads.iterator(); iter.hasNext();) {
178 OneMethodThread thread = (OneMethodThread) iter.next();
179 thread.join();
180 }
181 // indicate as synchronized
182 vectorMap.put(s, alreadySynchronized);
183 }
184 }
185 }

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26