/[xulu]/branches/1.8-gt2-2.6/src/appl/parallel/services/RemoteEventProxy.java
ViewVC logotype

Annotation of /branches/1.8-gt2-2.6/src/appl/parallel/services/RemoteEventProxy.java

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2 - (hide annotations)
Wed Feb 25 11:54:01 2009 UTC (15 years, 9 months ago) by mojays
Original Path: trunk/src/appl/parallel/services/RemoteEventProxy.java
File size: 4591 byte(s)
First Commit, corresponds to Revision 1008 of Wikisquare-SVN 
1 mojays 2 package appl.parallel.services;
2    
3     import java.rmi.RemoteException;
4     import java.util.Vector;
5     import java.util.concurrent.ConcurrentLinkedQueue;
6    
7     import org.apache.log4j.LogManager;
8     import org.apache.log4j.Logger;
9    
10     import appl.parallel.event.CommEventSink;
11     import appl.parallel.event.RemoteEvent;
12     import appl.parallel.event.RemoteEventSink;
13     import appl.parallel.event.TimeEvent;
14     import appl.parallel.event.TransferEvent;
15    
16     /**
17     * Delays events and fires them after the delay all <b>at once</b> in one
18     * object (this will save communication time)
19     *
20     * @author Dominik Appl
21     */
22     public class RemoteEventProxy implements CommEventSink, Service {
23    
24     /**
25     * This Thread submits the events after the delay
26     *
27     * @author Dominik Appl
28     */
29     private class SubmitThread extends Thread {
30    
31     private boolean exit = false;
32    
33     private final ConcurrentLinkedQueue<RemoteEvent> queue2;
34    
35     private final CommEventSink sink;
36    
37     private final long delay;
38    
39     SubmitThread(ConcurrentLinkedQueue<RemoteEvent> queue,
40     CommEventSink sink, long delay) {
41     queue2 = queue;
42     this.sink = sink;
43     this.delay = Math.max(100, delay);
44     }
45    
46     /*
47     * (non-Javadoc)
48     *
49     * @see java.lang.Thread#run()
50     */
51     public void run() {
52     while (!exit) {
53     try {
54     this.sleep(delay);
55     } catch (InterruptedException e) {
56     // TODO Auto-generated catch block
57     e.printStackTrace();
58     }
59     // add all new events to a vector submit them at once
60     Vector<RemoteEvent> events = new Vector<RemoteEvent>();
61     while (queue.peek() != null)
62     events.add(queue.poll());
63     try {
64     if (events.size() > 0)
65     sink.fireRemoteEvents((RemoteEvent[]) events
66     .toArray(new RemoteEvent[events.size()]));
67     } catch (RemoteException e) {
68     // TODO Auto-generated catch block
69     LOG.warn("Could not transmit events." + e.getMessage(), e);
70     events.clear();
71     }
72     }
73     }
74    
75     public void startThread() {
76     exit = false;
77     this.start();
78     }
79    
80     public void stopThread() {
81     exit = true;
82     }
83     }
84    
85     private final Logger LOG = LogManager.getLogger(this.getClass().getName());
86    
87     private boolean timeMonitoring = false;
88    
89     private boolean transferMonitoring = false;
90    
91     private boolean isRunning;
92    
93     SubmitThread submitThread;
94    
95     ConcurrentLinkedQueue<RemoteEvent> queue;
96    
97     private final CommEventSink eventSink;
98    
99     private final long delay;
100    
101     /**
102     * Creates a new proxy
103     *
104     * @param targetSink
105     * the sink where the events should be submitted to
106     * @param delay
107     * the delay (in milliseconds)
108     */
109     public RemoteEventProxy(CommEventSink targetSink, long delay) {
110     this.eventSink = targetSink;
111     this.delay = delay;
112     queue = new ConcurrentLinkedQueue<RemoteEvent>();
113    
114     }
115    
116     /*
117     * (non-Javadoc)
118     *
119     * @see appl.parallel.event.RemoteEventSink#fireRemoteEvent(appl.parallel.event.RemoteEvent)
120     */
121     public void fireRemoteEvent(RemoteEvent e) throws RemoteException {
122     queue.add(e);
123     }
124    
125     /*
126     * (non-Javadoc)
127     *
128     * @see appl.parallel.event.RemoteEventSink#fireRemoteEvents(appl.parallel.event.RemoteEvent[])
129     */
130     public void fireRemoteEvents(RemoteEvent[] e) throws RemoteException {
131     for (RemoteEvent event : e) {
132     fireRemoteEvent(event);
133     }
134     }
135    
136     /*
137     * (non-Javadoc)
138     *
139     * @see appl.parallel.services.Service#isRunning()
140     */
141     public boolean isRunning() {
142     return isRunning;
143     }
144    
145     /**
146     * @return true if the service is running and time monitoring is enabled
147     */
148     public boolean isTimeMonitoringEnabled() throws RemoteException {
149     return timeMonitoring;
150     }
151    
152     /**
153     * @return true if the service is running and tansfer monitoring is enabled
154     */
155     public boolean isTransferMonitoringEnabled() throws RemoteException {
156     return transferMonitoring;
157    
158     }
159    
160     /*
161     * (non-Javadoc)
162     *
163     * @see appl.parallel.services.Service#startService()
164     */
165     public void startService() {
166     isRunning = true;
167     if (submitThread != null)
168     submitThread.stopThread();
169     submitThread = new SubmitThread(queue, eventSink, delay);
170     submitThread.startThread();
171     try {
172     this.timeMonitoring = eventSink.isTimeMonitoringEnabled();
173     this.transferMonitoring = eventSink.isTransferMonitoringEnabled();
174     } catch (RemoteException e) {
175     LOG.error("Could not connect to CommEventSink!!!");
176     }
177    
178     }
179    
180     /*
181     * (non-Javadoc)
182     *
183     * @see appl.parallel.services.Service#stopService()
184     */
185     public void stopService() {
186     isRunning = false;
187     if (submitThread != null)
188     submitThread.stopThread();
189     }
190     }

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26