/[xulu]/branches/1.8-gt2-2.6/src/appl/parallel/services/RemoteEventProxy.java
ViewVC logotype

Annotation of /branches/1.8-gt2-2.6/src/appl/parallel/services/RemoteEventProxy.java

Parent Directory Parent Directory | Revision Log Revision Log


Revision 60 - (hide annotations)
Sun Oct 4 16:54:52 2009 UTC (15 years, 2 months ago) by alfonx
File size: 4464 byte(s)
* organized imports
1 mojays 2 package appl.parallel.services;
2    
3     import java.rmi.RemoteException;
4     import java.util.Vector;
5     import java.util.concurrent.ConcurrentLinkedQueue;
6    
7     import org.apache.log4j.LogManager;
8     import org.apache.log4j.Logger;
9    
10     import appl.parallel.event.CommEventSink;
11     import appl.parallel.event.RemoteEvent;
12    
13     /**
14     * Delays events and fires them after the delay all <b>at once</b> in one
15     * object (this will save communication time)
16     *
17     * @author Dominik Appl
18     */
19     public class RemoteEventProxy implements CommEventSink, Service {
20    
21     /**
22     * This Thread submits the events after the delay
23     *
24     * @author Dominik Appl
25     */
26     private class SubmitThread extends Thread {
27    
28     private boolean exit = false;
29    
30     private final ConcurrentLinkedQueue<RemoteEvent> queue2;
31    
32     private final CommEventSink sink;
33    
34     private final long delay;
35    
36     SubmitThread(ConcurrentLinkedQueue<RemoteEvent> queue,
37     CommEventSink sink, long delay) {
38     queue2 = queue;
39     this.sink = sink;
40     this.delay = Math.max(100, delay);
41     }
42    
43     /*
44     * (non-Javadoc)
45     *
46     * @see java.lang.Thread#run()
47     */
48     public void run() {
49     while (!exit) {
50     try {
51     this.sleep(delay);
52     } catch (InterruptedException e) {
53     // TODO Auto-generated catch block
54     e.printStackTrace();
55     }
56     // add all new events to a vector submit them at once
57     Vector<RemoteEvent> events = new Vector<RemoteEvent>();
58     while (queue.peek() != null)
59     events.add(queue.poll());
60     try {
61     if (events.size() > 0)
62     sink.fireRemoteEvents((RemoteEvent[]) events
63     .toArray(new RemoteEvent[events.size()]));
64     } catch (RemoteException e) {
65     // TODO Auto-generated catch block
66     LOG.warn("Could not transmit events." + e.getMessage(), e);
67     events.clear();
68     }
69     }
70     }
71    
72     public void startThread() {
73     exit = false;
74     this.start();
75     }
76    
77     public void stopThread() {
78     exit = true;
79     }
80     }
81    
82     private final Logger LOG = LogManager.getLogger(this.getClass().getName());
83    
84     private boolean timeMonitoring = false;
85    
86     private boolean transferMonitoring = false;
87    
88     private boolean isRunning;
89    
90     SubmitThread submitThread;
91    
92     ConcurrentLinkedQueue<RemoteEvent> queue;
93    
94     private final CommEventSink eventSink;
95    
96     private final long delay;
97    
98     /**
99     * Creates a new proxy
100     *
101     * @param targetSink
102     * the sink where the events should be submitted to
103     * @param delay
104     * the delay (in milliseconds)
105     */
106     public RemoteEventProxy(CommEventSink targetSink, long delay) {
107     this.eventSink = targetSink;
108     this.delay = delay;
109     queue = new ConcurrentLinkedQueue<RemoteEvent>();
110    
111     }
112    
113     /*
114     * (non-Javadoc)
115     *
116     * @see appl.parallel.event.RemoteEventSink#fireRemoteEvent(appl.parallel.event.RemoteEvent)
117     */
118     public void fireRemoteEvent(RemoteEvent e) throws RemoteException {
119     queue.add(e);
120     }
121    
122     /*
123     * (non-Javadoc)
124     *
125     * @see appl.parallel.event.RemoteEventSink#fireRemoteEvents(appl.parallel.event.RemoteEvent[])
126     */
127     public void fireRemoteEvents(RemoteEvent[] e) throws RemoteException {
128     for (RemoteEvent event : e) {
129     fireRemoteEvent(event);
130     }
131     }
132    
133     /*
134     * (non-Javadoc)
135     *
136     * @see appl.parallel.services.Service#isRunning()
137     */
138     public boolean isRunning() {
139     return isRunning;
140     }
141    
142     /**
143     * @return true if the service is running and time monitoring is enabled
144     */
145     public boolean isTimeMonitoringEnabled() throws RemoteException {
146     return timeMonitoring;
147     }
148    
149     /**
150     * @return true if the service is running and tansfer monitoring is enabled
151     */
152     public boolean isTransferMonitoringEnabled() throws RemoteException {
153     return transferMonitoring;
154    
155     }
156    
157     /*
158     * (non-Javadoc)
159     *
160     * @see appl.parallel.services.Service#startService()
161     */
162     public void startService() {
163     isRunning = true;
164     if (submitThread != null)
165     submitThread.stopThread();
166     submitThread = new SubmitThread(queue, eventSink, delay);
167     submitThread.startThread();
168     try {
169     this.timeMonitoring = eventSink.isTimeMonitoringEnabled();
170     this.transferMonitoring = eventSink.isTransferMonitoringEnabled();
171     } catch (RemoteException e) {
172     LOG.error("Could not connect to CommEventSink!!!");
173     }
174    
175     }
176    
177     /*
178     * (non-Javadoc)
179     *
180     * @see appl.parallel.services.Service#stopService()
181     */
182     public void stopService() {
183     isRunning = false;
184     if (submitThread != null)
185     submitThread.stopThread();
186     }
187     }

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26