001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transport.discovery.multicast; 018 019 import java.io.IOException; 020 import java.net.DatagramPacket; 021 import java.net.InetAddress; 022 import java.net.InetSocketAddress; 023 import java.net.MulticastSocket; 024 import java.net.NetworkInterface; 025 import java.net.SocketAddress; 026 import java.net.SocketTimeoutException; 027 import java.net.URI; 028 import java.util.Iterator; 029 import java.util.Map; 030 import java.util.concurrent.ConcurrentHashMap; 031 import java.util.concurrent.ExecutorService; 032 import java.util.concurrent.LinkedBlockingQueue; 033 import java.util.concurrent.ThreadFactory; 034 import java.util.concurrent.ThreadPoolExecutor; 035 import java.util.concurrent.TimeUnit; 036 import java.util.concurrent.atomic.AtomicBoolean; 037 038 import org.apache.activemq.command.DiscoveryEvent; 039 import org.apache.activemq.transport.discovery.DiscoveryAgent; 040 import org.apache.activemq.transport.discovery.DiscoveryListener; 041 import org.apache.commons.logging.Log; 042 import org.apache.commons.logging.LogFactory; 043 044 /** 045 * A {@link DiscoveryAgent} using a multicast address and heartbeat packets 046 * encoded using any wireformat, but openwire by default. 047 * 048 * @version $Revision$ 049 */ 050 public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable { 051 052 public static final String DEFAULT_DISCOVERY_URI_STRING = "multicast://239.255.2.3:6155"; 053 public static final String DEFAULT_HOST_STR = "default"; 054 public static final String DEFAULT_HOST_IP = System.getProperty("activemq.partition.discovery", "239.255.2.3"); 055 public static final int DEFAULT_PORT = 6155; 056 057 private static final Log LOG = LogFactory.getLog(MulticastDiscoveryAgent.class); 058 private static final String TYPE_SUFFIX = "ActiveMQ-4."; 059 private static final String ALIVE = "alive."; 060 private static final String DEAD = "dead."; 061 private static final String DELIMITER = "%"; 062 private static final int BUFF_SIZE = 8192; 063 private static final int DEFAULT_IDLE_TIME = 500; 064 private static final int HEARTBEAT_MISS_BEFORE_DEATH = 10; 065 066 private long initialReconnectDelay = 1000 * 5; 067 private long maxReconnectDelay = 1000 * 30; 068 private long backOffMultiplier = 2; 069 private boolean useExponentialBackOff; 070 private int maxReconnectAttempts; 071 072 private int timeToLive = 1; 073 private boolean loopBackMode; 074 private Map<String, RemoteBrokerData> brokersByService = new ConcurrentHashMap<String, RemoteBrokerData>(); 075 private String group = "default"; 076 private URI discoveryURI; 077 private InetAddress inetAddress; 078 private SocketAddress sockAddress; 079 private DiscoveryListener discoveryListener; 080 private String selfService; 081 private MulticastSocket mcast; 082 private Thread runner; 083 private long keepAliveInterval = DEFAULT_IDLE_TIME; 084 private String mcInterface; 085 private String mcNetworkInterface; 086 private long lastAdvertizeTime; 087 private AtomicBoolean started = new AtomicBoolean(false); 088 private boolean reportAdvertizeFailed = true; 089 private ExecutorService executor = null; 090 091 class RemoteBrokerData { 092 final String brokerName; 093 final String service; 094 long lastHeartBeat; 095 long recoveryTime; 096 int failureCount; 097 boolean failed; 098 099 public RemoteBrokerData(String brokerName, String service) { 100 this.brokerName = brokerName; 101 this.service = service; 102 this.lastHeartBeat = System.currentTimeMillis(); 103 } 104 105 public synchronized void updateHeartBeat() { 106 lastHeartBeat = System.currentTimeMillis(); 107 108 // Consider that the broker recovery has succeeded if it has not 109 // failed in 60 seconds. 110 if (!failed && failureCount > 0 && (lastHeartBeat - recoveryTime) > 1000 * 60) { 111 if (LOG.isDebugEnabled()) { 112 LOG.debug("I now think that the " + service + " service has recovered."); 113 } 114 failureCount = 0; 115 recoveryTime = 0; 116 } 117 } 118 119 public synchronized long getLastHeartBeat() { 120 return lastHeartBeat; 121 } 122 123 public synchronized boolean markFailed() { 124 if (!failed) { 125 failed = true; 126 failureCount++; 127 128 long reconnectDelay; 129 if (!useExponentialBackOff) { 130 reconnectDelay = initialReconnectDelay; 131 } else { 132 reconnectDelay = (long)Math.pow(backOffMultiplier, failureCount); 133 if (reconnectDelay > maxReconnectDelay) { 134 reconnectDelay = maxReconnectDelay; 135 } 136 } 137 138 if (LOG.isDebugEnabled()) { 139 LOG.debug("Remote failure of " + service + " while still receiving multicast advertisements. Advertising events will be suppressed for " + reconnectDelay 140 + " ms, the current failure count is: " + failureCount); 141 } 142 143 recoveryTime = System.currentTimeMillis() + reconnectDelay; 144 return true; 145 } 146 return false; 147 } 148 149 /** 150 * @return true if this broker is marked failed and it is now the right 151 * time to start recovery. 152 */ 153 public synchronized boolean doRecovery() { 154 if (!failed) { 155 return false; 156 } 157 158 // Are we done trying to recover this guy? 159 if (maxReconnectAttempts > 0 && failureCount > maxReconnectAttempts) { 160 if (LOG.isDebugEnabled()) { 161 LOG.debug("Max reconnect attempts of the " + service + " service has been reached."); 162 } 163 return false; 164 } 165 166 // Is it not yet time? 167 if (System.currentTimeMillis() < recoveryTime) { 168 return false; 169 } 170 171 if (LOG.isDebugEnabled()) { 172 LOG.debug("Resuming event advertisement of the " + service + " service."); 173 } 174 failed = false; 175 return true; 176 } 177 178 public boolean isFailed() { 179 return failed; 180 } 181 } 182 183 /** 184 * Set the discovery listener 185 * 186 * @param listener 187 */ 188 public void setDiscoveryListener(DiscoveryListener listener) { 189 this.discoveryListener = listener; 190 } 191 192 /** 193 * register a service 194 */ 195 public void registerService(String name) throws IOException { 196 this.selfService = name; 197 if (started.get()) { 198 doAdvertizeSelf(); 199 } 200 } 201 202 /** 203 * @return Returns the loopBackMode. 204 */ 205 public boolean isLoopBackMode() { 206 return loopBackMode; 207 } 208 209 /** 210 * @param loopBackMode The loopBackMode to set. 211 */ 212 public void setLoopBackMode(boolean loopBackMode) { 213 this.loopBackMode = loopBackMode; 214 } 215 216 /** 217 * @return Returns the timeToLive. 218 */ 219 public int getTimeToLive() { 220 return timeToLive; 221 } 222 223 /** 224 * @param timeToLive The timeToLive to set. 225 */ 226 public void setTimeToLive(int timeToLive) { 227 this.timeToLive = timeToLive; 228 } 229 230 /** 231 * @return the discoveryURI 232 */ 233 public URI getDiscoveryURI() { 234 return discoveryURI; 235 } 236 237 /** 238 * Set the discoveryURI 239 * 240 * @param discoveryURI 241 */ 242 public void setDiscoveryURI(URI discoveryURI) { 243 this.discoveryURI = discoveryURI; 244 } 245 246 public long getKeepAliveInterval() { 247 return keepAliveInterval; 248 } 249 250 public void setKeepAliveInterval(long keepAliveInterval) { 251 this.keepAliveInterval = keepAliveInterval; 252 } 253 254 public void setInterface(String mcInterface) { 255 this.mcInterface = mcInterface; 256 } 257 258 public void setNetworkInterface(String mcNetworkInterface) { 259 this.mcNetworkInterface = mcNetworkInterface; 260 } 261 262 /** 263 * start the discovery agent 264 * 265 * @throws Exception 266 */ 267 public void start() throws Exception { 268 269 if (started.compareAndSet(false, true)) { 270 271 if (group == null || group.length() == 0) { 272 throw new IOException("You must specify a group to discover"); 273 } 274 String type = getType(); 275 if (!type.endsWith(".")) { 276 LOG.warn("The type '" + type + "' should end with '.' to be a valid Discovery type"); 277 type += "."; 278 } 279 280 if (discoveryURI == null) { 281 discoveryURI = new URI(DEFAULT_DISCOVERY_URI_STRING); 282 } 283 284 if (LOG.isTraceEnabled()) 285 LOG.trace("start - discoveryURI = " + discoveryURI); 286 287 String myHost = discoveryURI.getHost(); 288 int myPort = discoveryURI.getPort(); 289 290 if( DEFAULT_HOST_STR.equals(myHost) ) 291 myHost = DEFAULT_HOST_IP; 292 293 if(myPort < 0 ) 294 myPort = DEFAULT_PORT; 295 296 if (LOG.isTraceEnabled()) { 297 LOG.trace("start - myHost = " + myHost); 298 LOG.trace("start - myPort = " + myPort); 299 LOG.trace("start - group = " + group ); 300 LOG.trace("start - interface = " + mcInterface ); 301 LOG.trace("start - network interface = " + mcNetworkInterface ); 302 } 303 304 this.inetAddress = InetAddress.getByName(myHost); 305 this.sockAddress = new InetSocketAddress(this.inetAddress, myPort); 306 mcast = new MulticastSocket(myPort); 307 mcast.setLoopbackMode(loopBackMode); 308 mcast.setTimeToLive(getTimeToLive()); 309 mcast.joinGroup(inetAddress); 310 mcast.setSoTimeout((int)keepAliveInterval); 311 if (mcInterface != null) { 312 mcast.setInterface(InetAddress.getByName(mcInterface)); 313 } 314 if (mcNetworkInterface != null) { 315 mcast.setNetworkInterface(NetworkInterface.getByName(mcNetworkInterface)); 316 } 317 runner = new Thread(this); 318 runner.setName(this.toString() + ":" + runner.getName()); 319 runner.setDaemon(true); 320 runner.start(); 321 doAdvertizeSelf(); 322 } 323 } 324 325 /** 326 * stop the channel 327 * 328 * @throws Exception 329 */ 330 public void stop() throws Exception { 331 if (started.compareAndSet(true, false)) { 332 doAdvertizeSelf(); 333 if (mcast != null) { 334 mcast.close(); 335 } 336 if (runner != null) { 337 runner.interrupt(); 338 } 339 getExecutor().shutdownNow(); 340 } 341 } 342 343 public String getType() { 344 return group + "." + TYPE_SUFFIX; 345 } 346 347 public void run() { 348 byte[] buf = new byte[BUFF_SIZE]; 349 DatagramPacket packet = new DatagramPacket(buf, 0, buf.length); 350 while (started.get()) { 351 doTimeKeepingServices(); 352 try { 353 mcast.receive(packet); 354 if (packet.getLength() > 0) { 355 String str = new String(packet.getData(), packet.getOffset(), packet.getLength()); 356 processData(str); 357 } 358 } catch (SocketTimeoutException se) { 359 // ignore 360 } catch (IOException e) { 361 if (started.get()) { 362 LOG.error("failed to process packet: " + e); 363 } 364 } 365 } 366 } 367 368 private void processData(String str) { 369 if (discoveryListener != null) { 370 if (str.startsWith(getType())) { 371 String payload = str.substring(getType().length()); 372 if (payload.startsWith(ALIVE)) { 373 String brokerName = getBrokerName(payload.substring(ALIVE.length())); 374 String service = payload.substring(ALIVE.length() + brokerName.length() + 2); 375 processAlive(brokerName, service); 376 } else { 377 String brokerName = getBrokerName(payload.substring(DEAD.length())); 378 String service = payload.substring(DEAD.length() + brokerName.length() + 2); 379 processDead(service); 380 } 381 } 382 } 383 } 384 385 private void doTimeKeepingServices() { 386 if (started.get()) { 387 long currentTime = System.currentTimeMillis(); 388 if (currentTime < lastAdvertizeTime || ((currentTime - keepAliveInterval) > lastAdvertizeTime)) { 389 doAdvertizeSelf(); 390 lastAdvertizeTime = currentTime; 391 } 392 doExpireOldServices(); 393 } 394 } 395 396 private void doAdvertizeSelf() { 397 if (selfService != null) { 398 String payload = getType(); 399 payload += started.get() ? ALIVE : DEAD; 400 payload += DELIMITER + "localhost" + DELIMITER; 401 payload += selfService; 402 try { 403 byte[] data = payload.getBytes(); 404 DatagramPacket packet = new DatagramPacket(data, 0, data.length, sockAddress); 405 mcast.send(packet); 406 } catch (IOException e) { 407 // If a send fails, chances are all subsequent sends will fail 408 // too.. No need to keep reporting the 409 // same error over and over. 410 if (reportAdvertizeFailed) { 411 reportAdvertizeFailed = false; 412 LOG.error("Failed to advertise our service: " + payload, e); 413 if ("Operation not permitted".equals(e.getMessage())) { 414 LOG.error("The 'Operation not permitted' error has been know to be caused by improper firewall/network setup. " 415 + "Please make sure that the OS is properly configured to allow multicast traffic over: " + mcast.getLocalAddress()); 416 } 417 } 418 } 419 } 420 } 421 422 private void processAlive(String brokerName, String service) { 423 if (selfService == null || !service.equals(selfService)) { 424 RemoteBrokerData data = brokersByService.get(service); 425 if (data == null) { 426 data = new RemoteBrokerData(brokerName, service); 427 brokersByService.put(service, data); 428 fireServiceAddEvent(data); 429 doAdvertizeSelf(); 430 } else { 431 data.updateHeartBeat(); 432 if (data.doRecovery()) { 433 fireServiceAddEvent(data); 434 } 435 } 436 } 437 } 438 439 private void processDead(String service) { 440 if (!service.equals(selfService)) { 441 RemoteBrokerData data = brokersByService.remove(service); 442 if (data != null && !data.isFailed()) { 443 fireServiceRemovedEvent(data); 444 } 445 } 446 } 447 448 private void doExpireOldServices() { 449 long expireTime = System.currentTimeMillis() - (keepAliveInterval * HEARTBEAT_MISS_BEFORE_DEATH); 450 for (Iterator<RemoteBrokerData> i = brokersByService.values().iterator(); i.hasNext();) { 451 RemoteBrokerData data = i.next(); 452 if (data.getLastHeartBeat() < expireTime) { 453 processDead(data.service); 454 } 455 } 456 } 457 458 private String getBrokerName(String str) { 459 String result = null; 460 int start = str.indexOf(DELIMITER); 461 if (start >= 0) { 462 int end = str.indexOf(DELIMITER, start + 1); 463 result = str.substring(start + 1, end); 464 } 465 return result; 466 } 467 468 public void serviceFailed(DiscoveryEvent event) throws IOException { 469 RemoteBrokerData data = brokersByService.get(event.getServiceName()); 470 if (data != null && data.markFailed()) { 471 fireServiceRemovedEvent(data); 472 } 473 } 474 475 private void fireServiceRemovedEvent(RemoteBrokerData data) { 476 if (discoveryListener != null) { 477 final DiscoveryEvent event = new DiscoveryEvent(data.service); 478 event.setBrokerName(data.brokerName); 479 480 // Have the listener process the event async so that 481 // he does not block this thread since we are doing time sensitive 482 // processing of events. 483 getExecutor().execute(new Runnable() { 484 public void run() { 485 DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener; 486 if (discoveryListener != null) { 487 discoveryListener.onServiceRemove(event); 488 } 489 } 490 }); 491 } 492 } 493 494 private void fireServiceAddEvent(RemoteBrokerData data) { 495 if (discoveryListener != null) { 496 final DiscoveryEvent event = new DiscoveryEvent(data.service); 497 event.setBrokerName(data.brokerName); 498 499 // Have the listener process the event async so that 500 // he does not block this thread since we are doing time sensitive 501 // processing of events. 502 getExecutor().execute(new Runnable() { 503 public void run() { 504 DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener; 505 if (discoveryListener != null) { 506 discoveryListener.onServiceAdd(event); 507 } 508 } 509 }); 510 } 511 } 512 513 private ExecutorService getExecutor() { 514 if (executor == null) { 515 final String threadName = "Notifier-" + this.toString(); 516 executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { 517 public Thread newThread(Runnable runable) { 518 Thread t = new Thread(runable, threadName); 519 t.setDaemon(true); 520 return t; 521 } 522 }); 523 } 524 return executor; 525 } 526 527 public long getBackOffMultiplier() { 528 return backOffMultiplier; 529 } 530 531 public void setBackOffMultiplier(long backOffMultiplier) { 532 this.backOffMultiplier = backOffMultiplier; 533 } 534 535 public long getInitialReconnectDelay() { 536 return initialReconnectDelay; 537 } 538 539 public void setInitialReconnectDelay(long initialReconnectDelay) { 540 this.initialReconnectDelay = initialReconnectDelay; 541 } 542 543 public int getMaxReconnectAttempts() { 544 return maxReconnectAttempts; 545 } 546 547 public void setMaxReconnectAttempts(int maxReconnectAttempts) { 548 this.maxReconnectAttempts = maxReconnectAttempts; 549 } 550 551 public long getMaxReconnectDelay() { 552 return maxReconnectDelay; 553 } 554 555 public void setMaxReconnectDelay(long maxReconnectDelay) { 556 this.maxReconnectDelay = maxReconnectDelay; 557 } 558 559 public boolean isUseExponentialBackOff() { 560 return useExponentialBackOff; 561 } 562 563 public void setUseExponentialBackOff(boolean useExponentialBackOff) { 564 this.useExponentialBackOff = useExponentialBackOff; 565 } 566 567 public void setGroup(String group) { 568 this.group = group; 569 } 570 571 @Override 572 public String toString() { 573 return "MulticastDiscoveryAgent-" 574 + (selfService != null ? "advertise:" + selfService : "listener:" + this.discoveryListener); 575 } 576 }