001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq; 018 019 import java.io.IOException; 020 import java.util.ArrayList; 021 import java.util.HashMap; 022 import java.util.Iterator; 023 import java.util.LinkedList; 024 import java.util.List; 025 import java.util.Map; 026 import java.util.Map.Entry; 027 import java.util.concurrent.ExecutorService; 028 import java.util.concurrent.Executors; 029 import java.util.concurrent.TimeUnit; 030 import java.util.concurrent.atomic.AtomicBoolean; 031 import java.util.concurrent.atomic.AtomicReference; 032 033 import javax.jms.IllegalStateException; 034 import javax.jms.InvalidDestinationException; 035 import javax.jms.JMSException; 036 import javax.jms.Message; 037 import javax.jms.MessageConsumer; 038 import javax.jms.MessageListener; 039 import javax.jms.TransactionRolledBackException; 040 041 import org.apache.activemq.blob.BlobDownloader; 042 import org.apache.activemq.command.ActiveMQBlobMessage; 043 import org.apache.activemq.command.ActiveMQDestination; 044 import org.apache.activemq.command.ActiveMQMessage; 045 import org.apache.activemq.command.ActiveMQTempDestination; 046 import org.apache.activemq.command.CommandTypes; 047 import org.apache.activemq.command.ConsumerId; 048 import org.apache.activemq.command.ConsumerInfo; 049 import org.apache.activemq.command.MessageAck; 050 import org.apache.activemq.command.MessageDispatch; 051 import org.apache.activemq.command.MessageId; 052 import org.apache.activemq.command.MessagePull; 053 import org.apache.activemq.command.RemoveInfo; 054 import org.apache.activemq.command.TransactionId; 055 import org.apache.activemq.management.JMSConsumerStatsImpl; 056 import org.apache.activemq.management.StatsCapable; 057 import org.apache.activemq.management.StatsImpl; 058 import org.apache.activemq.selector.SelectorParser; 059 import org.apache.activemq.thread.Scheduler; 060 import org.apache.activemq.transaction.Synchronization; 061 import org.apache.activemq.util.Callback; 062 import org.apache.activemq.util.IntrospectionSupport; 063 import org.apache.activemq.util.JMSExceptionSupport; 064 import org.apache.commons.logging.Log; 065 import org.apache.commons.logging.LogFactory; 066 067 /** 068 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 069 * from a destination. A <CODE> MessageConsumer</CODE> object is created by 070 * passing a <CODE>Destination</CODE> object to a message-consumer creation 071 * method supplied by a session. 072 * <P> 073 * <CODE>MessageConsumer</CODE> is the parent interface for all message 074 * consumers. 075 * <P> 076 * A message consumer can be created with a message selector. A message selector 077 * allows the client to restrict the messages delivered to the message consumer 078 * to those that match the selector. 079 * <P> 080 * A client may either synchronously receive a message consumer's messages or 081 * have the consumer asynchronously deliver them as they arrive. 082 * <P> 083 * For synchronous receipt, a client can request the next message from a message 084 * consumer using one of its <CODE> receive</CODE> methods. There are several 085 * variations of <CODE>receive</CODE> that allow a client to poll or wait for 086 * the next message. 087 * <P> 088 * For asynchronous delivery, a client can register a 089 * <CODE>MessageListener</CODE> object with a message consumer. As messages 090 * arrive at the message consumer, it delivers them by calling the 091 * <CODE>MessageListener</CODE>'s<CODE> 092 * onMessage</CODE> method. 093 * <P> 094 * It is a client programming error for a <CODE>MessageListener</CODE> to 095 * throw an exception. 096 * 097 * @version $Revision: 1.22 $ 098 * @see javax.jms.MessageConsumer 099 * @see javax.jms.QueueReceiver 100 * @see javax.jms.TopicSubscriber 101 * @see javax.jms.Session 102 */ 103 public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsCapable, ActiveMQDispatcher { 104 105 @SuppressWarnings("serial") 106 class PreviouslyDeliveredMap<K, V> extends HashMap<K, V> { 107 final TransactionId transactionId; 108 public PreviouslyDeliveredMap(TransactionId transactionId) { 109 this.transactionId = transactionId; 110 } 111 } 112 113 private static final Log LOG = LogFactory.getLog(ActiveMQMessageConsumer.class); 114 protected static final Scheduler scheduler = Scheduler.getInstance(); 115 protected final ActiveMQSession session; 116 protected final ConsumerInfo info; 117 118 // These are the messages waiting to be delivered to the client 119 protected final MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel(); 120 121 // The are the messages that were delivered to the consumer but that have 122 // not been acknowledged. It's kept in reverse order since we 123 // Always walk list in reverse order. 124 private final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>(); 125 // track duplicate deliveries in a transaction such that the tx integrity can be validated 126 private PreviouslyDeliveredMap<MessageId, Boolean> previouslyDeliveredMessages; 127 private int deliveredCounter; 128 private int additionalWindowSize; 129 private long redeliveryDelay; 130 private int ackCounter; 131 private int dispatchedCount; 132 private final AtomicReference<MessageListener> messageListener = new AtomicReference<MessageListener>(); 133 private JMSConsumerStatsImpl stats; 134 135 private final String selector; 136 private boolean synchronizationRegistered; 137 private AtomicBoolean started = new AtomicBoolean(false); 138 139 private MessageAvailableListener availableListener; 140 141 private RedeliveryPolicy redeliveryPolicy; 142 private boolean optimizeAcknowledge; 143 private AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean(); 144 private ExecutorService executorService; 145 private MessageTransformer transformer; 146 private boolean clearDispatchList; 147 148 private MessageAck pendingAck; 149 private long lastDeliveredSequenceId; 150 151 private IOException failureError; 152 153 private long optimizeAckTimestamp = System.currentTimeMillis(); 154 private long optimizeAckTimeout = 300; 155 private long failoverRedeliveryWaitPeriod = 0; 156 157 /** 158 * Create a MessageConsumer 159 * 160 * @param session 161 * @param dest 162 * @param name 163 * @param selector 164 * @param prefetch 165 * @param maximumPendingMessageCount 166 * @param noLocal 167 * @param browser 168 * @param dispatchAsync 169 * @param messageListener 170 * @throws JMSException 171 */ 172 public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest, 173 String name, String selector, int prefetch, 174 int maximumPendingMessageCount, boolean noLocal, boolean browser, 175 boolean dispatchAsync, MessageListener messageListener) throws JMSException { 176 if (dest == null) { 177 throw new InvalidDestinationException("Don't understand null destinations"); 178 } else if (dest.getPhysicalName() == null) { 179 throw new InvalidDestinationException("The destination object was not given a physical name."); 180 } else if (dest.isTemporary()) { 181 String physicalName = dest.getPhysicalName(); 182 183 if (physicalName == null) { 184 throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest); 185 } 186 187 String connectionID = session.connection.getConnectionInfo().getConnectionId().getValue(); 188 189 if (physicalName.indexOf(connectionID) < 0) { 190 throw new InvalidDestinationException( 191 "Cannot use a Temporary destination from another Connection"); 192 } 193 194 if (session.connection.isDeleted(dest)) { 195 throw new InvalidDestinationException( 196 "Cannot use a Temporary destination that has been deleted"); 197 } 198 if (prefetch < 0) { 199 throw new JMSException("Cannot have a prefetch size less than zero"); 200 } 201 } 202 203 this.session = session; 204 this.redeliveryPolicy = session.connection.getRedeliveryPolicy(); 205 setTransformer(session.getTransformer()); 206 207 this.info = new ConsumerInfo(consumerId); 208 this.info.setExclusive(this.session.connection.isExclusiveConsumer()); 209 this.info.setSubscriptionName(name); 210 this.info.setPrefetchSize(prefetch); 211 this.info.setCurrentPrefetchSize(prefetch); 212 this.info.setMaximumPendingMessageLimit(maximumPendingMessageCount); 213 this.info.setNoLocal(noLocal); 214 this.info.setDispatchAsync(dispatchAsync); 215 this.info.setRetroactive(this.session.connection.isUseRetroactiveConsumer()); 216 this.info.setSelector(null); 217 218 // Allows the options on the destination to configure the consumerInfo 219 if (dest.getOptions() != null) { 220 Map<String, String> options = new HashMap<String, String>(dest.getOptions()); 221 IntrospectionSupport.setProperties(this.info, options, "consumer."); 222 } 223 224 this.info.setDestination(dest); 225 this.info.setBrowser(browser); 226 if (selector != null && selector.trim().length() != 0) { 227 // Validate the selector 228 SelectorParser.parse(selector); 229 this.info.setSelector(selector); 230 this.selector = selector; 231 } else if (info.getSelector() != null) { 232 // Validate the selector 233 SelectorParser.parse(this.info.getSelector()); 234 this.selector = this.info.getSelector(); 235 } else { 236 this.selector = null; 237 } 238 239 this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest); 240 this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge() 241 && !info.isBrowser(); 242 this.info.setOptimizedAcknowledge(this.optimizeAcknowledge); 243 this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod(); 244 if (messageListener != null) { 245 setMessageListener(messageListener); 246 } 247 try { 248 this.session.addConsumer(this); 249 this.session.syncSendPacket(info); 250 } catch (JMSException e) { 251 this.session.removeConsumer(this); 252 throw e; 253 } 254 255 if (session.connection.isStarted()) { 256 start(); 257 } 258 } 259 260 private boolean isAutoAcknowledgeEach() { 261 return session.isAutoAcknowledge() || ( session.isDupsOkAcknowledge() && getDestination().isQueue() ); 262 } 263 264 private boolean isAutoAcknowledgeBatch() { 265 return session.isDupsOkAcknowledge() && !getDestination().isQueue() ; 266 } 267 268 public StatsImpl getStats() { 269 return stats; 270 } 271 272 public JMSConsumerStatsImpl getConsumerStats() { 273 return stats; 274 } 275 276 public RedeliveryPolicy getRedeliveryPolicy() { 277 return redeliveryPolicy; 278 } 279 280 /** 281 * Sets the redelivery policy used when messages are redelivered 282 */ 283 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) { 284 this.redeliveryPolicy = redeliveryPolicy; 285 } 286 287 public MessageTransformer getTransformer() { 288 return transformer; 289 } 290 291 /** 292 * Sets the transformer used to transform messages before they are sent on 293 * to the JMS bus 294 */ 295 public void setTransformer(MessageTransformer transformer) { 296 this.transformer = transformer; 297 } 298 299 /** 300 * @return Returns the value. 301 */ 302 public ConsumerId getConsumerId() { 303 return info.getConsumerId(); 304 } 305 306 /** 307 * @return the consumer name - used for durable consumers 308 */ 309 public String getConsumerName() { 310 return this.info.getSubscriptionName(); 311 } 312 313 /** 314 * @return true if this consumer does not accept locally produced messages 315 */ 316 protected boolean isNoLocal() { 317 return info.isNoLocal(); 318 } 319 320 /** 321 * Retrieve is a browser 322 * 323 * @return true if a browser 324 */ 325 protected boolean isBrowser() { 326 return info.isBrowser(); 327 } 328 329 /** 330 * @return ActiveMQDestination 331 */ 332 protected ActiveMQDestination getDestination() { 333 return info.getDestination(); 334 } 335 336 /** 337 * @return Returns the prefetchNumber. 338 */ 339 public int getPrefetchNumber() { 340 return info.getPrefetchSize(); 341 } 342 343 /** 344 * @return true if this is a durable topic subscriber 345 */ 346 public boolean isDurableSubscriber() { 347 return info.getSubscriptionName() != null && info.getDestination().isTopic(); 348 } 349 350 /** 351 * Gets this message consumer's message selector expression. 352 * 353 * @return this message consumer's message selector, or null if no message 354 * selector exists for the message consumer (that is, if the message 355 * selector was not set or was set to null or the empty string) 356 * @throws JMSException if the JMS provider fails to receive the next 357 * message due to some internal error. 358 */ 359 public String getMessageSelector() throws JMSException { 360 checkClosed(); 361 return selector; 362 } 363 364 /** 365 * Gets the message consumer's <CODE>MessageListener</CODE>. 366 * 367 * @return the listener for the message consumer, or null if no listener is 368 * set 369 * @throws JMSException if the JMS provider fails to get the message 370 * listener due to some internal error. 371 * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener) 372 */ 373 public MessageListener getMessageListener() throws JMSException { 374 checkClosed(); 375 return this.messageListener.get(); 376 } 377 378 /** 379 * Sets the message consumer's <CODE>MessageListener</CODE>. 380 * <P> 381 * Setting the message listener to null is the equivalent of unsetting the 382 * message listener for the message consumer. 383 * <P> 384 * The effect of calling <CODE>MessageConsumer.setMessageListener</CODE> 385 * while messages are being consumed by an existing listener or the consumer 386 * is being used to consume messages synchronously is undefined. 387 * 388 * @param listener the listener to which the messages are to be delivered 389 * @throws JMSException if the JMS provider fails to receive the next 390 * message due to some internal error. 391 * @see javax.jms.MessageConsumer#getMessageListener 392 */ 393 public void setMessageListener(MessageListener listener) throws JMSException { 394 checkClosed(); 395 if (info.getPrefetchSize() == 0) { 396 throw new JMSException( 397 "Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1"); 398 } 399 if (listener != null) { 400 boolean wasRunning = session.isRunning(); 401 if (wasRunning) { 402 session.stop(); 403 } 404 405 this.messageListener.set(listener); 406 session.redispatch(this, unconsumedMessages); 407 408 if (wasRunning) { 409 session.start(); 410 } 411 } else { 412 this.messageListener.set(null); 413 } 414 } 415 416 public MessageAvailableListener getAvailableListener() { 417 return availableListener; 418 } 419 420 /** 421 * Sets the listener used to notify synchronous consumers that there is a 422 * message available so that the {@link MessageConsumer#receiveNoWait()} can 423 * be called. 424 */ 425 public void setAvailableListener(MessageAvailableListener availableListener) { 426 this.availableListener = availableListener; 427 } 428 429 /** 430 * Used to get an enqueued message from the unconsumedMessages list. The 431 * amount of time this method blocks is based on the timeout value. - if 432 * timeout==-1 then it blocks until a message is received. - if timeout==0 433 * then it it tries to not block at all, it returns a message if it is 434 * available - if timeout>0 then it blocks up to timeout amount of time. 435 * Expired messages will consumed by this method. 436 * 437 * @throws JMSException 438 * @return null if we timeout or if the consumer is closed. 439 */ 440 private MessageDispatch dequeue(long timeout) throws JMSException { 441 try { 442 long deadline = 0; 443 if (timeout > 0) { 444 deadline = System.currentTimeMillis() + timeout; 445 } 446 while (true) { 447 MessageDispatch md = unconsumedMessages.dequeue(timeout); 448 if (md == null) { 449 if (timeout > 0 && !unconsumedMessages.isClosed()) { 450 timeout = Math.max(deadline - System.currentTimeMillis(), 0); 451 } else { 452 if (failureError != null) { 453 throw JMSExceptionSupport.create(failureError); 454 } else { 455 return null; 456 } 457 } 458 } else if (md.getMessage() == null) { 459 return null; 460 } else if (md.getMessage().isExpired()) { 461 if (LOG.isDebugEnabled()) { 462 LOG.debug(getConsumerId() + " received expired message: " + md); 463 } 464 beforeMessageIsConsumed(md); 465 afterMessageIsConsumed(md, true); 466 if (timeout > 0) { 467 timeout = Math.max(deadline - System.currentTimeMillis(), 0); 468 } 469 } else { 470 if (LOG.isTraceEnabled()) { 471 LOG.trace(getConsumerId() + " received message: " + md); 472 } 473 return md; 474 } 475 } 476 } catch (InterruptedException e) { 477 Thread.currentThread().interrupt(); 478 throw JMSExceptionSupport.create(e); 479 } 480 } 481 482 /** 483 * Receives the next message produced for this message consumer. 484 * <P> 485 * This call blocks indefinitely until a message is produced or until this 486 * message consumer is closed. 487 * <P> 488 * If this <CODE>receive</CODE> is done within a transaction, the consumer 489 * retains the message until the transaction commits. 490 * 491 * @return the next message produced for this message consumer, or null if 492 * this message consumer is concurrently closed 493 */ 494 public Message receive() throws JMSException { 495 checkClosed(); 496 checkMessageListener(); 497 498 sendPullCommand(0); 499 MessageDispatch md = dequeue(-1); 500 if (md == null) { 501 return null; 502 } 503 504 beforeMessageIsConsumed(md); 505 afterMessageIsConsumed(md, false); 506 507 return createActiveMQMessage(md); 508 } 509 510 /** 511 * @param md 512 * @return 513 */ 514 private ActiveMQMessage createActiveMQMessage(final MessageDispatch md) throws JMSException { 515 ActiveMQMessage m = (ActiveMQMessage)md.getMessage().copy(); 516 if (m.getDataStructureType()==CommandTypes.ACTIVEMQ_BLOB_MESSAGE) { 517 ((ActiveMQBlobMessage)m).setBlobDownloader(new BlobDownloader(session.getBlobTransferPolicy())); 518 } 519 if (transformer != null) { 520 Message transformedMessage = transformer.consumerTransform(session, this, m); 521 if (transformedMessage != null) { 522 m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection); 523 } 524 } 525 if (session.isClientAcknowledge()) { 526 m.setAcknowledgeCallback(new Callback() { 527 public void execute() throws Exception { 528 session.checkClosed(); 529 session.acknowledge(); 530 } 531 }); 532 }else if (session.isIndividualAcknowledge()) { 533 m.setAcknowledgeCallback(new Callback() { 534 public void execute() throws Exception { 535 session.checkClosed(); 536 acknowledge(md); 537 } 538 }); 539 } 540 return m; 541 } 542 543 /** 544 * Receives the next message that arrives within the specified timeout 545 * interval. 546 * <P> 547 * This call blocks until a message arrives, the timeout expires, or this 548 * message consumer is closed. A <CODE>timeout</CODE> of zero never 549 * expires, and the call blocks indefinitely. 550 * 551 * @param timeout the timeout value (in milliseconds), a time out of zero 552 * never expires. 553 * @return the next message produced for this message consumer, or null if 554 * the timeout expires or this message consumer is concurrently 555 * closed 556 */ 557 public Message receive(long timeout) throws JMSException { 558 checkClosed(); 559 checkMessageListener(); 560 if (timeout == 0) { 561 return this.receive(); 562 563 } 564 565 sendPullCommand(timeout); 566 while (timeout > 0) { 567 568 MessageDispatch md; 569 if (info.getPrefetchSize() == 0) { 570 md = dequeue(-1); // We let the broker let us know when we timeout. 571 } else { 572 md = dequeue(timeout); 573 } 574 575 if (md == null) { 576 return null; 577 } 578 579 beforeMessageIsConsumed(md); 580 afterMessageIsConsumed(md, false); 581 return createActiveMQMessage(md); 582 } 583 return null; 584 } 585 586 /** 587 * Receives the next message if one is immediately available. 588 * 589 * @return the next message produced for this message consumer, or null if 590 * one is not available 591 * @throws JMSException if the JMS provider fails to receive the next 592 * message due to some internal error. 593 */ 594 public Message receiveNoWait() throws JMSException { 595 checkClosed(); 596 checkMessageListener(); 597 sendPullCommand(-1); 598 599 MessageDispatch md; 600 if (info.getPrefetchSize() == 0) { 601 md = dequeue(-1); // We let the broker let us know when we 602 // timeout. 603 } else { 604 md = dequeue(0); 605 } 606 607 if (md == null) { 608 return null; 609 } 610 611 beforeMessageIsConsumed(md); 612 afterMessageIsConsumed(md, false); 613 return createActiveMQMessage(md); 614 } 615 616 /** 617 * Closes the message consumer. 618 * <P> 619 * Since a provider may allocate some resources on behalf of a <CODE> 620 * MessageConsumer</CODE> 621 * outside the Java virtual machine, clients should close them when they are 622 * not needed. Relying on garbage collection to eventually reclaim these 623 * resources may not be timely enough. 624 * <P> 625 * This call blocks until a <CODE>receive</CODE> or message listener in 626 * progress has completed. A blocked message consumer <CODE>receive </CODE> 627 * call returns null when this message consumer is closed. 628 * 629 * @throws JMSException if the JMS provider fails to close the consumer due 630 * to some internal error. 631 */ 632 public void close() throws JMSException { 633 if (!unconsumedMessages.isClosed()) { 634 if (session.getTransactionContext().isInTransaction()) { 635 session.getTransactionContext().addSynchronization(new Synchronization() { 636 public void afterCommit() throws Exception { 637 doClose(); 638 } 639 640 public void afterRollback() throws Exception { 641 doClose(); 642 } 643 }); 644 } else { 645 doClose(); 646 } 647 } 648 } 649 650 void doClose() throws JMSException { 651 dispose(); 652 RemoveInfo removeCommand = info.createRemoveCommand(); 653 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); 654 this.session.asyncSendPacket(removeCommand); 655 } 656 657 void clearMessagesInProgress() { 658 // deal with delivered messages async to avoid lock contention with in progress acks 659 clearDispatchList = true; 660 synchronized (unconsumedMessages.getMutex()) { 661 if (LOG.isDebugEnabled()) { 662 LOG.debug(getConsumerId() + " clearing dispatched list (" + unconsumedMessages.size() + ") on transport interrupt"); 663 } 664 // ensure unconsumed are rolledback up front as they may get redelivered to another consumer 665 List<MessageDispatch> list = unconsumedMessages.removeAll(); 666 if (!this.info.isBrowser()) { 667 for (MessageDispatch old : list) { 668 session.connection.rollbackDuplicate(this, old.getMessage()); 669 } 670 } 671 } 672 // allow dispatch on this connection to resume 673 session.connection.transportInterruptionProcessingComplete(); 674 } 675 676 void deliverAcks() { 677 MessageAck ack = null; 678 if (deliveryingAcknowledgements.compareAndSet(false, true)) { 679 if (isAutoAcknowledgeEach()) { 680 synchronized(deliveredMessages) { 681 ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); 682 if (ack != null) { 683 deliveredMessages.clear(); 684 ackCounter = 0; 685 } else { 686 ack = pendingAck; 687 pendingAck = null; 688 } 689 } 690 } else if (pendingAck != null && pendingAck.isStandardAck()) { 691 ack = pendingAck; 692 pendingAck = null; 693 } 694 if (ack != null) { 695 final MessageAck ackToSend = ack; 696 697 if (executorService == null) { 698 executorService = Executors.newSingleThreadExecutor(); 699 } 700 executorService.submit(new Runnable() { 701 public void run() { 702 try { 703 session.sendAck(ackToSend,true); 704 } catch (JMSException e) { 705 LOG.error(getConsumerId() + " failed to delivered acknowledgements", e); 706 } finally { 707 deliveryingAcknowledgements.set(false); 708 } 709 } 710 }); 711 } else { 712 deliveryingAcknowledgements.set(false); 713 } 714 } 715 } 716 717 public void dispose() throws JMSException { 718 if (!unconsumedMessages.isClosed()) { 719 720 // Do we have any acks we need to send out before closing? 721 // Ack any delivered messages now. 722 if (!session.getTransacted()) { 723 deliverAcks(); 724 if (isAutoAcknowledgeBatch()) { 725 acknowledge(); 726 } 727 } 728 if (executorService != null) { 729 executorService.shutdown(); 730 try { 731 executorService.awaitTermination(60, TimeUnit.SECONDS); 732 } catch (InterruptedException e) { 733 Thread.currentThread().interrupt(); 734 } 735 } 736 737 if (session.isClientAcknowledge()) { 738 if (!this.info.isBrowser()) { 739 // rollback duplicates that aren't acknowledged 740 List<MessageDispatch> tmp = null; 741 synchronized (this.deliveredMessages) { 742 tmp = new ArrayList<MessageDispatch>(this.deliveredMessages); 743 } 744 for (MessageDispatch old : tmp) { 745 this.session.connection.rollbackDuplicate(this, old.getMessage()); 746 } 747 tmp.clear(); 748 } 749 } 750 if (!session.isTransacted()) { 751 synchronized(deliveredMessages) { 752 deliveredMessages.clear(); 753 } 754 } 755 unconsumedMessages.close(); 756 this.session.removeConsumer(this); 757 List<MessageDispatch> list = unconsumedMessages.removeAll(); 758 if (!this.info.isBrowser()) { 759 for (MessageDispatch old : list) { 760 // ensure we don't filter this as a duplicate 761 session.connection.rollbackDuplicate(this, old.getMessage()); 762 } 763 } 764 } 765 } 766 767 /** 768 * @throws IllegalStateException 769 */ 770 protected void checkClosed() throws IllegalStateException { 771 if (unconsumedMessages.isClosed()) { 772 throw new IllegalStateException("The Consumer is closed"); 773 } 774 } 775 776 /** 777 * If we have a zero prefetch specified then send a pull command to the 778 * broker to pull a message we are about to receive 779 */ 780 protected void sendPullCommand(long timeout) throws JMSException { 781 clearDispatchList(); 782 if (info.getPrefetchSize() == 0 && unconsumedMessages.isEmpty()) { 783 MessagePull messagePull = new MessagePull(); 784 messagePull.configure(info); 785 messagePull.setTimeout(timeout); 786 session.asyncSendPacket(messagePull); 787 } 788 } 789 790 protected void checkMessageListener() throws JMSException { 791 session.checkMessageListener(); 792 } 793 794 protected void setOptimizeAcknowledge(boolean value) { 795 if (optimizeAcknowledge && !value) { 796 deliverAcks(); 797 } 798 optimizeAcknowledge = value; 799 } 800 801 protected void setPrefetchSize(int prefetch) { 802 deliverAcks(); 803 this.info.setCurrentPrefetchSize(prefetch); 804 } 805 806 private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException { 807 md.setDeliverySequenceId(session.getNextDeliveryId()); 808 lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId(); 809 if (!isAutoAcknowledgeBatch()) { 810 synchronized(deliveredMessages) { 811 deliveredMessages.addFirst(md); 812 } 813 if (session.getTransacted()) { 814 ackLater(md, MessageAck.DELIVERED_ACK_TYPE); 815 } 816 } 817 } 818 819 private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException { 820 if (unconsumedMessages.isClosed()) { 821 return; 822 } 823 if (messageExpired) { 824 synchronized (deliveredMessages) { 825 deliveredMessages.remove(md); 826 } 827 stats.getExpiredMessageCount().increment(); 828 ackLater(md, MessageAck.DELIVERED_ACK_TYPE); 829 } else { 830 stats.onMessage(); 831 if (session.getTransacted()) { 832 // Do nothing. 833 } else if (isAutoAcknowledgeEach()) { 834 if (deliveryingAcknowledgements.compareAndSet(false, true)) { 835 synchronized (deliveredMessages) { 836 if (!deliveredMessages.isEmpty()) { 837 if (optimizeAcknowledge) { 838 ackCounter++; 839 if (ackCounter >= (info.getPrefetchSize() * .65) || System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAckTimeout)) { 840 MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); 841 if (ack != null) { 842 deliveredMessages.clear(); 843 ackCounter = 0; 844 session.sendAck(ack); 845 optimizeAckTimestamp = System.currentTimeMillis(); 846 } 847 } 848 } else { 849 MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); 850 if (ack!=null) { 851 deliveredMessages.clear(); 852 session.sendAck(ack); 853 } 854 } 855 } 856 } 857 deliveryingAcknowledgements.set(false); 858 } 859 } else if (isAutoAcknowledgeBatch()) { 860 ackLater(md, MessageAck.STANDARD_ACK_TYPE); 861 } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) { 862 boolean messageUnackedByConsumer = false; 863 synchronized (deliveredMessages) { 864 messageUnackedByConsumer = deliveredMessages.contains(md); 865 } 866 if (messageUnackedByConsumer) { 867 ackLater(md, MessageAck.DELIVERED_ACK_TYPE); 868 } 869 } 870 else { 871 throw new IllegalStateException("Invalid session state."); 872 } 873 } 874 } 875 876 /** 877 * Creates a MessageAck for all messages contained in deliveredMessages. 878 * Caller should hold the lock for deliveredMessages. 879 * 880 * @param type Ack-Type (i.e. MessageAck.STANDARD_ACK_TYPE) 881 * @return <code>null</code> if nothing to ack. 882 */ 883 private MessageAck makeAckForAllDeliveredMessages(byte type) { 884 synchronized (deliveredMessages) { 885 if (deliveredMessages.isEmpty()) 886 return null; 887 888 MessageDispatch md = deliveredMessages.getFirst(); 889 MessageAck ack = new MessageAck(md, type, deliveredMessages.size()); 890 ack.setFirstMessageId(deliveredMessages.getLast().getMessage().getMessageId()); 891 return ack; 892 } 893 } 894 895 private void ackLater(MessageDispatch md, byte ackType) throws JMSException { 896 897 // Don't acknowledge now, but we may need to let the broker know the 898 // consumer got the message to expand the pre-fetch window 899 if (session.getTransacted()) { 900 session.doStartTransaction(); 901 if (!synchronizationRegistered) { 902 synchronizationRegistered = true; 903 session.getTransactionContext().addSynchronization(new Synchronization() { 904 public void beforeEnd() throws Exception { 905 acknowledge(); 906 synchronizationRegistered = false; 907 } 908 909 public void afterCommit() throws Exception { 910 commit(); 911 synchronizationRegistered = false; 912 } 913 914 public void afterRollback() throws Exception { 915 rollback(); 916 synchronizationRegistered = false; 917 } 918 }); 919 } 920 } 921 922 deliveredCounter++; 923 924 MessageAck oldPendingAck = pendingAck; 925 pendingAck = new MessageAck(md, ackType, deliveredCounter); 926 pendingAck.setTransactionId(session.getTransactionContext().getTransactionId()); 927 if( oldPendingAck==null ) { 928 pendingAck.setFirstMessageId(pendingAck.getLastMessageId()); 929 } else if ( oldPendingAck.getAckType() == pendingAck.getAckType() ) { 930 pendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId()); 931 } else { 932 // old pending ack being superseded by ack of another type, if is is not a delivered 933 // ack and hence important, send it now so it is not lost. 934 if ( !oldPendingAck.isDeliveredAck()) { 935 if (LOG.isDebugEnabled()) { 936 LOG.debug("Sending old pending ack " + oldPendingAck + ", new pending: " + pendingAck); 937 } 938 session.sendAck(oldPendingAck); 939 } else { 940 if (LOG.isDebugEnabled()) { 941 LOG.debug("dropping old pending ack " + oldPendingAck + ", new pending: " + pendingAck); 942 } 943 } 944 } 945 946 if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter - additionalWindowSize)) { 947 session.sendAck(pendingAck); 948 pendingAck=null; 949 deliveredCounter = 0; 950 additionalWindowSize = 0; 951 } 952 } 953 954 /** 955 * Acknowledge all the messages that have been delivered to the client up to 956 * this point. 957 * 958 * @throws JMSException 959 */ 960 public void acknowledge() throws JMSException { 961 clearDispatchList(); 962 waitForRedeliveries(); 963 synchronized(deliveredMessages) { 964 // Acknowledge all messages so far. 965 MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); 966 if (ack == null) 967 return; // no msgs 968 969 if (session.getTransacted()) { 970 rollbackOnFailedRecoveryRedelivery(); 971 session.doStartTransaction(); 972 ack.setTransactionId(session.getTransactionContext().getTransactionId()); 973 } 974 session.sendAck(ack); 975 pendingAck = null; 976 977 // Adjust the counters 978 deliveredCounter = Math.max(0, deliveredCounter - deliveredMessages.size()); 979 additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size()); 980 981 if (!session.getTransacted()) { 982 deliveredMessages.clear(); 983 } 984 } 985 } 986 987 private void waitForRedeliveries() { 988 if (failoverRedeliveryWaitPeriod > 0 && previouslyDeliveredMessages != null) { 989 long expiry = System.currentTimeMillis() + failoverRedeliveryWaitPeriod; 990 int numberNotReplayed; 991 do { 992 numberNotReplayed = 0; 993 synchronized(deliveredMessages) { 994 if (previouslyDeliveredMessages != null) { 995 for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) { 996 if (!entry.getValue()) { 997 numberNotReplayed++; 998 } 999 } 1000 } 1001 } 1002 if (numberNotReplayed > 0) { 1003 LOG.info("waiting for redelivery of " + numberNotReplayed + " in transaction: " 1004 + previouslyDeliveredMessages.transactionId + ", to consumer :" + this.getConsumerId()); 1005 try { 1006 Thread.sleep(Math.max(500, failoverRedeliveryWaitPeriod/4)); 1007 } catch (InterruptedException outOfhere) { 1008 break; 1009 } 1010 } 1011 } while (numberNotReplayed > 0 && expiry < System.currentTimeMillis()); 1012 } 1013 } 1014 1015 /* 1016 * called with deliveredMessages locked 1017 */ 1018 private void rollbackOnFailedRecoveryRedelivery() throws JMSException { 1019 if (previouslyDeliveredMessages != null) { 1020 // if any previously delivered messages was not re-delivered, transaction is invalid and must rollback 1021 // as messages have been dispatched else where. 1022 int numberNotReplayed = 0; 1023 for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) { 1024 if (!entry.getValue()) { 1025 numberNotReplayed++; 1026 if (LOG.isDebugEnabled()) { 1027 LOG.debug("previously delivered message has not been replayed in transaction: " 1028 + previouslyDeliveredMessages.transactionId 1029 + " , messageId: " + entry.getKey()); 1030 } 1031 } 1032 } 1033 if (numberNotReplayed > 0) { 1034 String message = "rolling back transaction (" 1035 + previouslyDeliveredMessages.transactionId + ") post failover recovery. " + numberNotReplayed 1036 + " previously delivered message(s) not replayed to consumer: " + this.getConsumerId(); 1037 LOG.warn(message); 1038 throw new TransactionRolledBackException(message); 1039 } 1040 } 1041 } 1042 1043 void acknowledge(MessageDispatch md) throws JMSException { 1044 MessageAck ack = new MessageAck(md,MessageAck.INDIVIDUAL_ACK_TYPE,1); 1045 session.sendAck(ack); 1046 synchronized(deliveredMessages){ 1047 deliveredMessages.remove(md); 1048 } 1049 } 1050 1051 public void commit() throws JMSException { 1052 synchronized (deliveredMessages) { 1053 deliveredMessages.clear(); 1054 clearPreviouslyDelivered(); 1055 } 1056 redeliveryDelay = 0; 1057 } 1058 1059 public void rollback() throws JMSException { 1060 synchronized (unconsumedMessages.getMutex()) { 1061 if (optimizeAcknowledge) { 1062 // remove messages read but not acked at the broker yet through 1063 // optimizeAcknowledge 1064 if (!this.info.isBrowser()) { 1065 synchronized(deliveredMessages) { 1066 for (int i = 0; (i < deliveredMessages.size()) && (i < ackCounter); i++) { 1067 // ensure we don't filter this as a duplicate 1068 MessageDispatch md = deliveredMessages.removeLast(); 1069 session.connection.rollbackDuplicate(this, md.getMessage()); 1070 } 1071 } 1072 } 1073 } 1074 synchronized(deliveredMessages) { 1075 rollbackPreviouslyDeliveredAndNotRedelivered(); 1076 if (deliveredMessages.isEmpty()) { 1077 return; 1078 } 1079 1080 // Only increase the redelivery delay after the first redelivery.. 1081 MessageDispatch lastMd = deliveredMessages.getFirst(); 1082 final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter(); 1083 if (currentRedeliveryCount > 0) { 1084 redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay); 1085 } 1086 MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId(); 1087 1088 for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) { 1089 MessageDispatch md = iter.next(); 1090 md.getMessage().onMessageRolledBack(); 1091 // ensure we don't filter this as a duplicate 1092 session.connection.rollbackDuplicate(this, md.getMessage()); 1093 } 1094 1095 if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES 1096 && lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()) { 1097 // We need to NACK the messages so that they get sent to the 1098 // DLQ. 1099 // Acknowledge the last message. 1100 1101 MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size()); 1102 ack.setFirstMessageId(firstMsgId); 1103 session.sendAck(ack,true); 1104 // Adjust the window size. 1105 additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size()); 1106 redeliveryDelay = 0; 1107 } else { 1108 1109 // only redelivery_ack after first delivery 1110 if (currentRedeliveryCount > 0) { 1111 MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size()); 1112 ack.setFirstMessageId(firstMsgId); 1113 session.sendAck(ack,true); 1114 } 1115 1116 // stop the delivery of messages. 1117 unconsumedMessages.stop(); 1118 1119 for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) { 1120 MessageDispatch md = iter.next(); 1121 unconsumedMessages.enqueueFirst(md); 1122 } 1123 1124 if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) { 1125 // Start up the delivery again a little later. 1126 scheduler.executeAfterDelay(new Runnable() { 1127 public void run() { 1128 try { 1129 if (started.get()) { 1130 start(); 1131 } 1132 } catch (JMSException e) { 1133 session.connection.onAsyncException(e); 1134 } 1135 } 1136 }, redeliveryDelay); 1137 } else { 1138 start(); 1139 } 1140 1141 } 1142 deliveredCounter -= deliveredMessages.size(); 1143 deliveredMessages.clear(); 1144 } 1145 } 1146 if (messageListener.get() != null) { 1147 session.redispatch(this, unconsumedMessages); 1148 } 1149 } 1150 1151 /* 1152 * called with unconsumedMessages && deliveredMessages locked 1153 * remove any message not re-delivered as they can't be replayed to this 1154 * consumer on rollback 1155 */ 1156 private void rollbackPreviouslyDeliveredAndNotRedelivered() { 1157 if (previouslyDeliveredMessages != null) { 1158 for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) { 1159 if (!entry.getValue()) { 1160 removeFromDeliveredMessages(entry.getKey()); 1161 } 1162 } 1163 clearPreviouslyDelivered(); 1164 } 1165 } 1166 1167 /* 1168 * called with deliveredMessages locked 1169 */ 1170 private void removeFromDeliveredMessages(MessageId key) { 1171 Iterator<MessageDispatch> iterator = deliveredMessages.iterator(); 1172 while (iterator.hasNext()) { 1173 MessageDispatch candidate = iterator.next(); 1174 if (key.equals(candidate.getMessage().getMessageId())) { 1175 session.connection.rollbackDuplicate(this, candidate.getMessage()); 1176 iterator.remove(); 1177 break; 1178 } 1179 } 1180 } 1181 /* 1182 * called with deliveredMessages locked 1183 */ 1184 private void clearPreviouslyDelivered() { 1185 if (previouslyDeliveredMessages != null) { 1186 previouslyDeliveredMessages.clear(); 1187 previouslyDeliveredMessages = null; 1188 } 1189 } 1190 1191 public void dispatch(MessageDispatch md) { 1192 MessageListener listener = this.messageListener.get(); 1193 try { 1194 clearDispatchList(); 1195 synchronized (unconsumedMessages.getMutex()) { 1196 if (!unconsumedMessages.isClosed()) { 1197 if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) { 1198 if (listener != null && unconsumedMessages.isRunning()) { 1199 ActiveMQMessage message = createActiveMQMessage(md); 1200 beforeMessageIsConsumed(md); 1201 try { 1202 boolean expired = message.isExpired(); 1203 if (!expired) { 1204 listener.onMessage(message); 1205 } 1206 afterMessageIsConsumed(md, expired); 1207 } catch (RuntimeException e) { 1208 if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session.isIndividualAcknowledge()) { 1209 // Redeliver the message 1210 } else { 1211 // Transacted or Client ack: Deliver the 1212 // next message. 1213 afterMessageIsConsumed(md, false); 1214 } 1215 LOG.error(getConsumerId() + " Exception while processing message: " + e, e); 1216 } 1217 } else { 1218 unconsumedMessages.enqueue(md); 1219 if (availableListener != null) { 1220 availableListener.onMessageAvailable(this); 1221 } 1222 } 1223 } else { 1224 if (!session.isTransacted()) { 1225 if (LOG.isDebugEnabled()) { 1226 LOG.debug(getConsumerId() + " ignoring (auto acking) duplicate: " + md.getMessage()); 1227 } 1228 MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); 1229 session.sendAck(ack); 1230 } else { 1231 if (LOG.isDebugEnabled()) { 1232 LOG.debug(getConsumerId() + " tracking transacted redlivery of duplicate: " + md.getMessage()); 1233 } 1234 boolean needsPoisonAck = false; 1235 synchronized (deliveredMessages) { 1236 if (previouslyDeliveredMessages != null) { 1237 previouslyDeliveredMessages.put(md.getMessage().getMessageId(), true); 1238 } else { 1239 // delivery while pending redelivery to another consumer on the same connection 1240 // not waiting for redelivery will help here 1241 needsPoisonAck = true; 1242 } 1243 } 1244 if (needsPoisonAck) { 1245 LOG.warn("acking duplicate delivery as poison, redelivery must be pending to another" 1246 + " consumer on this connection, failoverRedeliveryWaitPeriod=" 1247 + failoverRedeliveryWaitPeriod + ". Message: " + md); 1248 MessageAck poisonAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); 1249 poisonAck.setFirstMessageId(md.getMessage().getMessageId()); 1250 session.sendAck(poisonAck); 1251 } else { 1252 ackLater(md, MessageAck.DELIVERED_ACK_TYPE); 1253 } 1254 } 1255 } 1256 } 1257 } 1258 if (++dispatchedCount % 1000 == 0) { 1259 dispatchedCount = 0; 1260 Thread.yield(); 1261 } 1262 } catch (Exception e) { 1263 session.connection.onClientInternalException(e); 1264 } 1265 } 1266 1267 // async (on next call) clear or track delivered as they may be flagged as duplicates if they arrive again 1268 private void clearDispatchList() { 1269 if (clearDispatchList) { 1270 synchronized (deliveredMessages) { 1271 if (clearDispatchList) { 1272 if (!deliveredMessages.isEmpty()) { 1273 if (session.isTransacted()) { 1274 if (LOG.isDebugEnabled()) { 1275 LOG.debug(getConsumerId() + " tracking existing transacted delivered list (" + deliveredMessages.size() + ") on transport interrupt"); 1276 } 1277 if (previouslyDeliveredMessages == null) { 1278 previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId, Boolean>(session.getTransactionContext().getTransactionId()); 1279 } 1280 for (MessageDispatch delivered : deliveredMessages) { 1281 previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false); 1282 } 1283 } else { 1284 if (LOG.isDebugEnabled()) { 1285 LOG.debug(getConsumerId() + " clearing delivered list (" + deliveredMessages.size() + ") on transport interrupt"); 1286 } 1287 deliveredMessages.clear(); 1288 pendingAck = null; 1289 } 1290 } 1291 clearDispatchList = false; 1292 } 1293 } 1294 } 1295 } 1296 1297 public int getMessageSize() { 1298 return unconsumedMessages.size(); 1299 } 1300 1301 public void start() throws JMSException { 1302 if (unconsumedMessages.isClosed()) { 1303 return; 1304 } 1305 started.set(true); 1306 unconsumedMessages.start(); 1307 session.executor.wakeup(); 1308 } 1309 1310 public void stop() { 1311 started.set(false); 1312 unconsumedMessages.stop(); 1313 } 1314 1315 public String toString() { 1316 return "ActiveMQMessageConsumer { value=" + info.getConsumerId() + ", started=" + started.get() 1317 + " }"; 1318 } 1319 1320 /** 1321 * Delivers a message to the message listener. 1322 * 1323 * @return 1324 * @throws JMSException 1325 */ 1326 public boolean iterate() { 1327 MessageListener listener = this.messageListener.get(); 1328 if (listener != null) { 1329 MessageDispatch md = unconsumedMessages.dequeueNoWait(); 1330 if (md != null) { 1331 try { 1332 ActiveMQMessage message = createActiveMQMessage(md); 1333 beforeMessageIsConsumed(md); 1334 listener.onMessage(message); 1335 afterMessageIsConsumed(md, false); 1336 } catch (JMSException e) { 1337 session.connection.onClientInternalException(e); 1338 } 1339 return true; 1340 } 1341 } 1342 return false; 1343 } 1344 1345 public boolean isInUse(ActiveMQTempDestination destination) { 1346 return info.getDestination().equals(destination); 1347 } 1348 1349 public long getLastDeliveredSequenceId() { 1350 return lastDeliveredSequenceId; 1351 } 1352 1353 public IOException getFailureError() { 1354 return failureError; 1355 } 1356 1357 public void setFailureError(IOException failureError) { 1358 this.failureError = failureError; 1359 } 1360 }