001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region; 018 019 import java.io.IOException; 020 import java.util.LinkedList; 021 import java.util.concurrent.atomic.AtomicLong; 022 023 import javax.jms.JMSException; 024 025 import org.apache.activemq.broker.Broker; 026 import org.apache.activemq.broker.ConnectionContext; 027 import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor; 028 import org.apache.activemq.broker.region.cursors.PendingMessageCursor; 029 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; 030 import org.apache.activemq.broker.region.policy.MessageEvictionStrategy; 031 import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy; 032 import org.apache.activemq.command.ConsumerControl; 033 import org.apache.activemq.command.ConsumerInfo; 034 import org.apache.activemq.command.Message; 035 import org.apache.activemq.command.MessageAck; 036 import org.apache.activemq.command.MessageDispatch; 037 import org.apache.activemq.command.MessageDispatchNotification; 038 import org.apache.activemq.command.MessagePull; 039 import org.apache.activemq.command.Response; 040 import org.apache.activemq.transaction.Synchronization; 041 import org.apache.activemq.usage.SystemUsage; 042 import org.apache.commons.logging.Log; 043 import org.apache.commons.logging.LogFactory; 044 045 public class TopicSubscription extends AbstractSubscription { 046 047 private static final Log LOG = LogFactory.getLog(TopicSubscription.class); 048 private static final AtomicLong CURSOR_NAME_COUNTER = new AtomicLong(0); 049 050 protected PendingMessageCursor matched; 051 protected final SystemUsage usageManager; 052 protected AtomicLong dispatchedCounter = new AtomicLong(); 053 054 boolean singleDestination = true; 055 Destination destination; 056 057 private int maximumPendingMessages = -1; 058 private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy(); 059 private int discarded; 060 private final Object matchedListMutex = new Object(); 061 private final AtomicLong enqueueCounter = new AtomicLong(0); 062 private final AtomicLong dequeueCounter = new AtomicLong(0); 063 private int memoryUsageHighWaterMark = 95; 064 private boolean slowConsumer; 065 066 public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception { 067 super(broker, context, info); 068 this.usageManager = usageManager; 069 String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]"; 070 if (info.getDestination().isTemporary() || broker == null || broker.getTempDataStore()==null ) { 071 this.matched = new VMPendingMessageCursor(); 072 } else { 073 this.matched = new FilePendingMessageCursor(broker,matchedName); 074 } 075 } 076 077 public void init() throws Exception { 078 this.matched.setSystemUsage(usageManager); 079 this.matched.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); 080 this.matched.start(); 081 } 082 083 public void add(MessageReference node) throws Exception { 084 enqueueCounter.incrementAndGet(); 085 if (!isFull() && matched.isEmpty() && !isSlave()) { 086 // if maximumPendingMessages is set we will only discard messages which 087 // have not been dispatched (i.e. we allow the prefetch buffer to be filled) 088 dispatch(node); 089 slowConsumer=false; 090 } else { 091 //we are slow 092 if(!slowConsumer) { 093 slowConsumer=true; 094 for (Destination dest: destinations) { 095 dest.slowConsumer(getContext(), this); 096 } 097 } 098 if (maximumPendingMessages != 0) { 099 synchronized(matchedListMutex){ 100 while (matched.isFull()){ 101 if (getContext().getStopping().get()) { 102 LOG.warn("stopped waiting for space in pendingMessage cursor for: " + node.getMessageId()); 103 enqueueCounter.decrementAndGet(); 104 return; 105 } 106 matchedListMutex.wait(20); 107 } 108 matched.addMessageLast(node); 109 } 110 synchronized (matchedListMutex) { 111 112 // NOTE - be careful about the slaveBroker! 113 if (maximumPendingMessages > 0) { 114 // calculate the high water mark from which point we 115 // will eagerly evict expired messages 116 int max = messageEvictionStrategy.getEvictExpiredMessagesHighWatermark(); 117 if (maximumPendingMessages > 0 && maximumPendingMessages < max) { 118 max = maximumPendingMessages; 119 } 120 if (!matched.isEmpty() && matched.size() > max) { 121 removeExpiredMessages(); 122 } 123 // lets discard old messages as we are a slow consumer 124 while (!matched.isEmpty() && matched.size() > maximumPendingMessages) { 125 int pageInSize = matched.size() - maximumPendingMessages; 126 // only page in a 1000 at a time - else we could 127 // blow da memory 128 pageInSize = Math.max(1000, pageInSize); 129 LinkedList<MessageReference> list = null; 130 MessageReference[] oldMessages=null; 131 synchronized(matched){ 132 list = matched.pageInList(pageInSize); 133 oldMessages = messageEvictionStrategy.evictMessages(list); 134 for (MessageReference ref : list) { 135 ref.decrementReferenceCount(); 136 } 137 } 138 int messagesToEvict = 0; 139 if (oldMessages != null){ 140 messagesToEvict = oldMessages.length; 141 for (int i = 0; i < messagesToEvict; i++) { 142 MessageReference oldMessage = oldMessages[i]; 143 discard(oldMessage); 144 } 145 } 146 // lets avoid an infinite loop if we are given a bad 147 // eviction strategy 148 // for a bad strategy lets just not evict 149 if (messagesToEvict == 0) { 150 LOG.warn("No messages to evict returned from eviction strategy: " + messageEvictionStrategy); 151 break; 152 } 153 } 154 } 155 } 156 dispatchMatched(); 157 } 158 } 159 } 160 161 /** 162 * Discard any expired messages from the matched list. Called from a 163 * synchronized block. 164 * 165 * @throws IOException 166 */ 167 protected void removeExpiredMessages() throws IOException { 168 try { 169 matched.reset(); 170 while (matched.hasNext()) { 171 MessageReference node = matched.next(); 172 node.decrementReferenceCount(); 173 if (broker.isExpired(node)) { 174 matched.remove(); 175 dispatchedCounter.incrementAndGet(); 176 node.decrementReferenceCount(); 177 node.getRegionDestination().getDestinationStatistics().getExpired().increment(); 178 broker.messageExpired(getContext(), node); 179 break; 180 } 181 } 182 } finally { 183 matched.release(); 184 } 185 } 186 187 public void processMessageDispatchNotification(MessageDispatchNotification mdn) { 188 synchronized (matchedListMutex) { 189 try { 190 matched.reset(); 191 while (matched.hasNext()) { 192 MessageReference node = matched.next(); 193 node.decrementReferenceCount(); 194 if (node.getMessageId().equals(mdn.getMessageId())) { 195 matched.remove(); 196 dispatchedCounter.incrementAndGet(); 197 node.decrementReferenceCount(); 198 break; 199 } 200 } 201 } finally { 202 matched.release(); 203 } 204 } 205 } 206 207 public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception { 208 // Handle the standard acknowledgment case. 209 if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) { 210 if (context.isInTransaction()) { 211 context.getTransaction().addSynchronization(new Synchronization() { 212 213 public void afterCommit() throws Exception { 214 synchronized (TopicSubscription.this) { 215 if (singleDestination && destination != null) { 216 destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount()); 217 } 218 } 219 dequeueCounter.addAndGet(ack.getMessageCount()); 220 dispatchMatched(); 221 } 222 }); 223 } else { 224 if (singleDestination && destination != null) { 225 destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount()); 226 destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount()); 227 } 228 dequeueCounter.addAndGet(ack.getMessageCount()); 229 } 230 dispatchMatched(); 231 return; 232 } else if (ack.isDeliveredAck()) { 233 // Message was delivered but not acknowledged: update pre-fetch 234 // counters. 235 // also. get these for a consumer expired message. 236 if (destination != null && !ack.isInTransaction()) { 237 destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount()); 238 destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount()); 239 } 240 dequeueCounter.addAndGet(ack.getMessageCount()); 241 dispatchMatched(); 242 return; 243 } 244 throw new JMSException("Invalid acknowledgment: " + ack); 245 } 246 247 public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception { 248 // not supported for topics 249 return null; 250 } 251 252 public int getPendingQueueSize() { 253 return matched(); 254 } 255 256 public int getDispatchedQueueSize() { 257 return (int)(dispatchedCounter.get() - dequeueCounter.get()); 258 } 259 260 public int getMaximumPendingMessages() { 261 return maximumPendingMessages; 262 } 263 264 public long getDispatchedCounter() { 265 return dispatchedCounter.get(); 266 } 267 268 public long getEnqueueCounter() { 269 return enqueueCounter.get(); 270 } 271 272 public long getDequeueCounter() { 273 return dequeueCounter.get(); 274 } 275 276 /** 277 * @return the number of messages discarded due to being a slow consumer 278 */ 279 public int discarded() { 280 synchronized (matchedListMutex) { 281 return discarded; 282 } 283 } 284 285 /** 286 * @return the number of matched messages (messages targeted for the 287 * subscription but not yet able to be dispatched due to the 288 * prefetch buffer being full). 289 */ 290 public int matched() { 291 synchronized (matchedListMutex) { 292 return matched.size(); 293 } 294 } 295 296 /** 297 * Sets the maximum number of pending messages that can be matched against 298 * this consumer before old messages are discarded. 299 */ 300 public void setMaximumPendingMessages(int maximumPendingMessages) { 301 this.maximumPendingMessages = maximumPendingMessages; 302 } 303 304 public MessageEvictionStrategy getMessageEvictionStrategy() { 305 return messageEvictionStrategy; 306 } 307 308 /** 309 * Sets the eviction strategy used to decide which message to evict when the 310 * slow consumer needs to discard messages 311 */ 312 public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) { 313 this.messageEvictionStrategy = messageEvictionStrategy; 314 } 315 316 // Implementation methods 317 // ------------------------------------------------------------------------- 318 public boolean isFull() { 319 return getDispatchedQueueSize() >= info.getPrefetchSize(); 320 } 321 322 public int getInFlightSize() { 323 return getDispatchedQueueSize(); 324 } 325 326 327 /** 328 * @return true when 60% or more room is left for dispatching messages 329 */ 330 public boolean isLowWaterMark() { 331 return getDispatchedQueueSize() <= (info.getPrefetchSize() * .4); 332 } 333 334 /** 335 * @return true when 10% or less room is left for dispatching messages 336 */ 337 public boolean isHighWaterMark() { 338 return getDispatchedQueueSize() >= (info.getPrefetchSize() * .9); 339 } 340 341 /** 342 * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set 343 */ 344 public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) { 345 this.memoryUsageHighWaterMark = memoryUsageHighWaterMark; 346 } 347 348 /** 349 * @return the memoryUsageHighWaterMark 350 */ 351 public int getMemoryUsageHighWaterMark() { 352 return this.memoryUsageHighWaterMark; 353 } 354 355 /** 356 * @return the usageManager 357 */ 358 public SystemUsage getUsageManager() { 359 return this.usageManager; 360 } 361 362 /** 363 * @return the matched 364 */ 365 public PendingMessageCursor getMatched() { 366 return this.matched; 367 } 368 369 /** 370 * @param matched the matched to set 371 */ 372 public void setMatched(PendingMessageCursor matched) { 373 this.matched = matched; 374 } 375 376 /** 377 * inform the MessageConsumer on the client to change it's prefetch 378 * 379 * @param newPrefetch 380 */ 381 public void updateConsumerPrefetch(int newPrefetch) { 382 if (context != null && context.getConnection() != null && context.getConnection().isManageable()) { 383 ConsumerControl cc = new ConsumerControl(); 384 cc.setConsumerId(info.getConsumerId()); 385 cc.setPrefetch(newPrefetch); 386 context.getConnection().dispatchAsync(cc); 387 } 388 } 389 390 private void dispatchMatched() throws IOException { 391 synchronized (matchedListMutex) { 392 if (!matched.isEmpty() && !isFull()) { 393 try { 394 matched.reset(); 395 396 while (matched.hasNext() && !isFull()) { 397 MessageReference message = (MessageReference) matched.next(); 398 message.decrementReferenceCount(); 399 matched.remove(); 400 // Message may have been sitting in the matched list a 401 // while 402 // waiting for the consumer to ak the message. 403 if (message.isExpired()) { 404 discard(message); 405 continue; // just drop it. 406 } 407 dispatch(message); 408 } 409 } finally { 410 matched.release(); 411 } 412 } 413 } 414 } 415 416 private void dispatch(final MessageReference node) throws IOException { 417 Message message = (Message)node; 418 node.incrementReferenceCount(); 419 // Make sure we can dispatch a message. 420 MessageDispatch md = new MessageDispatch(); 421 md.setMessage(message); 422 md.setConsumerId(info.getConsumerId()); 423 md.setDestination(node.getRegionDestination().getActiveMQDestination()); 424 dispatchedCounter.incrementAndGet(); 425 // Keep track if this subscription is receiving messages from a single 426 // destination. 427 if (singleDestination) { 428 if (destination == null) { 429 destination = node.getRegionDestination(); 430 } else { 431 if (destination != node.getRegionDestination()) { 432 singleDestination = false; 433 } 434 } 435 } 436 if (info.isDispatchAsync()) { 437 md.setTransmitCallback(new Runnable() { 438 439 public void run() { 440 node.getRegionDestination().getDestinationStatistics().getDispatched().increment(); 441 node.getRegionDestination().getDestinationStatistics().getInflight().increment(); 442 node.decrementReferenceCount(); 443 } 444 }); 445 context.getConnection().dispatchAsync(md); 446 } else { 447 context.getConnection().dispatchSync(md); 448 node.getRegionDestination().getDestinationStatistics().getDispatched().increment(); 449 node.getRegionDestination().getDestinationStatistics().getInflight().increment(); 450 node.decrementReferenceCount(); 451 } 452 } 453 454 private void discard(MessageReference message) { 455 message.decrementReferenceCount(); 456 matched.remove(message); 457 discarded++; 458 if(destination != null) { 459 destination.getDestinationStatistics().getDequeues().increment(); 460 } 461 if (LOG.isDebugEnabled()) { 462 LOG.debug("Discarding message " + message); 463 } 464 Destination dest = message.getRegionDestination(); 465 if (dest != null) { 466 dest.messageDiscarded(getContext(), message); 467 } 468 broker.getRoot().sendToDeadLetterQueue(getContext(), message); 469 } 470 471 public String toString() { 472 return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered=" 473 + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded(); 474 } 475 476 public void destroy() { 477 synchronized (matchedListMutex) { 478 try { 479 matched.destroy(); 480 } catch (Exception e) { 481 LOG.warn("Failed to destroy cursor", e); 482 } 483 } 484 } 485 486 public int getPrefetchSize() { 487 return (int)info.getPrefetchSize(); 488 } 489 490 }