/[xulu]/trunk/src/appl/parallel/spmd/AdvancedSPMDServerController.java
ViewVC logotype

Contents of /trunk/src/appl/parallel/spmd/AdvancedSPMDServerController.java

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2 - (show annotations)
Wed Feb 25 11:54:01 2009 UTC (15 years, 9 months ago) by mojays
File size: 7900 byte(s)
First Commit, corresponds to Revision 1008 of Wikisquare-SVN 
1 package appl.parallel.spmd;
2
3 import java.io.PrintStream;
4 import java.util.HashMap;
5 import java.util.Iterator;
6 import java.util.Vector;
7
8 import org.apache.log4j.LogManager;
9 import org.apache.log4j.Logger;
10
11 import appl.parallel.ComputingResourceContainer;
12 import appl.parallel.client.DataServer;
13 import appl.parallel.client.ClientDataServer;
14 import appl.parallel.event.CommEventSink;
15 import appl.parallel.server.PartitionDataManager;
16 import appl.parallel.spmd.split.DataPartition;
17 import appl.parallel.thread.OneMethodThread;
18
19 /**
20 * Performance optimizations can be made with this class. Allows heavy
21 * multithreading. Because multiple threads may use this class concurrently
22 * nearly all additional methods of this class are internally synchronized using a static variable.
23 * Also allows preloading of partitions using {@link SyncPoint}s.
24 *
25 * @author Dominik Appl
26 */
27 public class AdvancedSPMDServerController extends SPMDServerController
28 implements AdvancedSPMDServerInterface {
29
30 private final Logger LOG = LogManager.getLogger(this.getClass().getName());
31
32 private static final HashMap<String, String> partitionPreloadStatus =
33 new HashMap<String, String>();
34
35 // private static final HashMap<Integer,String> multiPartitionPreloadStatus
36 // = new HashMap<Integer, String>();
37 /**
38 * For each {@link SyncPoint} a vector of threads associated with the id of
39 * a SyncPoint is created. The {@link HashMap} is static so all possible threads use
40 * the same map. Access to this map must be synchronized
41 */
42 private static final HashMap<Integer, Vector<OneMethodThread>> vectors =
43 new HashMap<Integer, Vector<OneMethodThread>>();
44
45 private boolean isMaster = true;
46
47 /**
48 * same parameters as superclass
49 *
50 * @see SPMDServerController#SPMDServerController(PartitionDataManager, int)
51 */
52 public AdvancedSPMDServerController(PartitionDataManager dataManager,
53 int referenceResouceID) {
54 super(dataManager, referenceResouceID);
55 // TODO Auto-generated constructor stub
56 }
57
58 /**
59 * The partition is loaded in a separate thread into the local
60 * {@link PartitionDataManager}<br>
61 * You must specify the partition name and a {@link SyncPoint}.<br>
62 * You can specify a priority for the thread in the {@link SyncPoint}.
63 * <br>
64 * <br>
65 * <b>Preloading will only be done one time independent of the number of
66 * parallel threads calling this method. (synchronized)<br>
67 * </b>
68 *
69 * @param partition
70 * the name of the partition for identification
71 * @param s
72 * the associated {@link SyncPoint}
73 * @see SPMDServerController#getPartition(String)
74 * @see #isMasterThread()
75 */
76 public void preloadPartition(String partition, SyncPoint s) {
77 // create a new thread only if some other thread has not already
78 // triggered the preloading
79 synchronized (vectors) {
80 if (partitionPreloadStatus.get(partition) == null) {
81 partitionPreloadStatus.put(partition, "inProgress");
82 OneMethodThread newThread = new OneMethodThread(
83 "getPartitionThread " + s.getId(), s.getPriority(),
84 partition) {
85 @Override
86 public void run() {
87 // retrieving the partition will as a sideeffect trigger
88 // loading the
89 // partition into the PartitionDataManager
90 getPartition((String) getParameter(0));
91 }
92 };
93 mapThreadToSyncPoint(newThread, s);
94 newThread.start();
95 }
96 }
97 }
98
99 /**
100 * The partition is loaded in a separate thread into the local
101 * {@link PartitionDataManager} You must specify the partition name and a
102 * {@link SyncPoint}. You can specify a priority for the thread in the
103 * {@link SyncPoint} <br>
104 * <b>Preloading will only be done one time independent of the number of
105 * parallel threads calling this method. (synchronized)<br>
106 * </b>
107 *
108 * @param multiPartition
109 * the name of the partition for identification
110 * @param idx
111 * the index of the partition in the multidata object
112 * @param s
113 * the associated {@link SyncPoint}
114 * @see appl.parallel.spmd.AdvancedSPMDClientInterface#mergePartition(java.lang.Object,
115 * appl.parallel.spmd.SyncPoint)
116 * @see SPMDServerController#getPartition(String)
117 */
118 public void preloadMultiPartition(String multiPartition, int idx,
119 SyncPoint s) {
120 synchronized (vectors) {
121 // create a new thread only if some other thread has not already
122 // triggered the preloading
123 if (partitionPreloadStatus.get(multiPartition
124 + MultiDataInfo.MULTISPLITCONSTANT + idx) == null) {
125 partitionPreloadStatus.put(multiPartition
126 + MultiDataInfo.MULTISPLITCONSTANT + idx, "inProgress");
127
128 OneMethodThread newThread = new OneMethodThread(
129 "Preload thread - getMultiPartition SID:" + s.getId(),
130 s.getPriority(), multiPartition, idx) {
131 @Override
132 public void run() {
133 // retrieving the partition will also trigger loading
134 // the
135 // partition into the PartitionDataManager
136 getMultiPartition((String) getParameter(0),
137 (Integer) getParameter(1));
138 }
139 };
140 mapThreadToSyncPoint(newThread, s);
141 newThread.start();
142 }
143 }
144 }
145
146 /*
147 * (non-Javadoc)
148 *
149 * @see appl.parallel.spmd.AdvancedSPMDServerInterface#synchronizeToSyncPoint(appl.parallel.spmd.SyncPoint)
150 */
151 public void synchronizeToSyncPoint(SyncPoint s) {
152 // join all threads associated with the given syncPoint
153 synchronized (vectors) {
154 Vector<OneMethodThread> threads = vectors.get(s.getId());
155 if (threads == null) {
156 UnsupportedOperationException e = new UnsupportedOperationException(
157 "The Thread with the SyncPoint '"
158 + s.getId()
159 + "' was not found or was earlier synchronized. Synchronisation failed!!");
160 LOG.error("The Thread with the SyncPoint '" + s.getId()
161 + "' was not found. Synchronisation failed!!", e);
162 throw e;
163 }
164 for (Iterator iter = threads.iterator(); iter.hasNext();) {
165 OneMethodThread thread = (OneMethodThread) iter.next();
166 thread.join();
167
168 }
169 }
170 }
171
172 /**
173 * You should call this method from time to time to clear all syncpoints and
174 * the associated threads out of the memory
175 */
176 public void clearSyncData() {
177 synchronized (vectors) {
178 vectors.clear();
179 partitionPreloadStatus.clear();
180 }
181 }
182
183 /**
184 * Puts a thread/Syncpoint pair into the Hashmap/Vector structure
185 */
186 private static void mapThreadToSyncPoint(OneMethodThread thread, SyncPoint s) {
187 Vector<OneMethodThread> threads = vectors.get(s.getId());
188 // if this is the first thread for the syncPoint: create a new
189 // Vector
190 if (threads == null) {
191 threads = new Vector<OneMethodThread>(5);
192 vectors.put(s.getId(), threads);
193 }
194 threads.add(thread);
195
196 }
197
198 /**
199 * If there are multiple processors on the machine and the current task
200 * {@link SPMDTask#supportsMultiThreading() supports multithreading} there
201 * are several threads created. Each thread uses a different
202 * {@link #getLocalBounds(DataPartition) calculation area}. Now there might
203 * be parts of the task code were multithreading is undesired, e.g. when
204 * operations affect the whole partition. In this case you should use this
205 * method to guarantee that only one thread has access to a code block.
206 *
207 * @return whether this thread is the "master" thread, which means whether
208 * this thread refers to the first task created.
209 */
210 public boolean isMasterThread() {
211 return isMaster;
212
213 }
214
215 /**
216 * @param isMaster
217 * true, if this should be the master thread
218 * @see #isMasterThread()
219 */
220 public void setMasterThread(boolean isMaster) {
221 this.isMaster = isMaster;
222 }
223 }

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26