001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transport.discovery.http; 018 019 import java.io.IOException; 020 import java.util.HashMap; 021 import java.util.HashSet; 022 import java.util.Map; 023 import java.util.Scanner; 024 import java.util.Set; 025 import java.util.concurrent.atomic.AtomicBoolean; 026 import java.util.concurrent.atomic.AtomicInteger; 027 import java.util.concurrent.atomic.AtomicReference; 028 029 import org.apache.activemq.Service; 030 import org.apache.activemq.command.DiscoveryEvent; 031 import org.apache.activemq.transport.discovery.DiscoveryAgent; 032 import org.apache.activemq.transport.discovery.DiscoveryListener; 033 import org.apache.activemq.util.IntrospectionSupport; 034 import org.apache.commons.httpclient.HttpClient; 035 import org.apache.commons.httpclient.methods.DeleteMethod; 036 import org.apache.commons.httpclient.methods.GetMethod; 037 import org.apache.commons.httpclient.methods.PutMethod; 038 import org.apache.commons.logging.Log; 039 import org.apache.commons.logging.LogFactory; 040 041 public class HTTPDiscoveryAgent implements DiscoveryAgent { 042 043 private static final Log LOG = LogFactory.getLog(HTTPDiscoveryAgent.class); 044 045 private String registryURL = "http://localhost:8080/discovery-registry/default"; 046 private HttpClient httpClient = new HttpClient(); 047 private AtomicBoolean running=new AtomicBoolean(); 048 private final AtomicReference<DiscoveryListener> discoveryListener = new AtomicReference<DiscoveryListener>(); 049 private final HashSet<String> registeredServices = new HashSet<String>(); 050 private final HashMap<String, SimpleDiscoveryEvent> discoveredServices = new HashMap<String, SimpleDiscoveryEvent>(); 051 private Thread thread; 052 private long updateInterval = 1000*10; 053 private String brokerName; 054 private boolean startEmbeddRegistry=false; 055 private Service jetty; 056 private AtomicInteger startCounter=new AtomicInteger(0); 057 058 059 private long initialReconnectDelay = 1000; 060 private long maxReconnectDelay = 1000 * 30; 061 private long backOffMultiplier = 2; 062 private boolean useExponentialBackOff=true; 063 private int maxReconnectAttempts; 064 private final Object sleepMutex = new Object(); 065 private long minConnectTime = 5000; 066 067 class SimpleDiscoveryEvent extends DiscoveryEvent { 068 069 private int connectFailures; 070 private long reconnectDelay = initialReconnectDelay; 071 private long connectTime = System.currentTimeMillis(); 072 private AtomicBoolean failed = new AtomicBoolean(false); 073 private AtomicBoolean removed = new AtomicBoolean(false); 074 075 public SimpleDiscoveryEvent(String service) { 076 super(service); 077 } 078 079 } 080 081 082 public String getGroup() { 083 return null; 084 } 085 086 public void registerService(String service) throws IOException { 087 synchronized(registeredServices) { 088 registeredServices.add(service); 089 } 090 doRegister(service); 091 } 092 093 synchronized private void doRegister(String service) { 094 String url = registryURL; 095 try { 096 PutMethod method = new PutMethod(url); 097 // method.setParams(createParams()); 098 method.setRequestHeader("service", service); 099 int responseCode = httpClient.executeMethod(method); 100 LOG.debug("PUT to "+url+" got a "+responseCode); 101 } catch (Exception e) { 102 LOG.debug("PUT to "+url+" failed with: "+e); 103 } 104 } 105 106 synchronized private void doUnRegister(String service) { 107 String url = registryURL; 108 try { 109 DeleteMethod method = new DeleteMethod(url); 110 // method.setParams(createParams()); 111 method.setRequestHeader("service", service); 112 int responseCode = httpClient.executeMethod(method); 113 LOG.debug("DELETE to "+url+" got a "+responseCode); 114 } catch (Exception e) { 115 LOG.debug("DELETE to "+url+" failed with: "+e); 116 } 117 } 118 119 // private HttpMethodParams createParams() { 120 // HttpMethodParams params = new HttpMethodParams(); 121 // params.setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler(0,false)); 122 // return params; 123 // } 124 125 synchronized private Set<String> doLookup(long freshness) { 126 String url = registryURL+"?freshness="+freshness; 127 try { 128 GetMethod method = new GetMethod(url); 129 // method.setParams(createParams()); 130 int responseCode = httpClient.executeMethod(method); 131 LOG.debug("GET to "+url+" got a "+responseCode); 132 if( responseCode == 200 ) { 133 Set<String> rc = new HashSet<String>(); 134 Scanner scanner = new Scanner(method.getResponseBodyAsStream()); 135 while( scanner.hasNextLine() ) { 136 String service = scanner.nextLine(); 137 if( service.trim().length() != 0 ) { 138 rc.add(service); 139 } 140 } 141 return rc; 142 } else { 143 LOG.debug("GET to "+url+" failed with response code: "+responseCode); 144 return null; 145 } 146 } catch (Exception e) { 147 LOG.debug("GET to "+url+" failed with: "+e); 148 return null; 149 } 150 } 151 152 public void serviceFailed(DiscoveryEvent devent) throws IOException { 153 154 final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent; 155 if (event.failed.compareAndSet(false, true)) { 156 discoveryListener.get().onServiceRemove(event); 157 if(!event.removed.get()) { 158 // Setup a thread to re-raise the event... 159 Thread thread = new Thread() { 160 public void run() { 161 162 // We detect a failed connection attempt because the service 163 // fails right away. 164 if (event.connectTime + minConnectTime > System.currentTimeMillis()) { 165 LOG.debug("Failure occured soon after the discovery event was generated. It will be clasified as a connection failure: "+event); 166 167 event.connectFailures++; 168 169 if (maxReconnectAttempts > 0 && event.connectFailures >= maxReconnectAttempts) { 170 LOG.debug("Reconnect attempts exceeded "+maxReconnectAttempts+" tries. Reconnecting has been disabled."); 171 return; 172 } 173 174 synchronized (sleepMutex) { 175 try { 176 if (!running.get() || event.removed.get()) { 177 return; 178 } 179 LOG.debug("Waiting "+event.reconnectDelay+" ms before attepting to reconnect."); 180 sleepMutex.wait(event.reconnectDelay); 181 } catch (InterruptedException ie) { 182 Thread.currentThread().interrupt(); 183 return; 184 } 185 } 186 187 if (!useExponentialBackOff) { 188 event.reconnectDelay = initialReconnectDelay; 189 } else { 190 // Exponential increment of reconnect delay. 191 event.reconnectDelay *= backOffMultiplier; 192 if (event.reconnectDelay > maxReconnectDelay) { 193 event.reconnectDelay = maxReconnectDelay; 194 } 195 } 196 197 } else { 198 event.connectFailures = 0; 199 event.reconnectDelay = initialReconnectDelay; 200 } 201 202 if (!running.get() || event.removed.get()) { 203 return; 204 } 205 206 event.connectTime = System.currentTimeMillis(); 207 event.failed.set(false); 208 discoveryListener.get().onServiceAdd(event); 209 } 210 }; 211 thread.setDaemon(true); 212 thread.start(); 213 } 214 } 215 } 216 217 218 public void setBrokerName(String brokerName) { 219 this.brokerName = brokerName; 220 } 221 222 public void setDiscoveryListener(DiscoveryListener discoveryListener) { 223 this.discoveryListener.set(discoveryListener); 224 } 225 226 public void setGroup(String group) { 227 } 228 229 public void start() throws Exception { 230 if( startCounter.addAndGet(1)==1 ) { 231 if( startEmbeddRegistry ) { 232 jetty = createEmbeddedJettyServer(); 233 Map props = new HashMap(); 234 props.put("agent", this); 235 IntrospectionSupport.setProperties(jetty, props); 236 jetty.start(); 237 } 238 239 running.set(true); 240 thread = new Thread("HTTPDiscovery Agent") { 241 @Override 242 public void run() { 243 while(running.get()) { 244 try { 245 update(); 246 Thread.sleep(updateInterval); 247 } catch (InterruptedException e) { 248 return; 249 } 250 } 251 } 252 }; 253 thread.setDaemon(true); 254 thread.start(); 255 } 256 } 257 258 /** 259 * Create the EmbeddedJettyServer instance via reflection so that we can avoid a hard runtime dependency on 260 * jetty. 261 * 262 * @return 263 * @throws Exception 264 */ 265 private Service createEmbeddedJettyServer() throws Exception { 266 Class clazz = HTTPDiscoveryAgent.class.getClassLoader().loadClass("org.apache.activemq.transport.discovery.http.EmbeddedJettyServer"); 267 return (Service)clazz.newInstance(); 268 } 269 270 private void update() { 271 // Register all our services... 272 synchronized(registeredServices) { 273 for (String service : registeredServices) { 274 doRegister(service); 275 } 276 } 277 278 // Find new registered services... 279 DiscoveryListener discoveryListener = this.discoveryListener.get(); 280 if(discoveryListener!=null) { 281 Set<String> activeServices = doLookup(updateInterval*3); 282 // If there is error talking the the central server, then activeServices == null 283 if( activeServices !=null ) { 284 synchronized(discoveredServices) { 285 286 HashSet<String> removedServices = new HashSet<String>(discoveredServices.keySet()); 287 removedServices.removeAll(activeServices); 288 289 HashSet<String> addedServices = new HashSet<String>(activeServices); 290 addedServices.removeAll(discoveredServices.keySet()); 291 addedServices.removeAll(removedServices); 292 293 for (String service : addedServices) { 294 SimpleDiscoveryEvent e = new SimpleDiscoveryEvent(service); 295 discoveredServices.put(service, e); 296 discoveryListener.onServiceAdd(e); 297 } 298 299 for (String service : removedServices) { 300 SimpleDiscoveryEvent e = discoveredServices.remove(service); 301 if( e !=null ) { 302 e.removed.set(true); 303 } 304 discoveryListener.onServiceRemove(e); 305 } 306 } 307 } 308 } 309 } 310 311 public void stop() throws Exception { 312 if( startCounter.decrementAndGet()==0 ) { 313 running.set(false); 314 if( thread!=null ) { 315 thread.join(updateInterval*3); 316 thread=null; 317 } 318 if( jetty!=null ) { 319 jetty.stop(); 320 jetty = null; 321 } 322 } 323 } 324 325 public String getRegistryURL() { 326 return registryURL; 327 } 328 329 public void setRegistryURL(String discoveryRegistryURL) { 330 this.registryURL = discoveryRegistryURL; 331 } 332 333 public long getUpdateInterval() { 334 return updateInterval; 335 } 336 337 public void setUpdateInterval(long updateInterval) { 338 this.updateInterval = updateInterval; 339 } 340 341 public boolean isStartEmbeddRegistry() { 342 return startEmbeddRegistry; 343 } 344 345 public void setStartEmbeddRegistry(boolean startEmbeddRegistry) { 346 this.startEmbeddRegistry = startEmbeddRegistry; 347 } 348 349 }