/[xulu]/trunk/src/appl/parallel/services/MulticastDiscoveryService.java
ViewVC logotype

Annotation of /trunk/src/appl/parallel/services/MulticastDiscoveryService.java

Parent Directory Parent Directory | Revision Log Revision Log


Revision 78 - (hide annotations)
Wed Feb 10 16:43:46 2010 UTC (14 years, 10 months ago) by alfonx
File size: 11658 byte(s)
Merged branch 1.8-gt2-2.6 to trunk. Now the trunk is based on GeoTools 2.6.1 and schmitzm-2.0.x
1 mojays 2 package appl.parallel.services;
2    
3     import java.io.IOException;
4     import java.net.DatagramPacket;
5     import java.net.InetAddress;
6     import java.net.MulticastSocket;
7     import java.net.UnknownHostException;
8     import java.rmi.Naming;
9     import java.util.Vector;
10    
11     import org.apache.log4j.BasicConfigurator;
12     import org.apache.log4j.LogManager;
13     import org.apache.log4j.Logger;
14    
15     import appl.ext.XuluConfig;
16     import appl.parallel.ComputingResource;
17     import appl.parallel.ComputingResourceContainer;
18     import appl.parallel.server.XuluServer;
19     import appl.parallel.test.PingTestObject;
20     import appl.parallel.util.Helper;
21    
22     /**
23     * Responsible for discovery of RemoteResources on the network, especially but
24     * not only {@link XuluServer XuluServers}. It listens for the following
25     * messages: <br>
26     * <br>
27     * "Hello Xulu from XuluServer" - sent by RemoteResources like
28     * {@link XuluServer} when starting <br>
29     * "Goodbye Xulu" - sent by RemoteResources like {@link XuluServer} when
30     * stopping <br>
31     * <br>
32     * and sends: <br>
33     * "Hello Servers" - when started If multicasting does not work try to disable
34     * your firewall or better configure it to allow multicast packages. Notice
35     * also, that multicasting may not work in virtual machines like MS Virtual PC
36     * 2004. <Warning><b>THIS CLASS IS STILL FOR DEMONSTRATION ONLY</b></Warning>
37     *
38     * @author Dominik Appl
39     * @see XuluServer
40     */
41     public class MulticastDiscoveryService implements DiscoveryService {
42    
43     // the data to send
44     final String sendHello = "Hello Servers";
45    
46     // data to receive
47     final String helloFromXuluServers = "Hello Xulu from XuluServer";
48    
49     final String recGoodBye = "Goodbye Xulu";
50    
51     // the Thread which waits on Server hello/goodbyes
52     volatile ReceiveThread receiveThread;
53    
54     // volatile RenewalThread renewalThread;
55    
56     // the thread that checks if servers are alive
57     // volatile CheckThread checkThread;
58    
59     // the data is read from XuluConfig
60     InetAddress group;
61    
62     int port;
63    
64     int lease;
65    
66     MulticastSocket socket;
67    
68     boolean isRunning = false;
69    
70     private final Logger LOG = LogManager.getLogger(this.getClass().getName());
71    
72     /**
73     * Constructs a new service reading port and lease time from
74     * {@link appl.ext.XuluConfig}
75     */
76     public MulticastDiscoveryService() {
77     XuluConfig config = XuluConfig.getXuluConfig();
78     port = Integer.valueOf(config
79     .getProperty("DiscoveryServices.multicast.port"));
80     // lease = Integer.valueOf(config
81     // .getProperty("MulticastDiscoveryService.lease"));
82     try {
83     group = InetAddress.getByName(config
84     .getProperty("DiscoveryServices.multicast.group"));
85     } catch (UnknownHostException e) {
86     // TODO Auto-generated catch block
87     e.printStackTrace();
88     }
89     }
90    
91     /**
92     * Constructs a new service
93     *
94     * @param port
95     * port where the multicasting happens
96     * @param lease
97     * timeintervall (in ms) after whi
98     */
99     public MulticastDiscoveryService(int port, int lease) {
100     this();
101     this.port = port;
102     this.lease = lease;
103     }
104    
105     /*
106     * (non-Javadoc)
107     *
108     * @see appl.parallel.services.Service#startService()
109     */
110     public void startService() {
111     try {
112     LOG.info("Discoveryservice started");
113     // All Servers must listen on the port specified, in order fo
114     // the socket to receive
115     if (socket == null)
116     socket = new MulticastSocket(port);
117     // open a multicast group and join it
118    
119     socket.joinGroup(group);
120    
121     socket.setSoTimeout(5000);
122    
123     // create and send Hello world
124     DatagramPacket datagram = new DatagramPacket(sendHello.getBytes(),
125     sendHello.length(), group, port);
126     System.out.println(group);
127     LOG.info("Send hello to all listening servers with broadcast IP "
128     + group);
129    
130     socket.send(datagram);
131    
132     if (receiveThread == null) {
133     receiveThread = new ReceiveThread(socket, helloFromXuluServers,
134     recGoodBye);
135     }
136     if (!(receiveThread.isAlive()))
137     receiveThread.start();
138    
139     } catch (UnknownHostException e) {
140     e.printStackTrace();
141     } catch (IOException e) {
142     e.printStackTrace();
143     }
144    
145     }
146    
147     /*
148     * (non-Javadoc)
149     *
150     * @see appl.parallel.services.DiscoveryService#getResources()
151     */
152     public Vector<ComputingResourceContainer> getResources() {
153     return receiveThread.getResources();
154     }
155    
156     /*
157     * (non-Javadoc)
158     *
159     * @see appl.parallel.services.Service#stopService()
160     */
161     public void stopService() {
162     receiveThread.stopThread();
163     // renewalThread.stopThread();
164     try {
165     Thread.currentThread().sleep(100);
166     } catch (InterruptedException e) {
167     // TODO Auto-generated catch block
168     e.printStackTrace();
169     }
170     socket.close();
171    
172     }
173    
174     /*
175     * (non-Javadoc)
176     *
177     * @see appl.parallel.services.Service#isRunning()
178     */
179     public boolean isRunning() {
180     return isRunning;
181     }
182    
183     /** waits for incoming calls */
184    
185     class ReceiveThread extends Thread {
186    
187     // Resources discovered so far
188     Vector<ComputingResource> discoveredResources;
189    
190     private volatile Thread receiveThread;
191    
192     boolean exit = false;
193    
194     MulticastSocket socket;
195    
196     private final String recHello2;
197    
198     private final String recGoodBye2;
199    
200     private boolean discoveryInProgress = false;
201    
202     /**
203 alfonx 33 * @return a Vector of discovered Ressources before return, all values
204 mojays 2 * are pinged and only returned if alive
205     */
206     public Vector<ComputingResourceContainer> getResources() {
207     return Helper.getResourceContainersForVector(discoveredResources);
208     }
209    
210     /**
211     * @param socket2
212     * Socket which is used for listening
213     * @param helloFromXuluServers
214     * String that equals the hello message of the server
215     * @param recGoodBye
216     * String that equals the Goordbye message of the server
217     */
218     public ReceiveThread(MulticastSocket socket, String recHello,
219     String recGoodBye) {
220     recHello2 = recHello;
221     recGoodBye2 = recGoodBye;
222     this.socket = socket;
223     discoveredResources = new Vector<ComputingResource>();
224     }
225    
226     /**
227     * Safe method to stop the Thread. Use this method instead of
228     * {@link Thread}
229     */
230     public void stopThread() {
231     if (receiveThread != null) {
232     exit = true;
233     // the only way to stop the Thread is to send a package so that
234     // the receive operation no longer blocks
235     DatagramPacket datagram = new DatagramPacket(
236     "Stopping receive Thread".getBytes(),
237     "Stopping receive Thread".length(), group, port);
238     try {
239     socket.send(datagram);
240     // wait on stop
241     Thread.currentThread().sleep(100);
242     } catch (IOException e) {
243     e.printStackTrace();
244     } catch (InterruptedException e) {
245     // TODO Auto-generated catch block
246     e.printStackTrace();
247     } finally {
248     exit = false;
249     }
250     exit = false;
251    
252     }
253    
254     }
255    
256     public void run() {
257     receiveThread = Thread.currentThread();
258     while (!exit) {
259     try {
260     // receive buffer
261    
262     byte[] buffer = new byte[256];
263     DatagramPacket datagram = new DatagramPacket(buffer,
264     buffer.length);
265     socket.receive(datagram);
266     // get the message in the right
267     byte[] result = new byte[datagram.getLength()];
268     System.arraycopy(datagram.getData(), 0, result, 0, datagram
269     .getLength());
270     // check if message is from XuluServer
271     if (new String(result).equals(helloFromXuluServers)) {
272     LOG.debug("received hello from server "
273     + datagram.getAddress());
274    
275     String newname = "rmi:/" + datagram.getAddress()
276     + "/XuluServer";
277    
278     ComputingResource server = (ComputingResource) Naming
279     .lookup(newname);
280     addRessource(server, datagram.getAddress().toString());
281    
282     } else if (LOG.isDebugEnabled())
283     LOG.debug("received message |" + new String(result)
284     + "| from " + datagram.getAddress());
285    
286     } catch (java.net.SocketTimeoutException e) {
287     // catch timeout exception
288     }
289    
290     catch (IOException e) {
291     // TODO Auto-generated catch block
292     e.printStackTrace();
293     } catch (Exception e) {
294     if (!(e instanceof InterruptedException))
295     e.printStackTrace();
296     }
297     }
298     System.out.println("exit Thread " + this.toString());
299     }
300    
301     /**
302     * adds ressource to vector but first looks if its not already added
303     *
304     * @param server
305     */
306     private void addRessource(ComputingResource server, String IP) {
307     // check for double entries
308     for (ComputingResource s : discoveredResources) {
309     if (s.equals(server)) {
310     LOG.debug("Not adding " + server.toString()
311     + " to discovered Ressources again!");
312     return;
313     }
314     }
315     LOG.info("Adding Xulu Server: " + IP + "to discovered Ressources");
316     // try {
317     // Thread.currentThread().sleep(2000);
318     // } catch (InterruptedException e) {
319     // // TODO Auto-generated catch block
320     // e.printStackTrace();
321     // }
322     discoveredResources.add(server);
323     }
324    
325     /**
326     * Gets the {@link Resource
327     */
328     public void pingResources() {
329     PingTestObject po = new PingTestObject(32);
330     // cannot make a for each loop because of
331     // java.util.ConcurrentModificationException
332     int i = discoveredResources.size() - 1;
333     while (i >= 0) {
334     boolean error = true;
335     ComputingResource r = discoveredResources.get(i);
336     if (!(r == null)) {
337    
338     try {
339     r.ping();
340     error = false;
341     LOG.debug("Ping to " + r + " was successfull!");
342     } catch (Exception e) {
343     e.printStackTrace();
344     }
345     // if not successfull remove object
346     if (error) {
347     LOG.debug("Ping to " + r
348     + " was NOT successfull! Removing ressource");
349    
350     discoveredResources.remove(r);
351     }
352     }
353     i--;
354     }
355     }
356    
357     }
358    
359     /**
360     * simple Thread that caused rediscovery of Ressources at a given time
361     * intervall **
362     */
363    
364     class RenewalThread extends Thread {
365    
366     private volatile Thread thisThread;
367    
368     boolean exit = false;
369    
370     private final long timeintervall;
371    
372     private volatile ReceiveThread discoveryToRenew;
373    
374     /**
375     * invokes rediscovery on the given thread at the given time intervall
376     */
377     public RenewalThread(ReceiveThread discoveryToRenew, long timeintervall) {
378     this.discoveryToRenew = discoveryToRenew;
379     this.timeintervall = timeintervall;
380    
381     }
382    
383     /**
384     * Safe method to stop this Thread. Use this method instead of
385     * {@link Thread}
386     */
387     public void stopThread() {
388     if (thisThread != null)
389     exit = true;
390     }
391    
392     public void run() {
393     thisThread = Thread.currentThread();
394     while (!exit) {
395     discoveryToRenew.pingResources();
396     try {
397     Thread.currentThread().sleep(timeintervall);
398     } catch (InterruptedException e) {
399     // TODO Auto-generated catch block
400     e.printStackTrace();
401     }
402     }
403     System.out.println("exit renewal Thread");
404     }
405    
406     }
407    
408     /**
409     * just for testing..edit as you like
410     */
411     public static void main(String[] args) {
412     BasicConfigurator.configure();
413     MulticastDiscoveryService bro = new MulticastDiscoveryService();
414     bro.startService();
415     try {
416     Thread.currentThread().sleep(5000);
417     } catch (InterruptedException e) {
418     e.printStackTrace();
419     }
420     // bro.stopService();
421     while (true)
422     try {
423     Vector<ComputingResourceContainer> v = bro.getResources();
424     for (ComputingResourceContainer container : v) {
425     System.out.print("Active Object: ");
426     System.out.println(container.getInformation().getProperty(
427     "name")
428     + " with IP "
429     + container.getInformation().getProperty("IP"));
430     }
431     Thread.currentThread().sleep(5000);
432     } catch (InterruptedException e) {
433    
434     }
435     }
436    
437     }

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26