/[xulu]/branches/1.8-gt2-2.6/src/appl/parallel/services/MulticastDiscoveryService.java
ViewVC logotype

Contents of /branches/1.8-gt2-2.6/src/appl/parallel/services/MulticastDiscoveryService.java

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2 - (show annotations)
Wed Feb 25 11:54:01 2009 UTC (15 years, 9 months ago) by mojays
Original Path: trunk/src/appl/parallel/services/MulticastDiscoveryService.java
File size: 11721 byte(s)
First Commit, corresponds to Revision 1008 of Wikisquare-SVN 
1 package appl.parallel.services;
2
3 import java.io.IOException;
4 import java.net.DatagramPacket;
5 import java.net.InetAddress;
6 import java.net.MulticastSocket;
7 import java.net.UnknownHostException;
8 import java.rmi.Naming;
9 import java.rmi.RemoteException;
10 import java.util.Iterator;
11 import java.util.Vector;
12
13 import org.apache.log4j.BasicConfigurator;
14 import org.apache.log4j.LogManager;
15 import org.apache.log4j.Logger;
16
17 import appl.ext.XuluConfig;
18 import appl.parallel.ComputingResource;
19 import appl.parallel.ComputingResourceContainer;
20 import appl.parallel.server.XuluServer;
21 import appl.parallel.test.PingTestObject;
22 import appl.parallel.util.Helper;
23
24 /**
25 * Responsible for discovery of RemoteResources on the network, especially but
26 * not only {@link XuluServer XuluServers}. It listens for the following
27 * messages: <br>
28 * <br>
29 * "Hello Xulu from XuluServer" - sent by RemoteResources like
30 * {@link XuluServer} when starting <br>
31 * "Goodbye Xulu" - sent by RemoteResources like {@link XuluServer} when
32 * stopping <br>
33 * <br>
34 * and sends: <br>
35 * "Hello Servers" - when started If multicasting does not work try to disable
36 * your firewall or better configure it to allow multicast packages. Notice
37 * also, that multicasting may not work in virtual machines like MS Virtual PC
38 * 2004. <Warning><b>THIS CLASS IS STILL FOR DEMONSTRATION ONLY</b></Warning>
39 *
40 * @author Dominik Appl
41 * @see XuluServer
42 */
43 public class MulticastDiscoveryService implements DiscoveryService {
44
45 // the data to send
46 final String sendHello = "Hello Servers";
47
48 // data to receive
49 final String helloFromXuluServers = "Hello Xulu from XuluServer";
50
51 final String recGoodBye = "Goodbye Xulu";
52
53 // the Thread which waits on Server hello/goodbyes
54 volatile ReceiveThread receiveThread;
55
56 // volatile RenewalThread renewalThread;
57
58 // the thread that checks if servers are alive
59 // volatile CheckThread checkThread;
60
61 // the data is read from XuluConfig
62 InetAddress group;
63
64 int port;
65
66 int lease;
67
68 MulticastSocket socket;
69
70 boolean isRunning = false;
71
72 private final Logger LOG = LogManager.getLogger(this.getClass().getName());
73
74 /**
75 * Constructs a new service reading port and lease time from
76 * {@link appl.ext.XuluConfig}
77 */
78 public MulticastDiscoveryService() {
79 XuluConfig config = XuluConfig.getXuluConfig();
80 port = Integer.valueOf(config
81 .getProperty("DiscoveryServices.multicast.port"));
82 // lease = Integer.valueOf(config
83 // .getProperty("MulticastDiscoveryService.lease"));
84 try {
85 group = InetAddress.getByName(config
86 .getProperty("DiscoveryServices.multicast.group"));
87 } catch (UnknownHostException e) {
88 // TODO Auto-generated catch block
89 e.printStackTrace();
90 }
91 }
92
93 /**
94 * Constructs a new service
95 *
96 * @param port
97 * port where the multicasting happens
98 * @param lease
99 * timeintervall (in ms) after whi
100 */
101 public MulticastDiscoveryService(int port, int lease) {
102 this();
103 this.port = port;
104 this.lease = lease;
105 }
106
107 /*
108 * (non-Javadoc)
109 *
110 * @see appl.parallel.services.Service#startService()
111 */
112 public void startService() {
113 try {
114 LOG.info("Discoveryservice started");
115 // All Servers must listen on the port specified, in order fo
116 // the socket to receive
117 if (socket == null)
118 socket = new MulticastSocket(port);
119 // open a multicast group and join it
120
121 socket.joinGroup(group);
122
123 socket.setSoTimeout(5000);
124
125 // create and send Hello world
126 DatagramPacket datagram = new DatagramPacket(sendHello.getBytes(),
127 sendHello.length(), group, port);
128 System.out.println(group);
129 LOG.info("Send hello to all listening servers with broadcast IP "
130 + group);
131
132 socket.send(datagram);
133
134 if (receiveThread == null) {
135 receiveThread = new ReceiveThread(socket, helloFromXuluServers,
136 recGoodBye);
137 }
138 if (!(receiveThread.isAlive()))
139 receiveThread.start();
140
141 } catch (UnknownHostException e) {
142 e.printStackTrace();
143 } catch (IOException e) {
144 e.printStackTrace();
145 }
146
147 }
148
149 /*
150 * (non-Javadoc)
151 *
152 * @see appl.parallel.services.DiscoveryService#getResources()
153 */
154 public Vector<ComputingResourceContainer> getResources() {
155 return receiveThread.getResources();
156 }
157
158 /*
159 * (non-Javadoc)
160 *
161 * @see appl.parallel.services.Service#stopService()
162 */
163 public void stopService() {
164 receiveThread.stopThread();
165 // renewalThread.stopThread();
166 try {
167 Thread.currentThread().sleep(100);
168 } catch (InterruptedException e) {
169 // TODO Auto-generated catch block
170 e.printStackTrace();
171 }
172 socket.close();
173
174 }
175
176 /*
177 * (non-Javadoc)
178 *
179 * @see appl.parallel.services.Service#isRunning()
180 */
181 public boolean isRunning() {
182 return isRunning;
183 }
184
185 /** waits for incoming calls */
186
187 class ReceiveThread extends Thread {
188
189 // Resources discovered so far
190 Vector<ComputingResource> discoveredResources;
191
192 private volatile Thread receiveThread;
193
194 boolean exit = false;
195
196 MulticastSocket socket;
197
198 private final String recHello2;
199
200 private final String recGoodBye2;
201
202 private boolean discoveryInProgress = false;
203
204 /**
205 * @returns a Vector of discovered Ressources before return, all values
206 * are pinged and only returned if alive
207 */
208 public Vector<ComputingResourceContainer> getResources() {
209 return Helper.getResourceContainersForVector(discoveredResources);
210 }
211
212 /**
213 * @param socket2
214 * Socket which is used for listening
215 * @param helloFromXuluServers
216 * String that equals the hello message of the server
217 * @param recGoodBye
218 * String that equals the Goordbye message of the server
219 */
220 public ReceiveThread(MulticastSocket socket, String recHello,
221 String recGoodBye) {
222 recHello2 = recHello;
223 recGoodBye2 = recGoodBye;
224 this.socket = socket;
225 discoveredResources = new Vector<ComputingResource>();
226 }
227
228 /**
229 * Safe method to stop the Thread. Use this method instead of
230 * {@link Thread}
231 */
232 public void stopThread() {
233 if (receiveThread != null) {
234 exit = true;
235 // the only way to stop the Thread is to send a package so that
236 // the receive operation no longer blocks
237 DatagramPacket datagram = new DatagramPacket(
238 "Stopping receive Thread".getBytes(),
239 "Stopping receive Thread".length(), group, port);
240 try {
241 socket.send(datagram);
242 // wait on stop
243 Thread.currentThread().sleep(100);
244 } catch (IOException e) {
245 e.printStackTrace();
246 } catch (InterruptedException e) {
247 // TODO Auto-generated catch block
248 e.printStackTrace();
249 } finally {
250 exit = false;
251 }
252 exit = false;
253
254 }
255
256 }
257
258 public void run() {
259 receiveThread = Thread.currentThread();
260 while (!exit) {
261 try {
262 // receive buffer
263
264 byte[] buffer = new byte[256];
265 DatagramPacket datagram = new DatagramPacket(buffer,
266 buffer.length);
267 socket.receive(datagram);
268 // get the message in the right
269 byte[] result = new byte[datagram.getLength()];
270 System.arraycopy(datagram.getData(), 0, result, 0, datagram
271 .getLength());
272 // check if message is from XuluServer
273 if (new String(result).equals(helloFromXuluServers)) {
274 LOG.debug("received hello from server "
275 + datagram.getAddress());
276
277 String newname = "rmi:/" + datagram.getAddress()
278 + "/XuluServer";
279
280 ComputingResource server = (ComputingResource) Naming
281 .lookup(newname);
282 addRessource(server, datagram.getAddress().toString());
283
284 } else if (LOG.isDebugEnabled())
285 LOG.debug("received message |" + new String(result)
286 + "| from " + datagram.getAddress());
287
288 } catch (java.net.SocketTimeoutException e) {
289 // catch timeout exception
290 }
291
292 catch (IOException e) {
293 // TODO Auto-generated catch block
294 e.printStackTrace();
295 } catch (Exception e) {
296 if (!(e instanceof InterruptedException))
297 e.printStackTrace();
298 }
299 }
300 System.out.println("exit Thread " + this.toString());
301 }
302
303 /**
304 * adds ressource to vector but first looks if its not already added
305 *
306 * @param server
307 */
308 private void addRessource(ComputingResource server, String IP) {
309 // check for double entries
310 for (ComputingResource s : discoveredResources) {
311 if (s.equals(server)) {
312 LOG.debug("Not adding " + server.toString()
313 + " to discovered Ressources again!");
314 return;
315 }
316 }
317 LOG.info("Adding Xulu Server: " + IP + "to discovered Ressources");
318 // try {
319 // Thread.currentThread().sleep(2000);
320 // } catch (InterruptedException e) {
321 // // TODO Auto-generated catch block
322 // e.printStackTrace();
323 // }
324 discoveredResources.add(server);
325 }
326
327 /**
328 * Gets the {@link Resource
329 */
330 public void pingResources() {
331 PingTestObject po = new PingTestObject(32);
332 // cannot make a for each loop because of
333 // java.util.ConcurrentModificationException
334 int i = discoveredResources.size() - 1;
335 while (i >= 0) {
336 boolean error = true;
337 ComputingResource r = discoveredResources.get(i);
338 if (!(r == null)) {
339
340 try {
341 r.ping();
342 error = false;
343 LOG.debug("Ping to " + r + " was successfull!");
344 } catch (Exception e) {
345 e.printStackTrace();
346 }
347 // if not successfull remove object
348 if (error) {
349 LOG.debug("Ping to " + r
350 + " was NOT successfull! Removing ressource");
351
352 discoveredResources.remove(r);
353 }
354 }
355 i--;
356 }
357 }
358
359 }
360
361 /**
362 * simple Thread that caused rediscovery of Ressources at a given time
363 * intervall **
364 */
365
366 class RenewalThread extends Thread {
367
368 private volatile Thread thisThread;
369
370 boolean exit = false;
371
372 private final long timeintervall;
373
374 private volatile ReceiveThread discoveryToRenew;
375
376 /**
377 * invokes rediscovery on the given thread at the given time intervall
378 */
379 public RenewalThread(ReceiveThread discoveryToRenew, long timeintervall) {
380 this.discoveryToRenew = discoveryToRenew;
381 this.timeintervall = timeintervall;
382
383 }
384
385 /**
386 * Safe method to stop this Thread. Use this method instead of
387 * {@link Thread}
388 */
389 public void stopThread() {
390 if (thisThread != null)
391 exit = true;
392 }
393
394 public void run() {
395 thisThread = Thread.currentThread();
396 while (!exit) {
397 discoveryToRenew.pingResources();
398 try {
399 Thread.currentThread().sleep(timeintervall);
400 } catch (InterruptedException e) {
401 // TODO Auto-generated catch block
402 e.printStackTrace();
403 }
404 }
405 System.out.println("exit renewal Thread");
406 }
407
408 }
409
410 /**
411 * just for testing..edit as you like
412 */
413 public static void main(String[] args) {
414 BasicConfigurator.configure();
415 MulticastDiscoveryService bro = new MulticastDiscoveryService();
416 bro.startService();
417 try {
418 Thread.currentThread().sleep(5000);
419 } catch (InterruptedException e) {
420 e.printStackTrace();
421 }
422 // bro.stopService();
423 while (true)
424 try {
425 Vector<ComputingResourceContainer> v = bro.getResources();
426 for (ComputingResourceContainer container : v) {
427 System.out.print("Active Object: ");
428 System.out.println(container.getInformation().getProperty(
429 "name")
430 + " with IP "
431 + container.getInformation().getProperty("IP"));
432 }
433 Thread.currentThread().sleep(5000);
434 } catch (InterruptedException e) {
435
436 }
437 }
438
439 }

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26