/[xulu]/branches/1.8-gt2-2.6/src/appl/parallel/services/HostnameDiscoveryService.java
ViewVC logotype

Contents of /branches/1.8-gt2-2.6/src/appl/parallel/services/HostnameDiscoveryService.java

Parent Directory Parent Directory | Revision Log Revision Log


Revision 47 - (show annotations)
Mon Aug 31 14:23:19 2009 UTC (15 years, 3 months ago) by mojays
File size: 8073 byte(s)
Branch 1.8-gt2-2.6 (from rev 45) for geotools 2.6 migration
1 package appl.parallel.services;
2
3 import java.net.MalformedURLException;
4 import java.rmi.ConnectException;
5 import java.rmi.Naming;
6 import java.rmi.NotBoundException;
7 import java.rmi.RemoteException;
8 import java.rmi.UnknownHostException;
9 import java.util.Iterator;
10 import java.util.Vector;
11 import java.util.concurrent.Callable;
12 import java.util.concurrent.ExecutionException;
13 import java.util.concurrent.ExecutorService;
14 import java.util.concurrent.Executors;
15 import java.util.concurrent.Future;
16 import java.util.concurrent.TimeUnit;
17 import java.util.concurrent.TimeoutException;
18
19 import org.apache.log4j.LogManager;
20 import org.apache.log4j.Logger;
21
22 import appl.ext.XuluConfig;
23 import appl.parallel.ComputingResource;
24 import appl.parallel.ComputingResourceContainer;
25 import appl.parallel.starter.Starter;
26 import appl.parallel.starter.client.StarterContainer;
27 import appl.parallel.starter.server.XuluServerStarter;
28 import appl.parallel.test.PingTestObject;
29
30 /**
31 * Very simple Discovery Service that simply looks up hosts from Property <br>
32 * <br>
33 * <code>DiscoveryServices.hostname.hosts</code> <br>
34 * <br>
35 * of the {@link XuluConfig} <br>
36 * <br>
37 * Refreshes the hostlist on every {@link #getResources()}, if <br>
38 * <br>
39 * <code> DiscoveryServices.hostname.refresh</code> <br>
40 * <br>
41 * is set to 1, it gets the ips from Property <br>
42 * <br>
43 * DiscoveryService.hostname.hosts <br>
44 * <br>
45 * in {@link XuluConfig}
46 *
47 * @author Dominik Appl
48 */
49 public class HostnameDiscoveryService implements DiscoveryService {
50 private final Logger LOG = LogManager.getLogger(this.getClass().getName());
51
52 Vector<ComputingResource> res = new Vector<ComputingResource>();
53
54 ExecutorService executor = Executors.newCachedThreadPool();
55
56 private String[] lookupStrings;
57
58 private int timeout;
59
60 private boolean firstRefresh = true;
61
62 /** @return true
63 * @see appl.parallel.services.Service#isRunning()
64 */
65 public boolean isRunning() {
66 return true;
67 }
68
69 /**
70 * @see appl.parallel.services.Service#startService()
71 */
72 public void startService() {
73 executor = Executors.newCachedThreadPool();
74 }
75
76 /**
77 * stops the service
78 */
79 public void stopService() {
80 executor.shutdownNow();
81 }
82
83 /**
84 * tries to (re-)discover every resource in Property <code>HostnameDiscoveryService.hosts</code>
85 * of {@link XuluConfig}. After the time in ms specified in DiscoveryServices.timeout
86 *
87 */
88 @SuppressWarnings("unchecked")
89 public synchronized void refresh() {
90 //get timeout
91 initVariables();
92 res.addAll(getResourcesForLookupStrings());
93 }
94
95 private void initVariables() {
96 timeout = XuluConfig.getXuluConfig().getIntProperty(
97 "DiscoveryServices.timeout");
98 if (timeout == 0) {
99 LOG
100 .warn("DiscoveryServices.timeout was not set or was 0! Setting connect timeout to 500...");
101 timeout = 500;
102 }
103 //get IPs
104 res.clear();
105 String[] ips = XuluConfig.getXuluConfig().getMultiProperty(
106 "DiscoveryServices.hostname.hosts");
107 if (ips.length == 0 || (ips.length == 1 && ips[0].equals(""))) {
108 LOG.warn("No Resources found in DiscoveryServices.hostname.hosts");
109 ips = new String[0];
110 }
111 //generate lookupStrings:
112 lookupStrings = ips;
113 }
114
115 /** gets resources, but pings them first to see if they are alive. Not
116 * reachable resouces are removed.
117 *
118 * @see appl.parallel.services.DiscoveryService#getResources()
119 */
120 public Vector<ComputingResourceContainer> getResources() {
121 if (firstRefresh
122 || XuluConfig.getXuluConfig().getBooleanProperty(
123 "DiscoveryServices.hostname.refresh") == true)
124 refresh();
125 firstRefresh = false;
126 return appl.parallel.util.Helper.getResourceContainersForVector(res);
127 }
128
129 /**
130 * @return starterContainers for {@link XuluServerStarter}s
131 */
132 public Vector<StarterContainer> getStarterContainers() {
133 initVariables();
134 String[] lookupNames = new String[lookupStrings.length];
135 for (int i = 0; i < lookupStrings.length; i++)
136 lookupNames[i] = "rmi://" + lookupStrings[i] + "/XuluServerStarter";
137
138 Vector<Starter> discoveredStarters = new Vector<Starter>();
139 Vector<StarterContainer> containers = new Vector<StarterContainer>();
140 // make a new Thread for each resouce
141 Future[] futures = new Future[lookupStrings.length];
142 for (int i = 0; i < lookupStrings.length; i++) {
143 futures[i] = executor.submit(new LookupCallable(lookupNames[i]));
144 }
145
146 // the time at which the result must be there:
147 long timeoutTime = System.currentTimeMillis() + timeout;
148 for (int i = 0; i < futures.length; i++) {
149 // servers previous in iterations may have caused a delay
150 // so the remaining time is calculated
151 long remainingTime = Math.max(10, timeoutTime
152 - System.currentTimeMillis());
153
154 try {
155 Starter starter = (Starter) futures[i].get(remainingTime,
156 TimeUnit.MILLISECONDS);
157 if (starter != null) {
158 discoveredStarters.add(starter);
159 containers.add(new StarterContainer(starter,
160 lookupStrings[i]));
161 LOG.debug("Discovered Starter: " + lookupNames[i]);
162 }
163 } catch (InterruptedException e) {
164 LOG.error(e);
165 e.printStackTrace();
166 } catch (ExecutionException e) {
167 LOG.error("Error while connecting to " + lookupNames[i]
168 + " (Connect exception)");
169 } catch (TimeoutException e) {
170 LOG.warn("Could not find Starter " + lookupNames[i]
171 + ". Timed out after " + timeout + "ms");
172 futures[i].cancel(true);
173 }
174 }
175 return containers;
176 }
177
178 private Vector<ComputingResource> getResourcesForLookupStrings() {
179 initVariables();
180 String[] lookupNames = new String[lookupStrings.length];
181 for (int i = 0; i < lookupStrings.length; i++)
182 lookupNames[i] = "rmi://" + lookupStrings[i] + "/XuluServer";
183
184 Vector<ComputingResource> discoveredResources = new Vector<ComputingResource>(
185 lookupNames.length);
186 // make a new Thread for each resource
187 Future[] futures = new Future[lookupNames.length];
188 for (int i = 0; i < lookupNames.length; i++) {
189 futures[i] = executor.submit(new LookupCallable(lookupNames[i]));
190 }
191
192 // the time at which the result must be there:
193 long timeoutTime = System.currentTimeMillis() + timeout;
194 for (int i = 0; i < futures.length; i++) {
195 // servers previous in iterations may have caused a delay
196 // so the remaining time is calculated
197 long remainingTime = Math.max(0, timeoutTime
198 - System.currentTimeMillis());
199
200 try {
201 ComputingResource server = (ComputingResource) futures[i].get(
202 remainingTime, TimeUnit.MILLISECONDS);
203 discoveredResources.add(server);
204 LOG.debug("Discovered resource: " + lookupNames[i]);
205 } catch (InterruptedException e) {
206 LOG.error(e);
207 e.printStackTrace();
208 } catch (ExecutionException e) {
209 LOG.error("Error while connecting to " + lookupNames[i], e);
210 } catch (TimeoutException e) {
211 LOG.warn("Could not add server " + lookupNames[i]
212 + ". Timed out after " + timeout + "ms");
213 futures[i].cancel(true);
214 }
215 }
216 return discoveredResources;
217 }
218
219 /**
220 * This class is used for lookup of remote resources. It handles all errors
221 * and submits them to the LOG
222 *
223 * @author Dominik Appl
224 */
225 private class LookupCallable implements Callable {
226
227 private final String lookupString;
228
229 private ComputingResource server = null;
230
231 public LookupCallable(String lookupString) {
232 this.lookupString = lookupString;
233
234 }
235
236 /* (non-Javadoc)
237 * @see java.util.concurrent.Callable#call()
238 */
239 public Object call() throws Exception {
240 try {
241 return Naming.lookup(lookupString);
242 } catch (UnknownHostException e) {
243 LOG.warn("(Unknown Host) Could not find host " + lookupString);
244 } catch (java.rmi.NotBoundException e) {
245 LOG.warn("(Not Bound) Could not find host " + lookupString);
246 } catch (java.rmi.ConnectException e) {
247 LOG.warn("(No Connection) Could not connect to host "
248 + lookupString);
249 }
250 return null;
251 }
252 }
253
254 }

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26