/[xulu]/branches/1.8-gt2-2.6/src/appl/parallel/services/RemoteEventProxy.java
ViewVC logotype

Contents of /branches/1.8-gt2-2.6/src/appl/parallel/services/RemoteEventProxy.java

Parent Directory Parent Directory | Revision Log Revision Log


Revision 60 - (show annotations)
Sun Oct 4 16:54:52 2009 UTC (15 years, 2 months ago) by alfonx
File size: 4464 byte(s)
* organized imports
1 package appl.parallel.services;
2
3 import java.rmi.RemoteException;
4 import java.util.Vector;
5 import java.util.concurrent.ConcurrentLinkedQueue;
6
7 import org.apache.log4j.LogManager;
8 import org.apache.log4j.Logger;
9
10 import appl.parallel.event.CommEventSink;
11 import appl.parallel.event.RemoteEvent;
12
13 /**
14 * Delays events and fires them after the delay all <b>at once</b> in one
15 * object (this will save communication time)
16 *
17 * @author Dominik Appl
18 */
19 public class RemoteEventProxy implements CommEventSink, Service {
20
21 /**
22 * This Thread submits the events after the delay
23 *
24 * @author Dominik Appl
25 */
26 private class SubmitThread extends Thread {
27
28 private boolean exit = false;
29
30 private final ConcurrentLinkedQueue<RemoteEvent> queue2;
31
32 private final CommEventSink sink;
33
34 private final long delay;
35
36 SubmitThread(ConcurrentLinkedQueue<RemoteEvent> queue,
37 CommEventSink sink, long delay) {
38 queue2 = queue;
39 this.sink = sink;
40 this.delay = Math.max(100, delay);
41 }
42
43 /*
44 * (non-Javadoc)
45 *
46 * @see java.lang.Thread#run()
47 */
48 public void run() {
49 while (!exit) {
50 try {
51 this.sleep(delay);
52 } catch (InterruptedException e) {
53 // TODO Auto-generated catch block
54 e.printStackTrace();
55 }
56 // add all new events to a vector submit them at once
57 Vector<RemoteEvent> events = new Vector<RemoteEvent>();
58 while (queue.peek() != null)
59 events.add(queue.poll());
60 try {
61 if (events.size() > 0)
62 sink.fireRemoteEvents((RemoteEvent[]) events
63 .toArray(new RemoteEvent[events.size()]));
64 } catch (RemoteException e) {
65 // TODO Auto-generated catch block
66 LOG.warn("Could not transmit events." + e.getMessage(), e);
67 events.clear();
68 }
69 }
70 }
71
72 public void startThread() {
73 exit = false;
74 this.start();
75 }
76
77 public void stopThread() {
78 exit = true;
79 }
80 }
81
82 private final Logger LOG = LogManager.getLogger(this.getClass().getName());
83
84 private boolean timeMonitoring = false;
85
86 private boolean transferMonitoring = false;
87
88 private boolean isRunning;
89
90 SubmitThread submitThread;
91
92 ConcurrentLinkedQueue<RemoteEvent> queue;
93
94 private final CommEventSink eventSink;
95
96 private final long delay;
97
98 /**
99 * Creates a new proxy
100 *
101 * @param targetSink
102 * the sink where the events should be submitted to
103 * @param delay
104 * the delay (in milliseconds)
105 */
106 public RemoteEventProxy(CommEventSink targetSink, long delay) {
107 this.eventSink = targetSink;
108 this.delay = delay;
109 queue = new ConcurrentLinkedQueue<RemoteEvent>();
110
111 }
112
113 /*
114 * (non-Javadoc)
115 *
116 * @see appl.parallel.event.RemoteEventSink#fireRemoteEvent(appl.parallel.event.RemoteEvent)
117 */
118 public void fireRemoteEvent(RemoteEvent e) throws RemoteException {
119 queue.add(e);
120 }
121
122 /*
123 * (non-Javadoc)
124 *
125 * @see appl.parallel.event.RemoteEventSink#fireRemoteEvents(appl.parallel.event.RemoteEvent[])
126 */
127 public void fireRemoteEvents(RemoteEvent[] e) throws RemoteException {
128 for (RemoteEvent event : e) {
129 fireRemoteEvent(event);
130 }
131 }
132
133 /*
134 * (non-Javadoc)
135 *
136 * @see appl.parallel.services.Service#isRunning()
137 */
138 public boolean isRunning() {
139 return isRunning;
140 }
141
142 /**
143 * @return true if the service is running and time monitoring is enabled
144 */
145 public boolean isTimeMonitoringEnabled() throws RemoteException {
146 return timeMonitoring;
147 }
148
149 /**
150 * @return true if the service is running and tansfer monitoring is enabled
151 */
152 public boolean isTransferMonitoringEnabled() throws RemoteException {
153 return transferMonitoring;
154
155 }
156
157 /*
158 * (non-Javadoc)
159 *
160 * @see appl.parallel.services.Service#startService()
161 */
162 public void startService() {
163 isRunning = true;
164 if (submitThread != null)
165 submitThread.stopThread();
166 submitThread = new SubmitThread(queue, eventSink, delay);
167 submitThread.startThread();
168 try {
169 this.timeMonitoring = eventSink.isTimeMonitoringEnabled();
170 this.transferMonitoring = eventSink.isTransferMonitoringEnabled();
171 } catch (RemoteException e) {
172 LOG.error("Could not connect to CommEventSink!!!");
173 }
174
175 }
176
177 /*
178 * (non-Javadoc)
179 *
180 * @see appl.parallel.services.Service#stopService()
181 */
182 public void stopService() {
183 isRunning = false;
184 if (submitThread != null)
185 submitThread.stopThread();
186 }
187 }

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26