001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transport.discovery.simple; 018 019 import java.io.IOException; 020 import java.net.URI; 021 import java.util.concurrent.SynchronousQueue; 022 import java.util.concurrent.ThreadFactory; 023 import java.util.concurrent.ThreadPoolExecutor; 024 import java.util.concurrent.TimeUnit; 025 import java.util.concurrent.atomic.AtomicBoolean; 026 027 import org.apache.activemq.command.DiscoveryEvent; 028 import org.apache.activemq.transport.discovery.DiscoveryAgent; 029 import org.apache.activemq.transport.discovery.DiscoveryListener; 030 import org.apache.commons.logging.Log; 031 import org.apache.commons.logging.LogFactory; 032 033 /** 034 * A simple DiscoveryAgent that allows static configuration of the discovered 035 * services. 036 * 037 * @version $Revision$ 038 */ 039 public class SimpleDiscoveryAgent implements DiscoveryAgent { 040 041 private final static Log LOG = LogFactory.getLog(SimpleDiscoveryAgent.class); 042 private static final ThreadPoolExecutor ASYNC_TASKS; 043 private long initialReconnectDelay = 1000; 044 private long maxReconnectDelay = 1000 * 30; 045 private long backOffMultiplier = 2; 046 private boolean useExponentialBackOff=true; 047 private int maxReconnectAttempts; 048 private final Object sleepMutex = new Object(); 049 private long minConnectTime = 5000; 050 private DiscoveryListener listener; 051 private String services[] = new String[] {}; 052 private final AtomicBoolean running = new AtomicBoolean(false); 053 054 class SimpleDiscoveryEvent extends DiscoveryEvent { 055 056 private int connectFailures; 057 private long reconnectDelay = initialReconnectDelay; 058 private long connectTime = System.currentTimeMillis(); 059 private AtomicBoolean failed = new AtomicBoolean(false); 060 061 public SimpleDiscoveryEvent(String service) { 062 super(service); 063 } 064 065 } 066 067 public void setDiscoveryListener(DiscoveryListener listener) { 068 this.listener = listener; 069 } 070 071 public void registerService(String name) throws IOException { 072 } 073 074 public void start() throws Exception { 075 running.set(true); 076 for (int i = 0; i < services.length; i++) { 077 listener.onServiceAdd(new SimpleDiscoveryEvent(services[i])); 078 } 079 } 080 081 public void stop() throws Exception { 082 running.set(false); 083 synchronized (sleepMutex) { 084 sleepMutex.notifyAll(); 085 } 086 } 087 088 public String[] getServices() { 089 return services; 090 } 091 092 public void setServices(String services) { 093 this.services = services.split(","); 094 } 095 096 public void setServices(String services[]) { 097 this.services = services; 098 } 099 100 public void setServices(URI services[]) { 101 this.services = new String[services.length]; 102 for (int i = 0; i < services.length; i++) { 103 this.services[i] = services[i].toString(); 104 } 105 } 106 107 public void serviceFailed(DiscoveryEvent devent) throws IOException { 108 109 final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent; 110 if (event.failed.compareAndSet(false, true)) { 111 112 listener.onServiceRemove(event); 113 ASYNC_TASKS.execute(new Runnable() { 114 public void run() { 115 116 // We detect a failed connection attempt because the service 117 // fails right 118 // away. 119 if (event.connectTime + minConnectTime > System.currentTimeMillis()) { 120 LOG.debug("Failure occured soon after the discovery event was generated. It will be clasified as a connection failure: "+event); 121 122 event.connectFailures++; 123 124 if (maxReconnectAttempts > 0 && event.connectFailures >= maxReconnectAttempts) { 125 LOG.debug("Reconnect attempts exceeded "+maxReconnectAttempts+" tries. Reconnecting has been disabled."); 126 return; 127 } 128 129 synchronized (sleepMutex) { 130 try { 131 if (!running.get()) { 132 return; 133 } 134 135 LOG.debug("Waiting "+event.reconnectDelay+" ms before attepting to reconnect."); 136 sleepMutex.wait(event.reconnectDelay); 137 } catch (InterruptedException ie) { 138 Thread.currentThread().interrupt(); 139 return; 140 } 141 } 142 143 if (!useExponentialBackOff) { 144 event.reconnectDelay = initialReconnectDelay; 145 } else { 146 // Exponential increment of reconnect delay. 147 event.reconnectDelay *= backOffMultiplier; 148 if (event.reconnectDelay > maxReconnectDelay) { 149 event.reconnectDelay = maxReconnectDelay; 150 } 151 } 152 153 } else { 154 event.connectFailures = 0; 155 event.reconnectDelay = initialReconnectDelay; 156 } 157 158 if (!running.get()) { 159 return; 160 } 161 162 event.connectTime = System.currentTimeMillis(); 163 event.failed.set(false); 164 listener.onServiceAdd(event); 165 } 166 }); 167 } 168 } 169 170 public long getBackOffMultiplier() { 171 return backOffMultiplier; 172 } 173 174 public void setBackOffMultiplier(long backOffMultiplier) { 175 this.backOffMultiplier = backOffMultiplier; 176 } 177 178 public long getInitialReconnectDelay() { 179 return initialReconnectDelay; 180 } 181 182 public void setInitialReconnectDelay(long initialReconnectDelay) { 183 this.initialReconnectDelay = initialReconnectDelay; 184 } 185 186 public int getMaxReconnectAttempts() { 187 return maxReconnectAttempts; 188 } 189 190 public void setMaxReconnectAttempts(int maxReconnectAttempts) { 191 this.maxReconnectAttempts = maxReconnectAttempts; 192 } 193 194 public long getMaxReconnectDelay() { 195 return maxReconnectDelay; 196 } 197 198 public void setMaxReconnectDelay(long maxReconnectDelay) { 199 this.maxReconnectDelay = maxReconnectDelay; 200 } 201 202 public long getMinConnectTime() { 203 return minConnectTime; 204 } 205 206 public void setMinConnectTime(long minConnectTime) { 207 this.minConnectTime = minConnectTime; 208 } 209 210 public boolean isUseExponentialBackOff() { 211 return useExponentialBackOff; 212 } 213 214 public void setUseExponentialBackOff(boolean useExponentialBackOff) { 215 this.useExponentialBackOff = useExponentialBackOff; 216 } 217 218 static { 219 ASYNC_TASKS = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() { 220 public Thread newThread(Runnable runnable) { 221 Thread thread = new Thread(runnable, "Simple Discovery Agent: "+runnable); 222 thread.setDaemon(true); 223 return thread; 224 } 225 }); 226 } 227 228 229 }