/[xulu]/trunk/src/appl/parallel/services/RemoteEventProxy.java
ViewVC logotype

Contents of /trunk/src/appl/parallel/services/RemoteEventProxy.java

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2 - (show annotations)
Wed Feb 25 11:54:01 2009 UTC (15 years, 10 months ago) by mojays
File size: 4591 byte(s)
First Commit, corresponds to Revision 1008 of Wikisquare-SVN 
1 package appl.parallel.services;
2
3 import java.rmi.RemoteException;
4 import java.util.Vector;
5 import java.util.concurrent.ConcurrentLinkedQueue;
6
7 import org.apache.log4j.LogManager;
8 import org.apache.log4j.Logger;
9
10 import appl.parallel.event.CommEventSink;
11 import appl.parallel.event.RemoteEvent;
12 import appl.parallel.event.RemoteEventSink;
13 import appl.parallel.event.TimeEvent;
14 import appl.parallel.event.TransferEvent;
15
16 /**
17 * Delays events and fires them after the delay all <b>at once</b> in one
18 * object (this will save communication time)
19 *
20 * @author Dominik Appl
21 */
22 public class RemoteEventProxy implements CommEventSink, Service {
23
24 /**
25 * This Thread submits the events after the delay
26 *
27 * @author Dominik Appl
28 */
29 private class SubmitThread extends Thread {
30
31 private boolean exit = false;
32
33 private final ConcurrentLinkedQueue<RemoteEvent> queue2;
34
35 private final CommEventSink sink;
36
37 private final long delay;
38
39 SubmitThread(ConcurrentLinkedQueue<RemoteEvent> queue,
40 CommEventSink sink, long delay) {
41 queue2 = queue;
42 this.sink = sink;
43 this.delay = Math.max(100, delay);
44 }
45
46 /*
47 * (non-Javadoc)
48 *
49 * @see java.lang.Thread#run()
50 */
51 public void run() {
52 while (!exit) {
53 try {
54 this.sleep(delay);
55 } catch (InterruptedException e) {
56 // TODO Auto-generated catch block
57 e.printStackTrace();
58 }
59 // add all new events to a vector submit them at once
60 Vector<RemoteEvent> events = new Vector<RemoteEvent>();
61 while (queue.peek() != null)
62 events.add(queue.poll());
63 try {
64 if (events.size() > 0)
65 sink.fireRemoteEvents((RemoteEvent[]) events
66 .toArray(new RemoteEvent[events.size()]));
67 } catch (RemoteException e) {
68 // TODO Auto-generated catch block
69 LOG.warn("Could not transmit events." + e.getMessage(), e);
70 events.clear();
71 }
72 }
73 }
74
75 public void startThread() {
76 exit = false;
77 this.start();
78 }
79
80 public void stopThread() {
81 exit = true;
82 }
83 }
84
85 private final Logger LOG = LogManager.getLogger(this.getClass().getName());
86
87 private boolean timeMonitoring = false;
88
89 private boolean transferMonitoring = false;
90
91 private boolean isRunning;
92
93 SubmitThread submitThread;
94
95 ConcurrentLinkedQueue<RemoteEvent> queue;
96
97 private final CommEventSink eventSink;
98
99 private final long delay;
100
101 /**
102 * Creates a new proxy
103 *
104 * @param targetSink
105 * the sink where the events should be submitted to
106 * @param delay
107 * the delay (in milliseconds)
108 */
109 public RemoteEventProxy(CommEventSink targetSink, long delay) {
110 this.eventSink = targetSink;
111 this.delay = delay;
112 queue = new ConcurrentLinkedQueue<RemoteEvent>();
113
114 }
115
116 /*
117 * (non-Javadoc)
118 *
119 * @see appl.parallel.event.RemoteEventSink#fireRemoteEvent(appl.parallel.event.RemoteEvent)
120 */
121 public void fireRemoteEvent(RemoteEvent e) throws RemoteException {
122 queue.add(e);
123 }
124
125 /*
126 * (non-Javadoc)
127 *
128 * @see appl.parallel.event.RemoteEventSink#fireRemoteEvents(appl.parallel.event.RemoteEvent[])
129 */
130 public void fireRemoteEvents(RemoteEvent[] e) throws RemoteException {
131 for (RemoteEvent event : e) {
132 fireRemoteEvent(event);
133 }
134 }
135
136 /*
137 * (non-Javadoc)
138 *
139 * @see appl.parallel.services.Service#isRunning()
140 */
141 public boolean isRunning() {
142 return isRunning;
143 }
144
145 /**
146 * @return true if the service is running and time monitoring is enabled
147 */
148 public boolean isTimeMonitoringEnabled() throws RemoteException {
149 return timeMonitoring;
150 }
151
152 /**
153 * @return true if the service is running and tansfer monitoring is enabled
154 */
155 public boolean isTransferMonitoringEnabled() throws RemoteException {
156 return transferMonitoring;
157
158 }
159
160 /*
161 * (non-Javadoc)
162 *
163 * @see appl.parallel.services.Service#startService()
164 */
165 public void startService() {
166 isRunning = true;
167 if (submitThread != null)
168 submitThread.stopThread();
169 submitThread = new SubmitThread(queue, eventSink, delay);
170 submitThread.startThread();
171 try {
172 this.timeMonitoring = eventSink.isTimeMonitoringEnabled();
173 this.transferMonitoring = eventSink.isTransferMonitoringEnabled();
174 } catch (RemoteException e) {
175 LOG.error("Could not connect to CommEventSink!!!");
176 }
177
178 }
179
180 /*
181 * (non-Javadoc)
182 *
183 * @see appl.parallel.services.Service#stopService()
184 */
185 public void stopService() {
186 isRunning = false;
187 if (submitThread != null)
188 submitThread.stopThread();
189 }
190 }

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26