/[xulu]/branches/1.8-gt2-2.6/src/appl/parallel/services/MulticastDiscoveryService.java
ViewVC logotype

Contents of /branches/1.8-gt2-2.6/src/appl/parallel/services/MulticastDiscoveryService.java

Parent Directory Parent Directory | Revision Log Revision Log


Revision 33 - (show annotations)
Fri Jun 19 14:20:37 2009 UTC (15 years, 6 months ago) by alfonx
Original Path: trunk/src/appl/parallel/services/MulticastDiscoveryService.java
File size: 11720 byte(s)
* Renamed classes:
LayerPanel to MapLegend (works on StyledObj)
AtlasLayerPanel to AtlasMapLegend (works on DpLayer)
LayerPanelGroup to MapLayerLegend (works on StyledObj)
AtlasLayerPanelGroup to AtlasMapLayerLegend (works on DpLayer)

Updated all JARs and committed them.

TODO: Some "ISDSS person" might want to rename:
de.isdss.util.framework.ui.panel.LayerPanelScrollPane to something like MapLegendScrollPane.java or FWMapLegendScrollPane.java

* Also committing some substitutions of "@returns" with "@return" 
* Removed testsomethong.java from xulu
1 package appl.parallel.services;
2
3 import java.io.IOException;
4 import java.net.DatagramPacket;
5 import java.net.InetAddress;
6 import java.net.MulticastSocket;
7 import java.net.UnknownHostException;
8 import java.rmi.Naming;
9 import java.rmi.RemoteException;
10 import java.util.Iterator;
11 import java.util.Vector;
12
13 import org.apache.log4j.BasicConfigurator;
14 import org.apache.log4j.LogManager;
15 import org.apache.log4j.Logger;
16
17 import appl.ext.XuluConfig;
18 import appl.parallel.ComputingResource;
19 import appl.parallel.ComputingResourceContainer;
20 import appl.parallel.server.XuluServer;
21 import appl.parallel.test.PingTestObject;
22 import appl.parallel.util.Helper;
23
24 /**
25 * Responsible for discovery of RemoteResources on the network, especially but
26 * not only {@link XuluServer XuluServers}. It listens for the following
27 * messages: <br>
28 * <br>
29 * "Hello Xulu from XuluServer" - sent by RemoteResources like
30 * {@link XuluServer} when starting <br>
31 * "Goodbye Xulu" - sent by RemoteResources like {@link XuluServer} when
32 * stopping <br>
33 * <br>
34 * and sends: <br>
35 * "Hello Servers" - when started If multicasting does not work try to disable
36 * your firewall or better configure it to allow multicast packages. Notice
37 * also, that multicasting may not work in virtual machines like MS Virtual PC
38 * 2004. <Warning><b>THIS CLASS IS STILL FOR DEMONSTRATION ONLY</b></Warning>
39 *
40 * @author Dominik Appl
41 * @see XuluServer
42 */
43 public class MulticastDiscoveryService implements DiscoveryService {
44
45 // the data to send
46 final String sendHello = "Hello Servers";
47
48 // data to receive
49 final String helloFromXuluServers = "Hello Xulu from XuluServer";
50
51 final String recGoodBye = "Goodbye Xulu";
52
53 // the Thread which waits on Server hello/goodbyes
54 volatile ReceiveThread receiveThread;
55
56 // volatile RenewalThread renewalThread;
57
58 // the thread that checks if servers are alive
59 // volatile CheckThread checkThread;
60
61 // the data is read from XuluConfig
62 InetAddress group;
63
64 int port;
65
66 int lease;
67
68 MulticastSocket socket;
69
70 boolean isRunning = false;
71
72 private final Logger LOG = LogManager.getLogger(this.getClass().getName());
73
74 /**
75 * Constructs a new service reading port and lease time from
76 * {@link appl.ext.XuluConfig}
77 */
78 public MulticastDiscoveryService() {
79 XuluConfig config = XuluConfig.getXuluConfig();
80 port = Integer.valueOf(config
81 .getProperty("DiscoveryServices.multicast.port"));
82 // lease = Integer.valueOf(config
83 // .getProperty("MulticastDiscoveryService.lease"));
84 try {
85 group = InetAddress.getByName(config
86 .getProperty("DiscoveryServices.multicast.group"));
87 } catch (UnknownHostException e) {
88 // TODO Auto-generated catch block
89 e.printStackTrace();
90 }
91 }
92
93 /**
94 * Constructs a new service
95 *
96 * @param port
97 * port where the multicasting happens
98 * @param lease
99 * timeintervall (in ms) after whi
100 */
101 public MulticastDiscoveryService(int port, int lease) {
102 this();
103 this.port = port;
104 this.lease = lease;
105 }
106
107 /*
108 * (non-Javadoc)
109 *
110 * @see appl.parallel.services.Service#startService()
111 */
112 public void startService() {
113 try {
114 LOG.info("Discoveryservice started");
115 // All Servers must listen on the port specified, in order fo
116 // the socket to receive
117 if (socket == null)
118 socket = new MulticastSocket(port);
119 // open a multicast group and join it
120
121 socket.joinGroup(group);
122
123 socket.setSoTimeout(5000);
124
125 // create and send Hello world
126 DatagramPacket datagram = new DatagramPacket(sendHello.getBytes(),
127 sendHello.length(), group, port);
128 System.out.println(group);
129 LOG.info("Send hello to all listening servers with broadcast IP "
130 + group);
131
132 socket.send(datagram);
133
134 if (receiveThread == null) {
135 receiveThread = new ReceiveThread(socket, helloFromXuluServers,
136 recGoodBye);
137 }
138 if (!(receiveThread.isAlive()))
139 receiveThread.start();
140
141 } catch (UnknownHostException e) {
142 e.printStackTrace();
143 } catch (IOException e) {
144 e.printStackTrace();
145 }
146
147 }
148
149 /*
150 * (non-Javadoc)
151 *
152 * @see appl.parallel.services.DiscoveryService#getResources()
153 */
154 public Vector<ComputingResourceContainer> getResources() {
155 return receiveThread.getResources();
156 }
157
158 /*
159 * (non-Javadoc)
160 *
161 * @see appl.parallel.services.Service#stopService()
162 */
163 public void stopService() {
164 receiveThread.stopThread();
165 // renewalThread.stopThread();
166 try {
167 Thread.currentThread().sleep(100);
168 } catch (InterruptedException e) {
169 // TODO Auto-generated catch block
170 e.printStackTrace();
171 }
172 socket.close();
173
174 }
175
176 /*
177 * (non-Javadoc)
178 *
179 * @see appl.parallel.services.Service#isRunning()
180 */
181 public boolean isRunning() {
182 return isRunning;
183 }
184
185 /** waits for incoming calls */
186
187 class ReceiveThread extends Thread {
188
189 // Resources discovered so far
190 Vector<ComputingResource> discoveredResources;
191
192 private volatile Thread receiveThread;
193
194 boolean exit = false;
195
196 MulticastSocket socket;
197
198 private final String recHello2;
199
200 private final String recGoodBye2;
201
202 private boolean discoveryInProgress = false;
203
204 /**
205 * @return a Vector of discovered Ressources before return, all values
206 * are pinged and only returned if alive
207 */
208 public Vector<ComputingResourceContainer> getResources() {
209 return Helper.getResourceContainersForVector(discoveredResources);
210 }
211
212 /**
213 * @param socket2
214 * Socket which is used for listening
215 * @param helloFromXuluServers
216 * String that equals the hello message of the server
217 * @param recGoodBye
218 * String that equals the Goordbye message of the server
219 */
220 public ReceiveThread(MulticastSocket socket, String recHello,
221 String recGoodBye) {
222 recHello2 = recHello;
223 recGoodBye2 = recGoodBye;
224 this.socket = socket;
225 discoveredResources = new Vector<ComputingResource>();
226 }
227
228 /**
229 * Safe method to stop the Thread. Use this method instead of
230 * {@link Thread}
231 */
232 public void stopThread() {
233 if (receiveThread != null) {
234 exit = true;
235 // the only way to stop the Thread is to send a package so that
236 // the receive operation no longer blocks
237 DatagramPacket datagram = new DatagramPacket(
238 "Stopping receive Thread".getBytes(),
239 "Stopping receive Thread".length(), group, port);
240 try {
241 socket.send(datagram);
242 // wait on stop
243 Thread.currentThread().sleep(100);
244 } catch (IOException e) {
245 e.printStackTrace();
246 } catch (InterruptedException e) {
247 // TODO Auto-generated catch block
248 e.printStackTrace();
249 } finally {
250 exit = false;
251 }
252 exit = false;
253
254 }
255
256 }
257
258 public void run() {
259 receiveThread = Thread.currentThread();
260 while (!exit) {
261 try {
262 // receive buffer
263
264 byte[] buffer = new byte[256];
265 DatagramPacket datagram = new DatagramPacket(buffer,
266 buffer.length);
267 socket.receive(datagram);
268 // get the message in the right
269 byte[] result = new byte[datagram.getLength()];
270 System.arraycopy(datagram.getData(), 0, result, 0, datagram
271 .getLength());
272 // check if message is from XuluServer
273 if (new String(result).equals(helloFromXuluServers)) {
274 LOG.debug("received hello from server "
275 + datagram.getAddress());
276
277 String newname = "rmi:/" + datagram.getAddress()
278 + "/XuluServer";
279
280 ComputingResource server = (ComputingResource) Naming
281 .lookup(newname);
282 addRessource(server, datagram.getAddress().toString());
283
284 } else if (LOG.isDebugEnabled())
285 LOG.debug("received message |" + new String(result)
286 + "| from " + datagram.getAddress());
287
288 } catch (java.net.SocketTimeoutException e) {
289 // catch timeout exception
290 }
291
292 catch (IOException e) {
293 // TODO Auto-generated catch block
294 e.printStackTrace();
295 } catch (Exception e) {
296 if (!(e instanceof InterruptedException))
297 e.printStackTrace();
298 }
299 }
300 System.out.println("exit Thread " + this.toString());
301 }
302
303 /**
304 * adds ressource to vector but first looks if its not already added
305 *
306 * @param server
307 */
308 private void addRessource(ComputingResource server, String IP) {
309 // check for double entries
310 for (ComputingResource s : discoveredResources) {
311 if (s.equals(server)) {
312 LOG.debug("Not adding " + server.toString()
313 + " to discovered Ressources again!");
314 return;
315 }
316 }
317 LOG.info("Adding Xulu Server: " + IP + "to discovered Ressources");
318 // try {
319 // Thread.currentThread().sleep(2000);
320 // } catch (InterruptedException e) {
321 // // TODO Auto-generated catch block
322 // e.printStackTrace();
323 // }
324 discoveredResources.add(server);
325 }
326
327 /**
328 * Gets the {@link Resource
329 */
330 public void pingResources() {
331 PingTestObject po = new PingTestObject(32);
332 // cannot make a for each loop because of
333 // java.util.ConcurrentModificationException
334 int i = discoveredResources.size() - 1;
335 while (i >= 0) {
336 boolean error = true;
337 ComputingResource r = discoveredResources.get(i);
338 if (!(r == null)) {
339
340 try {
341 r.ping();
342 error = false;
343 LOG.debug("Ping to " + r + " was successfull!");
344 } catch (Exception e) {
345 e.printStackTrace();
346 }
347 // if not successfull remove object
348 if (error) {
349 LOG.debug("Ping to " + r
350 + " was NOT successfull! Removing ressource");
351
352 discoveredResources.remove(r);
353 }
354 }
355 i--;
356 }
357 }
358
359 }
360
361 /**
362 * simple Thread that caused rediscovery of Ressources at a given time
363 * intervall **
364 */
365
366 class RenewalThread extends Thread {
367
368 private volatile Thread thisThread;
369
370 boolean exit = false;
371
372 private final long timeintervall;
373
374 private volatile ReceiveThread discoveryToRenew;
375
376 /**
377 * invokes rediscovery on the given thread at the given time intervall
378 */
379 public RenewalThread(ReceiveThread discoveryToRenew, long timeintervall) {
380 this.discoveryToRenew = discoveryToRenew;
381 this.timeintervall = timeintervall;
382
383 }
384
385 /**
386 * Safe method to stop this Thread. Use this method instead of
387 * {@link Thread}
388 */
389 public void stopThread() {
390 if (thisThread != null)
391 exit = true;
392 }
393
394 public void run() {
395 thisThread = Thread.currentThread();
396 while (!exit) {
397 discoveryToRenew.pingResources();
398 try {
399 Thread.currentThread().sleep(timeintervall);
400 } catch (InterruptedException e) {
401 // TODO Auto-generated catch block
402 e.printStackTrace();
403 }
404 }
405 System.out.println("exit renewal Thread");
406 }
407
408 }
409
410 /**
411 * just for testing..edit as you like
412 */
413 public static void main(String[] args) {
414 BasicConfigurator.configure();
415 MulticastDiscoveryService bro = new MulticastDiscoveryService();
416 bro.startService();
417 try {
418 Thread.currentThread().sleep(5000);
419 } catch (InterruptedException e) {
420 e.printStackTrace();
421 }
422 // bro.stopService();
423 while (true)
424 try {
425 Vector<ComputingResourceContainer> v = bro.getResources();
426 for (ComputingResourceContainer container : v) {
427 System.out.print("Active Object: ");
428 System.out.println(container.getInformation().getProperty(
429 "name")
430 + " with IP "
431 + container.getInformation().getProperty("IP"));
432 }
433 Thread.currentThread().sleep(5000);
434 } catch (InterruptedException e) {
435
436 }
437 }
438
439 }

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26