/[xulu]/trunk/src/appl/parallel/services/MulticastDiscoveryService.java
ViewVC logotype

Annotation of /trunk/src/appl/parallel/services/MulticastDiscoveryService.java

Parent Directory Parent Directory | Revision Log Revision Log


Revision 33 - (hide annotations)
Fri Jun 19 14:20:37 2009 UTC (15 years, 7 months ago) by alfonx
File size: 11720 byte(s)
* Renamed classes:
LayerPanel to MapLegend (works on StyledObj)
AtlasLayerPanel to AtlasMapLegend (works on DpLayer)
LayerPanelGroup to MapLayerLegend (works on StyledObj)
AtlasLayerPanelGroup to AtlasMapLayerLegend (works on DpLayer)

Updated all JARs and committed them.

TODO: Some "ISDSS person" might want to rename:
de.isdss.util.framework.ui.panel.LayerPanelScrollPane to something like MapLegendScrollPane.java or FWMapLegendScrollPane.java

* Also committing some substitutions of "@returns" with "@return" 
* Removed testsomethong.java from xulu
1 mojays 2 package appl.parallel.services;
2    
3     import java.io.IOException;
4     import java.net.DatagramPacket;
5     import java.net.InetAddress;
6     import java.net.MulticastSocket;
7     import java.net.UnknownHostException;
8     import java.rmi.Naming;
9     import java.rmi.RemoteException;
10     import java.util.Iterator;
11     import java.util.Vector;
12    
13     import org.apache.log4j.BasicConfigurator;
14     import org.apache.log4j.LogManager;
15     import org.apache.log4j.Logger;
16    
17     import appl.ext.XuluConfig;
18     import appl.parallel.ComputingResource;
19     import appl.parallel.ComputingResourceContainer;
20     import appl.parallel.server.XuluServer;
21     import appl.parallel.test.PingTestObject;
22     import appl.parallel.util.Helper;
23    
24     /**
25     * Responsible for discovery of RemoteResources on the network, especially but
26     * not only {@link XuluServer XuluServers}. It listens for the following
27     * messages: <br>
28     * <br>
29     * "Hello Xulu from XuluServer" - sent by RemoteResources like
30     * {@link XuluServer} when starting <br>
31     * "Goodbye Xulu" - sent by RemoteResources like {@link XuluServer} when
32     * stopping <br>
33     * <br>
34     * and sends: <br>
35     * "Hello Servers" - when started If multicasting does not work try to disable
36     * your firewall or better configure it to allow multicast packages. Notice
37     * also, that multicasting may not work in virtual machines like MS Virtual PC
38     * 2004. <Warning><b>THIS CLASS IS STILL FOR DEMONSTRATION ONLY</b></Warning>
39     *
40     * @author Dominik Appl
41     * @see XuluServer
42     */
43     public class MulticastDiscoveryService implements DiscoveryService {
44    
45     // the data to send
46     final String sendHello = "Hello Servers";
47    
48     // data to receive
49     final String helloFromXuluServers = "Hello Xulu from XuluServer";
50    
51     final String recGoodBye = "Goodbye Xulu";
52    
53     // the Thread which waits on Server hello/goodbyes
54     volatile ReceiveThread receiveThread;
55    
56     // volatile RenewalThread renewalThread;
57    
58     // the thread that checks if servers are alive
59     // volatile CheckThread checkThread;
60    
61     // the data is read from XuluConfig
62     InetAddress group;
63    
64     int port;
65    
66     int lease;
67    
68     MulticastSocket socket;
69    
70     boolean isRunning = false;
71    
72     private final Logger LOG = LogManager.getLogger(this.getClass().getName());
73    
74     /**
75     * Constructs a new service reading port and lease time from
76     * {@link appl.ext.XuluConfig}
77     */
78     public MulticastDiscoveryService() {
79     XuluConfig config = XuluConfig.getXuluConfig();
80     port = Integer.valueOf(config
81     .getProperty("DiscoveryServices.multicast.port"));
82     // lease = Integer.valueOf(config
83     // .getProperty("MulticastDiscoveryService.lease"));
84     try {
85     group = InetAddress.getByName(config
86     .getProperty("DiscoveryServices.multicast.group"));
87     } catch (UnknownHostException e) {
88     // TODO Auto-generated catch block
89     e.printStackTrace();
90     }
91     }
92    
93     /**
94     * Constructs a new service
95     *
96     * @param port
97     * port where the multicasting happens
98     * @param lease
99     * timeintervall (in ms) after whi
100     */
101     public MulticastDiscoveryService(int port, int lease) {
102     this();
103     this.port = port;
104     this.lease = lease;
105     }
106    
107     /*
108     * (non-Javadoc)
109     *
110     * @see appl.parallel.services.Service#startService()
111     */
112     public void startService() {
113     try {
114     LOG.info("Discoveryservice started");
115     // All Servers must listen on the port specified, in order fo
116     // the socket to receive
117     if (socket == null)
118     socket = new MulticastSocket(port);
119     // open a multicast group and join it
120    
121     socket.joinGroup(group);
122    
123     socket.setSoTimeout(5000);
124    
125     // create and send Hello world
126     DatagramPacket datagram = new DatagramPacket(sendHello.getBytes(),
127     sendHello.length(), group, port);
128     System.out.println(group);
129     LOG.info("Send hello to all listening servers with broadcast IP "
130     + group);
131    
132     socket.send(datagram);
133    
134     if (receiveThread == null) {
135     receiveThread = new ReceiveThread(socket, helloFromXuluServers,
136     recGoodBye);
137     }
138     if (!(receiveThread.isAlive()))
139     receiveThread.start();
140    
141     } catch (UnknownHostException e) {
142     e.printStackTrace();
143     } catch (IOException e) {
144     e.printStackTrace();
145     }
146    
147     }
148    
149     /*
150     * (non-Javadoc)
151     *
152     * @see appl.parallel.services.DiscoveryService#getResources()
153     */
154     public Vector<ComputingResourceContainer> getResources() {
155     return receiveThread.getResources();
156     }
157    
158     /*
159     * (non-Javadoc)
160     *
161     * @see appl.parallel.services.Service#stopService()
162     */
163     public void stopService() {
164     receiveThread.stopThread();
165     // renewalThread.stopThread();
166     try {
167     Thread.currentThread().sleep(100);
168     } catch (InterruptedException e) {
169     // TODO Auto-generated catch block
170     e.printStackTrace();
171     }
172     socket.close();
173    
174     }
175    
176     /*
177     * (non-Javadoc)
178     *
179     * @see appl.parallel.services.Service#isRunning()
180     */
181     public boolean isRunning() {
182     return isRunning;
183     }
184    
185     /** waits for incoming calls */
186    
187     class ReceiveThread extends Thread {
188    
189     // Resources discovered so far
190     Vector<ComputingResource> discoveredResources;
191    
192     private volatile Thread receiveThread;
193    
194     boolean exit = false;
195    
196     MulticastSocket socket;
197    
198     private final String recHello2;
199    
200     private final String recGoodBye2;
201    
202     private boolean discoveryInProgress = false;
203    
204     /**
205 alfonx 33 * @return a Vector of discovered Ressources before return, all values
206 mojays 2 * are pinged and only returned if alive
207     */
208     public Vector<ComputingResourceContainer> getResources() {
209     return Helper.getResourceContainersForVector(discoveredResources);
210     }
211    
212     /**
213     * @param socket2
214     * Socket which is used for listening
215     * @param helloFromXuluServers
216     * String that equals the hello message of the server
217     * @param recGoodBye
218     * String that equals the Goordbye message of the server
219     */
220     public ReceiveThread(MulticastSocket socket, String recHello,
221     String recGoodBye) {
222     recHello2 = recHello;
223     recGoodBye2 = recGoodBye;
224     this.socket = socket;
225     discoveredResources = new Vector<ComputingResource>();
226     }
227    
228     /**
229     * Safe method to stop the Thread. Use this method instead of
230     * {@link Thread}
231     */
232     public void stopThread() {
233     if (receiveThread != null) {
234     exit = true;
235     // the only way to stop the Thread is to send a package so that
236     // the receive operation no longer blocks
237     DatagramPacket datagram = new DatagramPacket(
238     "Stopping receive Thread".getBytes(),
239     "Stopping receive Thread".length(), group, port);
240     try {
241     socket.send(datagram);
242     // wait on stop
243     Thread.currentThread().sleep(100);
244     } catch (IOException e) {
245     e.printStackTrace();
246     } catch (InterruptedException e) {
247     // TODO Auto-generated catch block
248     e.printStackTrace();
249     } finally {
250     exit = false;
251     }
252     exit = false;
253    
254     }
255    
256     }
257    
258     public void run() {
259     receiveThread = Thread.currentThread();
260     while (!exit) {
261     try {
262     // receive buffer
263    
264     byte[] buffer = new byte[256];
265     DatagramPacket datagram = new DatagramPacket(buffer,
266     buffer.length);
267     socket.receive(datagram);
268     // get the message in the right
269     byte[] result = new byte[datagram.getLength()];
270     System.arraycopy(datagram.getData(), 0, result, 0, datagram
271     .getLength());
272     // check if message is from XuluServer
273     if (new String(result).equals(helloFromXuluServers)) {
274     LOG.debug("received hello from server "
275     + datagram.getAddress());
276    
277     String newname = "rmi:/" + datagram.getAddress()
278     + "/XuluServer";
279    
280     ComputingResource server = (ComputingResource) Naming
281     .lookup(newname);
282     addRessource(server, datagram.getAddress().toString());
283    
284     } else if (LOG.isDebugEnabled())
285     LOG.debug("received message |" + new String(result)
286     + "| from " + datagram.getAddress());
287    
288     } catch (java.net.SocketTimeoutException e) {
289     // catch timeout exception
290     }
291    
292     catch (IOException e) {
293     // TODO Auto-generated catch block
294     e.printStackTrace();
295     } catch (Exception e) {
296     if (!(e instanceof InterruptedException))
297     e.printStackTrace();
298     }
299     }
300     System.out.println("exit Thread " + this.toString());
301     }
302    
303     /**
304     * adds ressource to vector but first looks if its not already added
305     *
306     * @param server
307     */
308     private void addRessource(ComputingResource server, String IP) {
309     // check for double entries
310     for (ComputingResource s : discoveredResources) {
311     if (s.equals(server)) {
312     LOG.debug("Not adding " + server.toString()
313     + " to discovered Ressources again!");
314     return;
315     }
316     }
317     LOG.info("Adding Xulu Server: " + IP + "to discovered Ressources");
318     // try {
319     // Thread.currentThread().sleep(2000);
320     // } catch (InterruptedException e) {
321     // // TODO Auto-generated catch block
322     // e.printStackTrace();
323     // }
324     discoveredResources.add(server);
325     }
326    
327     /**
328     * Gets the {@link Resource
329     */
330     public void pingResources() {
331     PingTestObject po = new PingTestObject(32);
332     // cannot make a for each loop because of
333     // java.util.ConcurrentModificationException
334     int i = discoveredResources.size() - 1;
335     while (i >= 0) {
336     boolean error = true;
337     ComputingResource r = discoveredResources.get(i);
338     if (!(r == null)) {
339    
340     try {
341     r.ping();
342     error = false;
343     LOG.debug("Ping to " + r + " was successfull!");
344     } catch (Exception e) {
345     e.printStackTrace();
346     }
347     // if not successfull remove object
348     if (error) {
349     LOG.debug("Ping to " + r
350     + " was NOT successfull! Removing ressource");
351    
352     discoveredResources.remove(r);
353     }
354     }
355     i--;
356     }
357     }
358    
359     }
360    
361     /**
362     * simple Thread that caused rediscovery of Ressources at a given time
363     * intervall **
364     */
365    
366     class RenewalThread extends Thread {
367    
368     private volatile Thread thisThread;
369    
370     boolean exit = false;
371    
372     private final long timeintervall;
373    
374     private volatile ReceiveThread discoveryToRenew;
375    
376     /**
377     * invokes rediscovery on the given thread at the given time intervall
378     */
379     public RenewalThread(ReceiveThread discoveryToRenew, long timeintervall) {
380     this.discoveryToRenew = discoveryToRenew;
381     this.timeintervall = timeintervall;
382    
383     }
384    
385     /**
386     * Safe method to stop this Thread. Use this method instead of
387     * {@link Thread}
388     */
389     public void stopThread() {
390     if (thisThread != null)
391     exit = true;
392     }
393    
394     public void run() {
395     thisThread = Thread.currentThread();
396     while (!exit) {
397     discoveryToRenew.pingResources();
398     try {
399     Thread.currentThread().sleep(timeintervall);
400     } catch (InterruptedException e) {
401     // TODO Auto-generated catch block
402     e.printStackTrace();
403     }
404     }
405     System.out.println("exit renewal Thread");
406     }
407    
408     }
409    
410     /**
411     * just for testing..edit as you like
412     */
413     public static void main(String[] args) {
414     BasicConfigurator.configure();
415     MulticastDiscoveryService bro = new MulticastDiscoveryService();
416     bro.startService();
417     try {
418     Thread.currentThread().sleep(5000);
419     } catch (InterruptedException e) {
420     e.printStackTrace();
421     }
422     // bro.stopService();
423     while (true)
424     try {
425     Vector<ComputingResourceContainer> v = bro.getResources();
426     for (ComputingResourceContainer container : v) {
427     System.out.print("Active Object: ");
428     System.out.println(container.getInformation().getProperty(
429     "name")
430     + " with IP "
431     + container.getInformation().getProperty("IP"));
432     }
433     Thread.currentThread().sleep(5000);
434     } catch (InterruptedException e) {
435    
436     }
437     }
438    
439     }

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26