001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq; 018 019 import java.io.File; 020 import java.io.InputStream; 021 import java.io.Serializable; 022 import java.net.URL; 023 import java.util.Collections; 024 import java.util.Iterator; 025 import java.util.List; 026 import java.util.concurrent.CopyOnWriteArrayList; 027 import java.util.concurrent.atomic.AtomicBoolean; 028 029 import javax.jms.BytesMessage; 030 import javax.jms.Destination; 031 import javax.jms.IllegalStateException; 032 import javax.jms.InvalidDestinationException; 033 import javax.jms.InvalidSelectorException; 034 import javax.jms.JMSException; 035 import javax.jms.MapMessage; 036 import javax.jms.Message; 037 import javax.jms.MessageConsumer; 038 import javax.jms.MessageListener; 039 import javax.jms.MessageProducer; 040 import javax.jms.ObjectMessage; 041 import javax.jms.Queue; 042 import javax.jms.QueueBrowser; 043 import javax.jms.QueueReceiver; 044 import javax.jms.QueueSender; 045 import javax.jms.QueueSession; 046 import javax.jms.Session; 047 import javax.jms.StreamMessage; 048 import javax.jms.TemporaryQueue; 049 import javax.jms.TemporaryTopic; 050 import javax.jms.TextMessage; 051 import javax.jms.Topic; 052 import javax.jms.TopicPublisher; 053 import javax.jms.TopicSession; 054 import javax.jms.TopicSubscriber; 055 import javax.jms.TransactionRolledBackException; 056 057 import org.apache.activemq.blob.BlobDownloader; 058 import org.apache.activemq.blob.BlobTransferPolicy; 059 import org.apache.activemq.blob.BlobUploader; 060 import org.apache.activemq.command.ActiveMQBlobMessage; 061 import org.apache.activemq.command.ActiveMQBytesMessage; 062 import org.apache.activemq.command.ActiveMQDestination; 063 import org.apache.activemq.command.ActiveMQMapMessage; 064 import org.apache.activemq.command.ActiveMQMessage; 065 import org.apache.activemq.command.ActiveMQObjectMessage; 066 import org.apache.activemq.command.ActiveMQQueue; 067 import org.apache.activemq.command.ActiveMQStreamMessage; 068 import org.apache.activemq.command.ActiveMQTempDestination; 069 import org.apache.activemq.command.ActiveMQTempQueue; 070 import org.apache.activemq.command.ActiveMQTempTopic; 071 import org.apache.activemq.command.ActiveMQTextMessage; 072 import org.apache.activemq.command.ActiveMQTopic; 073 import org.apache.activemq.command.Command; 074 import org.apache.activemq.command.ConsumerId; 075 import org.apache.activemq.command.MessageAck; 076 import org.apache.activemq.command.MessageDispatch; 077 import org.apache.activemq.command.MessageId; 078 import org.apache.activemq.command.ProducerId; 079 import org.apache.activemq.command.RemoveInfo; 080 import org.apache.activemq.command.Response; 081 import org.apache.activemq.command.SessionId; 082 import org.apache.activemq.command.SessionInfo; 083 import org.apache.activemq.command.TransactionId; 084 import org.apache.activemq.management.JMSSessionStatsImpl; 085 import org.apache.activemq.management.StatsCapable; 086 import org.apache.activemq.management.StatsImpl; 087 import org.apache.activemq.thread.Scheduler; 088 import org.apache.activemq.transaction.Synchronization; 089 import org.apache.activemq.usage.MemoryUsage; 090 import org.apache.activemq.util.Callback; 091 import org.apache.activemq.util.LongSequenceGenerator; 092 import org.apache.commons.logging.Log; 093 import org.apache.commons.logging.LogFactory; 094 095 /** 096 * <P> 097 * A <CODE>Session</CODE> object is a single-threaded context for producing 098 * and consuming messages. Although it may allocate provider resources outside 099 * the Java virtual machine (JVM), it is considered a lightweight JMS object. 100 * <P> 101 * A session serves several purposes: 102 * <UL> 103 * <LI>It is a factory for its message producers and consumers. 104 * <LI>It supplies provider-optimized message factories. 105 * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and 106 * <CODE>TemporaryQueues</CODE>. 107 * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE> 108 * objects for those clients that need to dynamically manipulate 109 * provider-specific destination names. 110 * <LI>It supports a single series of transactions that combine work spanning 111 * its producers and consumers into atomic units. 112 * <LI>It defines a serial order for the messages it consumes and the messages 113 * it produces. 114 * <LI>It retains messages it consumes until they have been acknowledged. 115 * <LI>It serializes execution of message listeners registered with its message 116 * consumers. 117 * <LI>It is a factory for <CODE>QueueBrowsers</CODE>. 118 * </UL> 119 * <P> 120 * A session can create and service multiple message producers and consumers. 121 * <P> 122 * One typical use is to have a thread block on a synchronous 123 * <CODE>MessageConsumer</CODE> until a message arrives. The thread may then 124 * use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s. 125 * <P> 126 * If a client desires to have one thread produce messages while others consume 127 * them, the client should use a separate session for its producing thread. 128 * <P> 129 * Once a connection has been started, any session with one or more registered 130 * message listeners is dedicated to the thread of control that delivers 131 * messages to it. It is erroneous for client code to use this session or any of 132 * its constituent objects from another thread of control. The only exception to 133 * this rule is the use of the session or connection <CODE>close</CODE> 134 * method. 135 * <P> 136 * It should be easy for most clients to partition their work naturally into 137 * sessions. This model allows clients to start simply and incrementally add 138 * message processing complexity as their need for concurrency grows. 139 * <P> 140 * The <CODE>close</CODE> method is the only session method that can be called 141 * while some other session method is being executed in another thread. 142 * <P> 143 * A session may be specified as transacted. Each transacted session supports a 144 * single series of transactions. Each transaction groups a set of message sends 145 * and a set of message receives into an atomic unit of work. In effect, 146 * transactions organize a session's input message stream and output message 147 * stream into series of atomic units. When a transaction commits, its atomic 148 * unit of input is acknowledged and its associated atomic unit of output is 149 * sent. If a transaction rollback is done, the transaction's sent messages are 150 * destroyed and the session's input is automatically recovered. 151 * <P> 152 * The content of a transaction's input and output units is simply those 153 * messages that have been produced and consumed within the session's current 154 * transaction. 155 * <P> 156 * A transaction is completed using either its session's <CODE>commit</CODE> 157 * method or its session's <CODE>rollback </CODE> method. The completion of a 158 * session's current transaction automatically begins the next. The result is 159 * that a transacted session always has a current transaction within which its 160 * work is done. 161 * <P> 162 * The Java Transaction Service (JTS) or some other transaction monitor may be 163 * used to combine a session's transaction with transactions on other resources 164 * (databases, other JMS sessions, etc.). Since Java distributed transactions 165 * are controlled via the Java Transaction API (JTA), use of the session's 166 * <CODE>commit</CODE> and <CODE>rollback</CODE> methods in this context is 167 * prohibited. 168 * <P> 169 * The JMS API does not require support for JTA; however, it does define how a 170 * provider supplies this support. 171 * <P> 172 * Although it is also possible for a JMS client to handle distributed 173 * transactions directly, it is unlikely that many JMS clients will do this. 174 * Support for JTA in the JMS API is targeted at systems vendors who will be 175 * integrating the JMS API into their application server products. 176 * 177 * @version $Revision: 1.34 $ 178 * @see javax.jms.Session 179 * @see javax.jms.QueueSession 180 * @see javax.jms.TopicSession 181 * @see javax.jms.XASession 182 */ 183 public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher { 184 185 /** 186 * Only acknowledge an individual message - using message.acknowledge() 187 * as opposed to CLIENT_ACKNOWLEDGE which 188 * acknowledges all messages consumed by a session at when acknowledge() 189 * is called 190 */ 191 public static final int INDIVIDUAL_ACKNOWLEDGE = 4; 192 public static final int MAX_ACK_CONSTANT = INDIVIDUAL_ACKNOWLEDGE; 193 194 public static interface DeliveryListener { 195 void beforeDelivery(ActiveMQSession session, Message msg); 196 197 void afterDelivery(ActiveMQSession session, Message msg); 198 } 199 200 private static final Log LOG = LogFactory.getLog(ActiveMQSession.class); 201 protected static final Scheduler scheduler = Scheduler.getInstance(); 202 203 protected int acknowledgementMode; 204 protected final ActiveMQConnection connection; 205 protected final SessionInfo info; 206 protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 207 protected final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator(); 208 protected final LongSequenceGenerator deliveryIdGenerator = new LongSequenceGenerator(); 209 protected final ActiveMQSessionExecutor executor = new ActiveMQSessionExecutor(this); 210 protected final AtomicBoolean started = new AtomicBoolean(false); 211 212 protected final CopyOnWriteArrayList<ActiveMQMessageConsumer> consumers = new CopyOnWriteArrayList<ActiveMQMessageConsumer>(); 213 protected final CopyOnWriteArrayList<ActiveMQMessageProducer> producers = new CopyOnWriteArrayList<ActiveMQMessageProducer>(); 214 215 protected boolean closed; 216 private volatile boolean synchronizationRegistered; 217 protected boolean asyncDispatch; 218 protected boolean sessionAsyncDispatch; 219 protected final boolean debug; 220 protected Object sendMutex = new Object(); 221 222 private MessageListener messageListener; 223 private JMSSessionStatsImpl stats; 224 private TransactionContext transactionContext; 225 private DeliveryListener deliveryListener; 226 private MessageTransformer transformer; 227 private BlobTransferPolicy blobTransferPolicy; 228 private long lastDeliveredSequenceId; 229 230 /** 231 * Construct the Session 232 * 233 * @param connection 234 * @param sessionId 235 * @param acknowledgeMode n.b if transacted - the acknowledgeMode == 236 * Session.SESSION_TRANSACTED 237 * @param asyncDispatch 238 * @param sessionAsyncDispatch 239 * @throws JMSException on internal error 240 */ 241 protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch) throws JMSException { 242 this.debug = LOG.isDebugEnabled(); 243 this.connection = connection; 244 this.acknowledgementMode = acknowledgeMode; 245 this.asyncDispatch = asyncDispatch; 246 this.sessionAsyncDispatch = sessionAsyncDispatch; 247 this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue()); 248 setTransactionContext(new TransactionContext(connection)); 249 connection.addSession(this); 250 stats = new JMSSessionStatsImpl(producers, consumers); 251 this.connection.asyncSendPacket(info); 252 setTransformer(connection.getTransformer()); 253 setBlobTransferPolicy(connection.getBlobTransferPolicy()); 254 255 if (connection.isStarted()) { 256 start(); 257 } 258 259 } 260 261 protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch) throws JMSException { 262 this(connection, sessionId, acknowledgeMode, asyncDispatch, true); 263 } 264 265 /** 266 * Sets the transaction context of the session. 267 * 268 * @param transactionContext - provides the means to control a JMS 269 * transaction. 270 */ 271 public void setTransactionContext(TransactionContext transactionContext) { 272 this.transactionContext = transactionContext; 273 } 274 275 /** 276 * Returns the transaction context of the session. 277 * 278 * @return transactionContext - session's transaction context. 279 */ 280 public TransactionContext getTransactionContext() { 281 return transactionContext; 282 } 283 284 /* 285 * (non-Javadoc) 286 * 287 * @see org.apache.activemq.management.StatsCapable#getStats() 288 */ 289 public StatsImpl getStats() { 290 return stats; 291 } 292 293 /** 294 * Returns the session's statistics. 295 * 296 * @return stats - session's statistics. 297 */ 298 public JMSSessionStatsImpl getSessionStats() { 299 return stats; 300 } 301 302 /** 303 * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE> 304 * object is used to send a message containing a stream of uninterpreted 305 * bytes. 306 * 307 * @return the an ActiveMQBytesMessage 308 * @throws JMSException if the JMS provider fails to create this message due 309 * to some internal error. 310 */ 311 public BytesMessage createBytesMessage() throws JMSException { 312 ActiveMQBytesMessage message = new ActiveMQBytesMessage(); 313 configureMessage(message); 314 return message; 315 } 316 317 /** 318 * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE> 319 * object is used to send a self-defining set of name-value pairs, where 320 * names are <CODE>String</CODE> objects and values are primitive values 321 * in the Java programming language. 322 * 323 * @return an ActiveMQMapMessage 324 * @throws JMSException if the JMS provider fails to create this message due 325 * to some internal error. 326 */ 327 public MapMessage createMapMessage() throws JMSException { 328 ActiveMQMapMessage message = new ActiveMQMapMessage(); 329 configureMessage(message); 330 return message; 331 } 332 333 /** 334 * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE> 335 * interface is the root interface of all JMS messages. A 336 * <CODE>Message</CODE> object holds all the standard message header 337 * information. It can be sent when a message containing only header 338 * information is sufficient. 339 * 340 * @return an ActiveMQMessage 341 * @throws JMSException if the JMS provider fails to create this message due 342 * to some internal error. 343 */ 344 public Message createMessage() throws JMSException { 345 ActiveMQMessage message = new ActiveMQMessage(); 346 configureMessage(message); 347 return message; 348 } 349 350 /** 351 * Creates an <CODE>ObjectMessage</CODE> object. An 352 * <CODE>ObjectMessage</CODE> object is used to send a message that 353 * contains a serializable Java object. 354 * 355 * @return an ActiveMQObjectMessage 356 * @throws JMSException if the JMS provider fails to create this message due 357 * to some internal error. 358 */ 359 public ObjectMessage createObjectMessage() throws JMSException { 360 ActiveMQObjectMessage message = new ActiveMQObjectMessage(); 361 configureMessage(message); 362 return message; 363 } 364 365 /** 366 * Creates an initialized <CODE>ObjectMessage</CODE> object. An 367 * <CODE>ObjectMessage</CODE> object is used to send a message that 368 * contains a serializable Java object. 369 * 370 * @param object the object to use to initialize this message 371 * @return an ActiveMQObjectMessage 372 * @throws JMSException if the JMS provider fails to create this message due 373 * to some internal error. 374 */ 375 public ObjectMessage createObjectMessage(Serializable object) throws JMSException { 376 ActiveMQObjectMessage message = new ActiveMQObjectMessage(); 377 configureMessage(message); 378 message.setObject(object); 379 return message; 380 } 381 382 /** 383 * Creates a <CODE>StreamMessage</CODE> object. A 384 * <CODE>StreamMessage</CODE> object is used to send a self-defining 385 * stream of primitive values in the Java programming language. 386 * 387 * @return an ActiveMQStreamMessage 388 * @throws JMSException if the JMS provider fails to create this message due 389 * to some internal error. 390 */ 391 public StreamMessage createStreamMessage() throws JMSException { 392 ActiveMQStreamMessage message = new ActiveMQStreamMessage(); 393 configureMessage(message); 394 return message; 395 } 396 397 /** 398 * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE> 399 * object is used to send a message containing a <CODE>String</CODE> 400 * object. 401 * 402 * @return an ActiveMQTextMessage 403 * @throws JMSException if the JMS provider fails to create this message due 404 * to some internal error. 405 */ 406 public TextMessage createTextMessage() throws JMSException { 407 ActiveMQTextMessage message = new ActiveMQTextMessage(); 408 configureMessage(message); 409 return message; 410 } 411 412 /** 413 * Creates an initialized <CODE>TextMessage</CODE> object. A 414 * <CODE>TextMessage</CODE> object is used to send a message containing a 415 * <CODE>String</CODE>. 416 * 417 * @param text the string used to initialize this message 418 * @return an ActiveMQTextMessage 419 * @throws JMSException if the JMS provider fails to create this message due 420 * to some internal error. 421 */ 422 public TextMessage createTextMessage(String text) throws JMSException { 423 ActiveMQTextMessage message = new ActiveMQTextMessage(); 424 message.setText(text); 425 configureMessage(message); 426 return message; 427 } 428 429 /** 430 * Creates an initialized <CODE>BlobMessage</CODE> object. A 431 * <CODE>BlobMessage</CODE> object is used to send a message containing a 432 * <CODE>URL</CODE> which points to some network addressible BLOB. 433 * 434 * @param url the network addressable URL used to pass directly to the 435 * consumer 436 * @return a BlobMessage 437 * @throws JMSException if the JMS provider fails to create this message due 438 * to some internal error. 439 */ 440 public BlobMessage createBlobMessage(URL url) throws JMSException { 441 return createBlobMessage(url, false); 442 } 443 444 /** 445 * Creates an initialized <CODE>BlobMessage</CODE> object. A 446 * <CODE>BlobMessage</CODE> object is used to send a message containing a 447 * <CODE>URL</CODE> which points to some network addressible BLOB. 448 * 449 * @param url the network addressable URL used to pass directly to the 450 * consumer 451 * @param deletedByBroker indicates whether or not the resource is deleted 452 * by the broker when the message is acknowledged 453 * @return a BlobMessage 454 * @throws JMSException if the JMS provider fails to create this message due 455 * to some internal error. 456 */ 457 public BlobMessage createBlobMessage(URL url, boolean deletedByBroker) throws JMSException { 458 ActiveMQBlobMessage message = new ActiveMQBlobMessage(); 459 configureMessage(message); 460 message.setURL(url); 461 message.setDeletedByBroker(deletedByBroker); 462 message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy())); 463 return message; 464 } 465 466 /** 467 * Creates an initialized <CODE>BlobMessage</CODE> object. A 468 * <CODE>BlobMessage</CODE> object is used to send a message containing 469 * the <CODE>File</CODE> content. Before the message is sent the file 470 * conent will be uploaded to the broker or some other remote repository 471 * depending on the {@link #getBlobTransferPolicy()}. 472 * 473 * @param file the file to be uploaded to some remote repo (or the broker) 474 * depending on the strategy 475 * @return a BlobMessage 476 * @throws JMSException if the JMS provider fails to create this message due 477 * to some internal error. 478 */ 479 public BlobMessage createBlobMessage(File file) throws JMSException { 480 ActiveMQBlobMessage message = new ActiveMQBlobMessage(); 481 configureMessage(message); 482 message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), file)); 483 message.setBlobDownloader(new BlobDownloader((getBlobTransferPolicy()))); 484 message.setDeletedByBroker(true); 485 message.setName(file.getName()); 486 return message; 487 } 488 489 /** 490 * Creates an initialized <CODE>BlobMessage</CODE> object. A 491 * <CODE>BlobMessage</CODE> object is used to send a message containing 492 * the <CODE>File</CODE> content. Before the message is sent the file 493 * conent will be uploaded to the broker or some other remote repository 494 * depending on the {@link #getBlobTransferPolicy()}. 495 * 496 * @param in the stream to be uploaded to some remote repo (or the broker) 497 * depending on the strategy 498 * @return a BlobMessage 499 * @throws JMSException if the JMS provider fails to create this message due 500 * to some internal error. 501 */ 502 public BlobMessage createBlobMessage(InputStream in) throws JMSException { 503 ActiveMQBlobMessage message = new ActiveMQBlobMessage(); 504 configureMessage(message); 505 message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in)); 506 message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy())); 507 message.setDeletedByBroker(true); 508 return message; 509 } 510 511 /** 512 * Indicates whether the session is in transacted mode. 513 * 514 * @return true if the session is in transacted mode 515 * @throws JMSException if there is some internal error. 516 */ 517 public boolean getTransacted() throws JMSException { 518 checkClosed(); 519 return isTransacted(); 520 } 521 522 /** 523 * Returns the acknowledgement mode of the session. The acknowledgement mode 524 * is set at the time that the session is created. If the session is 525 * transacted, the acknowledgement mode is ignored. 526 * 527 * @return If the session is not transacted, returns the current 528 * acknowledgement mode for the session. If the session is 529 * transacted, returns SESSION_TRANSACTED. 530 * @throws JMSException 531 * @see javax.jms.Connection#createSession(boolean,int) 532 * @since 1.1 exception JMSException if there is some internal error. 533 */ 534 public int getAcknowledgeMode() throws JMSException { 535 checkClosed(); 536 return this.acknowledgementMode; 537 } 538 539 /** 540 * Commits all messages done in this transaction and releases any locks 541 * currently held. 542 * 543 * @throws JMSException if the JMS provider fails to commit the transaction 544 * due to some internal error. 545 * @throws TransactionRolledBackException if the transaction is rolled back 546 * due to some internal error during commit. 547 * @throws javax.jms.IllegalStateException if the method is not called by a 548 * transacted session. 549 */ 550 public void commit() throws JMSException { 551 checkClosed(); 552 if (!getTransacted()) { 553 throw new javax.jms.IllegalStateException("Not a transacted session"); 554 } 555 if (LOG.isDebugEnabled()) { 556 LOG.debug(getSessionId() + " Transaction Commit :" + transactionContext.getTransactionId()); 557 } 558 transactionContext.commit(); 559 } 560 561 /** 562 * Rolls back any messages done in this transaction and releases any locks 563 * currently held. 564 * 565 * @throws JMSException if the JMS provider fails to roll back the 566 * transaction due to some internal error. 567 * @throws javax.jms.IllegalStateException if the method is not called by a 568 * transacted session. 569 */ 570 public void rollback() throws JMSException { 571 checkClosed(); 572 if (!getTransacted()) { 573 throw new javax.jms.IllegalStateException("Not a transacted session"); 574 } 575 if (LOG.isDebugEnabled()) { 576 LOG.debug(getSessionId() + " Transaction Rollback"); 577 } 578 transactionContext.rollback(); 579 } 580 581 /** 582 * Closes the session. 583 * <P> 584 * Since a provider may allocate some resources on behalf of a session 585 * outside the JVM, clients should close the resources when they are not 586 * needed. Relying on garbage collection to eventually reclaim these 587 * resources may not be timely enough. 588 * <P> 589 * There is no need to close the producers and consumers of a closed 590 * session. 591 * <P> 592 * This call will block until a <CODE>receive</CODE> call or message 593 * listener in progress has completed. A blocked message consumer 594 * <CODE>receive</CODE> call returns <CODE>null</CODE> when this session 595 * is closed. 596 * <P> 597 * Closing a transacted session must roll back the transaction in progress. 598 * <P> 599 * This method is the only <CODE>Session</CODE> method that can be called 600 * concurrently. 601 * <P> 602 * Invoking any other <CODE>Session</CODE> method on a closed session must 603 * throw a <CODE> JMSException.IllegalStateException</CODE>. Closing a 604 * closed session must <I>not </I> throw an exception. 605 * 606 * @throws JMSException if the JMS provider fails to close the session due 607 * to some internal error. 608 */ 609 public void close() throws JMSException { 610 if (!closed) { 611 if (getTransactionContext().isInXATransaction()) { 612 if (!synchronizationRegistered) { 613 synchronizationRegistered = true; 614 getTransactionContext().addSynchronization(new Synchronization() { 615 616 public void afterCommit() throws Exception { 617 doClose(); 618 synchronizationRegistered = false; 619 } 620 621 public void afterRollback() throws Exception { 622 doClose(); 623 synchronizationRegistered = false; 624 } 625 }); 626 } 627 628 } else { 629 doClose(); 630 } 631 } 632 } 633 634 private void doClose() throws JMSException { 635 dispose(); 636 RemoveInfo removeCommand = info.createRemoveCommand(); 637 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); 638 connection.asyncSendPacket(removeCommand); 639 } 640 641 void clearMessagesInProgress() { 642 executor.clearMessagesInProgress(); 643 // we are called from inside the transport reconnection logic 644 // which involves us clearing all the connections' consumers 645 // dispatch and delivered lists. So rather than trying to 646 // grab a mutex (which could be already owned by the message 647 // listener calling the send or an ack) we allow it to complete in 648 // a separate thread via the scheduler and notify us via 649 // connection.transportInterruptionProcessingComplete() 650 // 651 for (final ActiveMQMessageConsumer consumer : consumers) { 652 scheduler.executeAfterDelay(new Runnable() { 653 public void run() { 654 consumer.clearMessagesInProgress(); 655 }}, 0l); 656 } 657 } 658 659 void deliverAcks() { 660 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 661 ActiveMQMessageConsumer consumer = iter.next(); 662 consumer.deliverAcks(); 663 } 664 } 665 666 public synchronized void dispose() throws JMSException { 667 if (!closed) { 668 669 try { 670 executor.stop(); 671 672 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 673 ActiveMQMessageConsumer consumer = iter.next(); 674 consumer.setFailureError(connection.getFirstFailureError()); 675 consumer.dispose(); 676 lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, consumer.getLastDeliveredSequenceId()); 677 } 678 consumers.clear(); 679 680 for (Iterator<ActiveMQMessageProducer> iter = producers.iterator(); iter.hasNext();) { 681 ActiveMQMessageProducer producer = iter.next(); 682 producer.dispose(); 683 } 684 producers.clear(); 685 686 try { 687 if (getTransactionContext().isInLocalTransaction()) { 688 rollback(); 689 } 690 } catch (JMSException e) { 691 } 692 693 } finally { 694 connection.removeSession(this); 695 this.transactionContext = null; 696 closed = true; 697 } 698 } 699 } 700 701 /** 702 * Checks that the session is not closed then configures the message 703 */ 704 protected void configureMessage(ActiveMQMessage message) throws IllegalStateException { 705 checkClosed(); 706 message.setConnection(connection); 707 } 708 709 /** 710 * Check if the session is closed. It is used for ensuring that the session 711 * is open before performing various operations. 712 * 713 * @throws IllegalStateException if the Session is closed 714 */ 715 protected void checkClosed() throws IllegalStateException { 716 if (closed) { 717 throw new IllegalStateException("The Session is closed"); 718 } 719 } 720 721 /** 722 * Stops message delivery in this session, and restarts message delivery 723 * with the oldest unacknowledged message. 724 * <P> 725 * All consumers deliver messages in a serial order. Acknowledging a 726 * received message automatically acknowledges all messages that have been 727 * delivered to the client. 728 * <P> 729 * Restarting a session causes it to take the following actions: 730 * <UL> 731 * <LI>Stop message delivery 732 * <LI>Mark all messages that might have been delivered but not 733 * acknowledged as "redelivered" 734 * <LI>Restart the delivery sequence including all unacknowledged messages 735 * that had been previously delivered. Redelivered messages do not have to 736 * be delivered in exactly their original delivery order. 737 * </UL> 738 * 739 * @throws JMSException if the JMS provider fails to stop and restart 740 * message delivery due to some internal error. 741 * @throws IllegalStateException if the method is called by a transacted 742 * session. 743 */ 744 public void recover() throws JMSException { 745 746 checkClosed(); 747 if (getTransacted()) { 748 throw new IllegalStateException("This session is transacted"); 749 } 750 751 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 752 ActiveMQMessageConsumer c = iter.next(); 753 c.rollback(); 754 } 755 756 } 757 758 /** 759 * Returns the session's distinguished message listener (optional). 760 * 761 * @return the message listener associated with this session 762 * @throws JMSException if the JMS provider fails to get the message 763 * listener due to an internal error. 764 * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener) 765 * @see javax.jms.ServerSessionPool 766 * @see javax.jms.ServerSession 767 */ 768 public MessageListener getMessageListener() throws JMSException { 769 checkClosed(); 770 return this.messageListener; 771 } 772 773 /** 774 * Sets the session's distinguished message listener (optional). 775 * <P> 776 * When the distinguished message listener is set, no other form of message 777 * receipt in the session can be used; however, all forms of sending 778 * messages are still supported. 779 * <P> 780 * This is an expert facility not used by regular JMS clients. 781 * 782 * @param listener the message listener to associate with this session 783 * @throws JMSException if the JMS provider fails to set the message 784 * listener due to an internal error. 785 * @see javax.jms.Session#getMessageListener() 786 * @see javax.jms.ServerSessionPool 787 * @see javax.jms.ServerSession 788 */ 789 public void setMessageListener(MessageListener listener) throws JMSException { 790 checkClosed(); 791 this.messageListener = listener; 792 793 if (listener != null) { 794 executor.setDispatchedBySessionPool(true); 795 } 796 } 797 798 /** 799 * Optional operation, intended to be used only by Application Servers, not 800 * by ordinary JMS clients. 801 * 802 * @see javax.jms.ServerSession 803 */ 804 public void run() { 805 MessageDispatch messageDispatch; 806 while ((messageDispatch = executor.dequeueNoWait()) != null) { 807 final MessageDispatch md = messageDispatch; 808 ActiveMQMessage message = (ActiveMQMessage)md.getMessage(); 809 if (message.isExpired() || connection.isDuplicate(ActiveMQSession.this, message)) { 810 // TODO: Ack it without delivery to client 811 continue; 812 } 813 814 if (isClientAcknowledge()||isIndividualAcknowledge()) { 815 message.setAcknowledgeCallback(new Callback() { 816 public void execute() throws Exception { 817 } 818 }); 819 } 820 821 if (deliveryListener != null) { 822 deliveryListener.beforeDelivery(this, message); 823 } 824 825 md.setDeliverySequenceId(getNextDeliveryId()); 826 827 try { 828 messageListener.onMessage(message); 829 } catch (RuntimeException e) { 830 LOG.error("error dispatching message: ", e); 831 // A problem while invoking the MessageListener does not 832 // in general indicate a problem with the connection to the broker, i.e. 833 // it will usually be sufficient to let the afterDelivery() method either 834 // commit or roll back in order to deal with the exception. 835 // However, we notify any registered client internal exception listener 836 // of the problem. 837 connection.onClientInternalException(e); 838 } 839 840 try { 841 MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); 842 ack.setFirstMessageId(md.getMessage().getMessageId()); 843 doStartTransaction(); 844 ack.setTransactionId(getTransactionContext().getTransactionId()); 845 if (ack.getTransactionId() != null) { 846 getTransactionContext().addSynchronization(new Synchronization() { 847 848 public void afterRollback() throws Exception { 849 md.getMessage().onMessageRolledBack(); 850 // ensure we don't filter this as a duplicate 851 connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage()); 852 RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy(); 853 int redeliveryCounter = md.getMessage().getRedeliveryCounter(); 854 if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES 855 && redeliveryCounter > redeliveryPolicy.getMaximumRedeliveries()) { 856 // We need to NACK the messages so that they get 857 // sent to the 858 // DLQ. 859 // Acknowledge the last message. 860 MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); 861 ack.setFirstMessageId(md.getMessage().getMessageId()); 862 asyncSendPacket(ack); 863 } else { 864 865 MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1); 866 ack.setFirstMessageId(md.getMessage().getMessageId()); 867 asyncSendPacket(ack); 868 869 // Figure out how long we should wait to resend 870 // this message. 871 long redeliveryDelay = 0; 872 for (int i = 0; i < redeliveryCounter; i++) { 873 redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay); 874 } 875 scheduler.executeAfterDelay(new Runnable() { 876 877 public void run() { 878 ((ActiveMQDispatcher)md.getConsumer()).dispatch(md); 879 } 880 }, redeliveryDelay); 881 } 882 } 883 }); 884 } 885 asyncSendPacket(ack); 886 } catch (Throwable e) { 887 connection.onClientInternalException(e); 888 } 889 890 if (deliveryListener != null) { 891 deliveryListener.afterDelivery(this, message); 892 } 893 } 894 } 895 896 /** 897 * Creates a <CODE>MessageProducer</CODE> to send messages to the 898 * specified destination. 899 * <P> 900 * A client uses a <CODE>MessageProducer</CODE> object to send messages to 901 * a destination. Since <CODE>Queue </CODE> and <CODE>Topic</CODE> both 902 * inherit from <CODE>Destination</CODE>, they can be used in the 903 * destination parameter to create a <CODE>MessageProducer</CODE> object. 904 * 905 * @param destination the <CODE>Destination</CODE> to send to, or null if 906 * this is a producer which does not have a specified 907 * destination. 908 * @return the MessageProducer 909 * @throws JMSException if the session fails to create a MessageProducer due 910 * to some internal error. 911 * @throws InvalidDestinationException if an invalid destination is 912 * specified. 913 * @since 1.1 914 */ 915 public MessageProducer createProducer(Destination destination) throws JMSException { 916 checkClosed(); 917 if (destination instanceof CustomDestination) { 918 CustomDestination customDestination = (CustomDestination)destination; 919 return customDestination.createProducer(this); 920 } 921 int timeSendOut = connection.getSendTimeout(); 922 return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination),timeSendOut); 923 } 924 925 /** 926 * Creates a <CODE>MessageConsumer</CODE> for the specified destination. 927 * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from 928 * <CODE>Destination</CODE>, they can be used in the destination 929 * parameter to create a <CODE>MessageConsumer</CODE>. 930 * 931 * @param destination the <CODE>Destination</CODE> to access. 932 * @return the MessageConsumer 933 * @throws JMSException if the session fails to create a consumer due to 934 * some internal error. 935 * @throws InvalidDestinationException if an invalid destination is 936 * specified. 937 * @since 1.1 938 */ 939 public MessageConsumer createConsumer(Destination destination) throws JMSException { 940 return createConsumer(destination, (String) null); 941 } 942 943 /** 944 * Creates a <CODE>MessageConsumer</CODE> for the specified destination, 945 * using a message selector. Since <CODE> Queue</CODE> and 946 * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they 947 * can be used in the destination parameter to create a 948 * <CODE>MessageConsumer</CODE>. 949 * <P> 950 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 951 * that have been sent to a destination. 952 * 953 * @param destination the <CODE>Destination</CODE> to access 954 * @param messageSelector only messages with properties matching the message 955 * selector expression are delivered. A value of null or an 956 * empty string indicates that there is no message selector 957 * for the message consumer. 958 * @return the MessageConsumer 959 * @throws JMSException if the session fails to create a MessageConsumer due 960 * to some internal error. 961 * @throws InvalidDestinationException if an invalid destination is 962 * specified. 963 * @throws InvalidSelectorException if the message selector is invalid. 964 * @since 1.1 965 */ 966 public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { 967 return createConsumer(destination, messageSelector, false); 968 } 969 970 /** 971 * Creates a <CODE>MessageConsumer</CODE> for the specified destination. 972 * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from 973 * <CODE>Destination</CODE>, they can be used in the destination 974 * parameter to create a <CODE>MessageConsumer</CODE>. 975 * 976 * @param destination the <CODE>Destination</CODE> to access. 977 * @param messageListener the listener to use for async consumption of messages 978 * @return the MessageConsumer 979 * @throws JMSException if the session fails to create a consumer due to 980 * some internal error. 981 * @throws InvalidDestinationException if an invalid destination is 982 * specified. 983 * @since 1.1 984 */ 985 public MessageConsumer createConsumer(Destination destination, MessageListener messageListener) throws JMSException { 986 return createConsumer(destination, null, messageListener); 987 } 988 989 /** 990 * Creates a <CODE>MessageConsumer</CODE> for the specified destination, 991 * using a message selector. Since <CODE> Queue</CODE> and 992 * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they 993 * can be used in the destination parameter to create a 994 * <CODE>MessageConsumer</CODE>. 995 * <P> 996 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 997 * that have been sent to a destination. 998 * 999 * @param destination the <CODE>Destination</CODE> to access 1000 * @param messageSelector only messages with properties matching the message 1001 * selector expression are delivered. A value of null or an 1002 * empty string indicates that there is no message selector 1003 * for the message consumer. 1004 * @param messageListener the listener to use for async consumption of messages 1005 * @return the MessageConsumer 1006 * @throws JMSException if the session fails to create a MessageConsumer due 1007 * to some internal error. 1008 * @throws InvalidDestinationException if an invalid destination is 1009 * specified. 1010 * @throws InvalidSelectorException if the message selector is invalid. 1011 * @since 1.1 1012 */ 1013 public MessageConsumer createConsumer(Destination destination, String messageSelector, MessageListener messageListener) throws JMSException { 1014 return createConsumer(destination, messageSelector, false, messageListener); 1015 } 1016 1017 /** 1018 * Creates <CODE>MessageConsumer</CODE> for the specified destination, 1019 * using a message selector. This method can specify whether messages 1020 * published by its own connection should be delivered to it, if the 1021 * destination is a topic. 1022 * <P> 1023 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from 1024 * <CODE>Destination</CODE>, they can be used in the destination 1025 * parameter to create a <CODE>MessageConsumer</CODE>. 1026 * <P> 1027 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1028 * that have been published to a destination. 1029 * <P> 1030 * In some cases, a connection may both publish and subscribe to a topic. 1031 * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to 1032 * inhibit the delivery of messages published by its own connection. The 1033 * default value for this attribute is False. The <CODE>noLocal</CODE> 1034 * value must be supported by destinations that are topics. 1035 * 1036 * @param destination the <CODE>Destination</CODE> to access 1037 * @param messageSelector only messages with properties matching the message 1038 * selector expression are delivered. A value of null or an 1039 * empty string indicates that there is no message selector 1040 * for the message consumer. 1041 * @param noLocal - if true, and the destination is a topic, inhibits the 1042 * delivery of messages published by its own connection. The 1043 * behavior for <CODE>NoLocal</CODE> is not specified if 1044 * the destination is a queue. 1045 * @return the MessageConsumer 1046 * @throws JMSException if the session fails to create a MessageConsumer due 1047 * to some internal error. 1048 * @throws InvalidDestinationException if an invalid destination is 1049 * specified. 1050 * @throws InvalidSelectorException if the message selector is invalid. 1051 * @since 1.1 1052 */ 1053 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException { 1054 return createConsumer(destination, messageSelector, noLocal, null); 1055 } 1056 1057 /** 1058 * Creates <CODE>MessageConsumer</CODE> for the specified destination, 1059 * using a message selector. This method can specify whether messages 1060 * published by its own connection should be delivered to it, if the 1061 * destination is a topic. 1062 * <P> 1063 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from 1064 * <CODE>Destination</CODE>, they can be used in the destination 1065 * parameter to create a <CODE>MessageConsumer</CODE>. 1066 * <P> 1067 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1068 * that have been published to a destination. 1069 * <P> 1070 * In some cases, a connection may both publish and subscribe to a topic. 1071 * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to 1072 * inhibit the delivery of messages published by its own connection. The 1073 * default value for this attribute is False. The <CODE>noLocal</CODE> 1074 * value must be supported by destinations that are topics. 1075 * 1076 * @param destination the <CODE>Destination</CODE> to access 1077 * @param messageSelector only messages with properties matching the message 1078 * selector expression are delivered. A value of null or an 1079 * empty string indicates that there is no message selector 1080 * for the message consumer. 1081 * @param noLocal - if true, and the destination is a topic, inhibits the 1082 * delivery of messages published by its own connection. The 1083 * behavior for <CODE>NoLocal</CODE> is not specified if 1084 * the destination is a queue. 1085 * @param messageListener the listener to use for async consumption of messages 1086 * @return the MessageConsumer 1087 * @throws JMSException if the session fails to create a MessageConsumer due 1088 * to some internal error. 1089 * @throws InvalidDestinationException if an invalid destination is 1090 * specified. 1091 * @throws InvalidSelectorException if the message selector is invalid. 1092 * @since 1.1 1093 */ 1094 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException { 1095 checkClosed(); 1096 1097 if (destination instanceof CustomDestination) { 1098 CustomDestination customDestination = (CustomDestination)destination; 1099 return customDestination.createConsumer(this, messageSelector, noLocal); 1100 } 1101 1102 ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy(); 1103 int prefetch = 0; 1104 if (destination instanceof Topic) { 1105 prefetch = prefetchPolicy.getTopicPrefetch(); 1106 } else { 1107 prefetch = prefetchPolicy.getQueuePrefetch(); 1108 } 1109 ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination); 1110 return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector, 1111 prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener); 1112 } 1113 1114 /** 1115 * Creates a queue identity given a <CODE>Queue</CODE> name. 1116 * <P> 1117 * This facility is provided for the rare cases where clients need to 1118 * dynamically manipulate queue identity. It allows the creation of a queue 1119 * identity with a provider-specific name. Clients that depend on this 1120 * ability are not portable. 1121 * <P> 1122 * Note that this method is not for creating the physical queue. The 1123 * physical creation of queues is an administrative task and is not to be 1124 * initiated by the JMS API. The one exception is the creation of temporary 1125 * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE> 1126 * method. 1127 * 1128 * @param queueName the name of this <CODE>Queue</CODE> 1129 * @return a <CODE>Queue</CODE> with the given name 1130 * @throws JMSException if the session fails to create a queue due to some 1131 * internal error. 1132 * @since 1.1 1133 */ 1134 public Queue createQueue(String queueName) throws JMSException { 1135 checkClosed(); 1136 if (queueName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) { 1137 return new ActiveMQTempQueue(queueName); 1138 } 1139 return new ActiveMQQueue(queueName); 1140 } 1141 1142 /** 1143 * Creates a topic identity given a <CODE>Topic</CODE> name. 1144 * <P> 1145 * This facility is provided for the rare cases where clients need to 1146 * dynamically manipulate topic identity. This allows the creation of a 1147 * topic identity with a provider-specific name. Clients that depend on this 1148 * ability are not portable. 1149 * <P> 1150 * Note that this method is not for creating the physical topic. The 1151 * physical creation of topics is an administrative task and is not to be 1152 * initiated by the JMS API. The one exception is the creation of temporary 1153 * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE> 1154 * method. 1155 * 1156 * @param topicName the name of this <CODE>Topic</CODE> 1157 * @return a <CODE>Topic</CODE> with the given name 1158 * @throws JMSException if the session fails to create a topic due to some 1159 * internal error. 1160 * @since 1.1 1161 */ 1162 public Topic createTopic(String topicName) throws JMSException { 1163 checkClosed(); 1164 if (topicName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) { 1165 return new ActiveMQTempTopic(topicName); 1166 } 1167 return new ActiveMQTopic(topicName); 1168 } 1169 1170 /** 1171 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on 1172 * the specified queue. 1173 * 1174 * @param queue the <CODE>queue</CODE> to access 1175 * @exception InvalidDestinationException if an invalid destination is 1176 * specified 1177 * @since 1.1 1178 */ 1179 /** 1180 * Creates a durable subscriber to the specified topic. 1181 * <P> 1182 * If a client needs to receive all the messages published on a topic, 1183 * including the ones published while the subscriber is inactive, it uses a 1184 * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a 1185 * record of this durable subscription and insures that all messages from 1186 * the topic's publishers are retained until they are acknowledged by this 1187 * durable subscriber or they have expired. 1188 * <P> 1189 * Sessions with durable subscribers must always provide the same client 1190 * identifier. In addition, each client must specify a name that uniquely 1191 * identifies (within client identifier) each durable subscription it 1192 * creates. Only one session at a time can have a 1193 * <CODE>TopicSubscriber</CODE> for a particular durable subscription. 1194 * <P> 1195 * A client can change an existing durable subscription by creating a 1196 * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic 1197 * and/or message selector. Changing a durable subscriber is equivalent to 1198 * unsubscribing (deleting) the old one and creating a new one. 1199 * <P> 1200 * In some cases, a connection may both publish and subscribe to a topic. 1201 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to 1202 * inhibit the delivery of messages published by its own connection. The 1203 * default value for this attribute is false. 1204 * 1205 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to 1206 * @param name the name used to identify this subscription 1207 * @return the TopicSubscriber 1208 * @throws JMSException if the session fails to create a subscriber due to 1209 * some internal error. 1210 * @throws InvalidDestinationException if an invalid topic is specified. 1211 * @since 1.1 1212 */ 1213 public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { 1214 checkClosed(); 1215 return createDurableSubscriber(topic, name, null, false); 1216 } 1217 1218 /** 1219 * Creates a durable subscriber to the specified topic, using a message 1220 * selector and specifying whether messages published by its own connection 1221 * should be delivered to it. 1222 * <P> 1223 * If a client needs to receive all the messages published on a topic, 1224 * including the ones published while the subscriber is inactive, it uses a 1225 * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a 1226 * record of this durable subscription and insures that all messages from 1227 * the topic's publishers are retained until they are acknowledged by this 1228 * durable subscriber or they have expired. 1229 * <P> 1230 * Sessions with durable subscribers must always provide the same client 1231 * identifier. In addition, each client must specify a name which uniquely 1232 * identifies (within client identifier) each durable subscription it 1233 * creates. Only one session at a time can have a 1234 * <CODE>TopicSubscriber</CODE> for a particular durable subscription. An 1235 * inactive durable subscriber is one that exists but does not currently 1236 * have a message consumer associated with it. 1237 * <P> 1238 * A client can change an existing durable subscription by creating a 1239 * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic 1240 * and/or message selector. Changing a durable subscriber is equivalent to 1241 * unsubscribing (deleting) the old one and creating a new one. 1242 * 1243 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to 1244 * @param name the name used to identify this subscription 1245 * @param messageSelector only messages with properties matching the message 1246 * selector expression are delivered. A value of null or an 1247 * empty string indicates that there is no message selector 1248 * for the message consumer. 1249 * @param noLocal if set, inhibits the delivery of messages published by its 1250 * own connection 1251 * @return the Queue Browser 1252 * @throws JMSException if the session fails to create a subscriber due to 1253 * some internal error. 1254 * @throws InvalidDestinationException if an invalid topic is specified. 1255 * @throws InvalidSelectorException if the message selector is invalid. 1256 * @since 1.1 1257 */ 1258 public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException { 1259 checkClosed(); 1260 1261 if (topic instanceof CustomDestination) { 1262 CustomDestination customDestination = (CustomDestination)topic; 1263 return customDestination.createDurableSubscriber(this, name, messageSelector, noLocal); 1264 } 1265 1266 connection.checkClientIDWasManuallySpecified(); 1267 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); 1268 int prefetch = isAutoAcknowledge() && connection.isOptimizedMessageDispatch() ? prefetchPolicy.getOptimizeDurableTopicPrefetch() : prefetchPolicy.getDurableTopicPrefetch(); 1269 int maxPrendingLimit = prefetchPolicy.getMaximumPendingMessageLimit(); 1270 return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), name, messageSelector, prefetch, maxPrendingLimit, 1271 noLocal, false, asyncDispatch); 1272 } 1273 1274 /** 1275 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on 1276 * the specified queue. 1277 * 1278 * @param queue the <CODE>queue</CODE> to access 1279 * @return the Queue Browser 1280 * @throws JMSException if the session fails to create a browser due to some 1281 * internal error. 1282 * @throws InvalidDestinationException if an invalid destination is 1283 * specified 1284 * @since 1.1 1285 */ 1286 public QueueBrowser createBrowser(Queue queue) throws JMSException { 1287 checkClosed(); 1288 return createBrowser(queue, null); 1289 } 1290 1291 /** 1292 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on 1293 * the specified queue using a message selector. 1294 * 1295 * @param queue the <CODE>queue</CODE> to access 1296 * @param messageSelector only messages with properties matching the message 1297 * selector expression are delivered. A value of null or an 1298 * empty string indicates that there is no message selector 1299 * for the message consumer. 1300 * @return the Queue Browser 1301 * @throws JMSException if the session fails to create a browser due to some 1302 * internal error. 1303 * @throws InvalidDestinationException if an invalid destination is 1304 * specified 1305 * @throws InvalidSelectorException if the message selector is invalid. 1306 * @since 1.1 1307 */ 1308 public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { 1309 checkClosed(); 1310 return new ActiveMQQueueBrowser(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, asyncDispatch); 1311 } 1312 1313 /** 1314 * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that 1315 * of the <CODE>Connection</CODE> unless it is deleted earlier. 1316 * 1317 * @return a temporary queue identity 1318 * @throws JMSException if the session fails to create a temporary queue due 1319 * to some internal error. 1320 * @since 1.1 1321 */ 1322 public TemporaryQueue createTemporaryQueue() throws JMSException { 1323 checkClosed(); 1324 return (TemporaryQueue)connection.createTempDestination(false); 1325 } 1326 1327 /** 1328 * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that 1329 * of the <CODE>Connection</CODE> unless it is deleted earlier. 1330 * 1331 * @return a temporary topic identity 1332 * @throws JMSException if the session fails to create a temporary topic due 1333 * to some internal error. 1334 * @since 1.1 1335 */ 1336 public TemporaryTopic createTemporaryTopic() throws JMSException { 1337 checkClosed(); 1338 return (TemporaryTopic)connection.createTempDestination(true); 1339 } 1340 1341 /** 1342 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from 1343 * the specified queue. 1344 * 1345 * @param queue the <CODE>Queue</CODE> to access 1346 * @return 1347 * @throws JMSException if the session fails to create a receiver due to 1348 * some internal error. 1349 * @throws JMSException 1350 * @throws InvalidDestinationException if an invalid queue is specified. 1351 */ 1352 public QueueReceiver createReceiver(Queue queue) throws JMSException { 1353 checkClosed(); 1354 return createReceiver(queue, null); 1355 } 1356 1357 /** 1358 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from 1359 * the specified queue using a message selector. 1360 * 1361 * @param queue the <CODE>Queue</CODE> to access 1362 * @param messageSelector only messages with properties matching the message 1363 * selector expression are delivered. A value of null or an 1364 * empty string indicates that there is no message selector 1365 * for the message consumer. 1366 * @return QueueReceiver 1367 * @throws JMSException if the session fails to create a receiver due to 1368 * some internal error. 1369 * @throws InvalidDestinationException if an invalid queue is specified. 1370 * @throws InvalidSelectorException if the message selector is invalid. 1371 */ 1372 public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException { 1373 checkClosed(); 1374 1375 if (queue instanceof CustomDestination) { 1376 CustomDestination customDestination = (CustomDestination)queue; 1377 return customDestination.createReceiver(this, messageSelector); 1378 } 1379 1380 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); 1381 return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, prefetchPolicy.getQueuePrefetch(), 1382 prefetchPolicy.getMaximumPendingMessageLimit(), asyncDispatch); 1383 } 1384 1385 /** 1386 * Creates a <CODE>QueueSender</CODE> object to send messages to the 1387 * specified queue. 1388 * 1389 * @param queue the <CODE>Queue</CODE> to access, or null if this is an 1390 * unidentified producer 1391 * @return QueueSender 1392 * @throws JMSException if the session fails to create a sender due to some 1393 * internal error. 1394 * @throws InvalidDestinationException if an invalid queue is specified. 1395 */ 1396 public QueueSender createSender(Queue queue) throws JMSException { 1397 checkClosed(); 1398 if (queue instanceof CustomDestination) { 1399 CustomDestination customDestination = (CustomDestination)queue; 1400 return customDestination.createSender(this); 1401 } 1402 int timeSendOut = connection.getSendTimeout(); 1403 return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue),timeSendOut); 1404 } 1405 1406 /** 1407 * Creates a nondurable subscriber to the specified topic. <p/> 1408 * <P> 1409 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages 1410 * that have been published to a topic. <p/> 1411 * <P> 1412 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They 1413 * receive only messages that are published while they are active. <p/> 1414 * <P> 1415 * In some cases, a connection may both publish and subscribe to a topic. 1416 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to 1417 * inhibit the delivery of messages published by its own connection. The 1418 * default value for this attribute is false. 1419 * 1420 * @param topic the <CODE>Topic</CODE> to subscribe to 1421 * @return TopicSubscriber 1422 * @throws JMSException if the session fails to create a subscriber due to 1423 * some internal error. 1424 * @throws InvalidDestinationException if an invalid topic is specified. 1425 */ 1426 public TopicSubscriber createSubscriber(Topic topic) throws JMSException { 1427 checkClosed(); 1428 return createSubscriber(topic, null, false); 1429 } 1430 1431 /** 1432 * Creates a nondurable subscriber to the specified topic, using a message 1433 * selector or specifying whether messages published by its own connection 1434 * should be delivered to it. <p/> 1435 * <P> 1436 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages 1437 * that have been published to a topic. <p/> 1438 * <P> 1439 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They 1440 * receive only messages that are published while they are active. <p/> 1441 * <P> 1442 * Messages filtered out by a subscriber's message selector will never be 1443 * delivered to the subscriber. From the subscriber's perspective, they do 1444 * not exist. <p/> 1445 * <P> 1446 * In some cases, a connection may both publish and subscribe to a topic. 1447 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to 1448 * inhibit the delivery of messages published by its own connection. The 1449 * default value for this attribute is false. 1450 * 1451 * @param topic the <CODE>Topic</CODE> to subscribe to 1452 * @param messageSelector only messages with properties matching the message 1453 * selector expression are delivered. A value of null or an 1454 * empty string indicates that there is no message selector 1455 * for the message consumer. 1456 * @param noLocal if set, inhibits the delivery of messages published by its 1457 * own connection 1458 * @return TopicSubscriber 1459 * @throws JMSException if the session fails to create a subscriber due to 1460 * some internal error. 1461 * @throws InvalidDestinationException if an invalid topic is specified. 1462 * @throws InvalidSelectorException if the message selector is invalid. 1463 */ 1464 public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { 1465 checkClosed(); 1466 1467 if (topic instanceof CustomDestination) { 1468 CustomDestination customDestination = (CustomDestination)topic; 1469 return customDestination.createSubscriber(this, messageSelector, noLocal); 1470 } 1471 1472 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); 1473 return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), null, messageSelector, prefetchPolicy 1474 .getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch); 1475 } 1476 1477 /** 1478 * Creates a publisher for the specified topic. <p/> 1479 * <P> 1480 * A client uses a <CODE>TopicPublisher</CODE> object to publish messages 1481 * on a topic. Each time a client creates a <CODE>TopicPublisher</CODE> on 1482 * a topic, it defines a new sequence of messages that have no ordering 1483 * relationship with the messages it has previously sent. 1484 * 1485 * @param topic the <CODE>Topic</CODE> to publish to, or null if this is 1486 * an unidentified producer 1487 * @return TopicPublisher 1488 * @throws JMSException if the session fails to create a publisher due to 1489 * some internal error. 1490 * @throws InvalidDestinationException if an invalid topic is specified. 1491 */ 1492 public TopicPublisher createPublisher(Topic topic) throws JMSException { 1493 checkClosed(); 1494 1495 if (topic instanceof CustomDestination) { 1496 CustomDestination customDestination = (CustomDestination)topic; 1497 return customDestination.createPublisher(this); 1498 } 1499 int timeSendOut = connection.getSendTimeout(); 1500 return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic),timeSendOut); 1501 } 1502 1503 /** 1504 * Unsubscribes a durable subscription that has been created by a client. 1505 * <P> 1506 * This method deletes the state being maintained on behalf of the 1507 * subscriber by its provider. 1508 * <P> 1509 * It is erroneous for a client to delete a durable subscription while there 1510 * is an active <CODE>MessageConsumer </CODE> or 1511 * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed 1512 * message is part of a pending transaction or has not been acknowledged in 1513 * the session. 1514 * 1515 * @param name the name used to identify this subscription 1516 * @throws JMSException if the session fails to unsubscribe to the durable 1517 * subscription due to some internal error. 1518 * @throws InvalidDestinationException if an invalid subscription name is 1519 * specified. 1520 * @since 1.1 1521 */ 1522 public void unsubscribe(String name) throws JMSException { 1523 checkClosed(); 1524 connection.unsubscribe(name); 1525 } 1526 1527 public void dispatch(MessageDispatch messageDispatch) { 1528 try { 1529 executor.execute(messageDispatch); 1530 } catch (InterruptedException e) { 1531 Thread.currentThread().interrupt(); 1532 connection.onClientInternalException(e); 1533 } 1534 } 1535 1536 /** 1537 * Acknowledges all consumed messages of the session of this consumed 1538 * message. 1539 * <P> 1540 * All consumed JMS messages support the <CODE>acknowledge</CODE> method 1541 * for use when a client has specified that its JMS session's consumed 1542 * messages are to be explicitly acknowledged. By invoking 1543 * <CODE>acknowledge</CODE> on a consumed message, a client acknowledges 1544 * all messages consumed by the session that the message was delivered to. 1545 * <P> 1546 * Calls to <CODE>acknowledge</CODE> are ignored for both transacted 1547 * sessions and sessions specified to use implicit acknowledgement modes. 1548 * <P> 1549 * A client may individually acknowledge each message as it is consumed, or 1550 * it may choose to acknowledge messages as an application-defined group 1551 * (which is done by calling acknowledge on the last received message of the 1552 * group, thereby acknowledging all messages consumed by the session.) 1553 * <P> 1554 * Messages that have been received but not acknowledged may be redelivered. 1555 * 1556 * @throws JMSException if the JMS provider fails to acknowledge the 1557 * messages due to some internal error. 1558 * @throws javax.jms.IllegalStateException if this method is called on a 1559 * closed session. 1560 * @see javax.jms.Session#CLIENT_ACKNOWLEDGE 1561 */ 1562 public void acknowledge() throws JMSException { 1563 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1564 ActiveMQMessageConsumer c = iter.next(); 1565 c.acknowledge(); 1566 } 1567 } 1568 1569 /** 1570 * Add a message consumer. 1571 * 1572 * @param consumer - message consumer. 1573 * @throws JMSException 1574 */ 1575 protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException { 1576 this.consumers.add(consumer); 1577 if (consumer.isDurableSubscriber()) { 1578 stats.onCreateDurableSubscriber(); 1579 } 1580 this.connection.addDispatcher(consumer.getConsumerId(), this); 1581 } 1582 1583 /** 1584 * Remove the message consumer. 1585 * 1586 * @param consumer - consumer to be removed. 1587 * @throws JMSException 1588 */ 1589 protected void removeConsumer(ActiveMQMessageConsumer consumer) { 1590 this.connection.removeDispatcher(consumer.getConsumerId()); 1591 if (consumer.isDurableSubscriber()) { 1592 stats.onRemoveDurableSubscriber(); 1593 } 1594 this.consumers.remove(consumer); 1595 this.connection.removeDispatcher(consumer); 1596 } 1597 1598 /** 1599 * Adds a message producer. 1600 * 1601 * @param producer - message producer to be added. 1602 * @throws JMSException 1603 */ 1604 protected void addProducer(ActiveMQMessageProducer producer) throws JMSException { 1605 this.producers.add(producer); 1606 this.connection.addProducer(producer.getProducerInfo().getProducerId(), producer); 1607 } 1608 1609 /** 1610 * Removes a message producer. 1611 * 1612 * @param producer - message producer to be removed. 1613 * @throws JMSException 1614 */ 1615 protected void removeProducer(ActiveMQMessageProducer producer) { 1616 this.connection.removeProducer(producer.getProducerInfo().getProducerId()); 1617 this.producers.remove(producer); 1618 } 1619 1620 /** 1621 * Start this Session. 1622 * 1623 * @throws JMSException 1624 */ 1625 protected void start() throws JMSException { 1626 started.set(true); 1627 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1628 ActiveMQMessageConsumer c = iter.next(); 1629 c.start(); 1630 } 1631 executor.start(); 1632 } 1633 1634 /** 1635 * Stops this session. 1636 * 1637 * @throws JMSException 1638 */ 1639 protected void stop() throws JMSException { 1640 1641 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1642 ActiveMQMessageConsumer c = iter.next(); 1643 c.stop(); 1644 } 1645 1646 started.set(false); 1647 executor.stop(); 1648 } 1649 1650 /** 1651 * Returns the session id. 1652 * 1653 * @return value - session id. 1654 */ 1655 protected SessionId getSessionId() { 1656 return info.getSessionId(); 1657 } 1658 1659 /** 1660 * @return 1661 */ 1662 protected ConsumerId getNextConsumerId() { 1663 return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId()); 1664 } 1665 1666 /** 1667 * @return 1668 */ 1669 protected ProducerId getNextProducerId() { 1670 return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId()); 1671 } 1672 1673 /** 1674 * Sends the message for dispatch by the broker. 1675 * 1676 * @param producer - message producer. 1677 * @param destination - message destination. 1678 * @param message - message to be sent. 1679 * @param deliveryMode - JMS messsage delivery mode. 1680 * @param priority - message priority. 1681 * @param timeToLive - message expiration. 1682 * @param producerWindow 1683 * @throws JMSException 1684 */ 1685 protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, 1686 MemoryUsage producerWindow, int sendTimeout) throws JMSException { 1687 1688 checkClosed(); 1689 if (destination.isTemporary() && connection.isDeleted(destination)) { 1690 throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination); 1691 } 1692 synchronized (sendMutex) { 1693 // tell the Broker we are about to start a new transaction 1694 doStartTransaction(); 1695 TransactionId txid = transactionContext.getTransactionId(); 1696 long sequenceNumber = producer.getMessageSequence(); 1697 1698 //Set the "JMS" header fields on the orriginal message, see 1.1 spec section 3.4.11 1699 message.setJMSDestination(destination); 1700 message.setJMSDeliveryMode(deliveryMode); 1701 long expiration = 0L; 1702 if (!producer.getDisableMessageTimestamp()) { 1703 long timeStamp = System.currentTimeMillis(); 1704 message.setJMSTimestamp(timeStamp); 1705 if (timeToLive > 0) { 1706 expiration = timeToLive + timeStamp; 1707 } 1708 } 1709 message.setJMSExpiration(expiration); 1710 message.setJMSPriority(priority); 1711 message.setJMSRedelivered(false); 1712 1713 // transform to our own message format here 1714 ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection); 1715 1716 // Set the message id. 1717 if (msg == message) { 1718 msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber)); 1719 } else { 1720 msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber)); 1721 message.setJMSMessageID(msg.getMessageId().toString()); 1722 } 1723 //clear the brokerPath in case we are re-sending this message 1724 msg.setBrokerPath(null); 1725 1726 1727 msg.setTransactionId(txid); 1728 if (connection.isCopyMessageOnSend()) { 1729 msg = (ActiveMQMessage)msg.copy(); 1730 } 1731 msg.setConnection(connection); 1732 msg.onSend(); 1733 msg.setProducerId(msg.getMessageId().getProducerId()); 1734 if (LOG.isTraceEnabled()) { 1735 LOG.trace(getSessionId() + " sending message: " + msg); 1736 } 1737 if (sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) { 1738 this.connection.asyncSendPacket(msg); 1739 if (producerWindow != null) { 1740 // Since we defer lots of the marshaling till we hit the 1741 // wire, this might not 1742 // provide and accurate size. We may change over to doing 1743 // more aggressive marshaling, 1744 // to get more accurate sizes.. this is more important once 1745 // users start using producer window 1746 // flow control. 1747 int size = msg.getSize(); 1748 producerWindow.increaseUsage(size); 1749 } 1750 } else { 1751 if (sendTimeout > 0) { 1752 this.connection.syncSendPacket(msg,sendTimeout); 1753 }else { 1754 this.connection.syncSendPacket(msg); 1755 } 1756 } 1757 1758 } 1759 } 1760 1761 /** 1762 * Send TransactionInfo to indicate transaction has started 1763 * 1764 * @throws JMSException if some internal error occurs 1765 */ 1766 protected void doStartTransaction() throws JMSException { 1767 if (getTransacted() && !transactionContext.isInXATransaction()) { 1768 transactionContext.begin(); 1769 } 1770 } 1771 1772 /** 1773 * Checks whether the session has unconsumed messages. 1774 * 1775 * @return true - if there are unconsumed messages. 1776 */ 1777 public boolean hasUncomsumedMessages() { 1778 return executor.hasUncomsumedMessages(); 1779 } 1780 1781 /** 1782 * Checks whether the session uses transactions. 1783 * 1784 * @return true - if the session uses transactions. 1785 */ 1786 public boolean isTransacted() { 1787 return this.acknowledgementMode == Session.SESSION_TRANSACTED || (transactionContext.isInXATransaction()); 1788 } 1789 1790 /** 1791 * Checks whether the session used client acknowledgment. 1792 * 1793 * @return true - if the session uses client acknowledgment. 1794 */ 1795 protected boolean isClientAcknowledge() { 1796 return this.acknowledgementMode == Session.CLIENT_ACKNOWLEDGE; 1797 } 1798 1799 /** 1800 * Checks whether the session used auto acknowledgment. 1801 * 1802 * @return true - if the session uses client acknowledgment. 1803 */ 1804 public boolean isAutoAcknowledge() { 1805 return acknowledgementMode == Session.AUTO_ACKNOWLEDGE; 1806 } 1807 1808 /** 1809 * Checks whether the session used dup ok acknowledgment. 1810 * 1811 * @return true - if the session uses client acknowledgment. 1812 */ 1813 public boolean isDupsOkAcknowledge() { 1814 return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE; 1815 } 1816 1817 public boolean isIndividualAcknowledge(){ 1818 return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE; 1819 } 1820 1821 /** 1822 * Returns the message delivery listener. 1823 * 1824 * @return deliveryListener - message delivery listener. 1825 */ 1826 public DeliveryListener getDeliveryListener() { 1827 return deliveryListener; 1828 } 1829 1830 /** 1831 * Sets the message delivery listener. 1832 * 1833 * @param deliveryListener - message delivery listener. 1834 */ 1835 public void setDeliveryListener(DeliveryListener deliveryListener) { 1836 this.deliveryListener = deliveryListener; 1837 } 1838 1839 /** 1840 * Returns the SessionInfo bean. 1841 * 1842 * @return info - SessionInfo bean. 1843 * @throws JMSException 1844 */ 1845 protected SessionInfo getSessionInfo() throws JMSException { 1846 SessionInfo info = new SessionInfo(connection.getConnectionInfo(), getSessionId().getValue()); 1847 return info; 1848 } 1849 1850 /** 1851 * Send the asynchronus command. 1852 * 1853 * @param command - command to be executed. 1854 * @throws JMSException 1855 */ 1856 public void asyncSendPacket(Command command) throws JMSException { 1857 connection.asyncSendPacket(command); 1858 } 1859 1860 /** 1861 * Send the synchronus command. 1862 * 1863 * @param command - command to be executed. 1864 * @return Response 1865 * @throws JMSException 1866 */ 1867 public Response syncSendPacket(Command command) throws JMSException { 1868 return connection.syncSendPacket(command); 1869 } 1870 1871 public long getNextDeliveryId() { 1872 return deliveryIdGenerator.getNextSequenceId(); 1873 } 1874 1875 public void redispatch(ActiveMQDispatcher dispatcher, MessageDispatchChannel unconsumedMessages) throws JMSException { 1876 1877 List<MessageDispatch> c = unconsumedMessages.removeAll(); 1878 for (MessageDispatch md : c) { 1879 this.connection.rollbackDuplicate(dispatcher, md.getMessage()); 1880 } 1881 Collections.reverse(c); 1882 1883 for (Iterator<MessageDispatch> iter = c.iterator(); iter.hasNext();) { 1884 MessageDispatch md = iter.next(); 1885 executor.executeFirst(md); 1886 } 1887 1888 } 1889 1890 public boolean isRunning() { 1891 return started.get(); 1892 } 1893 1894 public boolean isAsyncDispatch() { 1895 return asyncDispatch; 1896 } 1897 1898 public void setAsyncDispatch(boolean asyncDispatch) { 1899 this.asyncDispatch = asyncDispatch; 1900 } 1901 1902 /** 1903 * @return Returns the sessionAsyncDispatch. 1904 */ 1905 public boolean isSessionAsyncDispatch() { 1906 return sessionAsyncDispatch; 1907 } 1908 1909 /** 1910 * @param sessionAsyncDispatch The sessionAsyncDispatch to set. 1911 */ 1912 public void setSessionAsyncDispatch(boolean sessionAsyncDispatch) { 1913 this.sessionAsyncDispatch = sessionAsyncDispatch; 1914 } 1915 1916 public MessageTransformer getTransformer() { 1917 return transformer; 1918 } 1919 1920 public ActiveMQConnection getConnection() { 1921 return connection; 1922 } 1923 1924 /** 1925 * Sets the transformer used to transform messages before they are sent on 1926 * to the JMS bus or when they are received from the bus but before they are 1927 * delivered to the JMS client 1928 */ 1929 public void setTransformer(MessageTransformer transformer) { 1930 this.transformer = transformer; 1931 } 1932 1933 public BlobTransferPolicy getBlobTransferPolicy() { 1934 return blobTransferPolicy; 1935 } 1936 1937 /** 1938 * Sets the policy used to describe how out-of-band BLOBs (Binary Large 1939 * OBjects) are transferred from producers to brokers to consumers 1940 */ 1941 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) { 1942 this.blobTransferPolicy = blobTransferPolicy; 1943 } 1944 1945 public List getUnconsumedMessages() { 1946 return executor.getUnconsumedMessages(); 1947 } 1948 1949 public String toString() { 1950 return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + "}"; 1951 } 1952 1953 public void checkMessageListener() throws JMSException { 1954 if (messageListener != null) { 1955 throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); 1956 } 1957 for (Iterator<ActiveMQMessageConsumer> i = consumers.iterator(); i.hasNext();) { 1958 ActiveMQMessageConsumer consumer = i.next(); 1959 if (consumer.getMessageListener() != null) { 1960 throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); 1961 } 1962 } 1963 } 1964 1965 protected void setOptimizeAcknowledge(boolean value) { 1966 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1967 ActiveMQMessageConsumer c = iter.next(); 1968 c.setOptimizeAcknowledge(value); 1969 } 1970 } 1971 1972 protected void setPrefetchSize(ConsumerId id, int prefetch) { 1973 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1974 ActiveMQMessageConsumer c = iter.next(); 1975 if (c.getConsumerId().equals(id)) { 1976 c.setPrefetchSize(prefetch); 1977 break; 1978 } 1979 } 1980 } 1981 1982 protected void close(ConsumerId id) { 1983 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1984 ActiveMQMessageConsumer c = iter.next(); 1985 if (c.getConsumerId().equals(id)) { 1986 try { 1987 c.close(); 1988 } catch (JMSException e) { 1989 LOG.warn("Exception closing consumer", e); 1990 } 1991 LOG.warn("Closed consumer on Command"); 1992 break; 1993 } 1994 } 1995 } 1996 1997 public boolean isInUse(ActiveMQTempDestination destination) { 1998 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1999 ActiveMQMessageConsumer c = iter.next(); 2000 if (c.isInUse(destination)) { 2001 return true; 2002 } 2003 } 2004 return false; 2005 } 2006 2007 /** 2008 * highest sequence id of the last message delivered by this session. 2009 * Passed to the broker in the close command, maintained by dispose() 2010 * @return lastDeliveredSequenceId 2011 */ 2012 public long getLastDeliveredSequenceId() { 2013 return lastDeliveredSequenceId; 2014 } 2015 2016 protected void sendAck(MessageAck ack) throws JMSException { 2017 sendAck(ack,false); 2018 } 2019 2020 protected void sendAck(MessageAck ack, boolean lazy) throws JMSException { 2021 if (lazy || connection.isSendAcksAsync() || getTransacted()) { 2022 asyncSendPacket(ack); 2023 } else { 2024 syncSendPacket(ack); 2025 } 2026 } 2027 }