/[xulu]/trunk/src/appl/parallel/services/HostnameDiscoveryService.java
ViewVC logotype

Annotation of /trunk/src/appl/parallel/services/HostnameDiscoveryService.java

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2 - (hide annotations)
Wed Feb 25 11:54:01 2009 UTC (15 years, 10 months ago) by mojays
File size: 8073 byte(s)
First Commit, corresponds to Revision 1008 of Wikisquare-SVN 
1 mojays 2 package appl.parallel.services;
2    
3     import java.net.MalformedURLException;
4     import java.rmi.ConnectException;
5     import java.rmi.Naming;
6     import java.rmi.NotBoundException;
7     import java.rmi.RemoteException;
8     import java.rmi.UnknownHostException;
9     import java.util.Iterator;
10     import java.util.Vector;
11     import java.util.concurrent.Callable;
12     import java.util.concurrent.ExecutionException;
13     import java.util.concurrent.ExecutorService;
14     import java.util.concurrent.Executors;
15     import java.util.concurrent.Future;
16     import java.util.concurrent.TimeUnit;
17     import java.util.concurrent.TimeoutException;
18    
19     import org.apache.log4j.LogManager;
20     import org.apache.log4j.Logger;
21    
22     import appl.ext.XuluConfig;
23     import appl.parallel.ComputingResource;
24     import appl.parallel.ComputingResourceContainer;
25     import appl.parallel.starter.Starter;
26     import appl.parallel.starter.client.StarterContainer;
27     import appl.parallel.starter.server.XuluServerStarter;
28     import appl.parallel.test.PingTestObject;
29    
30     /**
31     * Very simple Discovery Service that simply looks up hosts from Property <br>
32     * <br>
33     * <code>DiscoveryServices.hostname.hosts</code> <br>
34     * <br>
35     * of the {@link XuluConfig} <br>
36     * <br>
37     * Refreshes the hostlist on every {@link #getResources()}, if <br>
38     * <br>
39     * <code> DiscoveryServices.hostname.refresh</code> <br>
40     * <br>
41     * is set to 1, it gets the ips from Property <br>
42     * <br>
43     * DiscoveryService.hostname.hosts <br>
44     * <br>
45     * in {@link XuluConfig}
46     *
47     * @author Dominik Appl
48     */
49     public class HostnameDiscoveryService implements DiscoveryService {
50     private final Logger LOG = LogManager.getLogger(this.getClass().getName());
51    
52     Vector<ComputingResource> res = new Vector<ComputingResource>();
53    
54     ExecutorService executor = Executors.newCachedThreadPool();
55    
56     private String[] lookupStrings;
57    
58     private int timeout;
59    
60     private boolean firstRefresh = true;
61    
62     /** @return true
63     * @see appl.parallel.services.Service#isRunning()
64     */
65     public boolean isRunning() {
66     return true;
67     }
68    
69     /**
70     * @see appl.parallel.services.Service#startService()
71     */
72     public void startService() {
73     executor = Executors.newCachedThreadPool();
74     }
75    
76     /**
77     * stops the service
78     */
79     public void stopService() {
80     executor.shutdownNow();
81     }
82    
83     /**
84     * tries to (re-)discover every resource in Property <code>HostnameDiscoveryService.hosts</code>
85     * of {@link XuluConfig}. After the time in ms specified in DiscoveryServices.timeout
86     *
87     */
88     @SuppressWarnings("unchecked")
89     public synchronized void refresh() {
90     //get timeout
91     initVariables();
92     res.addAll(getResourcesForLookupStrings());
93     }
94    
95     private void initVariables() {
96     timeout = XuluConfig.getXuluConfig().getIntProperty(
97     "DiscoveryServices.timeout");
98     if (timeout == 0) {
99     LOG
100     .warn("DiscoveryServices.timeout was not set or was 0! Setting connect timeout to 500...");
101     timeout = 500;
102     }
103     //get IPs
104     res.clear();
105     String[] ips = XuluConfig.getXuluConfig().getMultiProperty(
106     "DiscoveryServices.hostname.hosts");
107     if (ips.length == 0 || (ips.length == 1 && ips[0].equals(""))) {
108     LOG.warn("No Resources found in DiscoveryServices.hostname.hosts");
109     ips = new String[0];
110     }
111     //generate lookupStrings:
112     lookupStrings = ips;
113     }
114    
115     /** gets resources, but pings them first to see if they are alive. Not
116     * reachable resouces are removed.
117     *
118     * @see appl.parallel.services.DiscoveryService#getResources()
119     */
120     public Vector<ComputingResourceContainer> getResources() {
121     if (firstRefresh
122     || XuluConfig.getXuluConfig().getBooleanProperty(
123     "DiscoveryServices.hostname.refresh") == true)
124     refresh();
125     firstRefresh = false;
126     return appl.parallel.util.Helper.getResourceContainersForVector(res);
127     }
128    
129     /**
130     * @return starterContainers for {@link XuluServerStarter}s
131     */
132     public Vector<StarterContainer> getStarterContainers() {
133     initVariables();
134     String[] lookupNames = new String[lookupStrings.length];
135     for (int i = 0; i < lookupStrings.length; i++)
136     lookupNames[i] = "rmi://" + lookupStrings[i] + "/XuluServerStarter";
137    
138     Vector<Starter> discoveredStarters = new Vector<Starter>();
139     Vector<StarterContainer> containers = new Vector<StarterContainer>();
140     // make a new Thread for each resouce
141     Future[] futures = new Future[lookupStrings.length];
142     for (int i = 0; i < lookupStrings.length; i++) {
143     futures[i] = executor.submit(new LookupCallable(lookupNames[i]));
144     }
145    
146     // the time at which the result must be there:
147     long timeoutTime = System.currentTimeMillis() + timeout;
148     for (int i = 0; i < futures.length; i++) {
149     // servers previous in iterations may have caused a delay
150     // so the remaining time is calculated
151     long remainingTime = Math.max(10, timeoutTime
152     - System.currentTimeMillis());
153    
154     try {
155     Starter starter = (Starter) futures[i].get(remainingTime,
156     TimeUnit.MILLISECONDS);
157     if (starter != null) {
158     discoveredStarters.add(starter);
159     containers.add(new StarterContainer(starter,
160     lookupStrings[i]));
161     LOG.debug("Discovered Starter: " + lookupNames[i]);
162     }
163     } catch (InterruptedException e) {
164     LOG.error(e);
165     e.printStackTrace();
166     } catch (ExecutionException e) {
167     LOG.error("Error while connecting to " + lookupNames[i]
168     + " (Connect exception)");
169     } catch (TimeoutException e) {
170     LOG.warn("Could not find Starter " + lookupNames[i]
171     + ". Timed out after " + timeout + "ms");
172     futures[i].cancel(true);
173     }
174     }
175     return containers;
176     }
177    
178     private Vector<ComputingResource> getResourcesForLookupStrings() {
179     initVariables();
180     String[] lookupNames = new String[lookupStrings.length];
181     for (int i = 0; i < lookupStrings.length; i++)
182     lookupNames[i] = "rmi://" + lookupStrings[i] + "/XuluServer";
183    
184     Vector<ComputingResource> discoveredResources = new Vector<ComputingResource>(
185     lookupNames.length);
186     // make a new Thread for each resource
187     Future[] futures = new Future[lookupNames.length];
188     for (int i = 0; i < lookupNames.length; i++) {
189     futures[i] = executor.submit(new LookupCallable(lookupNames[i]));
190     }
191    
192     // the time at which the result must be there:
193     long timeoutTime = System.currentTimeMillis() + timeout;
194     for (int i = 0; i < futures.length; i++) {
195     // servers previous in iterations may have caused a delay
196     // so the remaining time is calculated
197     long remainingTime = Math.max(0, timeoutTime
198     - System.currentTimeMillis());
199    
200     try {
201     ComputingResource server = (ComputingResource) futures[i].get(
202     remainingTime, TimeUnit.MILLISECONDS);
203     discoveredResources.add(server);
204     LOG.debug("Discovered resource: " + lookupNames[i]);
205     } catch (InterruptedException e) {
206     LOG.error(e);
207     e.printStackTrace();
208     } catch (ExecutionException e) {
209     LOG.error("Error while connecting to " + lookupNames[i], e);
210     } catch (TimeoutException e) {
211     LOG.warn("Could not add server " + lookupNames[i]
212     + ". Timed out after " + timeout + "ms");
213     futures[i].cancel(true);
214     }
215     }
216     return discoveredResources;
217     }
218    
219     /**
220     * This class is used for lookup of remote resources. It handles all errors
221     * and submits them to the LOG
222     *
223     * @author Dominik Appl
224     */
225     private class LookupCallable implements Callable {
226    
227     private final String lookupString;
228    
229     private ComputingResource server = null;
230    
231     public LookupCallable(String lookupString) {
232     this.lookupString = lookupString;
233    
234     }
235    
236     /* (non-Javadoc)
237     * @see java.util.concurrent.Callable#call()
238     */
239     public Object call() throws Exception {
240     try {
241     return Naming.lookup(lookupString);
242     } catch (UnknownHostException e) {
243     LOG.warn("(Unknown Host) Could not find host " + lookupString);
244     } catch (java.rmi.NotBoundException e) {
245     LOG.warn("(Not Bound) Could not find host " + lookupString);
246     } catch (java.rmi.ConnectException e) {
247     LOG.warn("(No Connection) Could not connect to host "
248     + lookupString);
249     }
250     return null;
251     }
252     }
253    
254     }

[email protected]
ViewVC Help
Powered by ViewVC 1.1.26