001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq.transport.failover;
019    
020    import java.io.IOException;
021    import java.io.InterruptedIOException;
022    import java.net.URI;
023    import java.util.ArrayList;
024    import java.util.Iterator;
025    import java.util.LinkedHashMap;
026    import java.util.List;
027    import java.util.Map;
028    import java.util.concurrent.CopyOnWriteArrayList;
029    import java.util.concurrent.atomic.AtomicReference;
030    import org.apache.activemq.command.BrokerInfo;
031    import org.apache.activemq.command.Command;
032    import org.apache.activemq.command.ConnectionControl;
033    import org.apache.activemq.command.RemoveInfo;
034    import org.apache.activemq.command.Response;
035    import org.apache.activemq.state.ConnectionStateTracker;
036    import org.apache.activemq.state.Tracked;
037    import org.apache.activemq.thread.DefaultThreadPools;
038    import org.apache.activemq.thread.Task;
039    import org.apache.activemq.thread.TaskRunner;
040    import org.apache.activemq.transport.CompositeTransport;
041    import org.apache.activemq.transport.DefaultTransportListener;
042    import org.apache.activemq.transport.FutureResponse;
043    import org.apache.activemq.transport.ResponseCallback;
044    import org.apache.activemq.transport.Transport;
045    import org.apache.activemq.transport.TransportFactory;
046    import org.apache.activemq.transport.TransportListener;
047    import org.apache.activemq.util.IOExceptionSupport;
048    import org.apache.activemq.util.ServiceSupport;
049    import org.apache.commons.logging.Log;
050    import org.apache.commons.logging.LogFactory;
051    
052    /**
053     * A Transport that is made reliable by being able to fail over to another
054     * transport when a transport failure is detected.
055     * 
056     * @version $Revision$
057     */
058    public class FailoverTransport implements CompositeTransport {
059    
060        private static final Log LOG = LogFactory.getLog(FailoverTransport.class);
061    
062        private TransportListener transportListener;
063        private boolean disposed;
064        private boolean connected;
065        private final CopyOnWriteArrayList<URI> uris = new CopyOnWriteArrayList<URI>();
066    
067        private final Object reconnectMutex = new Object();
068        private final Object backupMutex = new Object();
069        private final Object sleepMutex = new Object();
070        private final Object listenerMutex = new Object();
071        private final ConnectionStateTracker stateTracker = new ConnectionStateTracker();
072        private final Map<Integer, Command> requestMap = new LinkedHashMap<Integer, Command>();
073    
074        private URI connectedTransportURI;
075        private URI failedConnectTransportURI;
076        private final AtomicReference<Transport> connectedTransport = new AtomicReference<Transport>();
077        private final TaskRunner reconnectTask;
078        private boolean started;
079    
080        private long initialReconnectDelay = 10;
081        private long maxReconnectDelay = 1000 * 30;
082        private double backOffMultiplier = 2d;
083        private long timeout = -1;
084        private boolean useExponentialBackOff = true;
085        private boolean randomize = true;
086        private boolean initialized;
087        private int maxReconnectAttempts;
088        private int startupMaxReconnectAttempts;
089        private int connectFailures;
090        private long reconnectDelay = this.initialReconnectDelay;
091        private Exception connectionFailure;
092        private boolean firstConnection = true;
093        //optionally always have a backup created
094        private boolean backup=false;
095        private List<BackupTransport> backups=new CopyOnWriteArrayList<BackupTransport>();
096        private int backupPoolSize=1;
097        private boolean trackMessages = false;
098        private boolean trackTransactionProducers = true;
099        private int maxCacheSize = 128 * 1024;
100        private TransportListener disposedListener = new DefaultTransportListener() {};
101        
102    
103        private final TransportListener myTransportListener = createTransportListener();
104    
105        public FailoverTransport() throws InterruptedIOException {
106    
107            stateTracker.setTrackTransactions(true);
108            // Setup a task that is used to reconnect the a connection async.
109            reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
110                public boolean iterate() {
111                    boolean result=false;
112                    boolean buildBackup=true;
113                    boolean doReconnect = !disposed;
114                    synchronized(backupMutex) {
115                            if (connectedTransport.get()==null && !disposed) {
116                                    result=doReconnect();
117                                    buildBackup=false;
118                            }
119                    }
120                    if(buildBackup) {
121                            buildBackups();
122                    }else {
123                            //build backups on the next iteration
124                            result=true;
125                            try {
126                            reconnectTask.wakeup();
127                        } catch (InterruptedException e) {
128                            LOG.debug("Reconnect task has been interrupted.", e);
129                        }
130                    }
131                    return result;
132                }
133    
134            }, "ActiveMQ Failover Worker: " + System.identityHashCode(this));
135        }
136    
137        TransportListener createTransportListener() {
138            return new TransportListener() {
139                public void onCommand(Object o) {
140                    Command command = (Command)o;
141                    if (command == null) {
142                        return;
143                    }
144                    if (command.isResponse()) {
145                        Object object = null;
146                        synchronized(requestMap) {
147                         object = requestMap.remove(Integer.valueOf(((Response)command).getCorrelationId()));
148                        }
149                        if (object != null && object.getClass() == Tracked.class) {
150                            ((Tracked)object).onResponses();
151                        }
152                    }
153                    if (!initialized) {
154                        if (command.isBrokerInfo()) {
155                            BrokerInfo info = (BrokerInfo)command;
156                            BrokerInfo[] peers = info.getPeerBrokerInfos();
157                            if (peers != null) {
158                                for (int i = 0; i < peers.length; i++) {
159                                    String brokerString = peers[i].getBrokerURL();
160                                    add(brokerString);
161                                }
162                            }
163                            initialized = true;
164                        }
165    
166                    }
167                    if (transportListener != null) {
168                        transportListener.onCommand(command);
169                    }
170                }
171    
172                public void onException(IOException error) {
173                    try {
174                        handleTransportFailure(error);
175                    } catch (InterruptedException e) {
176                        Thread.currentThread().interrupt();
177                        transportListener.onException(new InterruptedIOException());
178                    }
179                }
180    
181                public void transportInterupted() {
182                    if (transportListener != null) {
183                        transportListener.transportInterupted();
184                    }
185                }
186    
187                public void transportResumed() {
188                    if (transportListener != null) {
189                        transportListener.transportResumed();
190                    }
191                }
192            };
193        }
194    
195    
196        public final void handleTransportFailure(IOException e) throws InterruptedException {
197            
198            Transport transport = connectedTransport.getAndSet(null);
199            if( transport!=null ) {
200                
201                transport.setTransportListener(disposedListener);
202                ServiceSupport.dispose(transport);
203                
204                boolean reconnectOk = false;
205                synchronized (reconnectMutex) {
206                    if(started) {
207                        LOG.warn("Transport failed to " + connectedTransportURI+ " , attempting to automatically reconnect due to: " + e);
208                        LOG.debug("Transport failed with the following exception:", e);
209                        reconnectOk = true;
210                    }          
211                    initialized = false;
212                    failedConnectTransportURI=connectedTransportURI;
213                    connectedTransportURI = null;
214                    connected=false;
215                
216                    // notify before any reconnect attempt so ack state can be whacked
217                    if (transportListener != null) {
218                        transportListener.transportInterupted();
219                    }
220                
221                    if (reconnectOk) {
222                        reconnectTask.wakeup();
223                    }
224                }
225            }
226    
227        }
228    
229        public void start() throws Exception {
230            synchronized (reconnectMutex) {
231                LOG.debug("Started.");
232                if (started) {
233                    return;
234                }
235                started = true;
236                stateTracker.setMaxCacheSize(getMaxCacheSize());
237                stateTracker.setTrackMessages(isTrackMessages());
238                stateTracker.setTrackTransactionProducers(isTrackTransactionProducers());
239                if (connectedTransport.get() != null) {
240                    stateTracker.restore(connectedTransport.get());
241                } else {
242                    reconnect();
243                }
244            }
245        }
246    
247        public void stop() throws Exception {
248            Transport transportToStop=null;
249            synchronized (reconnectMutex) {
250                LOG.debug("Stopped.");
251                if (!started) {
252                    return;
253                }
254                started = false;
255                disposed = true;
256                connected = false;
257                for (BackupTransport t:backups) {
258                    t.setDisposed(true);
259                }
260                backups.clear();
261    
262                if (connectedTransport.get() != null) {
263                    transportToStop = connectedTransport.getAndSet(null);
264                }
265                reconnectMutex.notifyAll();
266            }
267            synchronized (sleepMutex) {
268                sleepMutex.notifyAll();
269            }
270            reconnectTask.shutdown();
271            if( transportToStop!=null ) {
272                transportToStop.stop();
273            }
274        }
275    
276        public long getInitialReconnectDelay() {
277            return initialReconnectDelay;
278        }
279    
280        public void setInitialReconnectDelay(long initialReconnectDelay) {
281            this.initialReconnectDelay = initialReconnectDelay;
282        }
283    
284        public long getMaxReconnectDelay() {
285            return maxReconnectDelay;
286        }
287    
288        public void setMaxReconnectDelay(long maxReconnectDelay) {
289            this.maxReconnectDelay = maxReconnectDelay;
290        }
291    
292        public long getReconnectDelay() {
293            return reconnectDelay;
294        }
295    
296        public void setReconnectDelay(long reconnectDelay) {
297            this.reconnectDelay = reconnectDelay;
298        }
299    
300        public double getReconnectDelayExponent() {
301            return backOffMultiplier;
302        }
303    
304        public void setReconnectDelayExponent(double reconnectDelayExponent) {
305            this.backOffMultiplier = reconnectDelayExponent;
306        }
307    
308        public Transport getConnectedTransport() {
309            return connectedTransport.get();
310        }
311    
312        public URI getConnectedTransportURI() {
313            return connectedTransportURI;
314        }
315    
316        public int getMaxReconnectAttempts() {
317            return maxReconnectAttempts;
318        }
319    
320        public void setMaxReconnectAttempts(int maxReconnectAttempts) {
321            this.maxReconnectAttempts = maxReconnectAttempts;
322        }
323        
324        public int getStartupMaxReconnectAttempts() {
325            return this.startupMaxReconnectAttempts;
326        }
327    
328        public void setStartupMaxReconnectAttempts(int startupMaxReconnectAttempts) {
329            this.startupMaxReconnectAttempts = startupMaxReconnectAttempts;
330        }
331    
332        public long getTimeout() {
333                    return timeout;
334            }
335    
336            public void setTimeout(long timeout) {
337                    this.timeout = timeout;
338            }
339    
340            /**
341         * @return Returns the randomize.
342         */
343        public boolean isRandomize() {
344            return randomize;
345        }
346    
347        /**
348         * @param randomize The randomize to set.
349         */
350        public void setRandomize(boolean randomize) {
351            this.randomize = randomize;
352        }
353        
354        public boolean isBackup() {
355                    return backup;
356            }
357    
358            public void setBackup(boolean backup) {
359                    this.backup = backup;
360            }
361    
362            public int getBackupPoolSize() {
363                    return backupPoolSize;
364            }
365    
366            public void setBackupPoolSize(int backupPoolSize) {
367                    this.backupPoolSize = backupPoolSize;
368            }
369            
370            public boolean isTrackMessages() {
371            return trackMessages;
372        }
373    
374        public void setTrackMessages(boolean trackMessages) {
375            this.trackMessages = trackMessages;
376        }
377    
378        public boolean isTrackTransactionProducers() {
379            return this.trackTransactionProducers;
380        }
381    
382        public void setTrackTransactionProducers(boolean trackTransactionProducers) {
383            this.trackTransactionProducers = trackTransactionProducers;
384        }
385    
386        public int getMaxCacheSize() {
387            return maxCacheSize;
388        }
389    
390        public void setMaxCacheSize(int maxCacheSize) {
391            this.maxCacheSize = maxCacheSize;
392        }
393            
394        /**
395         * @return Returns true if the command is one sent when a connection
396         * is being closed.
397         */
398        private boolean isShutdownCommand(Command command) {
399            return (command != null && (command.isShutdownInfo() || command instanceof RemoveInfo));
400        }
401             
402    
403        public void oneway(Object o) throws IOException {
404            
405            Command command = (Command)o;
406            Exception error = null;
407            try {
408    
409                synchronized (reconnectMutex) {
410                    
411                    if (isShutdownCommand(command) && connectedTransport.get() == null) {
412                        if(command.isShutdownInfo()) {
413                            // Skipping send of ShutdownInfo command when not connected.
414                            return;
415                        }
416                        if(command instanceof RemoveInfo || command.isMessageAck()) {
417                            // Simulate response to RemoveInfo command or ack (as it will be stale)
418                            stateTracker.track(command);
419                            Response response = new Response();
420                            response.setCorrelationId(command.getCommandId());
421                            myTransportListener.onCommand(response);
422                            return;
423                        }
424                    }
425                    // Keep trying until the message is sent.
426                    for (int i = 0; !disposed; i++) {
427                        try {
428    
429                            // Wait for transport to be connected.
430                            Transport transport = connectedTransport.get();
431                            long start = System.currentTimeMillis();
432                            boolean timedout = false;
433                            while (transport == null && !disposed
434                                    && connectionFailure == null
435                                    && !Thread.currentThread().isInterrupted()) {
436                                LOG.trace("Waiting for transport to reconnect..: " + command);
437                                long end = System.currentTimeMillis();
438                                if (timeout > 0 && (end - start > timeout)) {
439                                    timedout = true;
440                                    LOG.info("Failover timed out after " + (end - start) + "ms");
441                                    break;
442                                }
443                                try {
444                                    reconnectMutex.wait(100);
445                                } catch (InterruptedException e) {
446                                    Thread.currentThread().interrupt();
447                                    LOG.debug("Interupted: " + e, e);
448                                }
449                                transport = connectedTransport.get();
450                            }
451    
452                            if (transport == null) {
453                                // Previous loop may have exited due to use being
454                                // disposed.
455                                if (disposed) {
456                                    error = new IOException("Transport disposed.");
457                                } else if (connectionFailure != null) {
458                                    error = connectionFailure;
459                                } else if (timedout == true) {
460                                    error = new IOException("Failover timeout of " + timeout + " ms reached.");
461                                }else {
462                                    error = new IOException("Unexpected failure.");
463                                }
464                                break;
465                            }
466    
467                            // If it was a request and it was not being tracked by
468                            // the state tracker,
469                            // then hold it in the requestMap so that we can replay
470                            // it later.
471                            Tracked tracked = stateTracker.track(command);
472                            synchronized(requestMap) {
473                                if (tracked != null && tracked.isWaitingForResponse()) {
474                                    requestMap.put(Integer.valueOf(command.getCommandId()), tracked);
475                                } else if (tracked == null && command.isResponseRequired()) {
476                                    requestMap.put(Integer.valueOf(command.getCommandId()), command);
477                                }
478                            }
479    
480                            // Send the message.
481                            try {
482                                transport.oneway(command);
483                                stateTracker.trackBack(command);
484                            } catch (IOException e) {
485    
486                                // If the command was not tracked.. we will retry in
487                                // this method
488                                if (tracked == null) {
489    
490                                    // since we will retry in this method.. take it
491                                    // out of the request
492                                    // map so that it is not sent 2 times on
493                                    // recovery
494                                    if (command.isResponseRequired()) {
495                                        requestMap.remove(Integer.valueOf(command.getCommandId()));
496                                    }
497    
498                                    // Rethrow the exception so it will handled by
499                                    // the outer catch
500                                    throw e;
501                                }
502    
503                            }
504    
505                            return;
506    
507                        } catch (IOException e) {
508                            if (LOG.isDebugEnabled()) {
509                                LOG.debug("Send oneway attempt: " + i + " failed for command:" + command);
510                            }
511                            handleTransportFailure(e);
512                        }
513                    }
514                }
515            } catch (InterruptedException e) {
516                // Some one may be trying to stop our thread.
517                Thread.currentThread().interrupt();
518                throw new InterruptedIOException();
519            }
520            if (!disposed) {
521                if (error != null) {
522                    if (error instanceof IOException) {
523                        throw (IOException)error;
524                    }
525                    throw IOExceptionSupport.create(error);
526                }
527            }
528        }
529    
530        public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
531            throw new AssertionError("Unsupported Method");
532        }
533    
534        public Object request(Object command) throws IOException {
535            throw new AssertionError("Unsupported Method");
536        }
537    
538        public Object request(Object command, int timeout) throws IOException {
539            throw new AssertionError("Unsupported Method");
540        }
541    
542        public void add(URI u[]) {
543            for (int i = 0; i < u.length; i++) {
544                if (!uris.contains(u[i])) {
545                    uris.add(u[i]);
546                }
547            }
548            reconnect();
549        }
550    
551        public void remove(URI u[]) {
552            for (int i = 0; i < u.length; i++) {
553                uris.remove(u[i]);
554            }
555            reconnect();
556        }
557    
558        public void add(String u) {
559            try {
560                URI uri = new URI(u);
561                if (!uris.contains(uri)) {
562                    uris.add(uri);
563                }
564    
565                reconnect();
566            } catch (Exception e) {
567                LOG.error("Failed to parse URI: " + u);
568            }
569        }
570    
571        public void reconnect() {
572            synchronized (reconnectMutex) {
573                if (started) {
574                    LOG.debug("Waking up reconnect task");
575                    try {
576                        reconnectTask.wakeup();
577                    } catch (InterruptedException e) {
578                        Thread.currentThread().interrupt();
579                    }
580                } else {
581                    LOG.debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.");
582                }
583            }
584        }
585    
586        private List<URI> getConnectList() {
587            ArrayList<URI> l = new ArrayList<URI>(uris);
588            boolean removed = false;
589            if (failedConnectTransportURI != null) {
590                removed = l.remove(failedConnectTransportURI);
591            }
592            if (randomize) {
593                // Randomly, reorder the list by random swapping
594                for (int i = 0; i < l.size(); i++) {
595                    int p = (int) (Math.random()*100 % l.size());
596                    URI t = l.get(p);
597                    l.set(p, l.get(i));
598                    l.set(i, t);
599                }
600            }
601            if (removed) {
602                l.add(failedConnectTransportURI);
603            }
604            LOG.debug("urlList connectionList:" + l);
605            return l;
606        }
607    
608        public TransportListener getTransportListener() {
609            return transportListener;
610        }
611    
612        public void setTransportListener(TransportListener commandListener) {
613            synchronized(listenerMutex) {
614                this.transportListener = commandListener;
615                listenerMutex.notifyAll();
616            }
617        }
618    
619        public <T> T narrow(Class<T> target) {
620    
621            if (target.isAssignableFrom(getClass())) {
622                return target.cast(this);
623            }
624            Transport transport = connectedTransport.get();
625            if ( transport != null) {
626                return transport.narrow(target);
627            }
628            return null;
629    
630        }
631    
632        protected void restoreTransport(Transport t) throws Exception, IOException {
633            t.start();
634            //send information to the broker - informing it we are an ft client
635            ConnectionControl cc = new ConnectionControl();
636            cc.setFaultTolerant(true);
637            t.oneway(cc);
638            stateTracker.restore(t);
639            Map tmpMap = null;
640            synchronized(requestMap) {
641                tmpMap = new LinkedHashMap<Integer, Command>(requestMap);
642            }
643            for (Iterator<Command> iter2 = tmpMap.values().iterator(); iter2.hasNext();) {
644                Command command = iter2.next();
645                if (LOG.isTraceEnabled()) {
646                    LOG.trace("restore, replay: " + command);
647                }
648                t.oneway(command);
649            }
650        }
651    
652        public boolean isUseExponentialBackOff() {
653            return useExponentialBackOff;
654        }
655    
656        public void setUseExponentialBackOff(boolean useExponentialBackOff) {
657            this.useExponentialBackOff = useExponentialBackOff;
658        }
659    
660        public String toString() {
661            return connectedTransportURI == null ? "unconnected" : connectedTransportURI.toString();
662        }
663    
664        public String getRemoteAddress() {
665            Transport transport = connectedTransport.get();
666            if ( transport != null) {
667                return transport.getRemoteAddress();
668            }
669            return null;
670        }
671    
672        public boolean isFaultTolerant() {
673            return true;
674        }
675        
676       final boolean doReconnect() {
677            Exception failure = null;
678            synchronized (reconnectMutex) {
679    
680                if (disposed || connectionFailure != null) {
681                    reconnectMutex.notifyAll();
682                }
683    
684                if (connectedTransport.get() != null || disposed || connectionFailure != null) {
685                    return false;
686                } else {
687                    List<URI> connectList = getConnectList();
688                    if (connectList.isEmpty()) {
689                        failure = new IOException("No uris available to connect to.");
690                    } else {
691                        if (!useExponentialBackOff) {
692                            reconnectDelay = initialReconnectDelay;
693                        }
694                        synchronized(backupMutex) {
695                            if (backup && !backups.isEmpty()) {
696                                    BackupTransport bt = backups.remove(0);
697                                Transport t = bt.getTransport();
698                                URI uri = bt.getUri();
699                                t.setTransportListener(myTransportListener);
700                                try {
701                                    if (started) { 
702                                        restoreTransport(t);  
703                                    }
704                                    reconnectDelay = initialReconnectDelay;
705                                    failedConnectTransportURI=null;
706                                    connectedTransportURI = uri;
707                                    connectedTransport.set(t);
708                                    reconnectMutex.notifyAll();
709                                    connectFailures = 0;
710                                    LOG.info("Successfully reconnected to backup " + uri);
711                                    return false;
712                                }catch (Exception e) {
713                                    LOG.debug("Backup transport failed",e);
714                                 }
715                            }
716                        }
717                        
718                        Iterator<URI> iter = connectList.iterator();
719                        while(iter.hasNext() && connectedTransport.get() == null && !disposed) {
720                            URI uri = iter.next();
721                            Transport t = null;
722                            try {
723                                LOG.debug("Attempting connect to: " + uri);
724                                t = TransportFactory.compositeConnect(uri);
725                                t.setTransportListener(myTransportListener);
726                                t.start();
727                                
728                                if (started) {
729                                    restoreTransport(t);
730                                }
731    
732                                LOG.debug("Connection established");
733                                reconnectDelay = initialReconnectDelay;
734                                connectedTransportURI = uri;
735                                connectedTransport.set(t);
736                                reconnectMutex.notifyAll();
737                                connectFailures = 0;
738                             // Make sure on initial startup, that the transportListener 
739                             // has been initialized for this instance.
740                                synchronized(listenerMutex) {
741                                    if (transportListener==null) {
742                                        try {
743                                            //if it isn't set after 2secs - it
744                                            //probably never will be
745                                            listenerMutex.wait(2000);
746                                        }catch(InterruptedException ex) {}
747                                    }
748                                }
749                                if (transportListener != null) {
750                                    transportListener.transportResumed();
751                                }else {
752                                    LOG.debug("transport resumed by transport listener not set");
753                                }
754                                if (firstConnection) {
755                                    firstConnection=false;
756                                    LOG.info("Successfully connected to " + uri);
757                                }else {
758                                    LOG.info("Successfully reconnected to " + uri);
759                                }
760                                connected=true;
761                                return false;
762                            } catch (Exception e) {
763                                failure = e;
764                                LOG.debug("Connect fail to: " + uri + ", reason: " + e);
765                                if (t!=null) {
766                                    try {
767                                        t.stop();       
768                                    } catch (Exception ee) {
769                                        LOG.debug("Stop of failed transport: " + t + " failed with reason: " + ee);
770                                    }
771                                }
772                            }
773                        }
774                    }
775                }
776                int reconnectAttempts = 0;
777                if (firstConnection) {
778                    if (this.startupMaxReconnectAttempts != 0) {
779                        reconnectAttempts = this.startupMaxReconnectAttempts;
780                    }
781                }
782                if (reconnectAttempts==0) {
783                    reconnectAttempts = this.maxReconnectAttempts;
784                }            
785                if (reconnectAttempts > 0 && ++connectFailures >= reconnectAttempts) {
786                    LOG.error("Failed to connect to transport after: " + connectFailures + " attempt(s)");
787                    connectionFailure = failure;
788            
789                    // Make sure on initial startup, that the transportListener has been initialized
790                    // for this instance.
791                    synchronized(listenerMutex) {
792                        if (transportListener==null) {
793                            try {
794                                listenerMutex.wait(2000);
795                            }catch(InterruptedException ex) {}
796                        }
797                    }
798    
799              
800                    if(transportListener != null) {
801                        if (connectionFailure instanceof IOException) {
802                            transportListener.onException((IOException)connectionFailure);
803                        } else {
804                            transportListener.onException(IOExceptionSupport.create(connectionFailure));
805                        }
806                    }        
807                    reconnectMutex.notifyAll();
808                    return false;
809                }
810            }
811            if (!disposed) {
812    
813                LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
814                synchronized (sleepMutex) {
815                    try {
816                        sleepMutex.wait(reconnectDelay);
817                    } catch (InterruptedException e) {
818                        Thread.currentThread().interrupt();
819                    }
820                }
821    
822                if (useExponentialBackOff) {
823                    // Exponential increment of reconnect delay.
824                    reconnectDelay *= backOffMultiplier;
825                    if (reconnectDelay > maxReconnectDelay) {
826                        reconnectDelay = maxReconnectDelay;
827                    }
828                }
829            }
830            return !disposed;
831        }
832    
833       
834       final boolean buildBackups() {
835               synchronized (backupMutex) {
836                       if (!disposed && backup && backups.size() < backupPoolSize) {
837                               List<URI> connectList = getConnectList();
838                               //removed disposed backups
839                               List<BackupTransport>disposedList = new ArrayList<BackupTransport>();
840                               for (BackupTransport bt:backups) {
841                                       if (bt.isDisposed()) {
842                                               disposedList.add(bt);
843                                       }
844                               }
845                               backups.removeAll(disposedList);
846                               disposedList.clear();
847                               for (Iterator<URI>iter = connectList.iterator();iter.hasNext() && backups.size() < backupPoolSize;) {
848                                       URI uri = iter.next();
849                                       if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) {
850                                               try {
851                                                       BackupTransport bt = new BackupTransport(this);
852                                                       bt.setUri(uri);
853                                                       if (!backups.contains(bt)) {
854                                                               Transport t = TransportFactory.compositeConnect(uri);
855                                           t.setTransportListener(bt);
856                                           t.start();
857                                           bt.setTransport(t);
858                                           backups.add(bt);
859                                                       }
860                                               } catch(Exception e) {
861                                                       LOG.debug("Failed to build backup ",e);
862                                               }
863                                       }
864                               }
865                       }
866               }
867               return false;
868       }
869    
870        public boolean isDisposed() {
871            return disposed;
872        }
873        
874        
875        public boolean isConnected() {
876            return connected;
877        }
878        
879        public void reconnect(URI uri) throws IOException {
880            add(new URI[] {uri});
881        }
882    
883        public int getReceiveCounter() {
884            Transport transport = connectedTransport.get();
885            if( transport == null ) {
886                return 0;
887            }
888            return transport.getReceiveCounter();
889        }
890    }