001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region; 018 019 import org.apache.activemq.broker.BrokerService; 020 import org.apache.activemq.broker.ConnectionContext; 021 import org.apache.activemq.broker.ProducerBrokerExchange; 022 import org.apache.activemq.broker.region.policy.DispatchPolicy; 023 import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy; 024 import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy; 025 import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy; 026 import org.apache.activemq.command.ActiveMQDestination; 027 import org.apache.activemq.command.ExceptionResponse; 028 import org.apache.activemq.command.Message; 029 import org.apache.activemq.command.MessageAck; 030 import org.apache.activemq.command.MessageId; 031 import org.apache.activemq.command.ProducerAck; 032 import org.apache.activemq.command.ProducerInfo; 033 import org.apache.activemq.command.Response; 034 import org.apache.activemq.command.SubscriptionInfo; 035 import org.apache.activemq.filter.MessageEvaluationContext; 036 import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 037 import org.apache.activemq.store.MessageRecoveryListener; 038 import org.apache.activemq.store.TopicMessageStore; 039 import org.apache.activemq.thread.Task; 040 import org.apache.activemq.thread.TaskRunner; 041 import org.apache.activemq.thread.TaskRunnerFactory; 042 import org.apache.activemq.thread.Valve; 043 import org.apache.activemq.transaction.Synchronization; 044 import org.apache.activemq.usage.Usage; 045 import org.apache.activemq.util.SubscriptionKey; 046 import org.apache.commons.logging.Log; 047 import org.apache.commons.logging.LogFactory; 048 import java.io.IOException; 049 import java.util.ArrayList; 050 import java.util.LinkedList; 051 import java.util.List; 052 import java.util.Set; 053 import java.util.concurrent.ConcurrentHashMap; 054 import java.util.concurrent.CopyOnWriteArrayList; 055 import java.util.concurrent.CopyOnWriteArraySet; 056 057 /** 058 * The Topic is a destination that sends a copy of a message to every active 059 * Subscription registered. 060 * 061 * @version $Revision: 1.21 $ 062 */ 063 public class Topic extends BaseDestination implements Task { 064 protected static final Log LOG = LogFactory.getLog(Topic.class); 065 private final TopicMessageStore topicStore; 066 protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>(); 067 protected final Valve dispatchValve = new Valve(true); 068 private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy(); 069 private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy; 070 private final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubcribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>(); 071 private final TaskRunner taskRunner; 072 private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>(); 073 private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() { 074 public void run() { 075 try { 076 Topic.this.taskRunner.wakeup(); 077 } catch (InterruptedException e) { 078 } 079 }; 080 }; 081 082 public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store, DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception { 083 super(brokerService, store, destination, parentStats); 084 this.topicStore = store; 085 //set default subscription recovery policy 086 subscriptionRecoveryPolicy = new NoSubscriptionRecoveryPolicy(); 087 this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName()); 088 } 089 090 public void initialize() throws Exception { 091 super.initialize(); 092 if (store != null) { 093 // AMQ-2586: Better to leave this stat at zero than to give the user misleading metrics. 094 // int messageCount = store.getMessageCount(); 095 // destinationStatistics.getMessages().setCount(messageCount); 096 } 097 } 098 099 public List<Subscription> getConsumers() { 100 synchronized (consumers) { 101 return new ArrayList<Subscription>(consumers); 102 } 103 } 104 105 public boolean lock(MessageReference node, LockOwner sub) { 106 return true; 107 } 108 109 public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception { 110 111 destinationStatistics.getConsumers().increment(); 112 113 if (!sub.getConsumerInfo().isDurable()) { 114 115 // Do a retroactive recovery if needed. 116 if (sub.getConsumerInfo().isRetroactive()) { 117 118 // synchronize with dispatch method so that no new messages are 119 // sent 120 // while we are recovering a subscription to avoid out of order 121 // messages. 122 dispatchValve.turnOff(); 123 try { 124 125 synchronized (consumers) { 126 sub.add(context, this); 127 consumers.add(sub); 128 } 129 subscriptionRecoveryPolicy.recover(context, this, sub); 130 131 } finally { 132 dispatchValve.turnOn(); 133 } 134 135 } else { 136 synchronized (consumers) { 137 sub.add(context, this); 138 consumers.add(sub); 139 } 140 } 141 } else { 142 sub.add(context, this); 143 DurableTopicSubscription dsub = (DurableTopicSubscription) sub; 144 durableSubcribers.put(dsub.getSubscriptionKey(), dsub); 145 } 146 } 147 148 public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception { 149 if (!sub.getConsumerInfo().isDurable()) { 150 destinationStatistics.getConsumers().decrement(); 151 synchronized (consumers) { 152 consumers.remove(sub); 153 } 154 } 155 sub.remove(context, this); 156 } 157 158 public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws IOException { 159 if (topicStore != null) { 160 topicStore.deleteSubscription(key.clientId, key.subscriptionName); 161 Object removed = durableSubcribers.remove(key); 162 if (removed != null) { 163 destinationStatistics.getConsumers().decrement(); 164 } 165 } 166 } 167 168 public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception { 169 // synchronize with dispatch method so that no new messages are sent 170 // while 171 // we are recovering a subscription to avoid out of order messages. 172 dispatchValve.turnOff(); 173 try { 174 175 if (topicStore == null) { 176 return; 177 } 178 179 // Recover the durable subscription. 180 String clientId = subscription.getSubscriptionKey().getClientId(); 181 String subscriptionName = subscription.getSubscriptionKey().getSubscriptionName(); 182 String selector = subscription.getConsumerInfo().getSelector(); 183 SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName); 184 if (info != null) { 185 // Check to see if selector changed. 186 String s1 = info.getSelector(); 187 if (s1 == null ^ selector == null || (s1 != null && !s1.equals(selector))) { 188 // Need to delete the subscription 189 topicStore.deleteSubscription(clientId, subscriptionName); 190 info = null; 191 } else { 192 synchronized (consumers) { 193 consumers.add(subscription); 194 } 195 } 196 } 197 // Do we need to create the subscription? 198 if (info == null) { 199 info = new SubscriptionInfo(); 200 info.setClientId(clientId); 201 info.setSelector(selector); 202 info.setSubscriptionName(subscriptionName); 203 info.setDestination(getActiveMQDestination()); 204 // This destination is an actual destination id. 205 info.setSubscribedDestination(subscription.getConsumerInfo().getDestination()); 206 // This destination might be a pattern 207 synchronized (consumers) { 208 consumers.add(subscription); 209 topicStore.addSubsciption(info, subscription.getConsumerInfo().isRetroactive()); 210 } 211 } 212 213 final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); 214 msgContext.setDestination(destination); 215 if (subscription.isRecoveryRequired()) { 216 topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() { 217 public boolean recoverMessage(Message message) throws Exception { 218 message.setRegionDestination(Topic.this); 219 try { 220 msgContext.setMessageReference(message); 221 if (subscription.matches(message, msgContext)) { 222 subscription.add(message); 223 } 224 } catch (IOException e) { 225 LOG.error("Failed to recover this message " + message); 226 } 227 return true; 228 } 229 230 public boolean recoverMessageReference(MessageId messageReference) throws Exception { 231 throw new RuntimeException("Should not be called."); 232 } 233 234 public boolean hasSpace() { 235 return true; 236 } 237 238 public boolean isDuplicate(MessageId id) { 239 return false; 240 } 241 }); 242 } 243 } finally { 244 dispatchValve.turnOn(); 245 } 246 } 247 248 public void deactivate(ConnectionContext context, DurableTopicSubscription sub) throws Exception { 249 synchronized (consumers) { 250 consumers.remove(sub); 251 } 252 sub.remove(context, this); 253 } 254 255 protected void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception { 256 if (subscription.getConsumerInfo().isRetroactive()) { 257 subscriptionRecoveryPolicy.recover(context, this, subscription); 258 } 259 } 260 261 public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception { 262 final ConnectionContext context = producerExchange.getConnectionContext(); 263 264 final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo(); 265 final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 && !context.isInRecoveryMode(); 266 267 // There is delay between the client sending it and it arriving at the 268 // destination.. it may have expired. 269 if (message.isExpired()) { 270 broker.messageExpired(context, message); 271 getDestinationStatistics().getExpired().increment(); 272 if (sendProducerAck) { 273 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 274 context.getConnection().dispatchAsync(ack); 275 } 276 return; 277 } 278 279 if (memoryUsage.isFull()) { 280 isFull(context, memoryUsage); 281 fastProducer(context, producerInfo); 282 283 if (isProducerFlowControl() && context.isProducerFlowControl()) { 284 285 if (warnOnProducerFlowControl) { 286 warnOnProducerFlowControl = false; 287 LOG.info("Usage Manager memory limit ("+ memoryUsage.getLimit() + ") reached for " + getActiveMQDestination().getQualifiedName() 288 + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it." 289 + " See http://activemq.apache.org/producer-flow-control.html for more info"); 290 } 291 292 if (systemUsage.isSendFailIfNoSpace()) { 293 throw new javax.jms.ResourceAllocationException("Usage Manager memory limit ("+ memoryUsage.getLimit() + ") reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding " 294 + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info"); 295 } 296 297 // We can avoid blocking due to low usage if the producer is sending 298 // a sync message or 299 // if it is using a producer window 300 if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) { 301 synchronized (messagesWaitingForSpace) { 302 messagesWaitingForSpace.add(new Runnable() { 303 public void run() { 304 305 try { 306 307 // While waiting for space to free up... the 308 // message may have expired. 309 if (message.isExpired()) { 310 broker.messageExpired(context, message); 311 getDestinationStatistics().getExpired().increment(); 312 } else { 313 doMessageSend(producerExchange, message); 314 } 315 316 if (sendProducerAck) { 317 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 318 context.getConnection().dispatchAsync(ack); 319 } else { 320 Response response = new Response(); 321 response.setCorrelationId(message.getCommandId()); 322 context.getConnection().dispatchAsync(response); 323 } 324 325 } catch (Exception e) { 326 if (!sendProducerAck && !context.isInRecoveryMode()) { 327 ExceptionResponse response = new ExceptionResponse(e); 328 response.setCorrelationId(message.getCommandId()); 329 context.getConnection().dispatchAsync(response); 330 } 331 } 332 333 } 334 }); 335 336 // If the user manager is not full, then the task will not 337 // get called.. 338 if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) { 339 // so call it directly here. 340 sendMessagesWaitingForSpaceTask.run(); 341 } 342 context.setDontSendReponse(true); 343 return; 344 } 345 346 } else { 347 // Producer flow control cannot be used, so we have do the flow 348 // control at the broker 349 // by blocking this thread until there is space available. 350 351 if (memoryUsage.isFull()) { 352 if (context.isInTransaction()) { 353 354 int count = 0; 355 while (!memoryUsage.waitForSpace(1000)) { 356 if (context.getStopping().get()) { 357 throw new IOException("Connection closed, send aborted."); 358 } 359 if (count > 2 && context.isInTransaction()) { 360 count = 0; 361 int size = context.getTransaction().size(); 362 LOG.warn("Waiting for space to send transacted message - transaction elements = " + size + " need more space to commit. Message = " + message); 363 } 364 } 365 } else { 366 waitForSpace(context, memoryUsage, "Usage Manager memory limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding " 367 + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info"); 368 } 369 } 370 371 // The usage manager could have delayed us by the time 372 // we unblock the message could have expired.. 373 if (message.isExpired()) { 374 getDestinationStatistics().getExpired().increment(); 375 if (LOG.isDebugEnabled()) { 376 LOG.debug("Expired message: " + message); 377 } 378 return; 379 } 380 } 381 } 382 } 383 384 doMessageSend(producerExchange, message); 385 messageDelivered(context, message); 386 if (sendProducerAck) { 387 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 388 context.getConnection().dispatchAsync(ack); 389 } 390 } 391 392 /** 393 * do send the message - this needs to be synchronized to ensure messages 394 * are stored AND dispatched in the right order 395 * 396 * @param producerExchange 397 * @param message 398 * @throws IOException 399 * @throws Exception 400 */ 401 synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception { 402 final ConnectionContext context = producerExchange.getConnectionContext(); 403 message.setRegionDestination(this); 404 message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); 405 406 if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) { 407 if (systemUsage.getStoreUsage().isFull()) { 408 final String logMessage = "Usage Manager Store is Full. Stopping producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." 409 + " See http://activemq.apache.org/producer-flow-control.html for more info"; 410 if (systemUsage.isSendFailIfNoSpace()) { 411 throw new javax.jms.ResourceAllocationException(logMessage); 412 } 413 414 waitForSpace(context, systemUsage.getStoreUsage(), logMessage); 415 } 416 topicStore.addMessage(context, message); 417 } 418 419 message.incrementReferenceCount(); 420 421 if (context.isInTransaction()) { 422 context.getTransaction().addSynchronization(new Synchronization() { 423 public void afterCommit() throws Exception { 424 // It could take while before we receive the commit 425 // operration.. by that time the message could have 426 // expired.. 427 if (broker.isExpired(message)) { 428 getDestinationStatistics().getExpired().increment(); 429 broker.messageExpired(context, message); 430 message.decrementReferenceCount(); 431 return; 432 } 433 try { 434 dispatch(context, message); 435 } finally { 436 message.decrementReferenceCount(); 437 } 438 } 439 }); 440 441 } else { 442 try { 443 dispatch(context, message); 444 } finally { 445 message.decrementReferenceCount(); 446 } 447 } 448 449 } 450 451 private boolean canOptimizeOutPersistence() { 452 return durableSubcribers.size() == 0; 453 } 454 455 public String toString() { 456 return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size(); 457 } 458 459 public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException { 460 if (topicStore != null && node.isPersistent()) { 461 DurableTopicSubscription dsub = (DurableTopicSubscription) sub; 462 SubscriptionKey key = dsub.getSubscriptionKey(); 463 topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId()); 464 } 465 messageConsumed(context, node); 466 } 467 468 public void gc() { 469 } 470 471 public Message loadMessage(MessageId messageId) throws IOException { 472 return topicStore != null ? topicStore.getMessage(messageId) : null; 473 } 474 475 public void start() throws Exception { 476 this.subscriptionRecoveryPolicy.start(); 477 if (memoryUsage != null) { 478 memoryUsage.start(); 479 } 480 481 } 482 483 public void stop() throws Exception { 484 if (taskRunner != null) { 485 taskRunner.shutdown(); 486 } 487 this.subscriptionRecoveryPolicy.stop(); 488 if (memoryUsage != null) { 489 memoryUsage.stop(); 490 } 491 if (this.topicStore != null) { 492 this.topicStore.stop(); 493 } 494 } 495 496 public Message[] browse() { 497 final Set<Message> result = new CopyOnWriteArraySet<Message>(); 498 try { 499 if (topicStore != null) { 500 topicStore.recover(new MessageRecoveryListener() { 501 public boolean recoverMessage(Message message) throws Exception { 502 result.add(message); 503 return true; 504 } 505 506 public boolean recoverMessageReference(MessageId messageReference) throws Exception { 507 return true; 508 } 509 510 public boolean hasSpace() { 511 return true; 512 } 513 514 public boolean isDuplicate(MessageId id) { 515 return false; 516 } 517 }); 518 Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination()); 519 if (msgs != null) { 520 for (int i = 0; i < msgs.length; i++) { 521 result.add(msgs[i]); 522 } 523 } 524 } 525 } catch (Throwable e) { 526 LOG.warn("Failed to browse Topic: " + getActiveMQDestination().getPhysicalName(), e); 527 } 528 return result.toArray(new Message[result.size()]); 529 } 530 531 public boolean iterate() { 532 synchronized (messagesWaitingForSpace) { 533 while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) { 534 Runnable op = messagesWaitingForSpace.removeFirst(); 535 op.run(); 536 } 537 } 538 return false; 539 } 540 541 // Properties 542 // ------------------------------------------------------------------------- 543 544 public DispatchPolicy getDispatchPolicy() { 545 return dispatchPolicy; 546 } 547 548 public void setDispatchPolicy(DispatchPolicy dispatchPolicy) { 549 this.dispatchPolicy = dispatchPolicy; 550 } 551 552 public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() { 553 return subscriptionRecoveryPolicy; 554 } 555 556 public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy) { 557 this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy; 558 } 559 560 // Implementation methods 561 // ------------------------------------------------------------------------- 562 563 public final void wakeup() { 564 } 565 566 protected void dispatch(final ConnectionContext context, Message message) throws Exception { 567 // AMQ-2586: Better to leave this stat at zero than to give the user misleading metrics. 568 // destinationStatistics.getMessages().increment(); 569 destinationStatistics.getEnqueues().increment(); 570 dispatchValve.increment(); 571 MessageEvaluationContext msgContext = null; 572 try { 573 if (!subscriptionRecoveryPolicy.add(context, message)) { 574 return; 575 } 576 synchronized (consumers) { 577 if (consumers.isEmpty()) { 578 onMessageWithNoConsumers(context, message); 579 return; 580 } 581 } 582 msgContext = context.getMessageEvaluationContext(); 583 msgContext.setDestination(destination); 584 msgContext.setMessageReference(message); 585 if (!dispatchPolicy.dispatch(message, msgContext, consumers)) { 586 onMessageWithNoConsumers(context, message); 587 } 588 589 } finally { 590 dispatchValve.decrement(); 591 if (msgContext != null) { 592 msgContext.clear(); 593 } 594 } 595 } 596 597 public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) { 598 broker.messageExpired(context, reference); 599 // AMQ-2586: Better to leave this stat at zero than to give the user misleading metrics. 600 // destinationStatistics.getMessages().decrement(); 601 destinationStatistics.getEnqueues().decrement(); 602 destinationStatistics.getExpired().increment(); 603 MessageAck ack = new MessageAck(); 604 ack.setAckType(MessageAck.STANDARD_ACK_TYPE); 605 ack.setDestination(destination); 606 ack.setMessageID(reference.getMessageId()); 607 try { 608 acknowledge(context, subs, ack, reference); 609 } catch (IOException e) { 610 LOG.error("Failed to remove expired Message from the store ", e); 611 } 612 } 613 614 private final void waitForSpace(ConnectionContext context, Usage<?> usage, String warning) throws IOException, InterruptedException { 615 long start = System.currentTimeMillis(); 616 long nextWarn = start + blockedProducerWarningInterval; 617 while (!usage.waitForSpace(1000)) { 618 if (context.getStopping().get()) { 619 throw new IOException("Connection closed, send aborted."); 620 } 621 622 long now = System.currentTimeMillis(); 623 if (now >= nextWarn) { 624 LOG.info(warning + " (blocking for: " + (now - start) / 1000 + "s)"); 625 nextWarn = now + blockedProducerWarningInterval; 626 } 627 } 628 } 629 630 }