/[xulu]/trunk.bakup/src/appl/parallel/spmd/AdvancedSPMDServerController.java
ViewVC logotype

Annotation of /trunk.bakup/src/appl/parallel/spmd/AdvancedSPMDServerController.java

Parent Directory Parent Directory | Revision Log Revision Log


Revision 77 - (hide annotations)
Wed Feb 10 16:14:27 2010 UTC (14 years, 11 months ago) by alfonx
File size: 7900 byte(s)
backup of trunk
1 mojays 2 package appl.parallel.spmd;
2    
3     import java.io.PrintStream;
4     import java.util.HashMap;
5     import java.util.Iterator;
6     import java.util.Vector;
7    
8     import org.apache.log4j.LogManager;
9     import org.apache.log4j.Logger;
10    
11     import appl.parallel.ComputingResourceContainer;
12     import appl.parallel.client.DataServer;
13     import appl.parallel.client.ClientDataServer;
14     import appl.parallel.event.CommEventSink;
15     import appl.parallel.server.PartitionDataManager;
16     import appl.parallel.spmd.split.DataPartition;
17     import appl.parallel.thread.OneMethodThread;
18    
19     /**
20     * Performance optimizations can be made with this class. Allows heavy
21     * multithreading. Because multiple threads may use this class concurrently
22     * nearly all additional methods of this class are internally synchronized using a static variable.
23     * Also allows preloading of partitions using {@link SyncPoint}s.
24     *
25     * @author Dominik Appl
26     */
27     public class AdvancedSPMDServerController extends SPMDServerController
28     implements AdvancedSPMDServerInterface {
29    
30     private final Logger LOG = LogManager.getLogger(this.getClass().getName());
31    
32     private static final HashMap<String, String> partitionPreloadStatus =
33     new HashMap<String, String>();
34    
35     // private static final HashMap<Integer,String> multiPartitionPreloadStatus
36     // = new HashMap<Integer, String>();
37     /**
38     * For each {@link SyncPoint} a vector of threads associated with the id of
39     * a SyncPoint is created. The {@link HashMap} is static so all possible threads use
40     * the same map. Access to this map must be synchronized
41     */
42     private static final HashMap<Integer, Vector<OneMethodThread>> vectors =
43     new HashMap<Integer, Vector<OneMethodThread>>();
44    
45     private boolean isMaster = true;
46    
47     /**
48     * same parameters as superclass
49     *
50     * @see SPMDServerController#SPMDServerController(PartitionDataManager, int)
51     */
52     public AdvancedSPMDServerController(PartitionDataManager dataManager,
53     int referenceResouceID) {
54     super(dataManager, referenceResouceID);
55     // TODO Auto-generated constructor stub
56     }
57    
58     /**
59     * The partition is loaded in a separate thread into the local
60     * {@link PartitionDataManager}<br>
61     * You must specify the partition name and a {@link SyncPoint}.<br>
62     * You can specify a priority for the thread in the {@link SyncPoint}.
63     * <br>
64     * <br>
65     * <b>Preloading will only be done one time independent of the number of
66     * parallel threads calling this method. (synchronized)<br>
67     * </b>
68     *
69     * @param partition
70     * the name of the partition for identification
71     * @param s
72     * the associated {@link SyncPoint}
73     * @see SPMDServerController#getPartition(String)
74     * @see #isMasterThread()
75     */
76     public void preloadPartition(String partition, SyncPoint s) {
77     // create a new thread only if some other thread has not already
78     // triggered the preloading
79     synchronized (vectors) {
80     if (partitionPreloadStatus.get(partition) == null) {
81     partitionPreloadStatus.put(partition, "inProgress");
82     OneMethodThread newThread = new OneMethodThread(
83     "getPartitionThread " + s.getId(), s.getPriority(),
84     partition) {
85     @Override
86     public void run() {
87     // retrieving the partition will as a sideeffect trigger
88     // loading the
89     // partition into the PartitionDataManager
90     getPartition((String) getParameter(0));
91     }
92     };
93     mapThreadToSyncPoint(newThread, s);
94     newThread.start();
95     }
96     }
97     }
98    
99     /**
100     * The partition is loaded in a separate thread into the local
101     * {@link PartitionDataManager} You must specify the partition name and a
102     * {@link SyncPoint}. You can specify a priority for the thread in the
103     * {@link SyncPoint} <br>
104     * <b>Preloading will only be done one time independent of the number of
105     * parallel threads calling this method. (synchronized)<br>
106     * </b>
107     *
108     * @param multiPartition
109     * the name of the partition for identification
110     * @param idx
111     * the index of the partition in the multidata object
112     * @param s
113     * the associated {@link SyncPoint}
114     * @see appl.parallel.spmd.AdvancedSPMDClientInterface#mergePartition(java.lang.Object,
115     * appl.parallel.spmd.SyncPoint)
116     * @see SPMDServerController#getPartition(String)
117     */
118     public void preloadMultiPartition(String multiPartition, int idx,
119     SyncPoint s) {
120     synchronized (vectors) {
121     // create a new thread only if some other thread has not already
122     // triggered the preloading
123     if (partitionPreloadStatus.get(multiPartition
124     + MultiDataInfo.MULTISPLITCONSTANT + idx) == null) {
125     partitionPreloadStatus.put(multiPartition
126     + MultiDataInfo.MULTISPLITCONSTANT + idx, "inProgress");
127    
128     OneMethodThread newThread = new OneMethodThread(
129     "Preload thread - getMultiPartition SID:" + s.getId(),
130     s.getPriority(), multiPartition, idx) {
131     @Override
132     public void run() {
133     // retrieving the partition will also trigger loading
134     // the
135     // partition into the PartitionDataManager
136     getMultiPartition((String) getParameter(0),
137     (Integer) getParameter(1));
138     }
139     };
140     mapThreadToSyncPoint(newThread, s);
141     newThread.start();
142     }
143     }
144     }
145    
146     /*
147     * (non-Javadoc)
148     *
149     * @see appl.parallel.spmd.AdvancedSPMDServerInterface#synchronizeToSyncPoint(appl.parallel.spmd.SyncPoint)
150     */
151     public void synchronizeToSyncPoint(SyncPoint s) {
152     // join all threads associated with the given syncPoint
153     synchronized (vectors) {
154     Vector<OneMethodThread> threads = vectors.get(s.getId());
155     if (threads == null) {
156     UnsupportedOperationException e = new UnsupportedOperationException(
157     "The Thread with the SyncPoint '"
158     + s.getId()
159     + "' was not found or was earlier synchronized. Synchronisation failed!!");
160     LOG.error("The Thread with the SyncPoint '" + s.getId()
161     + "' was not found. Synchronisation failed!!", e);
162     throw e;
163     }
164     for (Iterator iter = threads.iterator(); iter.hasNext();) {
165     OneMethodThread thread = (OneMethodThread) iter.next();
166     thread.join();
167    
168     }
169     }
170     }
171    
172     /**
173     * You should call this method from time to time to clear all syncpoints and
174     * the associated threads out of the memory
175     */
176     public void clearSyncData() {
177     synchronized (vectors) {
178     vectors.clear();
179     partitionPreloadStatus.clear();
180     }
181     }
182    
183     /**
184     * Puts a thread/Syncpoint pair into the Hashmap/Vector structure
185     */
186     private static void mapThreadToSyncPoint(OneMethodThread thread, SyncPoint s) {
187     Vector<OneMethodThread> threads = vectors.get(s.getId());
188     // if this is the first thread for the syncPoint: create a new
189     // Vector
190     if (threads == null) {
191     threads = new Vector<OneMethodThread>(5);
192     vectors.put(s.getId(), threads);
193     }
194     threads.add(thread);
195    
196     }
197    
198     /**
199     * If there are multiple processors on the machine and the current task
200     * {@link SPMDTask#supportsMultiThreading() supports multithreading} there
201     * are several threads created. Each thread uses a different
202     * {@link #getLocalBounds(DataPartition) calculation area}. Now there might
203     * be parts of the task code were multithreading is undesired, e.g. when
204     * operations affect the whole partition. In this case you should use this
205     * method to guarantee that only one thread has access to a code block.
206     *
207     * @return whether this thread is the "master" thread, which means whether
208     * this thread refers to the first task created.
209     */
210     public boolean isMasterThread() {
211     return isMaster;
212    
213     }
214    
215     /**
216     * @param isMaster
217     * true, if this should be the master thread
218     * @see #isMasterThread()
219     */
220     public void setMasterThread(boolean isMaster) {
221     this.isMaster = isMaster;
222     }
223     }

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26