001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.net.URI;
020    import java.net.URISyntaxException;
021    import java.util.HashMap;
022    import java.util.Map;
023    import java.util.Properties;
024    import java.util.concurrent.Executor;
025    import java.util.concurrent.ScheduledThreadPoolExecutor;
026    import java.util.concurrent.ThreadFactory;
027    
028    import javax.jms.Connection;
029    import javax.jms.ConnectionFactory;
030    import javax.jms.ExceptionListener;
031    import javax.jms.JMSException;
032    import javax.jms.QueueConnection;
033    import javax.jms.QueueConnectionFactory;
034    import javax.jms.TopicConnection;
035    import javax.jms.TopicConnectionFactory;
036    import javax.naming.Context;
037    
038    import org.apache.activemq.blob.BlobTransferPolicy;
039    import org.apache.activemq.jndi.JNDIBaseStorable;
040    import org.apache.activemq.management.JMSStatsImpl;
041    import org.apache.activemq.management.StatsCapable;
042    import org.apache.activemq.management.StatsImpl;
043    import org.apache.activemq.transport.Transport;
044    import org.apache.activemq.transport.TransportFactory;
045    import org.apache.activemq.transport.TransportListener;
046    import org.apache.activemq.util.IdGenerator;
047    import org.apache.activemq.util.IntrospectionSupport;
048    import org.apache.activemq.util.JMSExceptionSupport;
049    import org.apache.activemq.util.URISupport;
050    import org.apache.activemq.util.URISupport.CompositeData;
051    
052    /**
053     * A ConnectionFactory is an an Administered object, and is used for creating
054     * Connections. <p/> This class also implements QueueConnectionFactory and
055     * TopicConnectionFactory. You can use this connection to create both
056     * QueueConnections and TopicConnections.
057     * 
058     * @version $Revision: 1.9 $
059     * @see javax.jms.ConnectionFactory
060     */
061    public class ActiveMQConnectionFactory extends JNDIBaseStorable implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, StatsCapable, Cloneable {
062    
063        public static final String DEFAULT_BROKER_BIND_URL = "tcp://localhost:61616";
064        public static final String DEFAULT_BROKER_URL = "failover://"+DEFAULT_BROKER_BIND_URL;
065        public static final String DEFAULT_USER = null;
066        public static final String DEFAULT_PASSWORD = null;
067        public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 0;
068    
069        protected static final Executor DEFAULT_CONNECTION_EXECUTOR = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
070            public Thread newThread(Runnable run) {
071                Thread thread = new Thread(run);
072                thread.setPriority(ThreadPriorities.INBOUND_CLIENT_CONNECTION);
073                return thread;
074            }
075        });
076    
077        protected URI brokerURL;
078        protected String userName;
079        protected String password;
080        protected String clientID;
081        protected boolean dispatchAsync=true;
082        protected boolean alwaysSessionAsync=true;
083    
084        JMSStatsImpl factoryStats = new JMSStatsImpl();
085    
086        private IdGenerator clientIdGenerator;
087        private String clientIDPrefix;
088    
089        // client policies
090        private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
091        private RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
092        private BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy();
093        private MessageTransformer transformer;
094    
095        private boolean disableTimeStampsByDefault;
096        private boolean optimizedMessageDispatch = true;
097        private boolean copyMessageOnSend = true;
098        private boolean useCompression;
099        private boolean objectMessageSerializationDefered;
100        private boolean useAsyncSend;
101        private boolean optimizeAcknowledge;
102        private int closeTimeout = 15000;
103        private boolean useRetroactiveConsumer;
104        private boolean exclusiveConsumer;
105        private boolean nestedMapAndListEnabled = true;
106        private boolean alwaysSyncSend;
107        private boolean watchTopicAdvisories = true;
108        private int producerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE;
109        private long warnAboutUnstartedConnectionTimeout = 500L;
110        private int sendTimeout = 0;
111        private boolean sendAcksAsync=true;
112        private TransportListener transportListener;
113        private ExceptionListener exceptionListener;
114        private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
115        private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
116        private boolean useDedicatedTaskRunner;
117        private long consumerFailoverRedeliveryWaitPeriod = 0;
118        private ClientInternalExceptionListener clientInternalExceptionListener;
119    
120        // /////////////////////////////////////////////
121        //
122        // ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory Methods
123        //
124        // /////////////////////////////////////////////
125    
126        public ActiveMQConnectionFactory() {
127            this(DEFAULT_BROKER_URL);
128        }
129    
130        public ActiveMQConnectionFactory(String brokerURL) {
131            this(createURI(brokerURL));
132        }
133    
134        public ActiveMQConnectionFactory(URI brokerURL) {
135            setBrokerURL(brokerURL.toString());
136        }
137    
138        public ActiveMQConnectionFactory(String userName, String password, URI brokerURL) {
139            setUserName(userName);
140            setPassword(password);
141            setBrokerURL(brokerURL.toString());
142        }
143    
144        public ActiveMQConnectionFactory(String userName, String password, String brokerURL) {
145            setUserName(userName);
146            setPassword(password);
147            setBrokerURL(brokerURL);
148        }
149    
150        /**
151         * Returns a copy of the given connection factory
152         */
153        public ActiveMQConnectionFactory copy() {
154            try {
155                return (ActiveMQConnectionFactory)super.clone();
156            } catch (CloneNotSupportedException e) {
157                throw new RuntimeException("This should never happen: " + e, e);
158            }
159        }
160    
161        /**
162         * @param brokerURL
163         * @return
164         * @throws URISyntaxException
165         */
166        private static URI createURI(String brokerURL) {
167            try {
168                return new URI(brokerURL);
169            } catch (URISyntaxException e) {
170                throw (IllegalArgumentException)new IllegalArgumentException("Invalid broker URI: " + brokerURL).initCause(e);
171            }
172        }
173    
174        /**
175         * @return Returns the Connection.
176         */
177        public Connection createConnection() throws JMSException {
178            return createActiveMQConnection();
179        }
180    
181        /**
182         * @return Returns the Connection.
183         */
184        public Connection createConnection(String userName, String password) throws JMSException {
185            return createActiveMQConnection(userName, password);
186        }
187    
188        /**
189         * @return Returns the QueueConnection.
190         * @throws JMSException
191         */
192        public QueueConnection createQueueConnection() throws JMSException {
193            return createActiveMQConnection();
194        }
195    
196        /**
197         * @return Returns the QueueConnection.
198         */
199        public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
200            return createActiveMQConnection(userName, password);
201        }
202    
203        /**
204         * @return Returns the TopicConnection.
205         * @throws JMSException
206         */
207        public TopicConnection createTopicConnection() throws JMSException {
208            return createActiveMQConnection();
209        }
210    
211        /**
212         * @return Returns the TopicConnection.
213         */
214        public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
215            return createActiveMQConnection(userName, password);
216        }
217    
218        public StatsImpl getStats() {
219            // TODO
220            return null;
221        }
222    
223        // /////////////////////////////////////////////
224        //
225        // Implementation methods.
226        //
227        // /////////////////////////////////////////////
228    
229        protected ActiveMQConnection createActiveMQConnection() throws JMSException {
230            return createActiveMQConnection(userName, password);
231        }
232    
233        /**
234         * Creates a Transport based on this object's connection settings. Separated
235         * from createActiveMQConnection to allow for subclasses to override.
236         * 
237         * @return The newly created Transport.
238         * @throws JMSException If unable to create trasnport.
239         * @author sepandm@gmail.com
240         */
241        protected Transport createTransport() throws JMSException {
242            try {
243                return TransportFactory.connect(brokerURL, DEFAULT_CONNECTION_EXECUTOR);
244            } catch (Exception e) {
245                throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e);
246            }
247        }
248    
249        /**
250         * @return Returns the Connection.
251         */
252        protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException {
253            if (brokerURL == null) {
254                throw new ConfigurationException("brokerURL not set.");
255            }
256            ActiveMQConnection connection = null;
257            try {
258                Transport transport = createTransport();
259                connection = createActiveMQConnection(transport, factoryStats);
260    
261                connection.setUserName(userName);
262                connection.setPassword(password);
263    
264                configureConnection(connection);
265    
266                transport.start();
267    
268                if (clientID != null) {
269                    connection.setDefaultClientID(clientID);
270                }
271    
272                return connection;
273            } catch (JMSException e) {
274                // Clean up!
275                try {
276                    connection.close();
277                } catch (Throwable ignore) {
278                }
279                throw e;
280            } catch (Exception e) {
281                // Clean up!
282                try {
283                    connection.close();
284                } catch (Throwable ignore) {
285                }
286                throw JMSExceptionSupport.create("Could not connect to broker URL: " + brokerURL + ". Reason: " + e, e);
287            }
288        }
289    
290        protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception {
291            ActiveMQConnection connection = new ActiveMQConnection(transport, getClientIdGenerator(), stats);
292            return connection;
293        }
294    
295        protected void configureConnection(ActiveMQConnection connection) throws JMSException {
296            connection.setPrefetchPolicy(getPrefetchPolicy());
297            connection.setDisableTimeStampsByDefault(isDisableTimeStampsByDefault());
298            connection.setOptimizedMessageDispatch(isOptimizedMessageDispatch());
299            connection.setCopyMessageOnSend(isCopyMessageOnSend());
300            connection.setUseCompression(isUseCompression());
301            connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered());
302            connection.setDispatchAsync(isDispatchAsync());
303            connection.setUseAsyncSend(isUseAsyncSend());
304            connection.setAlwaysSyncSend(isAlwaysSyncSend());
305            connection.setAlwaysSessionAsync(isAlwaysSessionAsync());
306            connection.setOptimizeAcknowledge(isOptimizeAcknowledge());
307            connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
308            connection.setExclusiveConsumer(isExclusiveConsumer());
309            connection.setRedeliveryPolicy(getRedeliveryPolicy());
310            connection.setTransformer(getTransformer());
311            connection.setBlobTransferPolicy(getBlobTransferPolicy().copy());
312            connection.setWatchTopicAdvisories(isWatchTopicAdvisories());
313            connection.setProducerWindowSize(getProducerWindowSize());
314            connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout());
315            connection.setSendTimeout(getSendTimeout());
316            connection.setSendAcksAsync(isSendAcksAsync());
317            connection.setAuditDepth(getAuditDepth());
318            connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber());
319            connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner());
320            connection.setConsumerFailoverRedeliveryWaitPeriod(getConsumerFailoverRedeliveryWaitPeriod());
321            if (transportListener != null) {
322                connection.addTransportListener(transportListener);
323            }
324            if (exceptionListener != null) {
325                    connection.setExceptionListener(exceptionListener);
326            }
327            if (clientInternalExceptionListener != null) {
328                connection.setClientInternalExceptionListener(clientInternalExceptionListener);
329            }
330        }
331    
332        // /////////////////////////////////////////////
333        //
334        // Property Accessors
335        //
336        // /////////////////////////////////////////////
337    
338        public String getBrokerURL() {
339            return brokerURL == null ? null : brokerURL.toString();
340        }
341    
342        /**
343         * Sets the <a
344         * href="http://activemq.apache.org/configuring-transports.html">connection
345         * URL</a> used to connect to the ActiveMQ broker.
346         */
347        public void setBrokerURL(String brokerURL) {
348            this.brokerURL = createURI(brokerURL);
349    
350            // Use all the properties prefixed with 'jms.' to set the connection
351            // factory
352            // options.
353            if (this.brokerURL.getQuery() != null) {
354                // It might be a standard URI or...
355                try {
356    
357                    Map map = URISupport.parseQuery(this.brokerURL.getQuery());
358                    if (buildFromMap(IntrospectionSupport.extractProperties(map, "jms."))) {
359                        this.brokerURL = URISupport.createRemainingURI(this.brokerURL, map);
360                    }
361    
362                } catch (URISyntaxException e) {
363                }
364    
365            } else {
366    
367                // It might be a composite URI.
368                try {
369                    CompositeData data = URISupport.parseComposite(this.brokerURL);
370                    if (buildFromMap(IntrospectionSupport.extractProperties(data.getParameters(), "jms."))) {
371                        this.brokerURL = data.toURI();
372                    }
373                } catch (URISyntaxException e) {
374                }
375            }
376        }
377    
378        public String getClientID() {
379            return clientID;
380        }
381    
382        /**
383         * Sets the JMS clientID to use for the created connection. Note that this
384         * can only be used by one connection at once so generally its a better idea
385         * to set the clientID on a Connection
386         */
387        public void setClientID(String clientID) {
388            this.clientID = clientID;
389        }
390    
391        public boolean isCopyMessageOnSend() {
392            return copyMessageOnSend;
393        }
394    
395        /**
396         * Should a JMS message be copied to a new JMS Message object as part of the
397         * send() method in JMS. This is enabled by default to be compliant with the
398         * JMS specification. You can disable it if you do not mutate JMS messages
399         * after they are sent for a performance boost
400         */
401        public void setCopyMessageOnSend(boolean copyMessageOnSend) {
402            this.copyMessageOnSend = copyMessageOnSend;
403        }
404    
405        public boolean isDisableTimeStampsByDefault() {
406            return disableTimeStampsByDefault;
407        }
408    
409        /**
410         * Sets whether or not timestamps on messages should be disabled or not. If
411         * you disable them it adds a small performance boost.
412         */
413        public void setDisableTimeStampsByDefault(boolean disableTimeStampsByDefault) {
414            this.disableTimeStampsByDefault = disableTimeStampsByDefault;
415        }
416    
417        public boolean isOptimizedMessageDispatch() {
418            return optimizedMessageDispatch;
419        }
420    
421        /**
422         * If this flag is set then an larger prefetch limit is used - only
423         * applicable for durable topic subscribers.
424         */
425        public void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) {
426            this.optimizedMessageDispatch = optimizedMessageDispatch;
427        }
428    
429        public String getPassword() {
430            return password;
431        }
432    
433        /**
434         * Sets the JMS password used for connections created from this factory
435         */
436        public void setPassword(String password) {
437            this.password = password;
438        }
439    
440        public ActiveMQPrefetchPolicy getPrefetchPolicy() {
441            return prefetchPolicy;
442        }
443    
444        /**
445         * Sets the <a
446         * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
447         * policy</a> for consumers created by this connection.
448         */
449        public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
450            this.prefetchPolicy = prefetchPolicy;
451        }
452    
453        public boolean isUseAsyncSend() {
454            return useAsyncSend;
455        }
456    
457        public BlobTransferPolicy getBlobTransferPolicy() {
458            return blobTransferPolicy;
459        }
460    
461        /**
462         * Sets the policy used to describe how out-of-band BLOBs (Binary Large
463         * OBjects) are transferred from producers to brokers to consumers
464         */
465        public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
466            this.blobTransferPolicy = blobTransferPolicy;
467        }
468    
469        /**
470         * Forces the use of <a
471         * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
472         * adds a massive performance boost; but means that the send() method will
473         * return immediately whether the message has been sent or not which could
474         * lead to message loss.
475         */
476        public void setUseAsyncSend(boolean useAsyncSend) {
477            this.useAsyncSend = useAsyncSend;
478        }
479    
480        public synchronized boolean isWatchTopicAdvisories() {
481            return watchTopicAdvisories;
482        }
483    
484        public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
485            this.watchTopicAdvisories = watchTopicAdvisories;
486        }
487    
488        /**
489         * @return true if always sync send messages
490         */
491        public boolean isAlwaysSyncSend() {
492            return this.alwaysSyncSend;
493        }
494    
495        /**
496         * Set true if always require messages to be sync sent
497         * 
498         * @param alwaysSyncSend
499         */
500        public void setAlwaysSyncSend(boolean alwaysSyncSend) {
501            this.alwaysSyncSend = alwaysSyncSend;
502        }
503    
504        public String getUserName() {
505            return userName;
506        }
507    
508        /**
509         * Sets the JMS userName used by connections created by this factory
510         */
511        public void setUserName(String userName) {
512            this.userName = userName;
513        }
514    
515        public boolean isUseRetroactiveConsumer() {
516            return useRetroactiveConsumer;
517        }
518    
519        /**
520         * Sets whether or not retroactive consumers are enabled. Retroactive
521         * consumers allow non-durable topic subscribers to receive old messages
522         * that were published before the non-durable subscriber started.
523         */
524        public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
525            this.useRetroactiveConsumer = useRetroactiveConsumer;
526        }
527    
528        public boolean isExclusiveConsumer() {
529            return exclusiveConsumer;
530        }
531    
532        /**
533         * Enables or disables whether or not queue consumers should be exclusive or
534         * not for example to preserve ordering when not using <a
535         * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
536         * 
537         * @param exclusiveConsumer
538         */
539        public void setExclusiveConsumer(boolean exclusiveConsumer) {
540            this.exclusiveConsumer = exclusiveConsumer;
541        }
542    
543        public RedeliveryPolicy getRedeliveryPolicy() {
544            return redeliveryPolicy;
545        }
546    
547        /**
548         * Sets the global redelivery policy to be used when a message is delivered
549         * but the session is rolled back
550         */
551        public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
552            this.redeliveryPolicy = redeliveryPolicy;
553        }
554    
555        public MessageTransformer getTransformer() {
556            return transformer;
557        }
558        
559        /**
560         * @return the sendTimeout
561         */
562        public int getSendTimeout() {
563            return sendTimeout;
564        }
565    
566        /**
567         * @param sendTimeout the sendTimeout to set
568         */
569        public void setSendTimeout(int sendTimeout) {
570            this.sendTimeout = sendTimeout;
571        }
572        
573        /**
574         * @return the sendAcksAsync
575         */
576        public boolean isSendAcksAsync() {
577            return sendAcksAsync;
578        }
579    
580        /**
581         * @param sendAcksAsync the sendAcksAsync to set
582         */
583        public void setSendAcksAsync(boolean sendAcksAsync) {
584            this.sendAcksAsync = sendAcksAsync;
585        }
586    
587    
588        /**
589         * Sets the transformer used to transform messages before they are sent on
590         * to the JMS bus or when they are received from the bus but before they are
591         * delivered to the JMS client
592         */
593        public void setTransformer(MessageTransformer transformer) {
594            this.transformer = transformer;
595        }
596    
597        public void buildFromProperties(Properties properties) {
598    
599            if (properties == null) {
600                properties = new Properties();
601            }
602    
603            String temp = properties.getProperty(Context.PROVIDER_URL);
604            if (temp == null || temp.length() == 0) {
605                temp = properties.getProperty("brokerURL");
606            }
607            if (temp != null && temp.length() > 0) {
608                setBrokerURL(temp);
609            }
610    
611            Map<String, Object> p = new HashMap(properties);
612            buildFromMap(p);
613        }
614    
615        public boolean buildFromMap(Map<String, Object> properties) {
616            boolean rc = false;
617    
618            ActiveMQPrefetchPolicy p = new ActiveMQPrefetchPolicy();
619            if (IntrospectionSupport.setProperties(p, properties, "prefetchPolicy.")) {
620                setPrefetchPolicy(p);
621                rc = true;
622            }
623    
624            RedeliveryPolicy rp = new RedeliveryPolicy();
625            if (IntrospectionSupport.setProperties(rp, properties, "redeliveryPolicy.")) {
626                setRedeliveryPolicy(rp);
627                rc = true;
628            }
629    
630            BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy();
631            if (IntrospectionSupport.setProperties(blobTransferPolicy, properties, "blobTransferPolicy.")) {
632                setBlobTransferPolicy(blobTransferPolicy);
633                rc = true;
634            }
635    
636            rc |= IntrospectionSupport.setProperties(this, properties);
637    
638            return rc;
639        }
640    
641        public void populateProperties(Properties props) {
642            props.setProperty("dispatchAsync", Boolean.toString(isDispatchAsync()));
643    
644            if (getBrokerURL() != null) {
645                props.setProperty(Context.PROVIDER_URL, getBrokerURL());
646                props.setProperty("brokerURL", getBrokerURL());
647            }
648    
649            if (getClientID() != null) {
650                props.setProperty("clientID", getClientID());
651            }
652    
653            IntrospectionSupport.getProperties(getPrefetchPolicy(), props, "prefetchPolicy.");
654            IntrospectionSupport.getProperties(getRedeliveryPolicy(), props, "redeliveryPolicy.");
655            IntrospectionSupport.getProperties(getBlobTransferPolicy(), props, "blobTransferPolicy.");
656    
657            props.setProperty("copyMessageOnSend", Boolean.toString(isCopyMessageOnSend()));
658            props.setProperty("disableTimeStampsByDefault", Boolean.toString(isDisableTimeStampsByDefault()));
659            props.setProperty("objectMessageSerializationDefered", Boolean.toString(isObjectMessageSerializationDefered()));
660            props.setProperty("optimizedMessageDispatch", Boolean.toString(isOptimizedMessageDispatch()));
661    
662            if (getPassword() != null) {
663                props.setProperty("password", getPassword());
664            }
665    
666            props.setProperty("useAsyncSend", Boolean.toString(isUseAsyncSend()));
667            props.setProperty("useCompression", Boolean.toString(isUseCompression()));
668            props.setProperty("useRetroactiveConsumer", Boolean.toString(isUseRetroactiveConsumer()));
669            props.setProperty("watchTopicAdvisories", Boolean.toString(isWatchTopicAdvisories()));
670    
671            if (getUserName() != null) {
672                props.setProperty("userName", getUserName());
673            }
674    
675            props.setProperty("closeTimeout", Integer.toString(getCloseTimeout()));
676            props.setProperty("alwaysSessionAsync", Boolean.toString(isAlwaysSessionAsync()));
677            props.setProperty("optimizeAcknowledge", Boolean.toString(isOptimizeAcknowledge()));
678            props.setProperty("statsEnabled", Boolean.toString(isStatsEnabled()));
679            props.setProperty("alwaysSyncSend", Boolean.toString(isAlwaysSyncSend()));
680            props.setProperty("producerWindowSize", Integer.toString(getProducerWindowSize()));
681            props.setProperty("sendTimeout", Integer.toString(getSendTimeout()));
682            props.setProperty("sendAcksAsync",Boolean.toString(isSendAcksAsync()));
683            props.setProperty("auditDepth", Integer.toString(getAuditDepth()));
684            props.setProperty("auditMaximumProducerNumber", Integer.toString(getAuditMaximumProducerNumber()));
685        }
686    
687        public boolean isUseCompression() {
688            return useCompression;
689        }
690    
691        /**
692         * Enables the use of compression of the message bodies
693         */
694        public void setUseCompression(boolean useCompression) {
695            this.useCompression = useCompression;
696        }
697    
698        public boolean isObjectMessageSerializationDefered() {
699            return objectMessageSerializationDefered;
700        }
701    
702        /**
703         * When an object is set on an ObjectMessage, the JMS spec requires the
704         * object to be serialized by that set method. Enabling this flag causes the
705         * object to not get serialized. The object may subsequently get serialized
706         * if the message needs to be sent over a socket or stored to disk.
707         */
708        public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
709            this.objectMessageSerializationDefered = objectMessageSerializationDefered;
710        }
711    
712        public boolean isDispatchAsync() {
713            return dispatchAsync;
714        }
715    
716        /**
717         * Enables or disables the default setting of whether or not consumers have
718         * their messages <a
719         * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
720         * synchronously or asynchronously by the broker</a>. For non-durable
721         * topics for example we typically dispatch synchronously by default to
722         * minimize context switches which boost performance. However sometimes its
723         * better to go slower to ensure that a single blocked consumer socket does
724         * not block delivery to other consumers.
725         * 
726         * @param asyncDispatch If true then consumers created on this connection
727         *                will default to having their messages dispatched
728         *                asynchronously. The default value is false.
729         */
730        public void setDispatchAsync(boolean asyncDispatch) {
731            this.dispatchAsync = asyncDispatch;
732        }
733    
734        /**
735         * @return Returns the closeTimeout.
736         */
737        public int getCloseTimeout() {
738            return closeTimeout;
739        }
740    
741        /**
742         * Sets the timeout before a close is considered complete. Normally a
743         * close() on a connection waits for confirmation from the broker; this
744         * allows that operation to timeout to save the client hanging if there is
745         * no broker
746         */
747        public void setCloseTimeout(int closeTimeout) {
748            this.closeTimeout = closeTimeout;
749        }
750    
751        /**
752         * @return Returns the alwaysSessionAsync.
753         */
754        public boolean isAlwaysSessionAsync() {
755            return alwaysSessionAsync;
756        }
757    
758        /**
759         * If this flag is set then a separate thread is not used for dispatching
760         * messages for each Session in the Connection. However, a separate thread
761         * is always used if there is more than one session, or the session isn't in
762         * auto acknowledge or duplicates ok mode
763         */
764        public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
765            this.alwaysSessionAsync = alwaysSessionAsync;
766        }
767    
768        /**
769         * @return Returns the optimizeAcknowledge.
770         */
771        public boolean isOptimizeAcknowledge() {
772            return optimizeAcknowledge;
773        }
774    
775        /**
776         * @param optimizeAcknowledge The optimizeAcknowledge to set.
777         */
778        public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
779            this.optimizeAcknowledge = optimizeAcknowledge;
780        }
781    
782        public boolean isNestedMapAndListEnabled() {
783            return nestedMapAndListEnabled;
784        }
785    
786        /**
787         * Enables/disables whether or not Message properties and MapMessage entries
788         * support <a
789         * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
790         * Structures</a> of Map and List objects
791         */
792        public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
793            this.nestedMapAndListEnabled = structuredMapsEnabled;
794        }
795    
796        public String getClientIDPrefix() {
797            return clientIDPrefix;
798        }
799    
800        /**
801         * Sets the prefix used by autogenerated JMS Client ID values which are used
802         * if the JMS client does not explicitly specify on.
803         * 
804         * @param clientIDPrefix
805         */
806        public void setClientIDPrefix(String clientIDPrefix) {
807            this.clientIDPrefix = clientIDPrefix;
808        }
809    
810        protected synchronized IdGenerator getClientIdGenerator() {
811            if (clientIdGenerator == null) {
812                if (clientIDPrefix != null) {
813                    clientIdGenerator = new IdGenerator(clientIDPrefix);
814                } else {
815                    clientIdGenerator = new IdGenerator();
816                }
817            }
818            return clientIdGenerator;
819        }
820    
821        protected void setClientIdGenerator(IdGenerator clientIdGenerator) {
822            this.clientIdGenerator = clientIdGenerator;
823        }
824    
825        /**
826         * @return the statsEnabled
827         */
828        public boolean isStatsEnabled() {
829            return this.factoryStats.isEnabled();
830        }
831    
832        /**
833         * @param statsEnabled the statsEnabled to set
834         */
835        public void setStatsEnabled(boolean statsEnabled) {
836            this.factoryStats.setEnabled(statsEnabled);
837        }
838    
839        public synchronized int getProducerWindowSize() {
840            return producerWindowSize;
841        }
842    
843        public synchronized void setProducerWindowSize(int producerWindowSize) {
844            this.producerWindowSize = producerWindowSize;
845        }
846    
847        public long getWarnAboutUnstartedConnectionTimeout() {
848            return warnAboutUnstartedConnectionTimeout;
849        }
850    
851        /**
852         * Enables the timeout from a connection creation to when a warning is
853         * generated if the connection is not properly started via
854         * {@link Connection#start()} and a message is received by a consumer. It is
855         * a very common gotcha to forget to <a
856         * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
857         * the connection</a> so this option makes the default case to create a
858         * warning if the user forgets. To disable the warning just set the value to <
859         * 0 (say -1).
860         */
861        public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
862            this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
863        }
864    
865        public TransportListener getTransportListener() {
866            return transportListener;
867        }
868    
869        /**
870         * Allows a listener to be configured on the ConnectionFactory so that when this factory is used
871         * with frameworks which don't expose the Connection such as Spring JmsTemplate, you can still register
872         * a transport listener.
873         *
874         * @param transportListener sets the listener to be registered on all connections
875         * created by this factory
876         */
877        public void setTransportListener(TransportListener transportListener) {
878            this.transportListener = transportListener;
879        }
880        
881        
882        public ExceptionListener getExceptionListener() {
883            return exceptionListener;
884        }
885        
886        /**
887         * Allows an {@link ExceptionListener} to be configured on the ConnectionFactory so that when this factory
888         * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register
889         * an exception listener.
890         * <p> Note: access to this exceptionLinstener will <b>not</b> be serialized if it is associated with more than
891         * on connection (as it will be if more than one connection is subsequently created by this connection factory)
892         * @param exceptionListener sets the exception listener to be registered on all connections
893         * created by this factory
894         */
895        public void setExceptionListener(ExceptionListener exceptionListener) {
896            this.exceptionListener = exceptionListener;
897        }
898    
899            public int getAuditDepth() {
900                    return auditDepth;
901            }
902    
903            public void setAuditDepth(int auditDepth) {
904                    this.auditDepth = auditDepth;
905            }
906    
907            public int getAuditMaximumProducerNumber() {
908                    return auditMaximumProducerNumber;
909            }
910    
911            public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
912                    this.auditMaximumProducerNumber = auditMaximumProducerNumber;
913            }
914    
915        public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
916            this.useDedicatedTaskRunner = useDedicatedTaskRunner;
917        }
918        
919        public boolean isUseDedicatedTaskRunner() {
920            return useDedicatedTaskRunner;
921        }
922        
923        public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
924            this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
925        }
926        
927        public long getConsumerFailoverRedeliveryWaitPeriod() {
928            return consumerFailoverRedeliveryWaitPeriod;
929        }
930    
931        public ClientInternalExceptionListener getClientInternalExceptionListener() {
932            return clientInternalExceptionListener;
933        }
934        
935        /**
936         * Allows an {@link ClientInternalExceptionListener} to be configured on the ConnectionFactory so that when this factory
937         * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register
938         * an exception listener.
939         * <p> Note: access to this clientInternalExceptionListener will <b>not</b> be serialized if it is associated with more than
940         * on connection (as it will be if more than one connection is subsequently created by this connection factory)
941         * @param clientInternalExceptionListener sets the exception listener to be registered on all connections
942         * created by this factory
943         */
944        public void setClientInternalExceptionListener(
945                ClientInternalExceptionListener clientInternalExceptionListener) {
946            this.clientInternalExceptionListener = clientInternalExceptionListener;
947        }
948    }