/[xulu]/branches/1.8-gt2-2.6/src/appl/parallel/spmd/AdvancedSPMDServerController.java
ViewVC logotype

Contents of /branches/1.8-gt2-2.6/src/appl/parallel/spmd/AdvancedSPMDServerController.java

Parent Directory Parent Directory | Revision Log Revision Log


Revision 60 - (show annotations)
Sun Oct 4 16:54:52 2009 UTC (15 years, 2 months ago) by alfonx
File size: 7690 byte(s)
* organized imports
1 package appl.parallel.spmd;
2
3 import java.util.HashMap;
4 import java.util.Iterator;
5 import java.util.Vector;
6
7 import org.apache.log4j.LogManager;
8 import org.apache.log4j.Logger;
9
10 import appl.parallel.server.PartitionDataManager;
11 import appl.parallel.spmd.split.DataPartition;
12 import appl.parallel.thread.OneMethodThread;
13
14 /**
15 * Performance optimizations can be made with this class. Allows heavy
16 * multithreading. Because multiple threads may use this class concurrently
17 * nearly all additional methods of this class are internally synchronized using a static variable.
18 * Also allows preloading of partitions using {@link SyncPoint}s.
19 *
20 * @author Dominik Appl
21 */
22 public class AdvancedSPMDServerController extends SPMDServerController
23 implements AdvancedSPMDServerInterface {
24
25 private final Logger LOG = LogManager.getLogger(this.getClass().getName());
26
27 private static final HashMap<String, String> partitionPreloadStatus =
28 new HashMap<String, String>();
29
30 // private static final HashMap<Integer,String> multiPartitionPreloadStatus
31 // = new HashMap<Integer, String>();
32 /**
33 * For each {@link SyncPoint} a vector of threads associated with the id of
34 * a SyncPoint is created. The {@link HashMap} is static so all possible threads use
35 * the same map. Access to this map must be synchronized
36 */
37 private static final HashMap<Integer, Vector<OneMethodThread>> vectors =
38 new HashMap<Integer, Vector<OneMethodThread>>();
39
40 private boolean isMaster = true;
41
42 /**
43 * same parameters as superclass
44 *
45 * @see SPMDServerController#SPMDServerController(PartitionDataManager, int)
46 */
47 public AdvancedSPMDServerController(PartitionDataManager dataManager,
48 int referenceResouceID) {
49 super(dataManager, referenceResouceID);
50 // TODO Auto-generated constructor stub
51 }
52
53 /**
54 * The partition is loaded in a separate thread into the local
55 * {@link PartitionDataManager}<br>
56 * You must specify the partition name and a {@link SyncPoint}.<br>
57 * You can specify a priority for the thread in the {@link SyncPoint}.
58 * <br>
59 * <br>
60 * <b>Preloading will only be done one time independent of the number of
61 * parallel threads calling this method. (synchronized)<br>
62 * </b>
63 *
64 * @param partition
65 * the name of the partition for identification
66 * @param s
67 * the associated {@link SyncPoint}
68 * @see SPMDServerController#getPartition(String)
69 * @see #isMasterThread()
70 */
71 public void preloadPartition(String partition, SyncPoint s) {
72 // create a new thread only if some other thread has not already
73 // triggered the preloading
74 synchronized (vectors) {
75 if (partitionPreloadStatus.get(partition) == null) {
76 partitionPreloadStatus.put(partition, "inProgress");
77 OneMethodThread newThread = new OneMethodThread(
78 "getPartitionThread " + s.getId(), s.getPriority(),
79 partition) {
80 @Override
81 public void run() {
82 // retrieving the partition will as a sideeffect trigger
83 // loading the
84 // partition into the PartitionDataManager
85 getPartition((String) getParameter(0));
86 }
87 };
88 mapThreadToSyncPoint(newThread, s);
89 newThread.start();
90 }
91 }
92 }
93
94 /**
95 * The partition is loaded in a separate thread into the local
96 * {@link PartitionDataManager} You must specify the partition name and a
97 * {@link SyncPoint}. You can specify a priority for the thread in the
98 * {@link SyncPoint} <br>
99 * <b>Preloading will only be done one time independent of the number of
100 * parallel threads calling this method. (synchronized)<br>
101 * </b>
102 *
103 * @param multiPartition
104 * the name of the partition for identification
105 * @param idx
106 * the index of the partition in the multidata object
107 * @param s
108 * the associated {@link SyncPoint}
109 * @see appl.parallel.spmd.AdvancedSPMDClientInterface#mergePartition(java.lang.Object,
110 * appl.parallel.spmd.SyncPoint)
111 * @see SPMDServerController#getPartition(String)
112 */
113 public void preloadMultiPartition(String multiPartition, int idx,
114 SyncPoint s) {
115 synchronized (vectors) {
116 // create a new thread only if some other thread has not already
117 // triggered the preloading
118 if (partitionPreloadStatus.get(multiPartition
119 + MultiDataInfo.MULTISPLITCONSTANT + idx) == null) {
120 partitionPreloadStatus.put(multiPartition
121 + MultiDataInfo.MULTISPLITCONSTANT + idx, "inProgress");
122
123 OneMethodThread newThread = new OneMethodThread(
124 "Preload thread - getMultiPartition SID:" + s.getId(),
125 s.getPriority(), multiPartition, idx) {
126 @Override
127 public void run() {
128 // retrieving the partition will also trigger loading
129 // the
130 // partition into the PartitionDataManager
131 getMultiPartition((String) getParameter(0),
132 (Integer) getParameter(1));
133 }
134 };
135 mapThreadToSyncPoint(newThread, s);
136 newThread.start();
137 }
138 }
139 }
140
141 /*
142 * (non-Javadoc)
143 *
144 * @see appl.parallel.spmd.AdvancedSPMDServerInterface#synchronizeToSyncPoint(appl.parallel.spmd.SyncPoint)
145 */
146 public void synchronizeToSyncPoint(SyncPoint s) {
147 // join all threads associated with the given syncPoint
148 synchronized (vectors) {
149 Vector<OneMethodThread> threads = vectors.get(s.getId());
150 if (threads == null) {
151 UnsupportedOperationException e = new UnsupportedOperationException(
152 "The Thread with the SyncPoint '"
153 + s.getId()
154 + "' was not found or was earlier synchronized. Synchronisation failed!!");
155 LOG.error("The Thread with the SyncPoint '" + s.getId()
156 + "' was not found. Synchronisation failed!!", e);
157 throw e;
158 }
159 for (Iterator iter = threads.iterator(); iter.hasNext();) {
160 OneMethodThread thread = (OneMethodThread) iter.next();
161 thread.join();
162
163 }
164 }
165 }
166
167 /**
168 * You should call this method from time to time to clear all syncpoints and
169 * the associated threads out of the memory
170 */
171 public void clearSyncData() {
172 synchronized (vectors) {
173 vectors.clear();
174 partitionPreloadStatus.clear();
175 }
176 }
177
178 /**
179 * Puts a thread/Syncpoint pair into the Hashmap/Vector structure
180 */
181 private static void mapThreadToSyncPoint(OneMethodThread thread, SyncPoint s) {
182 Vector<OneMethodThread> threads = vectors.get(s.getId());
183 // if this is the first thread for the syncPoint: create a new
184 // Vector
185 if (threads == null) {
186 threads = new Vector<OneMethodThread>(5);
187 vectors.put(s.getId(), threads);
188 }
189 threads.add(thread);
190
191 }
192
193 /**
194 * If there are multiple processors on the machine and the current task
195 * {@link SPMDTask#supportsMultiThreading() supports multithreading} there
196 * are several threads created. Each thread uses a different
197 * {@link #getLocalBounds(DataPartition) calculation area}. Now there might
198 * be parts of the task code were multithreading is undesired, e.g. when
199 * operations affect the whole partition. In this case you should use this
200 * method to guarantee that only one thread has access to a code block.
201 *
202 * @return whether this thread is the "master" thread, which means whether
203 * this thread refers to the first task created.
204 */
205 public boolean isMasterThread() {
206 return isMaster;
207
208 }
209
210 /**
211 * @param isMaster
212 * true, if this should be the master thread
213 * @see #isMasterThread()
214 */
215 public void setMasterThread(boolean isMaster) {
216 this.isMaster = isMaster;
217 }
218 }

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26