/[xulu]/branches/1.8-gt2-2.6/src/appl/parallel/services/HostnameDiscoveryService.java
ViewVC logotype

Annotation of /branches/1.8-gt2-2.6/src/appl/parallel/services/HostnameDiscoveryService.java

Parent Directory Parent Directory | Revision Log Revision Log


Revision 60 - (hide annotations)
Sun Oct 4 16:54:52 2009 UTC (15 years, 3 months ago) by alfonx
File size: 7857 byte(s)
* organized imports
1 mojays 2 package appl.parallel.services;
2    
3     import java.rmi.Naming;
4     import java.rmi.UnknownHostException;
5     import java.util.Vector;
6     import java.util.concurrent.Callable;
7     import java.util.concurrent.ExecutionException;
8     import java.util.concurrent.ExecutorService;
9     import java.util.concurrent.Executors;
10     import java.util.concurrent.Future;
11     import java.util.concurrent.TimeUnit;
12     import java.util.concurrent.TimeoutException;
13    
14     import org.apache.log4j.LogManager;
15     import org.apache.log4j.Logger;
16    
17     import appl.ext.XuluConfig;
18     import appl.parallel.ComputingResource;
19     import appl.parallel.ComputingResourceContainer;
20     import appl.parallel.starter.Starter;
21     import appl.parallel.starter.client.StarterContainer;
22     import appl.parallel.starter.server.XuluServerStarter;
23    
24     /**
25     * Very simple Discovery Service that simply looks up hosts from Property <br>
26     * <br>
27     * <code>DiscoveryServices.hostname.hosts</code> <br>
28     * <br>
29     * of the {@link XuluConfig} <br>
30     * <br>
31     * Refreshes the hostlist on every {@link #getResources()}, if <br>
32     * <br>
33     * <code> DiscoveryServices.hostname.refresh</code> <br>
34     * <br>
35     * is set to 1, it gets the ips from Property <br>
36     * <br>
37     * DiscoveryService.hostname.hosts <br>
38     * <br>
39     * in {@link XuluConfig}
40     *
41     * @author Dominik Appl
42     */
43     public class HostnameDiscoveryService implements DiscoveryService {
44     private final Logger LOG = LogManager.getLogger(this.getClass().getName());
45    
46     Vector<ComputingResource> res = new Vector<ComputingResource>();
47    
48     ExecutorService executor = Executors.newCachedThreadPool();
49    
50     private String[] lookupStrings;
51    
52     private int timeout;
53    
54     private boolean firstRefresh = true;
55    
56     /** @return true
57     * @see appl.parallel.services.Service#isRunning()
58     */
59     public boolean isRunning() {
60     return true;
61     }
62    
63     /**
64     * @see appl.parallel.services.Service#startService()
65     */
66     public void startService() {
67     executor = Executors.newCachedThreadPool();
68     }
69    
70     /**
71     * stops the service
72     */
73     public void stopService() {
74     executor.shutdownNow();
75     }
76    
77     /**
78     * tries to (re-)discover every resource in Property <code>HostnameDiscoveryService.hosts</code>
79     * of {@link XuluConfig}. After the time in ms specified in DiscoveryServices.timeout
80     *
81     */
82     @SuppressWarnings("unchecked")
83     public synchronized void refresh() {
84     //get timeout
85     initVariables();
86     res.addAll(getResourcesForLookupStrings());
87     }
88    
89     private void initVariables() {
90     timeout = XuluConfig.getXuluConfig().getIntProperty(
91     "DiscoveryServices.timeout");
92     if (timeout == 0) {
93     LOG
94     .warn("DiscoveryServices.timeout was not set or was 0! Setting connect timeout to 500...");
95     timeout = 500;
96     }
97     //get IPs
98     res.clear();
99     String[] ips = XuluConfig.getXuluConfig().getMultiProperty(
100     "DiscoveryServices.hostname.hosts");
101     if (ips.length == 0 || (ips.length == 1 && ips[0].equals(""))) {
102     LOG.warn("No Resources found in DiscoveryServices.hostname.hosts");
103     ips = new String[0];
104     }
105     //generate lookupStrings:
106     lookupStrings = ips;
107     }
108    
109     /** gets resources, but pings them first to see if they are alive. Not
110     * reachable resouces are removed.
111     *
112     * @see appl.parallel.services.DiscoveryService#getResources()
113     */
114     public Vector<ComputingResourceContainer> getResources() {
115     if (firstRefresh
116     || XuluConfig.getXuluConfig().getBooleanProperty(
117     "DiscoveryServices.hostname.refresh") == true)
118     refresh();
119     firstRefresh = false;
120     return appl.parallel.util.Helper.getResourceContainersForVector(res);
121     }
122    
123     /**
124     * @return starterContainers for {@link XuluServerStarter}s
125     */
126     public Vector<StarterContainer> getStarterContainers() {
127     initVariables();
128     String[] lookupNames = new String[lookupStrings.length];
129     for (int i = 0; i < lookupStrings.length; i++)
130     lookupNames[i] = "rmi://" + lookupStrings[i] + "/XuluServerStarter";
131    
132     Vector<Starter> discoveredStarters = new Vector<Starter>();
133     Vector<StarterContainer> containers = new Vector<StarterContainer>();
134     // make a new Thread for each resouce
135     Future[] futures = new Future[lookupStrings.length];
136     for (int i = 0; i < lookupStrings.length; i++) {
137     futures[i] = executor.submit(new LookupCallable(lookupNames[i]));
138     }
139    
140     // the time at which the result must be there:
141     long timeoutTime = System.currentTimeMillis() + timeout;
142     for (int i = 0; i < futures.length; i++) {
143     // servers previous in iterations may have caused a delay
144     // so the remaining time is calculated
145     long remainingTime = Math.max(10, timeoutTime
146     - System.currentTimeMillis());
147    
148     try {
149     Starter starter = (Starter) futures[i].get(remainingTime,
150     TimeUnit.MILLISECONDS);
151     if (starter != null) {
152     discoveredStarters.add(starter);
153     containers.add(new StarterContainer(starter,
154     lookupStrings[i]));
155     LOG.debug("Discovered Starter: " + lookupNames[i]);
156     }
157     } catch (InterruptedException e) {
158     LOG.error(e);
159     e.printStackTrace();
160     } catch (ExecutionException e) {
161     LOG.error("Error while connecting to " + lookupNames[i]
162     + " (Connect exception)");
163     } catch (TimeoutException e) {
164     LOG.warn("Could not find Starter " + lookupNames[i]
165     + ". Timed out after " + timeout + "ms");
166     futures[i].cancel(true);
167     }
168     }
169     return containers;
170     }
171    
172     private Vector<ComputingResource> getResourcesForLookupStrings() {
173     initVariables();
174     String[] lookupNames = new String[lookupStrings.length];
175     for (int i = 0; i < lookupStrings.length; i++)
176     lookupNames[i] = "rmi://" + lookupStrings[i] + "/XuluServer";
177    
178     Vector<ComputingResource> discoveredResources = new Vector<ComputingResource>(
179     lookupNames.length);
180     // make a new Thread for each resource
181     Future[] futures = new Future[lookupNames.length];
182     for (int i = 0; i < lookupNames.length; i++) {
183     futures[i] = executor.submit(new LookupCallable(lookupNames[i]));
184     }
185    
186     // the time at which the result must be there:
187     long timeoutTime = System.currentTimeMillis() + timeout;
188     for (int i = 0; i < futures.length; i++) {
189     // servers previous in iterations may have caused a delay
190     // so the remaining time is calculated
191     long remainingTime = Math.max(0, timeoutTime
192     - System.currentTimeMillis());
193    
194     try {
195     ComputingResource server = (ComputingResource) futures[i].get(
196     remainingTime, TimeUnit.MILLISECONDS);
197     discoveredResources.add(server);
198     LOG.debug("Discovered resource: " + lookupNames[i]);
199     } catch (InterruptedException e) {
200     LOG.error(e);
201     e.printStackTrace();
202     } catch (ExecutionException e) {
203     LOG.error("Error while connecting to " + lookupNames[i], e);
204     } catch (TimeoutException e) {
205     LOG.warn("Could not add server " + lookupNames[i]
206     + ". Timed out after " + timeout + "ms");
207     futures[i].cancel(true);
208     }
209     }
210     return discoveredResources;
211     }
212    
213     /**
214     * This class is used for lookup of remote resources. It handles all errors
215     * and submits them to the LOG
216     *
217     * @author Dominik Appl
218     */
219     private class LookupCallable implements Callable {
220    
221     private final String lookupString;
222    
223     private ComputingResource server = null;
224    
225     public LookupCallable(String lookupString) {
226     this.lookupString = lookupString;
227    
228     }
229    
230     /* (non-Javadoc)
231     * @see java.util.concurrent.Callable#call()
232     */
233     public Object call() throws Exception {
234     try {
235     return Naming.lookup(lookupString);
236     } catch (UnknownHostException e) {
237     LOG.warn("(Unknown Host) Could not find host " + lookupString);
238     } catch (java.rmi.NotBoundException e) {
239     LOG.warn("(Not Bound) Could not find host " + lookupString);
240     } catch (java.rmi.ConnectException e) {
241     LOG.warn("(No Connection) Could not connect to host "
242     + lookupString);
243     }
244     return null;
245     }
246     }
247    
248     }

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26