001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.io.IOException;
020    import java.io.InputStream;
021    import java.io.OutputStream;
022    import java.net.URI;
023    import java.net.URISyntaxException;
024    import java.util.HashMap;
025    import java.util.Iterator;
026    import java.util.Map;
027    import java.util.concurrent.ConcurrentHashMap;
028    import java.util.concurrent.CopyOnWriteArrayList;
029    import java.util.concurrent.CountDownLatch;
030    import java.util.concurrent.LinkedBlockingQueue;
031    import java.util.concurrent.ThreadFactory;
032    import java.util.concurrent.ThreadPoolExecutor;
033    import java.util.concurrent.TimeUnit;
034    import java.util.concurrent.atomic.AtomicBoolean;
035    import java.util.concurrent.atomic.AtomicInteger;
036    
037    import javax.jms.Connection;
038    import javax.jms.ConnectionConsumer;
039    import javax.jms.ConnectionMetaData;
040    import javax.jms.DeliveryMode;
041    import javax.jms.Destination;
042    import javax.jms.ExceptionListener;
043    import javax.jms.IllegalStateException;
044    import javax.jms.JMSException;
045    import javax.jms.Queue;
046    import javax.jms.QueueConnection;
047    import javax.jms.QueueSession;
048    import javax.jms.ServerSessionPool;
049    import javax.jms.Session;
050    import javax.jms.Topic;
051    import javax.jms.TopicConnection;
052    import javax.jms.TopicSession;
053    import javax.jms.XAConnection;
054    import javax.jms.InvalidDestinationException;
055    
056    import org.apache.activemq.blob.BlobTransferPolicy;
057    import org.apache.activemq.command.ActiveMQDestination;
058    import org.apache.activemq.command.ActiveMQMessage;
059    import org.apache.activemq.command.ActiveMQTempDestination;
060    import org.apache.activemq.command.ActiveMQTempQueue;
061    import org.apache.activemq.command.ActiveMQTempTopic;
062    import org.apache.activemq.command.BrokerInfo;
063    import org.apache.activemq.command.Command;
064    import org.apache.activemq.command.CommandTypes;
065    import org.apache.activemq.command.ConnectionControl;
066    import org.apache.activemq.command.ConnectionError;
067    import org.apache.activemq.command.ConnectionId;
068    import org.apache.activemq.command.ConnectionInfo;
069    import org.apache.activemq.command.ConsumerControl;
070    import org.apache.activemq.command.ConsumerId;
071    import org.apache.activemq.command.ConsumerInfo;
072    import org.apache.activemq.command.ControlCommand;
073    import org.apache.activemq.command.DestinationInfo;
074    import org.apache.activemq.command.ExceptionResponse;
075    import org.apache.activemq.command.Message;
076    import org.apache.activemq.command.MessageDispatch;
077    import org.apache.activemq.command.MessageId;
078    import org.apache.activemq.command.ProducerAck;
079    import org.apache.activemq.command.ProducerId;
080    import org.apache.activemq.command.RemoveInfo;
081    import org.apache.activemq.command.RemoveSubscriptionInfo;
082    import org.apache.activemq.command.Response;
083    import org.apache.activemq.command.SessionId;
084    import org.apache.activemq.command.ShutdownInfo;
085    import org.apache.activemq.command.WireFormatInfo;
086    import org.apache.activemq.management.JMSConnectionStatsImpl;
087    import org.apache.activemq.management.JMSStatsImpl;
088    import org.apache.activemq.management.StatsCapable;
089    import org.apache.activemq.management.StatsImpl;
090    import org.apache.activemq.state.CommandVisitorAdapter;
091    import org.apache.activemq.thread.TaskRunnerFactory;
092    import org.apache.activemq.transport.Transport;
093    import org.apache.activemq.transport.TransportListener;
094    import org.apache.activemq.util.IdGenerator;
095    import org.apache.activemq.util.IntrospectionSupport;
096    import org.apache.activemq.util.JMSExceptionSupport;
097    import org.apache.activemq.util.LongSequenceGenerator;
098    import org.apache.activemq.util.ServiceSupport;
099    import org.apache.activemq.advisory.DestinationSource;
100    import org.apache.commons.logging.Log;
101    import org.apache.commons.logging.LogFactory;
102    
103    public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener, EnhancedConnection {
104    
105        public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
106        public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
107        public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
108    
109        private static final Log LOG = LogFactory.getLog(ActiveMQConnection.class);
110        private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
111    
112        public final ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>();
113    
114        protected boolean dispatchAsync=true;
115        protected boolean alwaysSessionAsync = true;
116    
117        private TaskRunnerFactory sessionTaskRunner;
118        private final ThreadPoolExecutor asyncConnectionThread;
119    
120        // Connection state variables
121        private final ConnectionInfo info;
122        private ExceptionListener exceptionListener;
123        private ClientInternalExceptionListener clientInternalExceptionListener;
124        private boolean clientIDSet;
125        private boolean isConnectionInfoSentToBroker;
126        private boolean userSpecifiedClientID;
127    
128        // Configuration options variables
129        private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
130        private BlobTransferPolicy blobTransferPolicy;
131        private RedeliveryPolicy redeliveryPolicy;
132        private MessageTransformer transformer;
133    
134        private boolean disableTimeStampsByDefault;
135        private boolean optimizedMessageDispatch = true;
136        private boolean copyMessageOnSend = true;
137        private boolean useCompression;
138        private boolean objectMessageSerializationDefered;
139        private boolean useAsyncSend;
140        private boolean optimizeAcknowledge;
141        private boolean nestedMapAndListEnabled = true;
142        private boolean useRetroactiveConsumer;
143        private boolean exclusiveConsumer;
144        private boolean alwaysSyncSend;
145        private int closeTimeout = 15000;
146        private boolean watchTopicAdvisories = true;
147        private long warnAboutUnstartedConnectionTimeout = 500L;
148        private int sendTimeout =0;
149        private boolean sendAcksAsync=true;
150    
151        private final Transport transport;
152        private final IdGenerator clientIdGenerator;
153        private final JMSStatsImpl factoryStats;
154        private final JMSConnectionStatsImpl stats;
155    
156        private final AtomicBoolean started = new AtomicBoolean(false);
157        private final AtomicBoolean closing = new AtomicBoolean(false);
158        private final AtomicBoolean closed = new AtomicBoolean(false);
159        private final AtomicBoolean transportFailed = new AtomicBoolean(false);
160        private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>();
161        private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>();
162        private final CopyOnWriteArrayList<ActiveMQInputStream> inputStreams = new CopyOnWriteArrayList<ActiveMQInputStream>();
163        private final CopyOnWriteArrayList<ActiveMQOutputStream> outputStreams = new CopyOnWriteArrayList<ActiveMQOutputStream>();
164        private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>();
165    
166        // Maps ConsumerIds to ActiveMQConsumer objects
167        private final ConcurrentHashMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>();
168        private final ConcurrentHashMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>();
169        private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
170        private final SessionId connectionSessionId;
171        private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
172        private final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
173        private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator();
174        private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator();
175    
176        private AdvisoryConsumer advisoryConsumer;
177        private final CountDownLatch brokerInfoReceived = new CountDownLatch(1);
178        private BrokerInfo brokerInfo;
179        private IOException firstFailureError;
180        private int producerWindowSize = ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE;
181    
182        // Assume that protocol is the latest. Change to the actual protocol
183        // version when a WireFormatInfo is received.
184        private AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
185        private long timeCreated;
186        private ConnectionAudit connectionAudit = new ConnectionAudit();
187        private DestinationSource destinationSource;
188        private final Object ensureConnectionInfoSentMutex = new Object();
189        private boolean useDedicatedTaskRunner;
190        protected CountDownLatch transportInterruptionProcessingComplete;
191        private long consumerFailoverRedeliveryWaitPeriod;
192    
193        /**
194         * Construct an <code>ActiveMQConnection</code>
195         * 
196         * @param transport
197         * @param factoryStats
198         * @throws Exception
199         */
200        protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, JMSStatsImpl factoryStats) throws Exception {
201    
202            this.transport = transport;
203            this.clientIdGenerator = clientIdGenerator;
204            this.factoryStats = factoryStats;
205    
206            // Configure a single threaded executor who's core thread can timeout if
207            // idle
208            asyncConnectionThread = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
209                public Thread newThread(Runnable r) {
210                    Thread thread = new Thread(r, "ActiveMQ Connection Worker: " + transport);
211                    thread.setDaemon(true);
212                    return thread;
213                }
214            });
215            // asyncConnectionThread.allowCoreThreadTimeOut(true);
216    
217            this.info = new ConnectionInfo(new ConnectionId(CONNECTION_ID_GENERATOR.generateId()));
218            this.info.setManageable(true);
219            this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
220    
221            this.transport.setTransportListener(this);
222    
223            this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
224            this.factoryStats.addConnection(this);
225            this.timeCreated = System.currentTimeMillis();
226            this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
227        }
228    
229        protected void setUserName(String userName) {
230            this.info.setUserName(userName);
231        }
232    
233        protected void setPassword(String password) {
234            this.info.setPassword(password);
235        }
236    
237        /**
238         * A static helper method to create a new connection
239         * 
240         * @return an ActiveMQConnection
241         * @throws JMSException
242         */
243        public static ActiveMQConnection makeConnection() throws JMSException {
244            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
245            return (ActiveMQConnection)factory.createConnection();
246        }
247    
248        /**
249         * A static helper method to create a new connection
250         * 
251         * @param uri
252         * @return and ActiveMQConnection
253         * @throws JMSException
254         */
255        public static ActiveMQConnection makeConnection(String uri) throws JMSException, URISyntaxException {
256            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
257            return (ActiveMQConnection)factory.createConnection();
258        }
259    
260        /**
261         * A static helper method to create a new connection
262         * 
263         * @param user
264         * @param password
265         * @param uri
266         * @return an ActiveMQConnection
267         * @throws JMSException
268         */
269        public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException, URISyntaxException {
270            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI(uri));
271            return (ActiveMQConnection)factory.createConnection();
272        }
273    
274        /**
275         * @return a number unique for this connection
276         */
277        public JMSConnectionStatsImpl getConnectionStats() {
278            return stats;
279        }
280    
281        /**
282         * Creates a <CODE>Session</CODE> object.
283         * 
284         * @param transacted indicates whether the session is transacted
285         * @param acknowledgeMode indicates whether the consumer or the client will
286         *                acknowledge any messages it receives; ignored if the
287         *                session is transacted. Legal values are
288         *                <code>Session.AUTO_ACKNOWLEDGE</code>,
289         *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
290         *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
291         * @return a newly created session
292         * @throws JMSException if the <CODE>Connection</CODE> object fails to
293         *                 create a session due to some internal error or lack of
294         *                 support for the specific transaction and acknowledgement
295         *                 mode.
296         * @see Session#AUTO_ACKNOWLEDGE
297         * @see Session#CLIENT_ACKNOWLEDGE
298         * @see Session#DUPS_OK_ACKNOWLEDGE
299         * @since 1.1
300         */
301        public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
302            checkClosedOrFailed();
303            ensureConnectionInfoSent();
304            if(!transacted) {
305                if (acknowledgeMode==Session.SESSION_TRANSACTED) {
306                    throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
307                } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) {
308                    throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " +
309                            "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)");
310                }
311            }
312            return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED
313                ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync());
314        }
315    
316        /**
317         * @return sessionId
318         */
319        protected SessionId getNextSessionId() {
320            return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId());
321        }
322    
323        /**
324         * Gets the client identifier for this connection.
325         * <P>
326         * This value is specific to the JMS provider. It is either preconfigured by
327         * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
328         * dynamically by the application by calling the <code>setClientID</code>
329         * method.
330         * 
331         * @return the unique client identifier
332         * @throws JMSException if the JMS provider fails to return the client ID
333         *                 for this connection due to some internal error.
334         */
335        public String getClientID() throws JMSException {
336            checkClosedOrFailed();
337            return this.info.getClientId();
338        }
339    
340        /**
341         * Sets the client identifier for this connection.
342         * <P>
343         * The preferred way to assign a JMS client's client identifier is for it to
344         * be configured in a client-specific <CODE>ConnectionFactory</CODE>
345         * object and transparently assigned to the <CODE>Connection</CODE> object
346         * it creates.
347         * <P>
348         * Alternatively, a client can set a connection's client identifier using a
349         * provider-specific value. The facility to set a connection's client
350         * identifier explicitly is not a mechanism for overriding the identifier
351         * that has been administratively configured. It is provided for the case
352         * where no administratively specified identifier exists. If one does exist,
353         * an attempt to change it by setting it must throw an
354         * <CODE>IllegalStateException</CODE>. If a client sets the client
355         * identifier explicitly, it must do so immediately after it creates the
356         * connection and before any other action on the connection is taken. After
357         * this point, setting the client identifier is a programming error that
358         * should throw an <CODE>IllegalStateException</CODE>.
359         * <P>
360         * The purpose of the client identifier is to associate a connection and its
361         * objects with a state maintained on behalf of the client by a provider.
362         * The only such state identified by the JMS API is that required to support
363         * durable subscriptions.
364         * <P>
365         * If another connection with the same <code>clientID</code> is already
366         * running when this method is called, the JMS provider should detect the
367         * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
368         * 
369         * @param newClientID the unique client identifier
370         * @throws JMSException if the JMS provider fails to set the client ID for
371         *                 this connection due to some internal error.
372         * @throws javax.jms.InvalidClientIDException if the JMS client specifies an
373         *                 invalid or duplicate client ID.
374         * @throws javax.jms.IllegalStateException if the JMS client attempts to set
375         *                 a connection's client ID at the wrong time or when it has
376         *                 been administratively configured.
377         */
378        public void setClientID(String newClientID) throws JMSException {
379            checkClosedOrFailed();
380    
381            if (this.clientIDSet) {
382                throw new IllegalStateException("The clientID has already been set");
383            }
384    
385            if (this.isConnectionInfoSentToBroker) {
386                throw new IllegalStateException("Setting clientID on a used Connection is not allowed");
387            }
388    
389            this.info.setClientId(newClientID);
390            this.userSpecifiedClientID = true;
391            ensureConnectionInfoSent();
392        }
393    
394        /**
395         * Sets the default client id that the connection will use if explicitly not
396         * set with the setClientId() call.
397         */
398        public void setDefaultClientID(String clientID) throws JMSException {
399            this.info.setClientId(clientID);
400            this.userSpecifiedClientID = true;
401        }
402    
403        /**
404         * Gets the metadata for this connection.
405         * 
406         * @return the connection metadata
407         * @throws JMSException if the JMS provider fails to get the connection
408         *                 metadata for this connection.
409         * @see javax.jms.ConnectionMetaData
410         */
411        public ConnectionMetaData getMetaData() throws JMSException {
412            checkClosedOrFailed();
413            return ActiveMQConnectionMetaData.INSTANCE;
414        }
415    
416        /**
417         * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
418         * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
419         * associated with it.
420         * 
421         * @return the <CODE>ExceptionListener</CODE> for this connection, or
422         *         null, if no <CODE>ExceptionListener</CODE> is associated with
423         *         this connection.
424         * @throws JMSException if the JMS provider fails to get the
425         *                 <CODE>ExceptionListener</CODE> for this connection.
426         * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
427         */
428        public ExceptionListener getExceptionListener() throws JMSException {
429            checkClosedOrFailed();
430            return this.exceptionListener;
431        }
432    
433        /**
434         * Sets an exception listener for this connection.
435         * <P>
436         * If a JMS provider detects a serious problem with a connection, it informs
437         * the connection's <CODE> ExceptionListener</CODE>, if one has been
438         * registered. It does this by calling the listener's <CODE>onException
439         * </CODE>
440         * method, passing it a <CODE>JMSException</CODE> object describing the
441         * problem.
442         * <P>
443         * An exception listener allows a client to be notified of a problem
444         * asynchronously. Some connections only consume messages, so they would
445         * have no other way to learn their connection has failed.
446         * <P>
447         * A connection serializes execution of its <CODE>ExceptionListener</CODE>.
448         * <P>
449         * A JMS provider should attempt to resolve connection problems itself
450         * before it notifies the client of them.
451         * 
452         * @param listener the exception listener
453         * @throws JMSException if the JMS provider fails to set the exception
454         *                 listener for this connection.
455         */
456        public void setExceptionListener(ExceptionListener listener) throws JMSException {
457            checkClosedOrFailed();
458            this.exceptionListener = listener;
459        }
460    
461        /**
462         * Gets the <code>ClientInternalExceptionListener</code> object for this connection.
463         * Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE>
464         * associated with it.
465         * 
466         * @return the listener or <code>null</code> if no listener is registered with the connection.
467         */
468        public ClientInternalExceptionListener getClientInternalExceptionListener()
469        {
470            return clientInternalExceptionListener;
471        }
472    
473        /**
474         * Sets a client internal exception listener for this connection.
475         * The connection will notify the listener, if one has been registered, of exceptions thrown by container components
476         * (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message.
477         * It does this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code>
478         * describing the problem.
479         * 
480         * @param listener the exception listener
481         */
482        public void setClientInternalExceptionListener(ClientInternalExceptionListener listener)
483        {
484            this.clientInternalExceptionListener = listener;
485        }
486        
487        /**
488         * Starts (or restarts) a connection's delivery of incoming messages. A call
489         * to <CODE>start</CODE> on a connection that has already been started is
490         * ignored.
491         * 
492         * @throws JMSException if the JMS provider fails to start message delivery
493         *                 due to some internal error.
494         * @see javax.jms.Connection#stop()
495         */
496        public void start() throws JMSException {
497            checkClosedOrFailed();
498            ensureConnectionInfoSent();
499            if (started.compareAndSet(false, true)) {
500                for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
501                    ActiveMQSession session = i.next();
502                    session.start();
503                }
504            }
505        }
506    
507        /**
508         * Temporarily stops a connection's delivery of incoming messages. Delivery
509         * can be restarted using the connection's <CODE>start</CODE> method. When
510         * the connection is stopped, delivery to all the connection's message
511         * consumers is inhibited: synchronous receives block, and messages are not
512         * delivered to message listeners.
513         * <P>
514         * This call blocks until receives and/or message listeners in progress have
515         * completed.
516         * <P>
517         * Stopping a connection has no effect on its ability to send messages. A
518         * call to <CODE>stop</CODE> on a connection that has already been stopped
519         * is ignored.
520         * <P>
521         * A call to <CODE>stop</CODE> must not return until delivery of messages
522         * has paused. This means that a client can rely on the fact that none of
523         * its message listeners will be called and that all threads of control
524         * waiting for <CODE>receive</CODE> calls to return will not return with a
525         * message until the connection is restarted. The receive timers for a
526         * stopped connection continue to advance, so receives may time out while
527         * the connection is stopped.
528         * <P>
529         * If message listeners are running when <CODE>stop</CODE> is invoked, the
530         * <CODE>stop</CODE> call must wait until all of them have returned before
531         * it may return. While these message listeners are completing, they must
532         * have the full services of the connection available to them.
533         * 
534         * @throws JMSException if the JMS provider fails to stop message delivery
535         *                 due to some internal error.
536         * @see javax.jms.Connection#start()
537         */
538        public void stop() throws JMSException {
539            checkClosedOrFailed();
540            if (started.compareAndSet(true, false)) {
541                synchronized(sessions) {
542                    for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
543                        ActiveMQSession s = i.next();
544                        s.stop();
545                    }
546                }
547            }
548        }
549    
550        /**
551         * Closes the connection.
552         * <P>
553         * Since a provider typically allocates significant resources outside the
554         * JVM on behalf of a connection, clients should close these resources when
555         * they are not needed. Relying on garbage collection to eventually reclaim
556         * these resources may not be timely enough.
557         * <P>
558         * There is no need to close the sessions, producers, and consumers of a
559         * closed connection.
560         * <P>
561         * Closing a connection causes all temporary destinations to be deleted.
562         * <P>
563         * When this method is invoked, it should not return until message
564         * processing has been shut down in an orderly fashion. This means that all
565         * message listeners that may have been running have returned, and that all
566         * pending receives have returned. A close terminates all pending message
567         * receives on the connection's sessions' consumers. The receives may return
568         * with a message or with null, depending on whether there was a message
569         * available at the time of the close. If one or more of the connection's
570         * sessions' message listeners is processing a message at the time when
571         * connection <CODE>close</CODE> is invoked, all the facilities of the
572         * connection and its sessions must remain available to those listeners
573         * until they return control to the JMS provider.
574         * <P>
575         * Closing a connection causes any of its sessions' transactions in progress
576         * to be rolled back. In the case where a session's work is coordinated by
577         * an external transaction manager, a session's <CODE>commit</CODE> and
578         * <CODE> rollback</CODE> methods are not used and the result of a closed
579         * session's work is determined later by the transaction manager. Closing a
580         * connection does NOT force an acknowledgment of client-acknowledged
581         * sessions.
582         * <P>
583         * Invoking the <CODE>acknowledge</CODE> method of a received message from
584         * a closed connection's session must throw an
585         * <CODE>IllegalStateException</CODE>. Closing a closed connection must
586         * NOT throw an exception.
587         * 
588         * @throws JMSException if the JMS provider fails to close the connection
589         *                 due to some internal error. For example, a failure to
590         *                 release resources or to close a socket connection can
591         *                 cause this exception to be thrown.
592         */
593        public void close() throws JMSException {
594            try {
595                // If we were running, lets stop first.
596                if (!closed.get() && !transportFailed.get()) {
597                    stop();
598                }
599    
600                synchronized (this) {
601                    if (!closed.get()) {
602                        closing.set(true);
603    
604                        if (destinationSource != null) {
605                            destinationSource.stop();
606                            destinationSource = null;
607                        }
608                        if (advisoryConsumer != null) {
609                            advisoryConsumer.dispose();
610                            advisoryConsumer = null;
611                        }
612    
613                        long lastDeliveredSequenceId = 0;
614                        for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
615                            ActiveMQSession s = i.next();
616                            s.dispose();
617                            lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, s.getLastDeliveredSequenceId());
618                        }
619                        for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
620                            ActiveMQConnectionConsumer c = i.next();
621                            c.dispose();
622                        }
623                        for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
624                            ActiveMQInputStream c = i.next();
625                            c.dispose();
626                        }
627                        for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
628                            ActiveMQOutputStream c = i.next();
629                            c.dispose();
630                        }
631    
632                        if (isConnectionInfoSentToBroker) {
633                            // If we announced ourselfs to the broker.. Try to let
634                            // the broker
635                            // know that the connection is being shutdown.
636                            RemoveInfo removeCommand = info.createRemoveCommand();
637                            removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
638                            doSyncSendPacket(info.createRemoveCommand(), closeTimeout);
639                            doAsyncSendPacket(new ShutdownInfo());
640                        }
641    
642                        ServiceSupport.dispose(this.transport);
643    
644                        started.set(false);
645    
646                        // TODO if we move the TaskRunnerFactory to the connection
647                        // factory
648                        // then we may need to call
649                        // factory.onConnectionClose(this);
650                        if (sessionTaskRunner != null) {
651                            sessionTaskRunner.shutdown();
652                        }
653                        closed.set(true);
654                        closing.set(false);
655                    }
656                }
657            } finally {
658                try {
659                    if (asyncConnectionThread != null){
660                        asyncConnectionThread.shutdown();
661                    }
662                }catch(Throwable e) {
663                    LOG.error("Error shutting down thread pool " + e,e);
664                }
665                factoryStats.removeConnection(this);
666            }
667        }
668    
669        /**
670         * Tells the broker to terminate its VM. This can be used to cleanly
671         * terminate a broker running in a standalone java process. Server must have
672         * property enable.vm.shutdown=true defined to allow this to work.
673         */
674        // TODO : org.apache.activemq.message.BrokerAdminCommand not yet
675        // implemented.
676        /*
677         * public void terminateBrokerVM() throws JMSException { BrokerAdminCommand
678         * command = new BrokerAdminCommand();
679         * command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM);
680         * asyncSendPacket(command); }
681         */
682    
683        /**
684         * Create a durable connection consumer for this connection (optional
685         * operation). This is an expert facility not used by regular JMS clients.
686         * 
687         * @param topic topic to access
688         * @param subscriptionName durable subscription name
689         * @param messageSelector only messages with properties matching the message
690         *                selector expression are delivered. A value of null or an
691         *                empty string indicates that there is no message selector
692         *                for the message consumer.
693         * @param sessionPool the server session pool to associate with this durable
694         *                connection consumer
695         * @param maxMessages the maximum number of messages that can be assigned to
696         *                a server session at one time
697         * @return the durable connection consumer
698         * @throws JMSException if the <CODE>Connection</CODE> object fails to
699         *                 create a connection consumer due to some internal error
700         *                 or invalid arguments for <CODE>sessionPool</CODE> and
701         *                 <CODE>messageSelector</CODE>.
702         * @throws javax.jms.InvalidDestinationException if an invalid destination
703         *                 is specified.
704         * @throws javax.jms.InvalidSelectorException if the message selector is
705         *                 invalid.
706         * @see javax.jms.ConnectionConsumer
707         * @since 1.1
708         */
709        public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
710            throws JMSException {
711            return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false);
712        }
713    
714        /**
715         * Create a durable connection consumer for this connection (optional
716         * operation). This is an expert facility not used by regular JMS clients.
717         * 
718         * @param topic topic to access
719         * @param subscriptionName durable subscription name
720         * @param messageSelector only messages with properties matching the message
721         *                selector expression are delivered. A value of null or an
722         *                empty string indicates that there is no message selector
723         *                for the message consumer.
724         * @param sessionPool the server session pool to associate with this durable
725         *                connection consumer
726         * @param maxMessages the maximum number of messages that can be assigned to
727         *                a server session at one time
728         * @param noLocal set true if you want to filter out messages published
729         *                locally
730         * @return the durable connection consumer
731         * @throws JMSException if the <CODE>Connection</CODE> object fails to
732         *                 create a connection consumer due to some internal error
733         *                 or invalid arguments for <CODE>sessionPool</CODE> and
734         *                 <CODE>messageSelector</CODE>.
735         * @throws javax.jms.InvalidDestinationException if an invalid destination
736         *                 is specified.
737         * @throws javax.jms.InvalidSelectorException if the message selector is
738         *                 invalid.
739         * @see javax.jms.ConnectionConsumer
740         * @since 1.1
741         */
742        public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages,
743                                                                  boolean noLocal) throws JMSException {
744            checkClosedOrFailed();
745            ensureConnectionInfoSent();
746            SessionId sessionId = new SessionId(info.getConnectionId(), -1);
747            ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()));
748            info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
749            info.setSubscriptionName(subscriptionName);
750            info.setSelector(messageSelector);
751            info.setPrefetchSize(maxMessages);
752            info.setDispatchAsync(isDispatchAsync());
753    
754            // Allows the options on the destination to configure the consumerInfo
755            if (info.getDestination().getOptions() != null) {
756                Map<String, String> options = new HashMap<String, String>(info.getDestination().getOptions());
757                IntrospectionSupport.setProperties(this.info, options, "consumer.");
758            }
759    
760            return new ActiveMQConnectionConsumer(this, sessionPool, info);
761        }
762    
763        // Properties
764        // -------------------------------------------------------------------------
765    
766        /**
767         * Returns true if this connection has been started
768         * 
769         * @return true if this Connection is started
770         */
771        public boolean isStarted() {
772            return started.get();
773        }
774    
775        /**
776         * Returns true if the connection is closed
777         */
778        public boolean isClosed() {
779            return closed.get();
780        }
781    
782        /**
783         * Returns true if the connection is in the process of being closed
784         */
785        public boolean isClosing() {
786            return closing.get();
787        }
788    
789        /**
790         * Returns true if the underlying transport has failed
791         */
792        public boolean isTransportFailed() {
793            return transportFailed.get();
794        }
795    
796        /**
797         * @return Returns the prefetchPolicy.
798         */
799        public ActiveMQPrefetchPolicy getPrefetchPolicy() {
800            return prefetchPolicy;
801        }
802    
803        /**
804         * Sets the <a
805         * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
806         * policy</a> for consumers created by this connection.
807         */
808        public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
809            this.prefetchPolicy = prefetchPolicy;
810        }
811    
812        /**
813         */
814        public Transport getTransportChannel() {
815            return transport;
816        }
817    
818        /**
819         * @return Returns the clientID of the connection, forcing one to be
820         *         generated if one has not yet been configured.
821         */
822        public String getInitializedClientID() throws JMSException {
823            ensureConnectionInfoSent();
824            return info.getClientId();
825        }
826    
827        /**
828         * @return Returns the timeStampsDisableByDefault.
829         */
830        public boolean isDisableTimeStampsByDefault() {
831            return disableTimeStampsByDefault;
832        }
833    
834        /**
835         * Sets whether or not timestamps on messages should be disabled or not. If
836         * you disable them it adds a small performance boost.
837         */
838        public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) {
839            this.disableTimeStampsByDefault = timeStampsDisableByDefault;
840        }
841    
842        /**
843         * @return Returns the dispatchOptimizedMessage.
844         */
845        public boolean isOptimizedMessageDispatch() {
846            return optimizedMessageDispatch;
847        }
848    
849        /**
850         * If this flag is set then an larger prefetch limit is used - only
851         * applicable for durable topic subscribers.
852         */
853        public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) {
854            this.optimizedMessageDispatch = dispatchOptimizedMessage;
855        }
856    
857        /**
858         * @return Returns the closeTimeout.
859         */
860        public int getCloseTimeout() {
861            return closeTimeout;
862        }
863    
864        /**
865         * Sets the timeout before a close is considered complete. Normally a
866         * close() on a connection waits for confirmation from the broker; this
867         * allows that operation to timeout to save the client hanging if there is
868         * no broker
869         */
870        public void setCloseTimeout(int closeTimeout) {
871            this.closeTimeout = closeTimeout;
872        }
873    
874        /**
875         * @return ConnectionInfo
876         */
877        public ConnectionInfo getConnectionInfo() {
878            return this.info;
879        }
880    
881        public boolean isUseRetroactiveConsumer() {
882            return useRetroactiveConsumer;
883        }
884    
885        /**
886         * Sets whether or not retroactive consumers are enabled. Retroactive
887         * consumers allow non-durable topic subscribers to receive old messages
888         * that were published before the non-durable subscriber started.
889         */
890        public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
891            this.useRetroactiveConsumer = useRetroactiveConsumer;
892        }
893    
894        public boolean isNestedMapAndListEnabled() {
895            return nestedMapAndListEnabled;
896        }
897    
898        /**
899         * Enables/disables whether or not Message properties and MapMessage entries
900         * support <a
901         * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
902         * Structures</a> of Map and List objects
903         */
904        public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
905            this.nestedMapAndListEnabled = structuredMapsEnabled;
906        }
907    
908        public boolean isExclusiveConsumer() {
909            return exclusiveConsumer;
910        }
911    
912        /**
913         * Enables or disables whether or not queue consumers should be exclusive or
914         * not for example to preserve ordering when not using <a
915         * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
916         * 
917         * @param exclusiveConsumer
918         */
919        public void setExclusiveConsumer(boolean exclusiveConsumer) {
920            this.exclusiveConsumer = exclusiveConsumer;
921        }
922    
923        /**
924         * Adds a transport listener so that a client can be notified of events in
925         * the underlying transport
926         */
927        public void addTransportListener(TransportListener transportListener) {
928            transportListeners.add(transportListener);
929        }
930    
931        public void removeTransportListener(TransportListener transportListener) {
932            transportListeners.remove(transportListener);
933        }
934    
935        public boolean isUseDedicatedTaskRunner() {
936            return useDedicatedTaskRunner;
937        }
938        
939        public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
940            this.useDedicatedTaskRunner = useDedicatedTaskRunner;
941        }
942    
943        public TaskRunnerFactory getSessionTaskRunner() {
944            synchronized (this) {
945                if (sessionTaskRunner == null) {
946                    sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner());
947                }
948            }
949            return sessionTaskRunner;
950        }
951    
952        public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
953            this.sessionTaskRunner = sessionTaskRunner;
954        }
955    
956        public MessageTransformer getTransformer() {
957            return transformer;
958        }
959    
960        /**
961         * Sets the transformer used to transform messages before they are sent on
962         * to the JMS bus or when they are received from the bus but before they are
963         * delivered to the JMS client
964         */
965        public void setTransformer(MessageTransformer transformer) {
966            this.transformer = transformer;
967        }
968    
969        /**
970         * @return the statsEnabled
971         */
972        public boolean isStatsEnabled() {
973            return this.stats.isEnabled();
974        }
975    
976        /**
977         * @param statsEnabled the statsEnabled to set
978         */
979        public void setStatsEnabled(boolean statsEnabled) {
980            this.stats.setEnabled(statsEnabled);
981        }
982    
983        /**
984         * Returns the {@link DestinationSource} object which can be used to listen to destinations
985         * being created or destroyed or to enquire about the current destinations available on the broker
986         *
987         * @return a lazily created destination source
988         * @throws JMSException
989         */
990        public DestinationSource getDestinationSource() throws JMSException {
991            if (destinationSource == null) {
992                destinationSource = new DestinationSource(this);
993                destinationSource.start();
994            }
995            return destinationSource;
996        }
997    
998        // Implementation methods
999        // -------------------------------------------------------------------------
1000    
1001        /**
1002         * Used internally for adding Sessions to the Connection
1003         * 
1004         * @param session
1005         * @throws JMSException
1006         * @throws JMSException
1007         */
1008        protected void addSession(ActiveMQSession session) throws JMSException {
1009            this.sessions.add(session);
1010            if (sessions.size() > 1 || session.isTransacted()) {
1011                optimizedMessageDispatch = false;
1012            }
1013        }
1014    
1015        /**
1016         * Used interanlly for removing Sessions from a Connection
1017         * 
1018         * @param session
1019         */
1020        protected void removeSession(ActiveMQSession session) {
1021            this.sessions.remove(session);
1022            this.removeDispatcher(session);
1023        }
1024    
1025        /**
1026         * Add a ConnectionConsumer
1027         * 
1028         * @param connectionConsumer
1029         * @throws JMSException
1030         */
1031        protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException {
1032            this.connectionConsumers.add(connectionConsumer);
1033        }
1034    
1035        /**
1036         * Remove a ConnectionConsumer
1037         * 
1038         * @param connectionConsumer
1039         */
1040        protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) {
1041            this.connectionConsumers.remove(connectionConsumer);
1042            this.removeDispatcher(connectionConsumer);
1043        }
1044    
1045        /**
1046         * Creates a <CODE>TopicSession</CODE> object.
1047         * 
1048         * @param transacted indicates whether the session is transacted
1049         * @param acknowledgeMode indicates whether the consumer or the client will
1050         *                acknowledge any messages it receives; ignored if the
1051         *                session is transacted. Legal values are
1052         *                <code>Session.AUTO_ACKNOWLEDGE</code>,
1053         *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1054         *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1055         * @return a newly created topic session
1056         * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1057         *                 to create a session due to some internal error or lack of
1058         *                 support for the specific transaction and acknowledgement
1059         *                 mode.
1060         * @see Session#AUTO_ACKNOWLEDGE
1061         * @see Session#CLIENT_ACKNOWLEDGE
1062         * @see Session#DUPS_OK_ACKNOWLEDGE
1063         */
1064        public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
1065            return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1066        }
1067    
1068        /**
1069         * Creates a connection consumer for this connection (optional operation).
1070         * This is an expert facility not used by regular JMS clients.
1071         * 
1072         * @param topic the topic to access
1073         * @param messageSelector only messages with properties matching the message
1074         *                selector expression are delivered. A value of null or an
1075         *                empty string indicates that there is no message selector
1076         *                for the message consumer.
1077         * @param sessionPool the server session pool to associate with this
1078         *                connection consumer
1079         * @param maxMessages the maximum number of messages that can be assigned to
1080         *                a server session at one time
1081         * @return the connection consumer
1082         * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1083         *                 to create a connection consumer due to some internal
1084         *                 error or invalid arguments for <CODE>sessionPool</CODE>
1085         *                 and <CODE>messageSelector</CODE>.
1086         * @throws javax.jms.InvalidDestinationException if an invalid topic is
1087         *                 specified.
1088         * @throws javax.jms.InvalidSelectorException if the message selector is
1089         *                 invalid.
1090         * @see javax.jms.ConnectionConsumer
1091         */
1092        public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1093            return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false);
1094        }
1095    
1096        /**
1097         * Creates a connection consumer for this connection (optional operation).
1098         * This is an expert facility not used by regular JMS clients.
1099         * 
1100         * @param queue the queue to access
1101         * @param messageSelector only messages with properties matching the message
1102         *                selector expression are delivered. A value of null or an
1103         *                empty string indicates that there is no message selector
1104         *                for the message consumer.
1105         * @param sessionPool the server session pool to associate with this
1106         *                connection consumer
1107         * @param maxMessages the maximum number of messages that can be assigned to
1108         *                a server session at one time
1109         * @return the connection consumer
1110         * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1111         *                 to create a connection consumer due to some internal
1112         *                 error or invalid arguments for <CODE>sessionPool</CODE>
1113         *                 and <CODE>messageSelector</CODE>.
1114         * @throws javax.jms.InvalidDestinationException if an invalid queue is
1115         *                 specified.
1116         * @throws javax.jms.InvalidSelectorException if the message selector is
1117         *                 invalid.
1118         * @see javax.jms.ConnectionConsumer
1119         */
1120        public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1121            return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false);
1122        }
1123    
1124        /**
1125         * Creates a connection consumer for this connection (optional operation).
1126         * This is an expert facility not used by regular JMS clients.
1127         * 
1128         * @param destination the destination to access
1129         * @param messageSelector only messages with properties matching the message
1130         *                selector expression are delivered. A value of null or an
1131         *                empty string indicates that there is no message selector
1132         *                for the message consumer.
1133         * @param sessionPool the server session pool to associate with this
1134         *                connection consumer
1135         * @param maxMessages the maximum number of messages that can be assigned to
1136         *                a server session at one time
1137         * @return the connection consumer
1138         * @throws JMSException if the <CODE>Connection</CODE> object fails to
1139         *                 create a connection consumer due to some internal error
1140         *                 or invalid arguments for <CODE>sessionPool</CODE> and
1141         *                 <CODE>messageSelector</CODE>.
1142         * @throws javax.jms.InvalidDestinationException if an invalid destination
1143         *                 is specified.
1144         * @throws javax.jms.InvalidSelectorException if the message selector is
1145         *                 invalid.
1146         * @see javax.jms.ConnectionConsumer
1147         * @since 1.1
1148         */
1149        public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1150            return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false);
1151        }
1152    
1153        public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal)
1154            throws JMSException {
1155    
1156            checkClosedOrFailed();
1157            ensureConnectionInfoSent();
1158    
1159            ConsumerId consumerId = createConsumerId();
1160            ConsumerInfo consumerInfo = new ConsumerInfo(consumerId);
1161            consumerInfo.setDestination(ActiveMQMessageTransformation.transformDestination(destination));
1162            consumerInfo.setSelector(messageSelector);
1163            consumerInfo.setPrefetchSize(maxMessages);
1164            consumerInfo.setNoLocal(noLocal);
1165            consumerInfo.setDispatchAsync(isDispatchAsync());
1166    
1167            // Allows the options on the destination to configure the consumerInfo
1168            if (consumerInfo.getDestination().getOptions() != null) {
1169                Map<String, String> options = new HashMap<String, String>(consumerInfo.getDestination().getOptions());
1170                IntrospectionSupport.setProperties(consumerInfo, options, "consumer.");
1171            }
1172    
1173            return new ActiveMQConnectionConsumer(this, sessionPool, consumerInfo);
1174        }
1175    
1176        /**
1177         * @return
1178         */
1179        private ConsumerId createConsumerId() {
1180            return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId());
1181        }
1182    
1183        /**
1184         * @return
1185         */
1186        private ProducerId createProducerId() {
1187            return new ProducerId(connectionSessionId, producerIdGenerator.getNextSequenceId());
1188        }
1189    
1190        /**
1191         * Creates a <CODE>QueueSession</CODE> object.
1192         * 
1193         * @param transacted indicates whether the session is transacted
1194         * @param acknowledgeMode indicates whether the consumer or the client will
1195         *                acknowledge any messages it receives; ignored if the
1196         *                session is transacted. Legal values are
1197         *                <code>Session.AUTO_ACKNOWLEDGE</code>,
1198         *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1199         *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1200         * @return a newly created queue session
1201         * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1202         *                 to create a session due to some internal error or lack of
1203         *                 support for the specific transaction and acknowledgement
1204         *                 mode.
1205         * @see Session#AUTO_ACKNOWLEDGE
1206         * @see Session#CLIENT_ACKNOWLEDGE
1207         * @see Session#DUPS_OK_ACKNOWLEDGE
1208         */
1209        public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
1210            return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1211        }
1212    
1213        /**
1214         * Ensures that the clientID was manually specified and not auto-generated.
1215         * If the clientID was not specified this method will throw an exception.
1216         * This method is used to ensure that the clientID + durableSubscriber name
1217         * are used correctly.
1218         * 
1219         * @throws JMSException
1220         */
1221        public void checkClientIDWasManuallySpecified() throws JMSException {
1222            if (!userSpecifiedClientID) {
1223                throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection");
1224            }
1225        }
1226    
1227        /**
1228         * send a Packet through the Connection - for internal use only
1229         * 
1230         * @param command
1231         * @throws JMSException
1232         */
1233        public void asyncSendPacket(Command command) throws JMSException {
1234            if (isClosed()) {
1235                throw new ConnectionClosedException();
1236            } else {
1237                doAsyncSendPacket(command);
1238            }
1239        }
1240    
1241            private void doAsyncSendPacket(Command command) throws JMSException {
1242                    try {
1243                        this.transport.oneway(command);
1244                    } catch (IOException e) {
1245                        throw JMSExceptionSupport.create(e);
1246                    }
1247            }
1248    
1249        /**
1250         * Send a packet through a Connection - for internal use only
1251         * 
1252         * @param command
1253         * @return
1254         * @throws JMSException
1255         */
1256        public Response syncSendPacket(Command command) throws JMSException {
1257            if (isClosed()) {
1258                throw new ConnectionClosedException();
1259            } else {
1260    
1261                try {
1262                    Response response = (Response)this.transport.request(command);
1263                    if (response.isException()) {
1264                        ExceptionResponse er = (ExceptionResponse)response;
1265                        if (er.getException() instanceof JMSException) {
1266                            throw (JMSException)er.getException();
1267                        } else {
1268                            if (isClosed()||closing.get()) {
1269                                LOG.debug("Received an exception but connection is closing");
1270                            }
1271                            JMSException jmsEx = null;
1272                            try {
1273                             jmsEx = JMSExceptionSupport.create(er.getException());
1274                            }catch(Throwable e) {
1275                                LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e);
1276                            }
1277                            if(jmsEx !=null) {
1278                                throw jmsEx;
1279                            }
1280                        }
1281                    }
1282                    return response;
1283                } catch (IOException e) {
1284                    throw JMSExceptionSupport.create(e);
1285                }
1286            }
1287        }
1288    
1289        /**
1290         * Send a packet through a Connection - for internal use only
1291         * 
1292         * @param command
1293         * @return
1294         * @throws JMSException
1295         */
1296        public Response syncSendPacket(Command command, int timeout) throws JMSException {
1297            if (isClosed() || closing.get()) {
1298                throw new ConnectionClosedException();
1299            } else {
1300                return doSyncSendPacket(command, timeout);
1301            }
1302        }
1303    
1304            private Response doSyncSendPacket(Command command, int timeout)
1305                            throws JMSException {
1306                    try {
1307                        Response response = (Response)this.transport.request(command, timeout);
1308                        if (response != null && response.isException()) {
1309                            ExceptionResponse er = (ExceptionResponse)response;
1310                            if (er.getException() instanceof JMSException) {
1311                                throw (JMSException)er.getException();
1312                            } else {
1313                                throw JMSExceptionSupport.create(er.getException());
1314                            }
1315                        }
1316                        return response;
1317                    } catch (IOException e) {
1318                        throw JMSExceptionSupport.create(e);
1319                    }
1320            }
1321    
1322        /**
1323         * @return statistics for this Connection
1324         */
1325        public StatsImpl getStats() {
1326            return stats;
1327        }
1328    
1329        /**
1330         * simply throws an exception if the Connection is already closed or the
1331         * Transport has failed
1332         * 
1333         * @throws JMSException
1334         */
1335        protected synchronized void checkClosedOrFailed() throws JMSException {
1336            checkClosed();
1337            if (transportFailed.get()) {
1338                throw new ConnectionFailedException(firstFailureError);
1339            }
1340        }
1341    
1342        /**
1343         * simply throws an exception if the Connection is already closed
1344         * 
1345         * @throws JMSException
1346         */
1347        protected synchronized void checkClosed() throws JMSException {
1348            if (closed.get()) {
1349                throw new ConnectionClosedException();
1350            }
1351        }
1352    
1353        /**
1354         * Send the ConnectionInfo to the Broker
1355         * 
1356         * @throws JMSException
1357         */
1358        protected void ensureConnectionInfoSent() throws JMSException {
1359            synchronized(this.ensureConnectionInfoSentMutex) {
1360                // Can we skip sending the ConnectionInfo packet??
1361                if (isConnectionInfoSentToBroker || closed.get()) {
1362                    return;
1363                }
1364                //TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID?
1365                if (info.getClientId() == null || info.getClientId().trim().length() == 0) {
1366                    info.setClientId(clientIdGenerator.generateId());
1367                }
1368                syncSendPacket(info.copy());
1369        
1370                this.isConnectionInfoSentToBroker = true;
1371                // Add a temp destination advisory consumer so that
1372                // We know what the valid temporary destinations are on the
1373                // broker without having to do an RPC to the broker.
1374        
1375                ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());
1376                if (watchTopicAdvisories) {
1377                    advisoryConsumer = new AdvisoryConsumer(this, consumerId);
1378                }
1379            }
1380        }
1381    
1382        public synchronized boolean isWatchTopicAdvisories() {
1383            return watchTopicAdvisories;
1384        }
1385    
1386        public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
1387            this.watchTopicAdvisories = watchTopicAdvisories;
1388        }
1389    
1390        /**
1391         * @return Returns the useAsyncSend.
1392         */
1393        public boolean isUseAsyncSend() {
1394            return useAsyncSend;
1395        }
1396    
1397        /**
1398         * Forces the use of <a
1399         * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
1400         * adds a massive performance boost; but means that the send() method will
1401         * return immediately whether the message has been sent or not which could
1402         * lead to message loss.
1403         */
1404        public void setUseAsyncSend(boolean useAsyncSend) {
1405            this.useAsyncSend = useAsyncSend;
1406        }
1407    
1408        /**
1409         * @return true if always sync send messages
1410         */
1411        public boolean isAlwaysSyncSend() {
1412            return this.alwaysSyncSend;
1413        }
1414    
1415        /**
1416         * Set true if always require messages to be sync sent
1417         * 
1418         * @param alwaysSyncSend
1419         */
1420        public void setAlwaysSyncSend(boolean alwaysSyncSend) {
1421            this.alwaysSyncSend = alwaysSyncSend;
1422        }
1423    
1424        /**
1425         * Cleans up this connection so that it's state is as if the connection was
1426         * just created. This allows the Resource Adapter to clean up a connection
1427         * so that it can be reused without having to close and recreate the
1428         * connection.
1429         */
1430        public void cleanup() throws JMSException {
1431    
1432            if (advisoryConsumer != null && !isTransportFailed()) {
1433                advisoryConsumer.dispose();
1434                advisoryConsumer = null;
1435            }
1436    
1437            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1438                ActiveMQSession s = i.next();
1439                s.dispose();
1440            }
1441            for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
1442                ActiveMQConnectionConsumer c = i.next();
1443                c.dispose();
1444            }
1445            for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
1446                ActiveMQInputStream c = i.next();
1447                c.dispose();
1448            }
1449            for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
1450                ActiveMQOutputStream c = i.next();
1451                c.dispose();
1452            }
1453    
1454            if (isConnectionInfoSentToBroker) {
1455                if (!transportFailed.get() && !closing.get()) {
1456                    syncSendPacket(info.createRemoveCommand());
1457                }
1458                isConnectionInfoSentToBroker = false;
1459            }
1460            if (userSpecifiedClientID) {
1461                info.setClientId(null);
1462                userSpecifiedClientID = false;
1463            }
1464            clientIDSet = false;
1465    
1466            started.set(false);
1467        }
1468    
1469        /**
1470         * Changes the associated username/password that is associated with this
1471         * connection. If the connection has been used, you must called cleanup()
1472         * before calling this method.
1473         * 
1474         * @throws IllegalStateException if the connection is in used.
1475         */
1476        public void changeUserInfo(String userName, String password) throws JMSException {
1477            if (isConnectionInfoSentToBroker) {
1478                throw new IllegalStateException("changeUserInfo used Connection is not allowed");
1479            }
1480            this.info.setUserName(userName);
1481            this.info.setPassword(password);
1482        }
1483    
1484        /**
1485         * @return Returns the resourceManagerId.
1486         * @throws JMSException
1487         */
1488        public String getResourceManagerId() throws JMSException {
1489            waitForBrokerInfo();
1490            if (brokerInfo == null) {
1491                throw new JMSException("Connection failed before Broker info was received.");
1492            }
1493            return brokerInfo.getBrokerId().getValue();
1494        }
1495    
1496        /**
1497         * Returns the broker name if one is available or null if one is not
1498         * available yet.
1499         */
1500        public String getBrokerName() {
1501            try {
1502                brokerInfoReceived.await(5, TimeUnit.SECONDS);
1503                if (brokerInfo == null) {
1504                    return null;
1505                }
1506                return brokerInfo.getBrokerName();
1507            } catch (InterruptedException e) {
1508                Thread.currentThread().interrupt();
1509                return null;
1510            }
1511        }
1512    
1513        /**
1514         * Returns the broker information if it is available or null if it is not
1515         * available yet.
1516         */
1517        public BrokerInfo getBrokerInfo() {
1518            return brokerInfo;
1519        }
1520    
1521        /**
1522         * @return Returns the RedeliveryPolicy.
1523         * @throws JMSException
1524         */
1525        public RedeliveryPolicy getRedeliveryPolicy() throws JMSException {
1526            return redeliveryPolicy;
1527        }
1528    
1529        /**
1530         * Sets the redelivery policy to be used when messages are rolled back
1531         */
1532        public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
1533            this.redeliveryPolicy = redeliveryPolicy;
1534        }
1535    
1536        public BlobTransferPolicy getBlobTransferPolicy() {
1537            if (blobTransferPolicy == null) {
1538                blobTransferPolicy = createBlobTransferPolicy();
1539            }
1540            return blobTransferPolicy;
1541        }
1542    
1543        /**
1544         * Sets the policy used to describe how out-of-band BLOBs (Binary Large
1545         * OBjects) are transferred from producers to brokers to consumers
1546         */
1547        public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
1548            this.blobTransferPolicy = blobTransferPolicy;
1549        }
1550    
1551        /**
1552         * @return Returns the alwaysSessionAsync.
1553         */
1554        public boolean isAlwaysSessionAsync() {
1555            return alwaysSessionAsync;
1556        }
1557    
1558        /**
1559         * If this flag is set then a separate thread is not used for dispatching
1560         * messages for each Session in the Connection. However, a separate thread
1561         * is always used if there is more than one session, or the session isn't in
1562         * auto acknowledge or duplicates ok mode
1563         */
1564        public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
1565            this.alwaysSessionAsync = alwaysSessionAsync;
1566        }
1567    
1568        /**
1569         * @return Returns the optimizeAcknowledge.
1570         */
1571        public boolean isOptimizeAcknowledge() {
1572            return optimizeAcknowledge;
1573        }
1574    
1575        /**
1576         * Enables an optimised acknowledgement mode where messages are acknowledged
1577         * in batches rather than individually
1578         * 
1579         * @param optimizeAcknowledge The optimizeAcknowledge to set.
1580         */
1581        public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
1582            this.optimizeAcknowledge = optimizeAcknowledge;
1583        }
1584    
1585        public long getWarnAboutUnstartedConnectionTimeout() {
1586            return warnAboutUnstartedConnectionTimeout;
1587        }
1588    
1589        /**
1590         * Enables the timeout from a connection creation to when a warning is
1591         * generated if the connection is not properly started via {@link #start()}
1592         * and a message is received by a consumer. It is a very common gotcha to
1593         * forget to <a
1594         * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
1595         * the connection</a> so this option makes the default case to create a
1596         * warning if the user forgets. To disable the warning just set the value to <
1597         * 0 (say -1).
1598         */
1599        public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
1600            this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
1601        }
1602        
1603        /**
1604         * @return the sendTimeout
1605         */
1606        public int getSendTimeout() {
1607            return sendTimeout;
1608        }
1609    
1610        /**
1611         * @param sendTimeout the sendTimeout to set
1612         */
1613        public void setSendTimeout(int sendTimeout) {
1614            this.sendTimeout = sendTimeout;
1615        }
1616        
1617        /**
1618         * @return the sendAcksAsync
1619         */
1620        public boolean isSendAcksAsync() {
1621            return sendAcksAsync;
1622        }
1623    
1624        /**
1625         * @param sendAcksAsync the sendAcksAsync to set
1626         */
1627        public void setSendAcksAsync(boolean sendAcksAsync) {
1628            this.sendAcksAsync = sendAcksAsync;
1629        }
1630    
1631    
1632        /**
1633         * Returns the time this connection was created
1634         */
1635        public long getTimeCreated() {
1636            return timeCreated;
1637        }
1638    
1639        private void waitForBrokerInfo() throws JMSException {
1640            try {
1641                brokerInfoReceived.await();
1642            } catch (InterruptedException e) {
1643                Thread.currentThread().interrupt();
1644                throw JMSExceptionSupport.create(e);
1645            }
1646        }
1647    
1648        // Package protected so that it can be used in unit tests
1649        public Transport getTransport() {
1650            return transport;
1651        }
1652    
1653        public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) {
1654            producers.put(producerId, producer);
1655        }
1656    
1657        public void removeProducer(ProducerId producerId) {
1658            producers.remove(producerId);
1659        }
1660    
1661        public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) {
1662            dispatchers.put(consumerId, dispatcher);
1663        }
1664    
1665        public void removeDispatcher(ConsumerId consumerId) {
1666            dispatchers.remove(consumerId);
1667        }
1668    
1669        /**
1670         * @param o - the command to consume
1671         */
1672        public void onCommand(final Object o) {
1673            final Command command = (Command)o;
1674            if (!closed.get() && command != null) {
1675                try {
1676                    command.visit(new CommandVisitorAdapter() {
1677                        @Override
1678                        public Response processMessageDispatch(MessageDispatch md) throws Exception {
1679                            waitForTransportInterruptionProcessing();
1680                            ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
1681                            if (dispatcher != null) {
1682                                // Copy in case a embedded broker is dispatching via
1683                                // vm://
1684                                // md.getMessage() == null to signal end of queue
1685                                // browse.
1686                                Message msg = md.getMessage();
1687                                if (msg != null) {
1688                                    msg = msg.copy();
1689                                    msg.setReadOnlyBody(true);
1690                                    msg.setReadOnlyProperties(true);
1691                                    msg.setRedeliveryCounter(md.getRedeliveryCounter());
1692                                    msg.setConnection(ActiveMQConnection.this);
1693                                    md.setMessage(msg);
1694                                }
1695                                dispatcher.dispatch(md);
1696                            }
1697                            return null;
1698                        }
1699    
1700                        @Override
1701                        public Response processProducerAck(ProducerAck pa) throws Exception {
1702                            if (pa != null && pa.getProducerId() != null) {
1703                                ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
1704                                if (producer != null) {
1705                                    producer.onProducerAck(pa);
1706                                }
1707                            }
1708                            return null;
1709                        }
1710    
1711                        @Override
1712                        public Response processBrokerInfo(BrokerInfo info) throws Exception {
1713                            brokerInfo = info;
1714                            brokerInfoReceived.countDown();
1715                            optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration();
1716                            getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
1717                            return null;
1718                        }
1719    
1720                        @Override
1721                        public Response processConnectionError(final ConnectionError error) throws Exception {
1722                            asyncConnectionThread.execute(new Runnable() {
1723                                public void run() {
1724                                    onAsyncException(error.getException());
1725                                }
1726                            });
1727                            return null;
1728                        }
1729    
1730                        @Override
1731                        public Response processControlCommand(ControlCommand command) throws Exception {
1732                            onControlCommand(command);
1733                            return null;
1734                        }
1735    
1736                        @Override
1737                        public Response processConnectionControl(ConnectionControl control) throws Exception {
1738                            onConnectionControl((ConnectionControl)command);
1739                            return null;
1740                        }
1741    
1742                        @Override
1743                        public Response processConsumerControl(ConsumerControl control) throws Exception {
1744                            onConsumerControl((ConsumerControl)command);
1745                            return null;
1746                        }
1747    
1748                        @Override
1749                        public Response processWireFormat(WireFormatInfo info) throws Exception {
1750                            onWireFormatInfo((WireFormatInfo)command);
1751                            return null;
1752                        }
1753                    });
1754                } catch (Exception e) {
1755                    onClientInternalException(e);
1756                }
1757    
1758            }
1759            for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1760                TransportListener listener = iter.next();
1761                listener.onCommand(command);
1762            }
1763        }
1764    
1765        protected void onWireFormatInfo(WireFormatInfo info) {
1766            protocolVersion.set(info.getVersion());
1767        }
1768    
1769        /**
1770         * Handles async client internal exceptions.
1771         * A client internal exception is usually one that has been thrown
1772         * by a container runtime component during asynchronous processing of a
1773         * message that does not affect the connection itself.
1774         * This method notifies the <code>ClientInternalExceptionListener</code> by invoking
1775         * its <code>onException</code> method, if one has been registered with this connection.
1776         * 
1777         * @param error the exception that the problem
1778         */
1779        public void onClientInternalException(final Throwable error) {
1780            if ( !closed.get() && !closing.get() ) {
1781                if ( this.clientInternalExceptionListener != null ) {
1782                    asyncConnectionThread.execute(new Runnable() {
1783                        public void run() {
1784                            ActiveMQConnection.this.clientInternalExceptionListener.onException(error);
1785                        }
1786                    });
1787                } else {
1788                    LOG.debug("Async client internal exception occurred with no exception listener registered: " 
1789                            + error, error);
1790                }
1791            }
1792        }
1793        /**
1794         * Used for handling async exceptions
1795         * 
1796         * @param error
1797         */
1798        public void onAsyncException(Throwable error) {
1799            if (!closed.get() && !closing.get()) {
1800                if (this.exceptionListener != null) {
1801    
1802                    if (!(error instanceof JMSException)) {
1803                        error = JMSExceptionSupport.create(error);
1804                    }
1805                    final JMSException e = (JMSException)error;
1806    
1807                    asyncConnectionThread.execute(new Runnable() {
1808                        public void run() {
1809                            ActiveMQConnection.this.exceptionListener.onException(e);
1810                        }
1811                    });
1812    
1813                } else {
1814                    LOG.debug("Async exception with no exception listener: " + error, error);
1815                }
1816            }
1817        }
1818    
1819        public void onException(final IOException error) {
1820                    onAsyncException(error);
1821                    if (!closing.get() && !closed.get()) {
1822                            asyncConnectionThread.execute(new Runnable() {
1823                                    public void run() {
1824                                            transportFailed(error);
1825                                            ServiceSupport.dispose(ActiveMQConnection.this.transport);
1826                                            brokerInfoReceived.countDown();
1827                                            try {
1828                                                    cleanup();
1829                                            } catch (JMSException e) {
1830                                                    LOG.warn("Exception during connection cleanup, " + e, e);
1831                                            }
1832                                            for (Iterator<TransportListener> iter = transportListeners
1833                                                            .iterator(); iter.hasNext();) {
1834                                                    TransportListener listener = iter.next();
1835                                                    listener.onException(error);
1836                                            }
1837                                    }
1838                            });
1839                    }
1840            }
1841    
1842        public void transportInterupted() {
1843            transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0));
1844            if (LOG.isDebugEnabled()) {
1845                LOG.debug("transport interrupted, dispatchers: " + transportInterruptionProcessingComplete.getCount());
1846            }
1847            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1848                ActiveMQSession s = i.next();
1849                s.clearMessagesInProgress();
1850            }
1851            for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1852                TransportListener listener = iter.next();
1853                listener.transportInterupted();
1854            }
1855        }
1856    
1857        public void transportResumed() {
1858            for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1859                TransportListener listener = iter.next();
1860                listener.transportResumed();
1861            }
1862        }
1863    
1864        /**
1865         * Create the DestinationInfo object for the temporary destination.
1866         * 
1867         * @param topic - if its true topic, else queue.
1868         * @return DestinationInfo
1869         * @throws JMSException
1870         */
1871        protected ActiveMQTempDestination createTempDestination(boolean topic) throws JMSException {
1872    
1873            // Check if Destination info is of temporary type.
1874            ActiveMQTempDestination dest;
1875            if (topic) {
1876                dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
1877            } else {
1878                dest = new ActiveMQTempQueue(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
1879            }
1880    
1881            DestinationInfo info = new DestinationInfo();
1882            info.setConnectionId(this.info.getConnectionId());
1883            info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
1884            info.setDestination(dest);
1885            syncSendPacket(info);
1886    
1887            dest.setConnection(this);
1888            activeTempDestinations.put(dest, dest);
1889            return dest;
1890        }
1891    
1892        /**
1893         * @param destination
1894         * @throws JMSException
1895         */
1896        public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException {
1897    
1898            checkClosedOrFailed();
1899    
1900            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1901                ActiveMQSession s = i.next();
1902                if (s.isInUse(destination)) {
1903                    throw new JMSException("A consumer is consuming from the temporary destination");
1904                }
1905            }
1906    
1907            activeTempDestinations.remove(destination);
1908    
1909            DestinationInfo info = new DestinationInfo();
1910            info.setConnectionId(this.info.getConnectionId());
1911            info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
1912            info.setDestination(destination);
1913            info.setTimeout(0);
1914            syncSendPacket(info);
1915        }
1916    
1917        public boolean isDeleted(ActiveMQDestination dest) {
1918    
1919            // If we are not watching the advisories.. then
1920            // we will assume that the temp destination does exist.
1921            if (advisoryConsumer == null) {
1922                return false;
1923            }
1924    
1925            return !activeTempDestinations.contains(dest);
1926        }
1927    
1928        public boolean isCopyMessageOnSend() {
1929            return copyMessageOnSend;
1930        }
1931    
1932        public LongSequenceGenerator getLocalTransactionIdGenerator() {
1933            return localTransactionIdGenerator;
1934        }
1935    
1936        public boolean isUseCompression() {
1937            return useCompression;
1938        }
1939    
1940        /**
1941         * Enables the use of compression of the message bodies
1942         */
1943        public void setUseCompression(boolean useCompression) {
1944            this.useCompression = useCompression;
1945        }
1946    
1947        public void destroyDestination(ActiveMQDestination destination) throws JMSException {
1948    
1949            checkClosedOrFailed();
1950            ensureConnectionInfoSent();
1951    
1952            DestinationInfo info = new DestinationInfo();
1953            info.setConnectionId(this.info.getConnectionId());
1954            info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
1955            info.setDestination(destination);
1956            info.setTimeout(0);
1957            syncSendPacket(info);
1958    
1959        }
1960    
1961        public boolean isDispatchAsync() {
1962            return dispatchAsync;
1963        }
1964    
1965        /**
1966         * Enables or disables the default setting of whether or not consumers have
1967         * their messages <a
1968         * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
1969         * synchronously or asynchronously by the broker</a>. For non-durable
1970         * topics for example we typically dispatch synchronously by default to
1971         * minimize context switches which boost performance. However sometimes its
1972         * better to go slower to ensure that a single blocked consumer socket does
1973         * not block delivery to other consumers.
1974         * 
1975         * @param asyncDispatch If true then consumers created on this connection
1976         *                will default to having their messages dispatched
1977         *                asynchronously. The default value is false.
1978         */
1979        public void setDispatchAsync(boolean asyncDispatch) {
1980            this.dispatchAsync = asyncDispatch;
1981        }
1982    
1983        public boolean isObjectMessageSerializationDefered() {
1984            return objectMessageSerializationDefered;
1985        }
1986    
1987        /**
1988         * When an object is set on an ObjectMessage, the JMS spec requires the
1989         * object to be serialized by that set method. Enabling this flag causes the
1990         * object to not get serialized. The object may subsequently get serialized
1991         * if the message needs to be sent over a socket or stored to disk.
1992         */
1993        public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
1994            this.objectMessageSerializationDefered = objectMessageSerializationDefered;
1995        }
1996    
1997        public InputStream createInputStream(Destination dest) throws JMSException {
1998            return createInputStream(dest, null);
1999        }
2000    
2001        public InputStream createInputStream(Destination dest, String messageSelector) throws JMSException {
2002            return createInputStream(dest, messageSelector, false);
2003        }
2004    
2005        public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException {
2006            return doCreateInputStream(dest, messageSelector, noLocal, null);
2007        }
2008    
2009        public InputStream createDurableInputStream(Topic dest, String name) throws JMSException {
2010            return createInputStream(dest, null, false);
2011        }
2012    
2013        public InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException {
2014            return createDurableInputStream(dest, name, messageSelector, false);
2015        }
2016    
2017        public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException {
2018            return doCreateInputStream(dest, messageSelector, noLocal, name);
2019        }
2020    
2021        private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName) throws JMSException {
2022            checkClosedOrFailed();
2023            ensureConnectionInfoSent();
2024            return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch());
2025        }
2026    
2027        /**
2028         * Creates a persistent output stream; individual messages will be written
2029         * to disk/database by the broker
2030         */
2031        public OutputStream createOutputStream(Destination dest) throws JMSException {
2032            return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
2033        }
2034    
2035        /**
2036         * Creates a non persistent output stream; messages will not be written to
2037         * disk
2038         */
2039        public OutputStream createNonPersistentOutputStream(Destination dest) throws JMSException {
2040            return createOutputStream(dest, null, DeliveryMode.NON_PERSISTENT, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
2041        }
2042    
2043        /**
2044         * Creates an output stream allowing full control over the delivery mode,
2045         * the priority and time to live of the messages and the properties added to
2046         * messages on the stream.
2047         * 
2048         * @param streamProperties defines a map of key-value pairs where the keys
2049         *                are strings and the values are primitive values (numbers
2050         *                and strings) which are appended to the messages similarly
2051         *                to using the
2052         *                {@link javax.jms.Message#setObjectProperty(String, Object)}
2053         *                method
2054         */
2055        public OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException {
2056            checkClosedOrFailed();
2057            ensureConnectionInfoSent();
2058            return new ActiveMQOutputStream(this, createProducerId(), ActiveMQDestination.transform(dest), streamProperties, deliveryMode, priority, timeToLive);
2059        }
2060    
2061        /**
2062         * Unsubscribes a durable subscription that has been created by a client.
2063         * <P>
2064         * This method deletes the state being maintained on behalf of the
2065         * subscriber by its provider.
2066         * <P>
2067         * It is erroneous for a client to delete a durable subscription while there
2068         * is an active <CODE>MessageConsumer </CODE> or
2069         * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
2070         * message is part of a pending transaction or has not been acknowledged in
2071         * the session.
2072         * 
2073         * @param name the name used to identify this subscription
2074         * @throws JMSException if the session fails to unsubscribe to the durable
2075         *                 subscription due to some internal error.
2076         * @throws InvalidDestinationException if an invalid subscription name is
2077         *                 specified.
2078         * @since 1.1
2079         */
2080        public void unsubscribe(String name) throws InvalidDestinationException, JMSException {
2081            checkClosedOrFailed();
2082            RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
2083            rsi.setConnectionId(getConnectionInfo().getConnectionId());
2084            rsi.setSubscriptionName(name);
2085            rsi.setClientId(getConnectionInfo().getClientId());
2086            syncSendPacket(rsi);
2087        }
2088    
2089        /**
2090         * Internal send method optimized: - It does not copy the message - It can
2091         * only handle ActiveMQ messages. - You can specify if the send is async or
2092         * sync - Does not allow you to send /w a transaction.
2093         */
2094        void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException {
2095            checkClosedOrFailed();
2096    
2097            if (destination.isTemporary() && isDeleted(destination)) {
2098                throw new JMSException("Cannot publish to a deleted Destination: " + destination);
2099            }
2100    
2101            msg.setJMSDestination(destination);
2102            msg.setJMSDeliveryMode(deliveryMode);
2103            long expiration = 0L;
2104    
2105            if (!isDisableTimeStampsByDefault()) {
2106                long timeStamp = System.currentTimeMillis();
2107                msg.setJMSTimestamp(timeStamp);
2108                if (timeToLive > 0) {
2109                    expiration = timeToLive + timeStamp;
2110                }
2111            }
2112    
2113            msg.setJMSExpiration(expiration);
2114            msg.setJMSPriority(priority);
2115    
2116            msg.setJMSRedelivered(false);
2117            msg.setMessageId(messageId);
2118    
2119            msg.onSend();
2120    
2121            msg.setProducerId(msg.getMessageId().getProducerId());
2122    
2123            if (LOG.isDebugEnabled()) {
2124                LOG.debug("Sending message: " + msg);
2125            }
2126    
2127            if (async) {
2128                asyncSendPacket(msg);
2129            } else {
2130                syncSendPacket(msg);
2131            }
2132    
2133        }
2134    
2135        public void addOutputStream(ActiveMQOutputStream stream) {
2136            outputStreams.add(stream);
2137        }
2138    
2139        public void removeOutputStream(ActiveMQOutputStream stream) {
2140            outputStreams.remove(stream);
2141        }
2142    
2143        public void addInputStream(ActiveMQInputStream stream) {
2144            inputStreams.add(stream);
2145        }
2146    
2147        public void removeInputStream(ActiveMQInputStream stream) {
2148            inputStreams.remove(stream);
2149        }
2150    
2151        protected void onControlCommand(ControlCommand command) {
2152            String text = command.getCommand();
2153            if (text != null) {
2154                if (text.equals("shutdown")) {
2155                    LOG.info("JVM told to shutdown");
2156                    System.exit(0);
2157                }
2158            }
2159        }
2160    
2161        protected void onConnectionControl(ConnectionControl command) {
2162            if (command.isFaultTolerant()) {
2163                this.optimizeAcknowledge = false;
2164                for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2165                    ActiveMQSession s = i.next();
2166                    s.setOptimizeAcknowledge(false);
2167                }
2168            }
2169        }
2170    
2171        protected void onConsumerControl(ConsumerControl command) {
2172            if (command.isClose()) {
2173                for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2174                    ActiveMQSession s = i.next();
2175                    s.close(command.getConsumerId());
2176                }
2177            } else {
2178                for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2179                    ActiveMQSession s = i.next();
2180                    s.setPrefetchSize(command.getConsumerId(), command.getPrefetch());
2181                }
2182            }
2183        }
2184    
2185        protected void transportFailed(IOException error) {
2186            transportFailed.set(true);
2187            if (firstFailureError == null) {
2188                firstFailureError = error;
2189            }
2190        }
2191    
2192        /**
2193         * Should a JMS message be copied to a new JMS Message object as part of the
2194         * send() method in JMS. This is enabled by default to be compliant with the
2195         * JMS specification. You can disable it if you do not mutate JMS messages
2196         * after they are sent for a performance boost
2197         */
2198        public void setCopyMessageOnSend(boolean copyMessageOnSend) {
2199            this.copyMessageOnSend = copyMessageOnSend;
2200        }
2201    
2202        public String toString() {
2203            return "ActiveMQConnection {id=" + info.getConnectionId() + ",clientId=" + info.getClientId() + ",started=" + started.get() + "}";
2204        }
2205    
2206        protected BlobTransferPolicy createBlobTransferPolicy() {
2207            return new BlobTransferPolicy();
2208        }
2209    
2210        public int getProtocolVersion() {
2211            return protocolVersion.get();
2212        }
2213    
2214        public int getProducerWindowSize() {
2215            return producerWindowSize;
2216        }
2217    
2218        public void setProducerWindowSize(int producerWindowSize) {
2219            this.producerWindowSize = producerWindowSize;
2220        }
2221    
2222        public void setAuditDepth(int auditDepth) {
2223            connectionAudit.setAuditDepth(auditDepth);
2224            }
2225    
2226        public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
2227            connectionAudit.setAuditMaximumProducerNumber(auditMaximumProducerNumber);
2228            }
2229    
2230        protected void removeDispatcher(ActiveMQDispatcher dispatcher) {
2231            connectionAudit.removeDispatcher(dispatcher);
2232        }
2233    
2234        protected boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2235            return connectionAudit.isDuplicate(dispatcher, message);
2236        }
2237    
2238        protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2239            connectionAudit.rollbackDuplicate(dispatcher, message);
2240        }
2241    
2242        public IOException getFirstFailureError() {
2243            return firstFailureError;
2244        }
2245    
2246        protected void waitForTransportInterruptionProcessing() throws InterruptedException {
2247            if (transportInterruptionProcessingComplete != null) {
2248                while (!closed.get() && !transportFailed.get() && !transportInterruptionProcessingComplete.await(15, TimeUnit.SECONDS)) {
2249                    LOG.warn("dispatch paused, waiting for outstanding dispatch interruption processing (" + transportInterruptionProcessingComplete.getCount() + ") to complete..");
2250                }
2251                synchronized (this) {
2252                    transportInterruptionProcessingComplete = null;
2253                }
2254            }
2255        }
2256    
2257        protected synchronized void transportInterruptionProcessingComplete() {
2258            if (transportInterruptionProcessingComplete != null) {
2259               transportInterruptionProcessingComplete.countDown();
2260            }
2261        }
2262    
2263        /*
2264         * specify the amount of time in milliseconds that a consumer with a transaction pending recovery
2265         * will wait to receive re dispatched messages.
2266         * default value is 0 so there is no wait by default.
2267         */
2268        public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
2269            this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
2270        }
2271        
2272        public long getConsumerFailoverRedeliveryWaitPeriod() {
2273            return consumerFailoverRedeliveryWaitPeriod;
2274        }
2275    }