001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq; 018 019 import java.net.URI; 020 import java.net.URISyntaxException; 021 import java.util.HashMap; 022 import java.util.Map; 023 import java.util.Properties; 024 import java.util.concurrent.Executor; 025 import java.util.concurrent.ScheduledThreadPoolExecutor; 026 import java.util.concurrent.ThreadFactory; 027 028 import javax.jms.Connection; 029 import javax.jms.ConnectionFactory; 030 import javax.jms.ExceptionListener; 031 import javax.jms.JMSException; 032 import javax.jms.QueueConnection; 033 import javax.jms.QueueConnectionFactory; 034 import javax.jms.TopicConnection; 035 import javax.jms.TopicConnectionFactory; 036 import javax.naming.Context; 037 038 import org.apache.activemq.blob.BlobTransferPolicy; 039 import org.apache.activemq.jndi.JNDIBaseStorable; 040 import org.apache.activemq.management.JMSStatsImpl; 041 import org.apache.activemq.management.StatsCapable; 042 import org.apache.activemq.management.StatsImpl; 043 import org.apache.activemq.transport.Transport; 044 import org.apache.activemq.transport.TransportFactory; 045 import org.apache.activemq.transport.TransportListener; 046 import org.apache.activemq.util.IdGenerator; 047 import org.apache.activemq.util.IntrospectionSupport; 048 import org.apache.activemq.util.JMSExceptionSupport; 049 import org.apache.activemq.util.URISupport; 050 import org.apache.activemq.util.URISupport.CompositeData; 051 052 /** 053 * A ConnectionFactory is an an Administered object, and is used for creating 054 * Connections. <p/> This class also implements QueueConnectionFactory and 055 * TopicConnectionFactory. You can use this connection to create both 056 * QueueConnections and TopicConnections. 057 * 058 * @version $Revision: 1.9 $ 059 * @see javax.jms.ConnectionFactory 060 */ 061 public class ActiveMQConnectionFactory extends JNDIBaseStorable implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, StatsCapable, Cloneable { 062 063 public static final String DEFAULT_BROKER_BIND_URL = "tcp://localhost:61616"; 064 public static final String DEFAULT_BROKER_URL = "failover://"+DEFAULT_BROKER_BIND_URL; 065 public static final String DEFAULT_USER = null; 066 public static final String DEFAULT_PASSWORD = null; 067 public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 0; 068 069 protected static final Executor DEFAULT_CONNECTION_EXECUTOR = new ScheduledThreadPoolExecutor(5, new ThreadFactory() { 070 public Thread newThread(Runnable run) { 071 Thread thread = new Thread(run); 072 thread.setPriority(ThreadPriorities.INBOUND_CLIENT_CONNECTION); 073 return thread; 074 } 075 }); 076 077 protected URI brokerURL; 078 protected String userName; 079 protected String password; 080 protected String clientID; 081 protected boolean dispatchAsync=true; 082 protected boolean alwaysSessionAsync=true; 083 084 JMSStatsImpl factoryStats = new JMSStatsImpl(); 085 086 private IdGenerator clientIdGenerator; 087 private String clientIDPrefix; 088 089 // client policies 090 private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); 091 private RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); 092 private BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy(); 093 private MessageTransformer transformer; 094 095 private boolean disableTimeStampsByDefault; 096 private boolean optimizedMessageDispatch = true; 097 private boolean copyMessageOnSend = true; 098 private boolean useCompression; 099 private boolean objectMessageSerializationDefered; 100 private boolean useAsyncSend; 101 private boolean optimizeAcknowledge; 102 private int closeTimeout = 15000; 103 private boolean useRetroactiveConsumer; 104 private boolean exclusiveConsumer; 105 private boolean nestedMapAndListEnabled = true; 106 private boolean alwaysSyncSend; 107 private boolean watchTopicAdvisories = true; 108 private int producerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE; 109 private long warnAboutUnstartedConnectionTimeout = 500L; 110 private int sendTimeout = 0; 111 private boolean sendAcksAsync=true; 112 private TransportListener transportListener; 113 private ExceptionListener exceptionListener; 114 private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE; 115 private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT; 116 private boolean useDedicatedTaskRunner; 117 private long consumerFailoverRedeliveryWaitPeriod = 0; 118 private ClientInternalExceptionListener clientInternalExceptionListener; 119 120 // ///////////////////////////////////////////// 121 // 122 // ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory Methods 123 // 124 // ///////////////////////////////////////////// 125 126 public ActiveMQConnectionFactory() { 127 this(DEFAULT_BROKER_URL); 128 } 129 130 public ActiveMQConnectionFactory(String brokerURL) { 131 this(createURI(brokerURL)); 132 } 133 134 public ActiveMQConnectionFactory(URI brokerURL) { 135 setBrokerURL(brokerURL.toString()); 136 } 137 138 public ActiveMQConnectionFactory(String userName, String password, URI brokerURL) { 139 setUserName(userName); 140 setPassword(password); 141 setBrokerURL(brokerURL.toString()); 142 } 143 144 public ActiveMQConnectionFactory(String userName, String password, String brokerURL) { 145 setUserName(userName); 146 setPassword(password); 147 setBrokerURL(brokerURL); 148 } 149 150 /** 151 * Returns a copy of the given connection factory 152 */ 153 public ActiveMQConnectionFactory copy() { 154 try { 155 return (ActiveMQConnectionFactory)super.clone(); 156 } catch (CloneNotSupportedException e) { 157 throw new RuntimeException("This should never happen: " + e, e); 158 } 159 } 160 161 /** 162 * @param brokerURL 163 * @return 164 * @throws URISyntaxException 165 */ 166 private static URI createURI(String brokerURL) { 167 try { 168 return new URI(brokerURL); 169 } catch (URISyntaxException e) { 170 throw (IllegalArgumentException)new IllegalArgumentException("Invalid broker URI: " + brokerURL).initCause(e); 171 } 172 } 173 174 /** 175 * @return Returns the Connection. 176 */ 177 public Connection createConnection() throws JMSException { 178 return createActiveMQConnection(); 179 } 180 181 /** 182 * @return Returns the Connection. 183 */ 184 public Connection createConnection(String userName, String password) throws JMSException { 185 return createActiveMQConnection(userName, password); 186 } 187 188 /** 189 * @return Returns the QueueConnection. 190 * @throws JMSException 191 */ 192 public QueueConnection createQueueConnection() throws JMSException { 193 return createActiveMQConnection(); 194 } 195 196 /** 197 * @return Returns the QueueConnection. 198 */ 199 public QueueConnection createQueueConnection(String userName, String password) throws JMSException { 200 return createActiveMQConnection(userName, password); 201 } 202 203 /** 204 * @return Returns the TopicConnection. 205 * @throws JMSException 206 */ 207 public TopicConnection createTopicConnection() throws JMSException { 208 return createActiveMQConnection(); 209 } 210 211 /** 212 * @return Returns the TopicConnection. 213 */ 214 public TopicConnection createTopicConnection(String userName, String password) throws JMSException { 215 return createActiveMQConnection(userName, password); 216 } 217 218 public StatsImpl getStats() { 219 // TODO 220 return null; 221 } 222 223 // ///////////////////////////////////////////// 224 // 225 // Implementation methods. 226 // 227 // ///////////////////////////////////////////// 228 229 protected ActiveMQConnection createActiveMQConnection() throws JMSException { 230 return createActiveMQConnection(userName, password); 231 } 232 233 /** 234 * Creates a Transport based on this object's connection settings. Separated 235 * from createActiveMQConnection to allow for subclasses to override. 236 * 237 * @return The newly created Transport. 238 * @throws JMSException If unable to create trasnport. 239 * @author sepandm@gmail.com 240 */ 241 protected Transport createTransport() throws JMSException { 242 try { 243 return TransportFactory.connect(brokerURL, DEFAULT_CONNECTION_EXECUTOR); 244 } catch (Exception e) { 245 throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e); 246 } 247 } 248 249 /** 250 * @return Returns the Connection. 251 */ 252 protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException { 253 if (brokerURL == null) { 254 throw new ConfigurationException("brokerURL not set."); 255 } 256 ActiveMQConnection connection = null; 257 try { 258 Transport transport = createTransport(); 259 connection = createActiveMQConnection(transport, factoryStats); 260 261 connection.setUserName(userName); 262 connection.setPassword(password); 263 264 configureConnection(connection); 265 266 transport.start(); 267 268 if (clientID != null) { 269 connection.setDefaultClientID(clientID); 270 } 271 272 return connection; 273 } catch (JMSException e) { 274 // Clean up! 275 try { 276 connection.close(); 277 } catch (Throwable ignore) { 278 } 279 throw e; 280 } catch (Exception e) { 281 // Clean up! 282 try { 283 connection.close(); 284 } catch (Throwable ignore) { 285 } 286 throw JMSExceptionSupport.create("Could not connect to broker URL: " + brokerURL + ". Reason: " + e, e); 287 } 288 } 289 290 protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception { 291 ActiveMQConnection connection = new ActiveMQConnection(transport, getClientIdGenerator(), stats); 292 return connection; 293 } 294 295 protected void configureConnection(ActiveMQConnection connection) throws JMSException { 296 connection.setPrefetchPolicy(getPrefetchPolicy()); 297 connection.setDisableTimeStampsByDefault(isDisableTimeStampsByDefault()); 298 connection.setOptimizedMessageDispatch(isOptimizedMessageDispatch()); 299 connection.setCopyMessageOnSend(isCopyMessageOnSend()); 300 connection.setUseCompression(isUseCompression()); 301 connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered()); 302 connection.setDispatchAsync(isDispatchAsync()); 303 connection.setUseAsyncSend(isUseAsyncSend()); 304 connection.setAlwaysSyncSend(isAlwaysSyncSend()); 305 connection.setAlwaysSessionAsync(isAlwaysSessionAsync()); 306 connection.setOptimizeAcknowledge(isOptimizeAcknowledge()); 307 connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer()); 308 connection.setExclusiveConsumer(isExclusiveConsumer()); 309 connection.setRedeliveryPolicy(getRedeliveryPolicy()); 310 connection.setTransformer(getTransformer()); 311 connection.setBlobTransferPolicy(getBlobTransferPolicy().copy()); 312 connection.setWatchTopicAdvisories(isWatchTopicAdvisories()); 313 connection.setProducerWindowSize(getProducerWindowSize()); 314 connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout()); 315 connection.setSendTimeout(getSendTimeout()); 316 connection.setSendAcksAsync(isSendAcksAsync()); 317 connection.setAuditDepth(getAuditDepth()); 318 connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber()); 319 connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner()); 320 connection.setConsumerFailoverRedeliveryWaitPeriod(getConsumerFailoverRedeliveryWaitPeriod()); 321 if (transportListener != null) { 322 connection.addTransportListener(transportListener); 323 } 324 if (exceptionListener != null) { 325 connection.setExceptionListener(exceptionListener); 326 } 327 if (clientInternalExceptionListener != null) { 328 connection.setClientInternalExceptionListener(clientInternalExceptionListener); 329 } 330 } 331 332 // ///////////////////////////////////////////// 333 // 334 // Property Accessors 335 // 336 // ///////////////////////////////////////////// 337 338 public String getBrokerURL() { 339 return brokerURL == null ? null : brokerURL.toString(); 340 } 341 342 /** 343 * Sets the <a 344 * href="http://activemq.apache.org/configuring-transports.html">connection 345 * URL</a> used to connect to the ActiveMQ broker. 346 */ 347 public void setBrokerURL(String brokerURL) { 348 this.brokerURL = createURI(brokerURL); 349 350 // Use all the properties prefixed with 'jms.' to set the connection 351 // factory 352 // options. 353 if (this.brokerURL.getQuery() != null) { 354 // It might be a standard URI or... 355 try { 356 357 Map map = URISupport.parseQuery(this.brokerURL.getQuery()); 358 if (buildFromMap(IntrospectionSupport.extractProperties(map, "jms."))) { 359 this.brokerURL = URISupport.createRemainingURI(this.brokerURL, map); 360 } 361 362 } catch (URISyntaxException e) { 363 } 364 365 } else { 366 367 // It might be a composite URI. 368 try { 369 CompositeData data = URISupport.parseComposite(this.brokerURL); 370 if (buildFromMap(IntrospectionSupport.extractProperties(data.getParameters(), "jms."))) { 371 this.brokerURL = data.toURI(); 372 } 373 } catch (URISyntaxException e) { 374 } 375 } 376 } 377 378 public String getClientID() { 379 return clientID; 380 } 381 382 /** 383 * Sets the JMS clientID to use for the created connection. Note that this 384 * can only be used by one connection at once so generally its a better idea 385 * to set the clientID on a Connection 386 */ 387 public void setClientID(String clientID) { 388 this.clientID = clientID; 389 } 390 391 public boolean isCopyMessageOnSend() { 392 return copyMessageOnSend; 393 } 394 395 /** 396 * Should a JMS message be copied to a new JMS Message object as part of the 397 * send() method in JMS. This is enabled by default to be compliant with the 398 * JMS specification. You can disable it if you do not mutate JMS messages 399 * after they are sent for a performance boost 400 */ 401 public void setCopyMessageOnSend(boolean copyMessageOnSend) { 402 this.copyMessageOnSend = copyMessageOnSend; 403 } 404 405 public boolean isDisableTimeStampsByDefault() { 406 return disableTimeStampsByDefault; 407 } 408 409 /** 410 * Sets whether or not timestamps on messages should be disabled or not. If 411 * you disable them it adds a small performance boost. 412 */ 413 public void setDisableTimeStampsByDefault(boolean disableTimeStampsByDefault) { 414 this.disableTimeStampsByDefault = disableTimeStampsByDefault; 415 } 416 417 public boolean isOptimizedMessageDispatch() { 418 return optimizedMessageDispatch; 419 } 420 421 /** 422 * If this flag is set then an larger prefetch limit is used - only 423 * applicable for durable topic subscribers. 424 */ 425 public void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) { 426 this.optimizedMessageDispatch = optimizedMessageDispatch; 427 } 428 429 public String getPassword() { 430 return password; 431 } 432 433 /** 434 * Sets the JMS password used for connections created from this factory 435 */ 436 public void setPassword(String password) { 437 this.password = password; 438 } 439 440 public ActiveMQPrefetchPolicy getPrefetchPolicy() { 441 return prefetchPolicy; 442 } 443 444 /** 445 * Sets the <a 446 * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch 447 * policy</a> for consumers created by this connection. 448 */ 449 public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) { 450 this.prefetchPolicy = prefetchPolicy; 451 } 452 453 public boolean isUseAsyncSend() { 454 return useAsyncSend; 455 } 456 457 public BlobTransferPolicy getBlobTransferPolicy() { 458 return blobTransferPolicy; 459 } 460 461 /** 462 * Sets the policy used to describe how out-of-band BLOBs (Binary Large 463 * OBjects) are transferred from producers to brokers to consumers 464 */ 465 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) { 466 this.blobTransferPolicy = blobTransferPolicy; 467 } 468 469 /** 470 * Forces the use of <a 471 * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which 472 * adds a massive performance boost; but means that the send() method will 473 * return immediately whether the message has been sent or not which could 474 * lead to message loss. 475 */ 476 public void setUseAsyncSend(boolean useAsyncSend) { 477 this.useAsyncSend = useAsyncSend; 478 } 479 480 public synchronized boolean isWatchTopicAdvisories() { 481 return watchTopicAdvisories; 482 } 483 484 public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) { 485 this.watchTopicAdvisories = watchTopicAdvisories; 486 } 487 488 /** 489 * @return true if always sync send messages 490 */ 491 public boolean isAlwaysSyncSend() { 492 return this.alwaysSyncSend; 493 } 494 495 /** 496 * Set true if always require messages to be sync sent 497 * 498 * @param alwaysSyncSend 499 */ 500 public void setAlwaysSyncSend(boolean alwaysSyncSend) { 501 this.alwaysSyncSend = alwaysSyncSend; 502 } 503 504 public String getUserName() { 505 return userName; 506 } 507 508 /** 509 * Sets the JMS userName used by connections created by this factory 510 */ 511 public void setUserName(String userName) { 512 this.userName = userName; 513 } 514 515 public boolean isUseRetroactiveConsumer() { 516 return useRetroactiveConsumer; 517 } 518 519 /** 520 * Sets whether or not retroactive consumers are enabled. Retroactive 521 * consumers allow non-durable topic subscribers to receive old messages 522 * that were published before the non-durable subscriber started. 523 */ 524 public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) { 525 this.useRetroactiveConsumer = useRetroactiveConsumer; 526 } 527 528 public boolean isExclusiveConsumer() { 529 return exclusiveConsumer; 530 } 531 532 /** 533 * Enables or disables whether or not queue consumers should be exclusive or 534 * not for example to preserve ordering when not using <a 535 * href="http://activemq.apache.org/message-groups.html">Message Groups</a> 536 * 537 * @param exclusiveConsumer 538 */ 539 public void setExclusiveConsumer(boolean exclusiveConsumer) { 540 this.exclusiveConsumer = exclusiveConsumer; 541 } 542 543 public RedeliveryPolicy getRedeliveryPolicy() { 544 return redeliveryPolicy; 545 } 546 547 /** 548 * Sets the global redelivery policy to be used when a message is delivered 549 * but the session is rolled back 550 */ 551 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) { 552 this.redeliveryPolicy = redeliveryPolicy; 553 } 554 555 public MessageTransformer getTransformer() { 556 return transformer; 557 } 558 559 /** 560 * @return the sendTimeout 561 */ 562 public int getSendTimeout() { 563 return sendTimeout; 564 } 565 566 /** 567 * @param sendTimeout the sendTimeout to set 568 */ 569 public void setSendTimeout(int sendTimeout) { 570 this.sendTimeout = sendTimeout; 571 } 572 573 /** 574 * @return the sendAcksAsync 575 */ 576 public boolean isSendAcksAsync() { 577 return sendAcksAsync; 578 } 579 580 /** 581 * @param sendAcksAsync the sendAcksAsync to set 582 */ 583 public void setSendAcksAsync(boolean sendAcksAsync) { 584 this.sendAcksAsync = sendAcksAsync; 585 } 586 587 588 /** 589 * Sets the transformer used to transform messages before they are sent on 590 * to the JMS bus or when they are received from the bus but before they are 591 * delivered to the JMS client 592 */ 593 public void setTransformer(MessageTransformer transformer) { 594 this.transformer = transformer; 595 } 596 597 public void buildFromProperties(Properties properties) { 598 599 if (properties == null) { 600 properties = new Properties(); 601 } 602 603 String temp = properties.getProperty(Context.PROVIDER_URL); 604 if (temp == null || temp.length() == 0) { 605 temp = properties.getProperty("brokerURL"); 606 } 607 if (temp != null && temp.length() > 0) { 608 setBrokerURL(temp); 609 } 610 611 Map<String, Object> p = new HashMap(properties); 612 buildFromMap(p); 613 } 614 615 public boolean buildFromMap(Map<String, Object> properties) { 616 boolean rc = false; 617 618 ActiveMQPrefetchPolicy p = new ActiveMQPrefetchPolicy(); 619 if (IntrospectionSupport.setProperties(p, properties, "prefetchPolicy.")) { 620 setPrefetchPolicy(p); 621 rc = true; 622 } 623 624 RedeliveryPolicy rp = new RedeliveryPolicy(); 625 if (IntrospectionSupport.setProperties(rp, properties, "redeliveryPolicy.")) { 626 setRedeliveryPolicy(rp); 627 rc = true; 628 } 629 630 BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy(); 631 if (IntrospectionSupport.setProperties(blobTransferPolicy, properties, "blobTransferPolicy.")) { 632 setBlobTransferPolicy(blobTransferPolicy); 633 rc = true; 634 } 635 636 rc |= IntrospectionSupport.setProperties(this, properties); 637 638 return rc; 639 } 640 641 public void populateProperties(Properties props) { 642 props.setProperty("dispatchAsync", Boolean.toString(isDispatchAsync())); 643 644 if (getBrokerURL() != null) { 645 props.setProperty(Context.PROVIDER_URL, getBrokerURL()); 646 props.setProperty("brokerURL", getBrokerURL()); 647 } 648 649 if (getClientID() != null) { 650 props.setProperty("clientID", getClientID()); 651 } 652 653 IntrospectionSupport.getProperties(getPrefetchPolicy(), props, "prefetchPolicy."); 654 IntrospectionSupport.getProperties(getRedeliveryPolicy(), props, "redeliveryPolicy."); 655 IntrospectionSupport.getProperties(getBlobTransferPolicy(), props, "blobTransferPolicy."); 656 657 props.setProperty("copyMessageOnSend", Boolean.toString(isCopyMessageOnSend())); 658 props.setProperty("disableTimeStampsByDefault", Boolean.toString(isDisableTimeStampsByDefault())); 659 props.setProperty("objectMessageSerializationDefered", Boolean.toString(isObjectMessageSerializationDefered())); 660 props.setProperty("optimizedMessageDispatch", Boolean.toString(isOptimizedMessageDispatch())); 661 662 if (getPassword() != null) { 663 props.setProperty("password", getPassword()); 664 } 665 666 props.setProperty("useAsyncSend", Boolean.toString(isUseAsyncSend())); 667 props.setProperty("useCompression", Boolean.toString(isUseCompression())); 668 props.setProperty("useRetroactiveConsumer", Boolean.toString(isUseRetroactiveConsumer())); 669 props.setProperty("watchTopicAdvisories", Boolean.toString(isWatchTopicAdvisories())); 670 671 if (getUserName() != null) { 672 props.setProperty("userName", getUserName()); 673 } 674 675 props.setProperty("closeTimeout", Integer.toString(getCloseTimeout())); 676 props.setProperty("alwaysSessionAsync", Boolean.toString(isAlwaysSessionAsync())); 677 props.setProperty("optimizeAcknowledge", Boolean.toString(isOptimizeAcknowledge())); 678 props.setProperty("statsEnabled", Boolean.toString(isStatsEnabled())); 679 props.setProperty("alwaysSyncSend", Boolean.toString(isAlwaysSyncSend())); 680 props.setProperty("producerWindowSize", Integer.toString(getProducerWindowSize())); 681 props.setProperty("sendTimeout", Integer.toString(getSendTimeout())); 682 props.setProperty("sendAcksAsync",Boolean.toString(isSendAcksAsync())); 683 props.setProperty("auditDepth", Integer.toString(getAuditDepth())); 684 props.setProperty("auditMaximumProducerNumber", Integer.toString(getAuditMaximumProducerNumber())); 685 } 686 687 public boolean isUseCompression() { 688 return useCompression; 689 } 690 691 /** 692 * Enables the use of compression of the message bodies 693 */ 694 public void setUseCompression(boolean useCompression) { 695 this.useCompression = useCompression; 696 } 697 698 public boolean isObjectMessageSerializationDefered() { 699 return objectMessageSerializationDefered; 700 } 701 702 /** 703 * When an object is set on an ObjectMessage, the JMS spec requires the 704 * object to be serialized by that set method. Enabling this flag causes the 705 * object to not get serialized. The object may subsequently get serialized 706 * if the message needs to be sent over a socket or stored to disk. 707 */ 708 public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) { 709 this.objectMessageSerializationDefered = objectMessageSerializationDefered; 710 } 711 712 public boolean isDispatchAsync() { 713 return dispatchAsync; 714 } 715 716 /** 717 * Enables or disables the default setting of whether or not consumers have 718 * their messages <a 719 * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched 720 * synchronously or asynchronously by the broker</a>. For non-durable 721 * topics for example we typically dispatch synchronously by default to 722 * minimize context switches which boost performance. However sometimes its 723 * better to go slower to ensure that a single blocked consumer socket does 724 * not block delivery to other consumers. 725 * 726 * @param asyncDispatch If true then consumers created on this connection 727 * will default to having their messages dispatched 728 * asynchronously. The default value is false. 729 */ 730 public void setDispatchAsync(boolean asyncDispatch) { 731 this.dispatchAsync = asyncDispatch; 732 } 733 734 /** 735 * @return Returns the closeTimeout. 736 */ 737 public int getCloseTimeout() { 738 return closeTimeout; 739 } 740 741 /** 742 * Sets the timeout before a close is considered complete. Normally a 743 * close() on a connection waits for confirmation from the broker; this 744 * allows that operation to timeout to save the client hanging if there is 745 * no broker 746 */ 747 public void setCloseTimeout(int closeTimeout) { 748 this.closeTimeout = closeTimeout; 749 } 750 751 /** 752 * @return Returns the alwaysSessionAsync. 753 */ 754 public boolean isAlwaysSessionAsync() { 755 return alwaysSessionAsync; 756 } 757 758 /** 759 * If this flag is set then a separate thread is not used for dispatching 760 * messages for each Session in the Connection. However, a separate thread 761 * is always used if there is more than one session, or the session isn't in 762 * auto acknowledge or duplicates ok mode 763 */ 764 public void setAlwaysSessionAsync(boolean alwaysSessionAsync) { 765 this.alwaysSessionAsync = alwaysSessionAsync; 766 } 767 768 /** 769 * @return Returns the optimizeAcknowledge. 770 */ 771 public boolean isOptimizeAcknowledge() { 772 return optimizeAcknowledge; 773 } 774 775 /** 776 * @param optimizeAcknowledge The optimizeAcknowledge to set. 777 */ 778 public void setOptimizeAcknowledge(boolean optimizeAcknowledge) { 779 this.optimizeAcknowledge = optimizeAcknowledge; 780 } 781 782 public boolean isNestedMapAndListEnabled() { 783 return nestedMapAndListEnabled; 784 } 785 786 /** 787 * Enables/disables whether or not Message properties and MapMessage entries 788 * support <a 789 * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested 790 * Structures</a> of Map and List objects 791 */ 792 public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) { 793 this.nestedMapAndListEnabled = structuredMapsEnabled; 794 } 795 796 public String getClientIDPrefix() { 797 return clientIDPrefix; 798 } 799 800 /** 801 * Sets the prefix used by autogenerated JMS Client ID values which are used 802 * if the JMS client does not explicitly specify on. 803 * 804 * @param clientIDPrefix 805 */ 806 public void setClientIDPrefix(String clientIDPrefix) { 807 this.clientIDPrefix = clientIDPrefix; 808 } 809 810 protected synchronized IdGenerator getClientIdGenerator() { 811 if (clientIdGenerator == null) { 812 if (clientIDPrefix != null) { 813 clientIdGenerator = new IdGenerator(clientIDPrefix); 814 } else { 815 clientIdGenerator = new IdGenerator(); 816 } 817 } 818 return clientIdGenerator; 819 } 820 821 protected void setClientIdGenerator(IdGenerator clientIdGenerator) { 822 this.clientIdGenerator = clientIdGenerator; 823 } 824 825 /** 826 * @return the statsEnabled 827 */ 828 public boolean isStatsEnabled() { 829 return this.factoryStats.isEnabled(); 830 } 831 832 /** 833 * @param statsEnabled the statsEnabled to set 834 */ 835 public void setStatsEnabled(boolean statsEnabled) { 836 this.factoryStats.setEnabled(statsEnabled); 837 } 838 839 public synchronized int getProducerWindowSize() { 840 return producerWindowSize; 841 } 842 843 public synchronized void setProducerWindowSize(int producerWindowSize) { 844 this.producerWindowSize = producerWindowSize; 845 } 846 847 public long getWarnAboutUnstartedConnectionTimeout() { 848 return warnAboutUnstartedConnectionTimeout; 849 } 850 851 /** 852 * Enables the timeout from a connection creation to when a warning is 853 * generated if the connection is not properly started via 854 * {@link Connection#start()} and a message is received by a consumer. It is 855 * a very common gotcha to forget to <a 856 * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start 857 * the connection</a> so this option makes the default case to create a 858 * warning if the user forgets. To disable the warning just set the value to < 859 * 0 (say -1). 860 */ 861 public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) { 862 this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout; 863 } 864 865 public TransportListener getTransportListener() { 866 return transportListener; 867 } 868 869 /** 870 * Allows a listener to be configured on the ConnectionFactory so that when this factory is used 871 * with frameworks which don't expose the Connection such as Spring JmsTemplate, you can still register 872 * a transport listener. 873 * 874 * @param transportListener sets the listener to be registered on all connections 875 * created by this factory 876 */ 877 public void setTransportListener(TransportListener transportListener) { 878 this.transportListener = transportListener; 879 } 880 881 882 public ExceptionListener getExceptionListener() { 883 return exceptionListener; 884 } 885 886 /** 887 * Allows an {@link ExceptionListener} to be configured on the ConnectionFactory so that when this factory 888 * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register 889 * an exception listener. 890 * <p> Note: access to this exceptionLinstener will <b>not</b> be serialized if it is associated with more than 891 * on connection (as it will be if more than one connection is subsequently created by this connection factory) 892 * @param exceptionListener sets the exception listener to be registered on all connections 893 * created by this factory 894 */ 895 public void setExceptionListener(ExceptionListener exceptionListener) { 896 this.exceptionListener = exceptionListener; 897 } 898 899 public int getAuditDepth() { 900 return auditDepth; 901 } 902 903 public void setAuditDepth(int auditDepth) { 904 this.auditDepth = auditDepth; 905 } 906 907 public int getAuditMaximumProducerNumber() { 908 return auditMaximumProducerNumber; 909 } 910 911 public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) { 912 this.auditMaximumProducerNumber = auditMaximumProducerNumber; 913 } 914 915 public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) { 916 this.useDedicatedTaskRunner = useDedicatedTaskRunner; 917 } 918 919 public boolean isUseDedicatedTaskRunner() { 920 return useDedicatedTaskRunner; 921 } 922 923 public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) { 924 this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod; 925 } 926 927 public long getConsumerFailoverRedeliveryWaitPeriod() { 928 return consumerFailoverRedeliveryWaitPeriod; 929 } 930 931 public ClientInternalExceptionListener getClientInternalExceptionListener() { 932 return clientInternalExceptionListener; 933 } 934 935 /** 936 * Allows an {@link ClientInternalExceptionListener} to be configured on the ConnectionFactory so that when this factory 937 * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register 938 * an exception listener. 939 * <p> Note: access to this clientInternalExceptionListener will <b>not</b> be serialized if it is associated with more than 940 * on connection (as it will be if more than one connection is subsequently created by this connection factory) 941 * @param clientInternalExceptionListener sets the exception listener to be registered on all connections 942 * created by this factory 943 */ 944 public void setClientInternalExceptionListener( 945 ClientInternalExceptionListener clientInternalExceptionListener) { 946 this.clientInternalExceptionListener = clientInternalExceptionListener; 947 } 948 }