/[xulu]/branches/1.8-gt2-2.6/src/appl/parallel/services/MulticastDiscoveryService.java
ViewVC logotype

Contents of /branches/1.8-gt2-2.6/src/appl/parallel/services/MulticastDiscoveryService.java

Parent Directory Parent Directory | Revision Log Revision Log


Revision 60 - (show annotations)
Sun Oct 4 16:54:52 2009 UTC (15 years, 3 months ago) by alfonx
File size: 11658 byte(s)
* organized imports
1 package appl.parallel.services;
2
3 import java.io.IOException;
4 import java.net.DatagramPacket;
5 import java.net.InetAddress;
6 import java.net.MulticastSocket;
7 import java.net.UnknownHostException;
8 import java.rmi.Naming;
9 import java.util.Vector;
10
11 import org.apache.log4j.BasicConfigurator;
12 import org.apache.log4j.LogManager;
13 import org.apache.log4j.Logger;
14
15 import appl.ext.XuluConfig;
16 import appl.parallel.ComputingResource;
17 import appl.parallel.ComputingResourceContainer;
18 import appl.parallel.server.XuluServer;
19 import appl.parallel.test.PingTestObject;
20 import appl.parallel.util.Helper;
21
22 /**
23 * Responsible for discovery of RemoteResources on the network, especially but
24 * not only {@link XuluServer XuluServers}. It listens for the following
25 * messages: <br>
26 * <br>
27 * "Hello Xulu from XuluServer" - sent by RemoteResources like
28 * {@link XuluServer} when starting <br>
29 * "Goodbye Xulu" - sent by RemoteResources like {@link XuluServer} when
30 * stopping <br>
31 * <br>
32 * and sends: <br>
33 * "Hello Servers" - when started If multicasting does not work try to disable
34 * your firewall or better configure it to allow multicast packages. Notice
35 * also, that multicasting may not work in virtual machines like MS Virtual PC
36 * 2004. <Warning><b>THIS CLASS IS STILL FOR DEMONSTRATION ONLY</b></Warning>
37 *
38 * @author Dominik Appl
39 * @see XuluServer
40 */
41 public class MulticastDiscoveryService implements DiscoveryService {
42
43 // the data to send
44 final String sendHello = "Hello Servers";
45
46 // data to receive
47 final String helloFromXuluServers = "Hello Xulu from XuluServer";
48
49 final String recGoodBye = "Goodbye Xulu";
50
51 // the Thread which waits on Server hello/goodbyes
52 volatile ReceiveThread receiveThread;
53
54 // volatile RenewalThread renewalThread;
55
56 // the thread that checks if servers are alive
57 // volatile CheckThread checkThread;
58
59 // the data is read from XuluConfig
60 InetAddress group;
61
62 int port;
63
64 int lease;
65
66 MulticastSocket socket;
67
68 boolean isRunning = false;
69
70 private final Logger LOG = LogManager.getLogger(this.getClass().getName());
71
72 /**
73 * Constructs a new service reading port and lease time from
74 * {@link appl.ext.XuluConfig}
75 */
76 public MulticastDiscoveryService() {
77 XuluConfig config = XuluConfig.getXuluConfig();
78 port = Integer.valueOf(config
79 .getProperty("DiscoveryServices.multicast.port"));
80 // lease = Integer.valueOf(config
81 // .getProperty("MulticastDiscoveryService.lease"));
82 try {
83 group = InetAddress.getByName(config
84 .getProperty("DiscoveryServices.multicast.group"));
85 } catch (UnknownHostException e) {
86 // TODO Auto-generated catch block
87 e.printStackTrace();
88 }
89 }
90
91 /**
92 * Constructs a new service
93 *
94 * @param port
95 * port where the multicasting happens
96 * @param lease
97 * timeintervall (in ms) after whi
98 */
99 public MulticastDiscoveryService(int port, int lease) {
100 this();
101 this.port = port;
102 this.lease = lease;
103 }
104
105 /*
106 * (non-Javadoc)
107 *
108 * @see appl.parallel.services.Service#startService()
109 */
110 public void startService() {
111 try {
112 LOG.info("Discoveryservice started");
113 // All Servers must listen on the port specified, in order fo
114 // the socket to receive
115 if (socket == null)
116 socket = new MulticastSocket(port);
117 // open a multicast group and join it
118
119 socket.joinGroup(group);
120
121 socket.setSoTimeout(5000);
122
123 // create and send Hello world
124 DatagramPacket datagram = new DatagramPacket(sendHello.getBytes(),
125 sendHello.length(), group, port);
126 System.out.println(group);
127 LOG.info("Send hello to all listening servers with broadcast IP "
128 + group);
129
130 socket.send(datagram);
131
132 if (receiveThread == null) {
133 receiveThread = new ReceiveThread(socket, helloFromXuluServers,
134 recGoodBye);
135 }
136 if (!(receiveThread.isAlive()))
137 receiveThread.start();
138
139 } catch (UnknownHostException e) {
140 e.printStackTrace();
141 } catch (IOException e) {
142 e.printStackTrace();
143 }
144
145 }
146
147 /*
148 * (non-Javadoc)
149 *
150 * @see appl.parallel.services.DiscoveryService#getResources()
151 */
152 public Vector<ComputingResourceContainer> getResources() {
153 return receiveThread.getResources();
154 }
155
156 /*
157 * (non-Javadoc)
158 *
159 * @see appl.parallel.services.Service#stopService()
160 */
161 public void stopService() {
162 receiveThread.stopThread();
163 // renewalThread.stopThread();
164 try {
165 Thread.currentThread().sleep(100);
166 } catch (InterruptedException e) {
167 // TODO Auto-generated catch block
168 e.printStackTrace();
169 }
170 socket.close();
171
172 }
173
174 /*
175 * (non-Javadoc)
176 *
177 * @see appl.parallel.services.Service#isRunning()
178 */
179 public boolean isRunning() {
180 return isRunning;
181 }
182
183 /** waits for incoming calls */
184
185 class ReceiveThread extends Thread {
186
187 // Resources discovered so far
188 Vector<ComputingResource> discoveredResources;
189
190 private volatile Thread receiveThread;
191
192 boolean exit = false;
193
194 MulticastSocket socket;
195
196 private final String recHello2;
197
198 private final String recGoodBye2;
199
200 private boolean discoveryInProgress = false;
201
202 /**
203 * @return a Vector of discovered Ressources before return, all values
204 * are pinged and only returned if alive
205 */
206 public Vector<ComputingResourceContainer> getResources() {
207 return Helper.getResourceContainersForVector(discoveredResources);
208 }
209
210 /**
211 * @param socket2
212 * Socket which is used for listening
213 * @param helloFromXuluServers
214 * String that equals the hello message of the server
215 * @param recGoodBye
216 * String that equals the Goordbye message of the server
217 */
218 public ReceiveThread(MulticastSocket socket, String recHello,
219 String recGoodBye) {
220 recHello2 = recHello;
221 recGoodBye2 = recGoodBye;
222 this.socket = socket;
223 discoveredResources = new Vector<ComputingResource>();
224 }
225
226 /**
227 * Safe method to stop the Thread. Use this method instead of
228 * {@link Thread}
229 */
230 public void stopThread() {
231 if (receiveThread != null) {
232 exit = true;
233 // the only way to stop the Thread is to send a package so that
234 // the receive operation no longer blocks
235 DatagramPacket datagram = new DatagramPacket(
236 "Stopping receive Thread".getBytes(),
237 "Stopping receive Thread".length(), group, port);
238 try {
239 socket.send(datagram);
240 // wait on stop
241 Thread.currentThread().sleep(100);
242 } catch (IOException e) {
243 e.printStackTrace();
244 } catch (InterruptedException e) {
245 // TODO Auto-generated catch block
246 e.printStackTrace();
247 } finally {
248 exit = false;
249 }
250 exit = false;
251
252 }
253
254 }
255
256 public void run() {
257 receiveThread = Thread.currentThread();
258 while (!exit) {
259 try {
260 // receive buffer
261
262 byte[] buffer = new byte[256];
263 DatagramPacket datagram = new DatagramPacket(buffer,
264 buffer.length);
265 socket.receive(datagram);
266 // get the message in the right
267 byte[] result = new byte[datagram.getLength()];
268 System.arraycopy(datagram.getData(), 0, result, 0, datagram
269 .getLength());
270 // check if message is from XuluServer
271 if (new String(result).equals(helloFromXuluServers)) {
272 LOG.debug("received hello from server "
273 + datagram.getAddress());
274
275 String newname = "rmi:/" + datagram.getAddress()
276 + "/XuluServer";
277
278 ComputingResource server = (ComputingResource) Naming
279 .lookup(newname);
280 addRessource(server, datagram.getAddress().toString());
281
282 } else if (LOG.isDebugEnabled())
283 LOG.debug("received message |" + new String(result)
284 + "| from " + datagram.getAddress());
285
286 } catch (java.net.SocketTimeoutException e) {
287 // catch timeout exception
288 }
289
290 catch (IOException e) {
291 // TODO Auto-generated catch block
292 e.printStackTrace();
293 } catch (Exception e) {
294 if (!(e instanceof InterruptedException))
295 e.printStackTrace();
296 }
297 }
298 System.out.println("exit Thread " + this.toString());
299 }
300
301 /**
302 * adds ressource to vector but first looks if its not already added
303 *
304 * @param server
305 */
306 private void addRessource(ComputingResource server, String IP) {
307 // check for double entries
308 for (ComputingResource s : discoveredResources) {
309 if (s.equals(server)) {
310 LOG.debug("Not adding " + server.toString()
311 + " to discovered Ressources again!");
312 return;
313 }
314 }
315 LOG.info("Adding Xulu Server: " + IP + "to discovered Ressources");
316 // try {
317 // Thread.currentThread().sleep(2000);
318 // } catch (InterruptedException e) {
319 // // TODO Auto-generated catch block
320 // e.printStackTrace();
321 // }
322 discoveredResources.add(server);
323 }
324
325 /**
326 * Gets the {@link Resource
327 */
328 public void pingResources() {
329 PingTestObject po = new PingTestObject(32);
330 // cannot make a for each loop because of
331 // java.util.ConcurrentModificationException
332 int i = discoveredResources.size() - 1;
333 while (i >= 0) {
334 boolean error = true;
335 ComputingResource r = discoveredResources.get(i);
336 if (!(r == null)) {
337
338 try {
339 r.ping();
340 error = false;
341 LOG.debug("Ping to " + r + " was successfull!");
342 } catch (Exception e) {
343 e.printStackTrace();
344 }
345 // if not successfull remove object
346 if (error) {
347 LOG.debug("Ping to " + r
348 + " was NOT successfull! Removing ressource");
349
350 discoveredResources.remove(r);
351 }
352 }
353 i--;
354 }
355 }
356
357 }
358
359 /**
360 * simple Thread that caused rediscovery of Ressources at a given time
361 * intervall **
362 */
363
364 class RenewalThread extends Thread {
365
366 private volatile Thread thisThread;
367
368 boolean exit = false;
369
370 private final long timeintervall;
371
372 private volatile ReceiveThread discoveryToRenew;
373
374 /**
375 * invokes rediscovery on the given thread at the given time intervall
376 */
377 public RenewalThread(ReceiveThread discoveryToRenew, long timeintervall) {
378 this.discoveryToRenew = discoveryToRenew;
379 this.timeintervall = timeintervall;
380
381 }
382
383 /**
384 * Safe method to stop this Thread. Use this method instead of
385 * {@link Thread}
386 */
387 public void stopThread() {
388 if (thisThread != null)
389 exit = true;
390 }
391
392 public void run() {
393 thisThread = Thread.currentThread();
394 while (!exit) {
395 discoveryToRenew.pingResources();
396 try {
397 Thread.currentThread().sleep(timeintervall);
398 } catch (InterruptedException e) {
399 // TODO Auto-generated catch block
400 e.printStackTrace();
401 }
402 }
403 System.out.println("exit renewal Thread");
404 }
405
406 }
407
408 /**
409 * just for testing..edit as you like
410 */
411 public static void main(String[] args) {
412 BasicConfigurator.configure();
413 MulticastDiscoveryService bro = new MulticastDiscoveryService();
414 bro.startService();
415 try {
416 Thread.currentThread().sleep(5000);
417 } catch (InterruptedException e) {
418 e.printStackTrace();
419 }
420 // bro.stopService();
421 while (true)
422 try {
423 Vector<ComputingResourceContainer> v = bro.getResources();
424 for (ComputingResourceContainer container : v) {
425 System.out.print("Active Object: ");
426 System.out.println(container.getInformation().getProperty(
427 "name")
428 + " with IP "
429 + container.getInformation().getProperty("IP"));
430 }
431 Thread.currentThread().sleep(5000);
432 } catch (InterruptedException e) {
433
434 }
435 }
436
437 }

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26