/[xulu]/trunk/src/appl/parallel/services/MulticastDiscoveryService.java
ViewVC logotype

Annotation of /trunk/src/appl/parallel/services/MulticastDiscoveryService.java

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2 - (hide annotations)
Wed Feb 25 11:54:01 2009 UTC (15 years, 11 months ago) by mojays
File size: 11721 byte(s)
First Commit, corresponds to Revision 1008 of Wikisquare-SVN 
1 mojays 2 package appl.parallel.services;
2    
3     import java.io.IOException;
4     import java.net.DatagramPacket;
5     import java.net.InetAddress;
6     import java.net.MulticastSocket;
7     import java.net.UnknownHostException;
8     import java.rmi.Naming;
9     import java.rmi.RemoteException;
10     import java.util.Iterator;
11     import java.util.Vector;
12    
13     import org.apache.log4j.BasicConfigurator;
14     import org.apache.log4j.LogManager;
15     import org.apache.log4j.Logger;
16    
17     import appl.ext.XuluConfig;
18     import appl.parallel.ComputingResource;
19     import appl.parallel.ComputingResourceContainer;
20     import appl.parallel.server.XuluServer;
21     import appl.parallel.test.PingTestObject;
22     import appl.parallel.util.Helper;
23    
24     /**
25     * Responsible for discovery of RemoteResources on the network, especially but
26     * not only {@link XuluServer XuluServers}. It listens for the following
27     * messages: <br>
28     * <br>
29     * "Hello Xulu from XuluServer" - sent by RemoteResources like
30     * {@link XuluServer} when starting <br>
31     * "Goodbye Xulu" - sent by RemoteResources like {@link XuluServer} when
32     * stopping <br>
33     * <br>
34     * and sends: <br>
35     * "Hello Servers" - when started If multicasting does not work try to disable
36     * your firewall or better configure it to allow multicast packages. Notice
37     * also, that multicasting may not work in virtual machines like MS Virtual PC
38     * 2004. <Warning><b>THIS CLASS IS STILL FOR DEMONSTRATION ONLY</b></Warning>
39     *
40     * @author Dominik Appl
41     * @see XuluServer
42     */
43     public class MulticastDiscoveryService implements DiscoveryService {
44    
45     // the data to send
46     final String sendHello = "Hello Servers";
47    
48     // data to receive
49     final String helloFromXuluServers = "Hello Xulu from XuluServer";
50    
51     final String recGoodBye = "Goodbye Xulu";
52    
53     // the Thread which waits on Server hello/goodbyes
54     volatile ReceiveThread receiveThread;
55    
56     // volatile RenewalThread renewalThread;
57    
58     // the thread that checks if servers are alive
59     // volatile CheckThread checkThread;
60    
61     // the data is read from XuluConfig
62     InetAddress group;
63    
64     int port;
65    
66     int lease;
67    
68     MulticastSocket socket;
69    
70     boolean isRunning = false;
71    
72     private final Logger LOG = LogManager.getLogger(this.getClass().getName());
73    
74     /**
75     * Constructs a new service reading port and lease time from
76     * {@link appl.ext.XuluConfig}
77     */
78     public MulticastDiscoveryService() {
79     XuluConfig config = XuluConfig.getXuluConfig();
80     port = Integer.valueOf(config
81     .getProperty("DiscoveryServices.multicast.port"));
82     // lease = Integer.valueOf(config
83     // .getProperty("MulticastDiscoveryService.lease"));
84     try {
85     group = InetAddress.getByName(config
86     .getProperty("DiscoveryServices.multicast.group"));
87     } catch (UnknownHostException e) {
88     // TODO Auto-generated catch block
89     e.printStackTrace();
90     }
91     }
92    
93     /**
94     * Constructs a new service
95     *
96     * @param port
97     * port where the multicasting happens
98     * @param lease
99     * timeintervall (in ms) after whi
100     */
101     public MulticastDiscoveryService(int port, int lease) {
102     this();
103     this.port = port;
104     this.lease = lease;
105     }
106    
107     /*
108     * (non-Javadoc)
109     *
110     * @see appl.parallel.services.Service#startService()
111     */
112     public void startService() {
113     try {
114     LOG.info("Discoveryservice started");
115     // All Servers must listen on the port specified, in order fo
116     // the socket to receive
117     if (socket == null)
118     socket = new MulticastSocket(port);
119     // open a multicast group and join it
120    
121     socket.joinGroup(group);
122    
123     socket.setSoTimeout(5000);
124    
125     // create and send Hello world
126     DatagramPacket datagram = new DatagramPacket(sendHello.getBytes(),
127     sendHello.length(), group, port);
128     System.out.println(group);
129     LOG.info("Send hello to all listening servers with broadcast IP "
130     + group);
131    
132     socket.send(datagram);
133    
134     if (receiveThread == null) {
135     receiveThread = new ReceiveThread(socket, helloFromXuluServers,
136     recGoodBye);
137     }
138     if (!(receiveThread.isAlive()))
139     receiveThread.start();
140    
141     } catch (UnknownHostException e) {
142     e.printStackTrace();
143     } catch (IOException e) {
144     e.printStackTrace();
145     }
146    
147     }
148    
149     /*
150     * (non-Javadoc)
151     *
152     * @see appl.parallel.services.DiscoveryService#getResources()
153     */
154     public Vector<ComputingResourceContainer> getResources() {
155     return receiveThread.getResources();
156     }
157    
158     /*
159     * (non-Javadoc)
160     *
161     * @see appl.parallel.services.Service#stopService()
162     */
163     public void stopService() {
164     receiveThread.stopThread();
165     // renewalThread.stopThread();
166     try {
167     Thread.currentThread().sleep(100);
168     } catch (InterruptedException e) {
169     // TODO Auto-generated catch block
170     e.printStackTrace();
171     }
172     socket.close();
173    
174     }
175    
176     /*
177     * (non-Javadoc)
178     *
179     * @see appl.parallel.services.Service#isRunning()
180     */
181     public boolean isRunning() {
182     return isRunning;
183     }
184    
185     /** waits for incoming calls */
186    
187     class ReceiveThread extends Thread {
188    
189     // Resources discovered so far
190     Vector<ComputingResource> discoveredResources;
191    
192     private volatile Thread receiveThread;
193    
194     boolean exit = false;
195    
196     MulticastSocket socket;
197    
198     private final String recHello2;
199    
200     private final String recGoodBye2;
201    
202     private boolean discoveryInProgress = false;
203    
204     /**
205     * @returns a Vector of discovered Ressources before return, all values
206     * are pinged and only returned if alive
207     */
208     public Vector<ComputingResourceContainer> getResources() {
209     return Helper.getResourceContainersForVector(discoveredResources);
210     }
211    
212     /**
213     * @param socket2
214     * Socket which is used for listening
215     * @param helloFromXuluServers
216     * String that equals the hello message of the server
217     * @param recGoodBye
218     * String that equals the Goordbye message of the server
219     */
220     public ReceiveThread(MulticastSocket socket, String recHello,
221     String recGoodBye) {
222     recHello2 = recHello;
223     recGoodBye2 = recGoodBye;
224     this.socket = socket;
225     discoveredResources = new Vector<ComputingResource>();
226     }
227    
228     /**
229     * Safe method to stop the Thread. Use this method instead of
230     * {@link Thread}
231     */
232     public void stopThread() {
233     if (receiveThread != null) {
234     exit = true;
235     // the only way to stop the Thread is to send a package so that
236     // the receive operation no longer blocks
237     DatagramPacket datagram = new DatagramPacket(
238     "Stopping receive Thread".getBytes(),
239     "Stopping receive Thread".length(), group, port);
240     try {
241     socket.send(datagram);
242     // wait on stop
243     Thread.currentThread().sleep(100);
244     } catch (IOException e) {
245     e.printStackTrace();
246     } catch (InterruptedException e) {
247     // TODO Auto-generated catch block
248     e.printStackTrace();
249     } finally {
250     exit = false;
251     }
252     exit = false;
253    
254     }
255    
256     }
257    
258     public void run() {
259     receiveThread = Thread.currentThread();
260     while (!exit) {
261     try {
262     // receive buffer
263    
264     byte[] buffer = new byte[256];
265     DatagramPacket datagram = new DatagramPacket(buffer,
266     buffer.length);
267     socket.receive(datagram);
268     // get the message in the right
269     byte[] result = new byte[datagram.getLength()];
270     System.arraycopy(datagram.getData(), 0, result, 0, datagram
271     .getLength());
272     // check if message is from XuluServer
273     if (new String(result).equals(helloFromXuluServers)) {
274     LOG.debug("received hello from server "
275     + datagram.getAddress());
276    
277     String newname = "rmi:/" + datagram.getAddress()
278     + "/XuluServer";
279    
280     ComputingResource server = (ComputingResource) Naming
281     .lookup(newname);
282     addRessource(server, datagram.getAddress().toString());
283    
284     } else if (LOG.isDebugEnabled())
285     LOG.debug("received message |" + new String(result)
286     + "| from " + datagram.getAddress());
287    
288     } catch (java.net.SocketTimeoutException e) {
289     // catch timeout exception
290     }
291    
292     catch (IOException e) {
293     // TODO Auto-generated catch block
294     e.printStackTrace();
295     } catch (Exception e) {
296     if (!(e instanceof InterruptedException))
297     e.printStackTrace();
298     }
299     }
300     System.out.println("exit Thread " + this.toString());
301     }
302    
303     /**
304     * adds ressource to vector but first looks if its not already added
305     *
306     * @param server
307     */
308     private void addRessource(ComputingResource server, String IP) {
309     // check for double entries
310     for (ComputingResource s : discoveredResources) {
311     if (s.equals(server)) {
312     LOG.debug("Not adding " + server.toString()
313     + " to discovered Ressources again!");
314     return;
315     }
316     }
317     LOG.info("Adding Xulu Server: " + IP + "to discovered Ressources");
318     // try {
319     // Thread.currentThread().sleep(2000);
320     // } catch (InterruptedException e) {
321     // // TODO Auto-generated catch block
322     // e.printStackTrace();
323     // }
324     discoveredResources.add(server);
325     }
326    
327     /**
328     * Gets the {@link Resource
329     */
330     public void pingResources() {
331     PingTestObject po = new PingTestObject(32);
332     // cannot make a for each loop because of
333     // java.util.ConcurrentModificationException
334     int i = discoveredResources.size() - 1;
335     while (i >= 0) {
336     boolean error = true;
337     ComputingResource r = discoveredResources.get(i);
338     if (!(r == null)) {
339    
340     try {
341     r.ping();
342     error = false;
343     LOG.debug("Ping to " + r + " was successfull!");
344     } catch (Exception e) {
345     e.printStackTrace();
346     }
347     // if not successfull remove object
348     if (error) {
349     LOG.debug("Ping to " + r
350     + " was NOT successfull! Removing ressource");
351    
352     discoveredResources.remove(r);
353     }
354     }
355     i--;
356     }
357     }
358    
359     }
360    
361     /**
362     * simple Thread that caused rediscovery of Ressources at a given time
363     * intervall **
364     */
365    
366     class RenewalThread extends Thread {
367    
368     private volatile Thread thisThread;
369    
370     boolean exit = false;
371    
372     private final long timeintervall;
373    
374     private volatile ReceiveThread discoveryToRenew;
375    
376     /**
377     * invokes rediscovery on the given thread at the given time intervall
378     */
379     public RenewalThread(ReceiveThread discoveryToRenew, long timeintervall) {
380     this.discoveryToRenew = discoveryToRenew;
381     this.timeintervall = timeintervall;
382    
383     }
384    
385     /**
386     * Safe method to stop this Thread. Use this method instead of
387     * {@link Thread}
388     */
389     public void stopThread() {
390     if (thisThread != null)
391     exit = true;
392     }
393    
394     public void run() {
395     thisThread = Thread.currentThread();
396     while (!exit) {
397     discoveryToRenew.pingResources();
398     try {
399     Thread.currentThread().sleep(timeintervall);
400     } catch (InterruptedException e) {
401     // TODO Auto-generated catch block
402     e.printStackTrace();
403     }
404     }
405     System.out.println("exit renewal Thread");
406     }
407    
408     }
409    
410     /**
411     * just for testing..edit as you like
412     */
413     public static void main(String[] args) {
414     BasicConfigurator.configure();
415     MulticastDiscoveryService bro = new MulticastDiscoveryService();
416     bro.startService();
417     try {
418     Thread.currentThread().sleep(5000);
419     } catch (InterruptedException e) {
420     e.printStackTrace();
421     }
422     // bro.stopService();
423     while (true)
424     try {
425     Vector<ComputingResourceContainer> v = bro.getResources();
426     for (ComputingResourceContainer container : v) {
427     System.out.print("Active Object: ");
428     System.out.println(container.getInformation().getProperty(
429     "name")
430     + " with IP "
431     + container.getInformation().getProperty("IP"));
432     }
433     Thread.currentThread().sleep(5000);
434     } catch (InterruptedException e) {
435    
436     }
437     }
438    
439     }

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26