001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.apache.activemq.transport.failover; 019 020 import java.io.IOException; 021 import java.io.InterruptedIOException; 022 import java.net.URI; 023 import java.util.ArrayList; 024 import java.util.Iterator; 025 import java.util.LinkedHashMap; 026 import java.util.List; 027 import java.util.Map; 028 import java.util.concurrent.CopyOnWriteArrayList; 029 import java.util.concurrent.atomic.AtomicReference; 030 import org.apache.activemq.command.BrokerInfo; 031 import org.apache.activemq.command.Command; 032 import org.apache.activemq.command.ConnectionControl; 033 import org.apache.activemq.command.RemoveInfo; 034 import org.apache.activemq.command.Response; 035 import org.apache.activemq.state.ConnectionStateTracker; 036 import org.apache.activemq.state.Tracked; 037 import org.apache.activemq.thread.DefaultThreadPools; 038 import org.apache.activemq.thread.Task; 039 import org.apache.activemq.thread.TaskRunner; 040 import org.apache.activemq.transport.CompositeTransport; 041 import org.apache.activemq.transport.DefaultTransportListener; 042 import org.apache.activemq.transport.FutureResponse; 043 import org.apache.activemq.transport.ResponseCallback; 044 import org.apache.activemq.transport.Transport; 045 import org.apache.activemq.transport.TransportFactory; 046 import org.apache.activemq.transport.TransportListener; 047 import org.apache.activemq.util.IOExceptionSupport; 048 import org.apache.activemq.util.ServiceSupport; 049 import org.apache.commons.logging.Log; 050 import org.apache.commons.logging.LogFactory; 051 052 /** 053 * A Transport that is made reliable by being able to fail over to another 054 * transport when a transport failure is detected. 055 * 056 * @version $Revision$ 057 */ 058 public class FailoverTransport implements CompositeTransport { 059 060 private static final Log LOG = LogFactory.getLog(FailoverTransport.class); 061 062 private TransportListener transportListener; 063 private boolean disposed; 064 private boolean connected; 065 private final CopyOnWriteArrayList<URI> uris = new CopyOnWriteArrayList<URI>(); 066 067 private final Object reconnectMutex = new Object(); 068 private final Object backupMutex = new Object(); 069 private final Object sleepMutex = new Object(); 070 private final Object listenerMutex = new Object(); 071 private final ConnectionStateTracker stateTracker = new ConnectionStateTracker(); 072 private final Map<Integer, Command> requestMap = new LinkedHashMap<Integer, Command>(); 073 074 private URI connectedTransportURI; 075 private URI failedConnectTransportURI; 076 private final AtomicReference<Transport> connectedTransport = new AtomicReference<Transport>(); 077 private final TaskRunner reconnectTask; 078 private boolean started; 079 080 private long initialReconnectDelay = 10; 081 private long maxReconnectDelay = 1000 * 30; 082 private double backOffMultiplier = 2d; 083 private long timeout = -1; 084 private boolean useExponentialBackOff = true; 085 private boolean randomize = true; 086 private boolean initialized; 087 private int maxReconnectAttempts; 088 private int startupMaxReconnectAttempts; 089 private int connectFailures; 090 private long reconnectDelay = this.initialReconnectDelay; 091 private Exception connectionFailure; 092 private boolean firstConnection = true; 093 //optionally always have a backup created 094 private boolean backup=false; 095 private List<BackupTransport> backups=new CopyOnWriteArrayList<BackupTransport>(); 096 private int backupPoolSize=1; 097 private boolean trackMessages = false; 098 private boolean trackTransactionProducers = true; 099 private int maxCacheSize = 128 * 1024; 100 private TransportListener disposedListener = new DefaultTransportListener() {}; 101 102 103 private final TransportListener myTransportListener = createTransportListener(); 104 105 public FailoverTransport() throws InterruptedIOException { 106 107 stateTracker.setTrackTransactions(true); 108 // Setup a task that is used to reconnect the a connection async. 109 reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() { 110 public boolean iterate() { 111 boolean result=false; 112 boolean buildBackup=true; 113 boolean doReconnect = !disposed; 114 synchronized(backupMutex) { 115 if (connectedTransport.get()==null && !disposed) { 116 result=doReconnect(); 117 buildBackup=false; 118 } 119 } 120 if(buildBackup) { 121 buildBackups(); 122 }else { 123 //build backups on the next iteration 124 result=true; 125 try { 126 reconnectTask.wakeup(); 127 } catch (InterruptedException e) { 128 LOG.debug("Reconnect task has been interrupted.", e); 129 } 130 } 131 return result; 132 } 133 134 }, "ActiveMQ Failover Worker: " + System.identityHashCode(this)); 135 } 136 137 TransportListener createTransportListener() { 138 return new TransportListener() { 139 public void onCommand(Object o) { 140 Command command = (Command)o; 141 if (command == null) { 142 return; 143 } 144 if (command.isResponse()) { 145 Object object = null; 146 synchronized(requestMap) { 147 object = requestMap.remove(Integer.valueOf(((Response)command).getCorrelationId())); 148 } 149 if (object != null && object.getClass() == Tracked.class) { 150 ((Tracked)object).onResponses(); 151 } 152 } 153 if (!initialized) { 154 if (command.isBrokerInfo()) { 155 BrokerInfo info = (BrokerInfo)command; 156 BrokerInfo[] peers = info.getPeerBrokerInfos(); 157 if (peers != null) { 158 for (int i = 0; i < peers.length; i++) { 159 String brokerString = peers[i].getBrokerURL(); 160 add(brokerString); 161 } 162 } 163 initialized = true; 164 } 165 166 } 167 if (transportListener != null) { 168 transportListener.onCommand(command); 169 } 170 } 171 172 public void onException(IOException error) { 173 try { 174 handleTransportFailure(error); 175 } catch (InterruptedException e) { 176 Thread.currentThread().interrupt(); 177 transportListener.onException(new InterruptedIOException()); 178 } 179 } 180 181 public void transportInterupted() { 182 if (transportListener != null) { 183 transportListener.transportInterupted(); 184 } 185 } 186 187 public void transportResumed() { 188 if (transportListener != null) { 189 transportListener.transportResumed(); 190 } 191 } 192 }; 193 } 194 195 196 public final void handleTransportFailure(IOException e) throws InterruptedException { 197 198 Transport transport = connectedTransport.getAndSet(null); 199 if( transport!=null ) { 200 201 transport.setTransportListener(disposedListener); 202 ServiceSupport.dispose(transport); 203 204 boolean reconnectOk = false; 205 synchronized (reconnectMutex) { 206 if(started) { 207 LOG.warn("Transport failed to " + connectedTransportURI+ " , attempting to automatically reconnect due to: " + e); 208 LOG.debug("Transport failed with the following exception:", e); 209 reconnectOk = true; 210 } 211 initialized = false; 212 failedConnectTransportURI=connectedTransportURI; 213 connectedTransportURI = null; 214 connected=false; 215 216 // notify before any reconnect attempt so ack state can be whacked 217 if (transportListener != null) { 218 transportListener.transportInterupted(); 219 } 220 221 if (reconnectOk) { 222 reconnectTask.wakeup(); 223 } 224 } 225 } 226 227 } 228 229 public void start() throws Exception { 230 synchronized (reconnectMutex) { 231 LOG.debug("Started."); 232 if (started) { 233 return; 234 } 235 started = true; 236 stateTracker.setMaxCacheSize(getMaxCacheSize()); 237 stateTracker.setTrackMessages(isTrackMessages()); 238 stateTracker.setTrackTransactionProducers(isTrackTransactionProducers()); 239 if (connectedTransport.get() != null) { 240 stateTracker.restore(connectedTransport.get()); 241 } else { 242 reconnect(); 243 } 244 } 245 } 246 247 public void stop() throws Exception { 248 Transport transportToStop=null; 249 synchronized (reconnectMutex) { 250 LOG.debug("Stopped."); 251 if (!started) { 252 return; 253 } 254 started = false; 255 disposed = true; 256 connected = false; 257 for (BackupTransport t:backups) { 258 t.setDisposed(true); 259 } 260 backups.clear(); 261 262 if (connectedTransport.get() != null) { 263 transportToStop = connectedTransport.getAndSet(null); 264 } 265 reconnectMutex.notifyAll(); 266 } 267 synchronized (sleepMutex) { 268 sleepMutex.notifyAll(); 269 } 270 reconnectTask.shutdown(); 271 if( transportToStop!=null ) { 272 transportToStop.stop(); 273 } 274 } 275 276 public long getInitialReconnectDelay() { 277 return initialReconnectDelay; 278 } 279 280 public void setInitialReconnectDelay(long initialReconnectDelay) { 281 this.initialReconnectDelay = initialReconnectDelay; 282 } 283 284 public long getMaxReconnectDelay() { 285 return maxReconnectDelay; 286 } 287 288 public void setMaxReconnectDelay(long maxReconnectDelay) { 289 this.maxReconnectDelay = maxReconnectDelay; 290 } 291 292 public long getReconnectDelay() { 293 return reconnectDelay; 294 } 295 296 public void setReconnectDelay(long reconnectDelay) { 297 this.reconnectDelay = reconnectDelay; 298 } 299 300 public double getReconnectDelayExponent() { 301 return backOffMultiplier; 302 } 303 304 public void setReconnectDelayExponent(double reconnectDelayExponent) { 305 this.backOffMultiplier = reconnectDelayExponent; 306 } 307 308 public Transport getConnectedTransport() { 309 return connectedTransport.get(); 310 } 311 312 public URI getConnectedTransportURI() { 313 return connectedTransportURI; 314 } 315 316 public int getMaxReconnectAttempts() { 317 return maxReconnectAttempts; 318 } 319 320 public void setMaxReconnectAttempts(int maxReconnectAttempts) { 321 this.maxReconnectAttempts = maxReconnectAttempts; 322 } 323 324 public int getStartupMaxReconnectAttempts() { 325 return this.startupMaxReconnectAttempts; 326 } 327 328 public void setStartupMaxReconnectAttempts(int startupMaxReconnectAttempts) { 329 this.startupMaxReconnectAttempts = startupMaxReconnectAttempts; 330 } 331 332 public long getTimeout() { 333 return timeout; 334 } 335 336 public void setTimeout(long timeout) { 337 this.timeout = timeout; 338 } 339 340 /** 341 * @return Returns the randomize. 342 */ 343 public boolean isRandomize() { 344 return randomize; 345 } 346 347 /** 348 * @param randomize The randomize to set. 349 */ 350 public void setRandomize(boolean randomize) { 351 this.randomize = randomize; 352 } 353 354 public boolean isBackup() { 355 return backup; 356 } 357 358 public void setBackup(boolean backup) { 359 this.backup = backup; 360 } 361 362 public int getBackupPoolSize() { 363 return backupPoolSize; 364 } 365 366 public void setBackupPoolSize(int backupPoolSize) { 367 this.backupPoolSize = backupPoolSize; 368 } 369 370 public boolean isTrackMessages() { 371 return trackMessages; 372 } 373 374 public void setTrackMessages(boolean trackMessages) { 375 this.trackMessages = trackMessages; 376 } 377 378 public boolean isTrackTransactionProducers() { 379 return this.trackTransactionProducers; 380 } 381 382 public void setTrackTransactionProducers(boolean trackTransactionProducers) { 383 this.trackTransactionProducers = trackTransactionProducers; 384 } 385 386 public int getMaxCacheSize() { 387 return maxCacheSize; 388 } 389 390 public void setMaxCacheSize(int maxCacheSize) { 391 this.maxCacheSize = maxCacheSize; 392 } 393 394 /** 395 * @return Returns true if the command is one sent when a connection 396 * is being closed. 397 */ 398 private boolean isShutdownCommand(Command command) { 399 return (command != null && (command.isShutdownInfo() || command instanceof RemoveInfo)); 400 } 401 402 403 public void oneway(Object o) throws IOException { 404 405 Command command = (Command)o; 406 Exception error = null; 407 try { 408 409 synchronized (reconnectMutex) { 410 411 if (isShutdownCommand(command) && connectedTransport.get() == null) { 412 if(command.isShutdownInfo()) { 413 // Skipping send of ShutdownInfo command when not connected. 414 return; 415 } 416 if(command instanceof RemoveInfo || command.isMessageAck()) { 417 // Simulate response to RemoveInfo command or ack (as it will be stale) 418 stateTracker.track(command); 419 Response response = new Response(); 420 response.setCorrelationId(command.getCommandId()); 421 myTransportListener.onCommand(response); 422 return; 423 } 424 } 425 // Keep trying until the message is sent. 426 for (int i = 0; !disposed; i++) { 427 try { 428 429 // Wait for transport to be connected. 430 Transport transport = connectedTransport.get(); 431 long start = System.currentTimeMillis(); 432 boolean timedout = false; 433 while (transport == null && !disposed 434 && connectionFailure == null 435 && !Thread.currentThread().isInterrupted()) { 436 LOG.trace("Waiting for transport to reconnect..: " + command); 437 long end = System.currentTimeMillis(); 438 if (timeout > 0 && (end - start > timeout)) { 439 timedout = true; 440 LOG.info("Failover timed out after " + (end - start) + "ms"); 441 break; 442 } 443 try { 444 reconnectMutex.wait(100); 445 } catch (InterruptedException e) { 446 Thread.currentThread().interrupt(); 447 LOG.debug("Interupted: " + e, e); 448 } 449 transport = connectedTransport.get(); 450 } 451 452 if (transport == null) { 453 // Previous loop may have exited due to use being 454 // disposed. 455 if (disposed) { 456 error = new IOException("Transport disposed."); 457 } else if (connectionFailure != null) { 458 error = connectionFailure; 459 } else if (timedout == true) { 460 error = new IOException("Failover timeout of " + timeout + " ms reached."); 461 }else { 462 error = new IOException("Unexpected failure."); 463 } 464 break; 465 } 466 467 // If it was a request and it was not being tracked by 468 // the state tracker, 469 // then hold it in the requestMap so that we can replay 470 // it later. 471 Tracked tracked = stateTracker.track(command); 472 synchronized(requestMap) { 473 if (tracked != null && tracked.isWaitingForResponse()) { 474 requestMap.put(Integer.valueOf(command.getCommandId()), tracked); 475 } else if (tracked == null && command.isResponseRequired()) { 476 requestMap.put(Integer.valueOf(command.getCommandId()), command); 477 } 478 } 479 480 // Send the message. 481 try { 482 transport.oneway(command); 483 stateTracker.trackBack(command); 484 } catch (IOException e) { 485 486 // If the command was not tracked.. we will retry in 487 // this method 488 if (tracked == null) { 489 490 // since we will retry in this method.. take it 491 // out of the request 492 // map so that it is not sent 2 times on 493 // recovery 494 if (command.isResponseRequired()) { 495 requestMap.remove(Integer.valueOf(command.getCommandId())); 496 } 497 498 // Rethrow the exception so it will handled by 499 // the outer catch 500 throw e; 501 } 502 503 } 504 505 return; 506 507 } catch (IOException e) { 508 if (LOG.isDebugEnabled()) { 509 LOG.debug("Send oneway attempt: " + i + " failed for command:" + command); 510 } 511 handleTransportFailure(e); 512 } 513 } 514 } 515 } catch (InterruptedException e) { 516 // Some one may be trying to stop our thread. 517 Thread.currentThread().interrupt(); 518 throw new InterruptedIOException(); 519 } 520 if (!disposed) { 521 if (error != null) { 522 if (error instanceof IOException) { 523 throw (IOException)error; 524 } 525 throw IOExceptionSupport.create(error); 526 } 527 } 528 } 529 530 public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException { 531 throw new AssertionError("Unsupported Method"); 532 } 533 534 public Object request(Object command) throws IOException { 535 throw new AssertionError("Unsupported Method"); 536 } 537 538 public Object request(Object command, int timeout) throws IOException { 539 throw new AssertionError("Unsupported Method"); 540 } 541 542 public void add(URI u[]) { 543 for (int i = 0; i < u.length; i++) { 544 if (!uris.contains(u[i])) { 545 uris.add(u[i]); 546 } 547 } 548 reconnect(); 549 } 550 551 public void remove(URI u[]) { 552 for (int i = 0; i < u.length; i++) { 553 uris.remove(u[i]); 554 } 555 reconnect(); 556 } 557 558 public void add(String u) { 559 try { 560 URI uri = new URI(u); 561 if (!uris.contains(uri)) { 562 uris.add(uri); 563 } 564 565 reconnect(); 566 } catch (Exception e) { 567 LOG.error("Failed to parse URI: " + u); 568 } 569 } 570 571 public void reconnect() { 572 synchronized (reconnectMutex) { 573 if (started) { 574 LOG.debug("Waking up reconnect task"); 575 try { 576 reconnectTask.wakeup(); 577 } catch (InterruptedException e) { 578 Thread.currentThread().interrupt(); 579 } 580 } else { 581 LOG.debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport."); 582 } 583 } 584 } 585 586 private List<URI> getConnectList() { 587 ArrayList<URI> l = new ArrayList<URI>(uris); 588 boolean removed = false; 589 if (failedConnectTransportURI != null) { 590 removed = l.remove(failedConnectTransportURI); 591 } 592 if (randomize) { 593 // Randomly, reorder the list by random swapping 594 for (int i = 0; i < l.size(); i++) { 595 int p = (int) (Math.random()*100 % l.size()); 596 URI t = l.get(p); 597 l.set(p, l.get(i)); 598 l.set(i, t); 599 } 600 } 601 if (removed) { 602 l.add(failedConnectTransportURI); 603 } 604 LOG.debug("urlList connectionList:" + l); 605 return l; 606 } 607 608 public TransportListener getTransportListener() { 609 return transportListener; 610 } 611 612 public void setTransportListener(TransportListener commandListener) { 613 synchronized(listenerMutex) { 614 this.transportListener = commandListener; 615 listenerMutex.notifyAll(); 616 } 617 } 618 619 public <T> T narrow(Class<T> target) { 620 621 if (target.isAssignableFrom(getClass())) { 622 return target.cast(this); 623 } 624 Transport transport = connectedTransport.get(); 625 if ( transport != null) { 626 return transport.narrow(target); 627 } 628 return null; 629 630 } 631 632 protected void restoreTransport(Transport t) throws Exception, IOException { 633 t.start(); 634 //send information to the broker - informing it we are an ft client 635 ConnectionControl cc = new ConnectionControl(); 636 cc.setFaultTolerant(true); 637 t.oneway(cc); 638 stateTracker.restore(t); 639 Map tmpMap = null; 640 synchronized(requestMap) { 641 tmpMap = new LinkedHashMap<Integer, Command>(requestMap); 642 } 643 for (Iterator<Command> iter2 = tmpMap.values().iterator(); iter2.hasNext();) { 644 Command command = iter2.next(); 645 if (LOG.isTraceEnabled()) { 646 LOG.trace("restore, replay: " + command); 647 } 648 t.oneway(command); 649 } 650 } 651 652 public boolean isUseExponentialBackOff() { 653 return useExponentialBackOff; 654 } 655 656 public void setUseExponentialBackOff(boolean useExponentialBackOff) { 657 this.useExponentialBackOff = useExponentialBackOff; 658 } 659 660 public String toString() { 661 return connectedTransportURI == null ? "unconnected" : connectedTransportURI.toString(); 662 } 663 664 public String getRemoteAddress() { 665 Transport transport = connectedTransport.get(); 666 if ( transport != null) { 667 return transport.getRemoteAddress(); 668 } 669 return null; 670 } 671 672 public boolean isFaultTolerant() { 673 return true; 674 } 675 676 final boolean doReconnect() { 677 Exception failure = null; 678 synchronized (reconnectMutex) { 679 680 if (disposed || connectionFailure != null) { 681 reconnectMutex.notifyAll(); 682 } 683 684 if (connectedTransport.get() != null || disposed || connectionFailure != null) { 685 return false; 686 } else { 687 List<URI> connectList = getConnectList(); 688 if (connectList.isEmpty()) { 689 failure = new IOException("No uris available to connect to."); 690 } else { 691 if (!useExponentialBackOff) { 692 reconnectDelay = initialReconnectDelay; 693 } 694 synchronized(backupMutex) { 695 if (backup && !backups.isEmpty()) { 696 BackupTransport bt = backups.remove(0); 697 Transport t = bt.getTransport(); 698 URI uri = bt.getUri(); 699 t.setTransportListener(myTransportListener); 700 try { 701 if (started) { 702 restoreTransport(t); 703 } 704 reconnectDelay = initialReconnectDelay; 705 failedConnectTransportURI=null; 706 connectedTransportURI = uri; 707 connectedTransport.set(t); 708 reconnectMutex.notifyAll(); 709 connectFailures = 0; 710 LOG.info("Successfully reconnected to backup " + uri); 711 return false; 712 }catch (Exception e) { 713 LOG.debug("Backup transport failed",e); 714 } 715 } 716 } 717 718 Iterator<URI> iter = connectList.iterator(); 719 while(iter.hasNext() && connectedTransport.get() == null && !disposed) { 720 URI uri = iter.next(); 721 Transport t = null; 722 try { 723 LOG.debug("Attempting connect to: " + uri); 724 t = TransportFactory.compositeConnect(uri); 725 t.setTransportListener(myTransportListener); 726 t.start(); 727 728 if (started) { 729 restoreTransport(t); 730 } 731 732 LOG.debug("Connection established"); 733 reconnectDelay = initialReconnectDelay; 734 connectedTransportURI = uri; 735 connectedTransport.set(t); 736 reconnectMutex.notifyAll(); 737 connectFailures = 0; 738 // Make sure on initial startup, that the transportListener 739 // has been initialized for this instance. 740 synchronized(listenerMutex) { 741 if (transportListener==null) { 742 try { 743 //if it isn't set after 2secs - it 744 //probably never will be 745 listenerMutex.wait(2000); 746 }catch(InterruptedException ex) {} 747 } 748 } 749 if (transportListener != null) { 750 transportListener.transportResumed(); 751 }else { 752 LOG.debug("transport resumed by transport listener not set"); 753 } 754 if (firstConnection) { 755 firstConnection=false; 756 LOG.info("Successfully connected to " + uri); 757 }else { 758 LOG.info("Successfully reconnected to " + uri); 759 } 760 connected=true; 761 return false; 762 } catch (Exception e) { 763 failure = e; 764 LOG.debug("Connect fail to: " + uri + ", reason: " + e); 765 if (t!=null) { 766 try { 767 t.stop(); 768 } catch (Exception ee) { 769 LOG.debug("Stop of failed transport: " + t + " failed with reason: " + ee); 770 } 771 } 772 } 773 } 774 } 775 } 776 int reconnectAttempts = 0; 777 if (firstConnection) { 778 if (this.startupMaxReconnectAttempts != 0) { 779 reconnectAttempts = this.startupMaxReconnectAttempts; 780 } 781 } 782 if (reconnectAttempts==0) { 783 reconnectAttempts = this.maxReconnectAttempts; 784 } 785 if (reconnectAttempts > 0 && ++connectFailures >= reconnectAttempts) { 786 LOG.error("Failed to connect to transport after: " + connectFailures + " attempt(s)"); 787 connectionFailure = failure; 788 789 // Make sure on initial startup, that the transportListener has been initialized 790 // for this instance. 791 synchronized(listenerMutex) { 792 if (transportListener==null) { 793 try { 794 listenerMutex.wait(2000); 795 }catch(InterruptedException ex) {} 796 } 797 } 798 799 800 if(transportListener != null) { 801 if (connectionFailure instanceof IOException) { 802 transportListener.onException((IOException)connectionFailure); 803 } else { 804 transportListener.onException(IOExceptionSupport.create(connectionFailure)); 805 } 806 } 807 reconnectMutex.notifyAll(); 808 return false; 809 } 810 } 811 if (!disposed) { 812 813 LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. "); 814 synchronized (sleepMutex) { 815 try { 816 sleepMutex.wait(reconnectDelay); 817 } catch (InterruptedException e) { 818 Thread.currentThread().interrupt(); 819 } 820 } 821 822 if (useExponentialBackOff) { 823 // Exponential increment of reconnect delay. 824 reconnectDelay *= backOffMultiplier; 825 if (reconnectDelay > maxReconnectDelay) { 826 reconnectDelay = maxReconnectDelay; 827 } 828 } 829 } 830 return !disposed; 831 } 832 833 834 final boolean buildBackups() { 835 synchronized (backupMutex) { 836 if (!disposed && backup && backups.size() < backupPoolSize) { 837 List<URI> connectList = getConnectList(); 838 //removed disposed backups 839 List<BackupTransport>disposedList = new ArrayList<BackupTransport>(); 840 for (BackupTransport bt:backups) { 841 if (bt.isDisposed()) { 842 disposedList.add(bt); 843 } 844 } 845 backups.removeAll(disposedList); 846 disposedList.clear(); 847 for (Iterator<URI>iter = connectList.iterator();iter.hasNext() && backups.size() < backupPoolSize;) { 848 URI uri = iter.next(); 849 if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) { 850 try { 851 BackupTransport bt = new BackupTransport(this); 852 bt.setUri(uri); 853 if (!backups.contains(bt)) { 854 Transport t = TransportFactory.compositeConnect(uri); 855 t.setTransportListener(bt); 856 t.start(); 857 bt.setTransport(t); 858 backups.add(bt); 859 } 860 } catch(Exception e) { 861 LOG.debug("Failed to build backup ",e); 862 } 863 } 864 } 865 } 866 } 867 return false; 868 } 869 870 public boolean isDisposed() { 871 return disposed; 872 } 873 874 875 public boolean isConnected() { 876 return connected; 877 } 878 879 public void reconnect(URI uri) throws IOException { 880 add(new URI[] {uri}); 881 } 882 883 public int getReceiveCounter() { 884 Transport transport = connectedTransport.get(); 885 if( transport == null ) { 886 return 0; 887 } 888 return transport.getReceiveCounter(); 889 } 890 }