/[xulu]/branches/1.8-gt2-2.6/src/appl/parallel/services/HostnameDiscoveryService.java
ViewVC logotype

Contents of /branches/1.8-gt2-2.6/src/appl/parallel/services/HostnameDiscoveryService.java

Parent Directory Parent Directory | Revision Log Revision Log


Revision 60 - (show annotations)
Sun Oct 4 16:54:52 2009 UTC (15 years, 2 months ago) by alfonx
File size: 7857 byte(s)
* organized imports
1 package appl.parallel.services;
2
3 import java.rmi.Naming;
4 import java.rmi.UnknownHostException;
5 import java.util.Vector;
6 import java.util.concurrent.Callable;
7 import java.util.concurrent.ExecutionException;
8 import java.util.concurrent.ExecutorService;
9 import java.util.concurrent.Executors;
10 import java.util.concurrent.Future;
11 import java.util.concurrent.TimeUnit;
12 import java.util.concurrent.TimeoutException;
13
14 import org.apache.log4j.LogManager;
15 import org.apache.log4j.Logger;
16
17 import appl.ext.XuluConfig;
18 import appl.parallel.ComputingResource;
19 import appl.parallel.ComputingResourceContainer;
20 import appl.parallel.starter.Starter;
21 import appl.parallel.starter.client.StarterContainer;
22 import appl.parallel.starter.server.XuluServerStarter;
23
24 /**
25 * Very simple Discovery Service that simply looks up hosts from Property <br>
26 * <br>
27 * <code>DiscoveryServices.hostname.hosts</code> <br>
28 * <br>
29 * of the {@link XuluConfig} <br>
30 * <br>
31 * Refreshes the hostlist on every {@link #getResources()}, if <br>
32 * <br>
33 * <code> DiscoveryServices.hostname.refresh</code> <br>
34 * <br>
35 * is set to 1, it gets the ips from Property <br>
36 * <br>
37 * DiscoveryService.hostname.hosts <br>
38 * <br>
39 * in {@link XuluConfig}
40 *
41 * @author Dominik Appl
42 */
43 public class HostnameDiscoveryService implements DiscoveryService {
44 private final Logger LOG = LogManager.getLogger(this.getClass().getName());
45
46 Vector<ComputingResource> res = new Vector<ComputingResource>();
47
48 ExecutorService executor = Executors.newCachedThreadPool();
49
50 private String[] lookupStrings;
51
52 private int timeout;
53
54 private boolean firstRefresh = true;
55
56 /** @return true
57 * @see appl.parallel.services.Service#isRunning()
58 */
59 public boolean isRunning() {
60 return true;
61 }
62
63 /**
64 * @see appl.parallel.services.Service#startService()
65 */
66 public void startService() {
67 executor = Executors.newCachedThreadPool();
68 }
69
70 /**
71 * stops the service
72 */
73 public void stopService() {
74 executor.shutdownNow();
75 }
76
77 /**
78 * tries to (re-)discover every resource in Property <code>HostnameDiscoveryService.hosts</code>
79 * of {@link XuluConfig}. After the time in ms specified in DiscoveryServices.timeout
80 *
81 */
82 @SuppressWarnings("unchecked")
83 public synchronized void refresh() {
84 //get timeout
85 initVariables();
86 res.addAll(getResourcesForLookupStrings());
87 }
88
89 private void initVariables() {
90 timeout = XuluConfig.getXuluConfig().getIntProperty(
91 "DiscoveryServices.timeout");
92 if (timeout == 0) {
93 LOG
94 .warn("DiscoveryServices.timeout was not set or was 0! Setting connect timeout to 500...");
95 timeout = 500;
96 }
97 //get IPs
98 res.clear();
99 String[] ips = XuluConfig.getXuluConfig().getMultiProperty(
100 "DiscoveryServices.hostname.hosts");
101 if (ips.length == 0 || (ips.length == 1 && ips[0].equals(""))) {
102 LOG.warn("No Resources found in DiscoveryServices.hostname.hosts");
103 ips = new String[0];
104 }
105 //generate lookupStrings:
106 lookupStrings = ips;
107 }
108
109 /** gets resources, but pings them first to see if they are alive. Not
110 * reachable resouces are removed.
111 *
112 * @see appl.parallel.services.DiscoveryService#getResources()
113 */
114 public Vector<ComputingResourceContainer> getResources() {
115 if (firstRefresh
116 || XuluConfig.getXuluConfig().getBooleanProperty(
117 "DiscoveryServices.hostname.refresh") == true)
118 refresh();
119 firstRefresh = false;
120 return appl.parallel.util.Helper.getResourceContainersForVector(res);
121 }
122
123 /**
124 * @return starterContainers for {@link XuluServerStarter}s
125 */
126 public Vector<StarterContainer> getStarterContainers() {
127 initVariables();
128 String[] lookupNames = new String[lookupStrings.length];
129 for (int i = 0; i < lookupStrings.length; i++)
130 lookupNames[i] = "rmi://" + lookupStrings[i] + "/XuluServerStarter";
131
132 Vector<Starter> discoveredStarters = new Vector<Starter>();
133 Vector<StarterContainer> containers = new Vector<StarterContainer>();
134 // make a new Thread for each resouce
135 Future[] futures = new Future[lookupStrings.length];
136 for (int i = 0; i < lookupStrings.length; i++) {
137 futures[i] = executor.submit(new LookupCallable(lookupNames[i]));
138 }
139
140 // the time at which the result must be there:
141 long timeoutTime = System.currentTimeMillis() + timeout;
142 for (int i = 0; i < futures.length; i++) {
143 // servers previous in iterations may have caused a delay
144 // so the remaining time is calculated
145 long remainingTime = Math.max(10, timeoutTime
146 - System.currentTimeMillis());
147
148 try {
149 Starter starter = (Starter) futures[i].get(remainingTime,
150 TimeUnit.MILLISECONDS);
151 if (starter != null) {
152 discoveredStarters.add(starter);
153 containers.add(new StarterContainer(starter,
154 lookupStrings[i]));
155 LOG.debug("Discovered Starter: " + lookupNames[i]);
156 }
157 } catch (InterruptedException e) {
158 LOG.error(e);
159 e.printStackTrace();
160 } catch (ExecutionException e) {
161 LOG.error("Error while connecting to " + lookupNames[i]
162 + " (Connect exception)");
163 } catch (TimeoutException e) {
164 LOG.warn("Could not find Starter " + lookupNames[i]
165 + ". Timed out after " + timeout + "ms");
166 futures[i].cancel(true);
167 }
168 }
169 return containers;
170 }
171
172 private Vector<ComputingResource> getResourcesForLookupStrings() {
173 initVariables();
174 String[] lookupNames = new String[lookupStrings.length];
175 for (int i = 0; i < lookupStrings.length; i++)
176 lookupNames[i] = "rmi://" + lookupStrings[i] + "/XuluServer";
177
178 Vector<ComputingResource> discoveredResources = new Vector<ComputingResource>(
179 lookupNames.length);
180 // make a new Thread for each resource
181 Future[] futures = new Future[lookupNames.length];
182 for (int i = 0; i < lookupNames.length; i++) {
183 futures[i] = executor.submit(new LookupCallable(lookupNames[i]));
184 }
185
186 // the time at which the result must be there:
187 long timeoutTime = System.currentTimeMillis() + timeout;
188 for (int i = 0; i < futures.length; i++) {
189 // servers previous in iterations may have caused a delay
190 // so the remaining time is calculated
191 long remainingTime = Math.max(0, timeoutTime
192 - System.currentTimeMillis());
193
194 try {
195 ComputingResource server = (ComputingResource) futures[i].get(
196 remainingTime, TimeUnit.MILLISECONDS);
197 discoveredResources.add(server);
198 LOG.debug("Discovered resource: " + lookupNames[i]);
199 } catch (InterruptedException e) {
200 LOG.error(e);
201 e.printStackTrace();
202 } catch (ExecutionException e) {
203 LOG.error("Error while connecting to " + lookupNames[i], e);
204 } catch (TimeoutException e) {
205 LOG.warn("Could not add server " + lookupNames[i]
206 + ". Timed out after " + timeout + "ms");
207 futures[i].cancel(true);
208 }
209 }
210 return discoveredResources;
211 }
212
213 /**
214 * This class is used for lookup of remote resources. It handles all errors
215 * and submits them to the LOG
216 *
217 * @author Dominik Appl
218 */
219 private class LookupCallable implements Callable {
220
221 private final String lookupString;
222
223 private ComputingResource server = null;
224
225 public LookupCallable(String lookupString) {
226 this.lookupString = lookupString;
227
228 }
229
230 /* (non-Javadoc)
231 * @see java.util.concurrent.Callable#call()
232 */
233 public Object call() throws Exception {
234 try {
235 return Naming.lookup(lookupString);
236 } catch (UnknownHostException e) {
237 LOG.warn("(Unknown Host) Could not find host " + lookupString);
238 } catch (java.rmi.NotBoundException e) {
239 LOG.warn("(Not Bound) Could not find host " + lookupString);
240 } catch (java.rmi.ConnectException e) {
241 LOG.warn("(No Connection) Could not connect to host "
242 + lookupString);
243 }
244 return null;
245 }
246 }
247
248 }

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26