001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region; 018 019 import java.io.IOException; 020 021 import org.apache.activemq.advisory.AdvisorySupport; 022 import org.apache.activemq.broker.Broker; 023 import org.apache.activemq.broker.BrokerService; 024 import org.apache.activemq.broker.ConnectionContext; 025 import org.apache.activemq.broker.ProducerBrokerExchange; 026 import org.apache.activemq.broker.region.policy.DeadLetterStrategy; 027 import org.apache.activemq.command.ActiveMQDestination; 028 import org.apache.activemq.command.ActiveMQTopic; 029 import org.apache.activemq.command.Message; 030 import org.apache.activemq.command.MessageDispatchNotification; 031 import org.apache.activemq.command.ProducerInfo; 032 import org.apache.activemq.state.ProducerState; 033 import org.apache.activemq.store.MessageStore; 034 import org.apache.activemq.usage.MemoryUsage; 035 import org.apache.activemq.usage.SystemUsage; 036 import org.apache.activemq.usage.Usage; 037 038 /** 039 * @version $Revision: 1.12 $ 040 */ 041 public abstract class BaseDestination implements Destination { 042 /** 043 * The maximum number of messages to page in to the destination from 044 * persistent storage 045 */ 046 public static final int MAX_PAGE_SIZE = 200; 047 public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2; 048 public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000; 049 protected final ActiveMQDestination destination; 050 protected final Broker broker; 051 protected final MessageStore store; 052 protected SystemUsage systemUsage; 053 protected MemoryUsage memoryUsage; 054 private boolean producerFlowControl = true; 055 protected boolean warnOnProducerFlowControl = true; 056 protected long blockedProducerWarningInterval = DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL; 057 058 private int maxProducersToAudit = 1024; 059 private int maxAuditDepth = 2048; 060 private boolean enableAudit = true; 061 private int maxPageSize = MAX_PAGE_SIZE; 062 private int maxBrowsePageSize = MAX_BROWSE_PAGE_SIZE; 063 private boolean useCache = true; 064 private int minimumMessageSize = 1024; 065 private boolean lazyDispatch = false; 066 private boolean advisoryForSlowConsumers; 067 private boolean advisdoryForFastProducers; 068 private boolean advisoryForDiscardingMessages; 069 private boolean advisoryWhenFull; 070 private boolean advisoryForDelivery; 071 private boolean advisoryForConsumed; 072 private boolean sendAdvisoryIfNoConsumers; 073 protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); 074 protected final BrokerService brokerService; 075 protected final Broker regionBroker; 076 protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY; 077 protected long expireMessagesPeriod = EXPIRE_MESSAGE_PERIOD; 078 private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE; 079 protected int cursorMemoryHighWaterMark = 70; 080 081 /** 082 * @param broker 083 * @param store 084 * @param destination 085 * @param parentStats 086 * @throws Exception 087 */ 088 public BaseDestination(BrokerService brokerService, MessageStore store, ActiveMQDestination destination, DestinationStatistics parentStats) throws Exception { 089 this.brokerService = brokerService; 090 this.broker = brokerService.getBroker(); 091 this.store = store; 092 this.destination = destination; 093 // let's copy the enabled property from the parent DestinationStatistics 094 this.destinationStatistics.setEnabled(parentStats.isEnabled()); 095 this.destinationStatistics.setParent(parentStats); 096 this.systemUsage = new SystemUsage(brokerService.getProducerSystemUsage(), destination.toString()); 097 this.memoryUsage = this.systemUsage.getMemoryUsage(); 098 this.memoryUsage.setUsagePortion(1.0f); 099 this.regionBroker = brokerService.getRegionBroker(); 100 } 101 102 /** 103 * initialize the destination 104 * 105 * @throws Exception 106 */ 107 public void initialize() throws Exception { 108 // Let the store know what usage manager we are using so that he can 109 // flush messages to disk when usage gets high. 110 if (store != null) { 111 store.setMemoryUsage(this.memoryUsage); 112 } 113 } 114 115 /** 116 * @return the producerFlowControl 117 */ 118 public boolean isProducerFlowControl() { 119 return producerFlowControl; 120 } 121 122 /** 123 * @param producerFlowControl the producerFlowControl to set 124 */ 125 public void setProducerFlowControl(boolean producerFlowControl) { 126 this.producerFlowControl = producerFlowControl; 127 } 128 129 /** 130 * Set's the interval at which warnings about producers being blocked by 131 * resource usage will be triggered. Values of 0 or less will disable 132 * warnings 133 * 134 * @param blockedProducerWarningInterval the interval at which warning about 135 * blocked producers will be triggered. 136 */ 137 public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) { 138 this.blockedProducerWarningInterval = blockedProducerWarningInterval; 139 } 140 141 /** 142 * 143 * @return the interval at which warning about blocked producers will be 144 * triggered. 145 */ 146 public long getBlockedProducerWarningInterval() { 147 return blockedProducerWarningInterval; 148 } 149 150 /** 151 * @return the maxProducersToAudit 152 */ 153 public int getMaxProducersToAudit() { 154 return maxProducersToAudit; 155 } 156 157 /** 158 * @param maxProducersToAudit the maxProducersToAudit to set 159 */ 160 public void setMaxProducersToAudit(int maxProducersToAudit) { 161 this.maxProducersToAudit = maxProducersToAudit; 162 } 163 164 /** 165 * @return the maxAuditDepth 166 */ 167 public int getMaxAuditDepth() { 168 return maxAuditDepth; 169 } 170 171 /** 172 * @param maxAuditDepth the maxAuditDepth to set 173 */ 174 public void setMaxAuditDepth(int maxAuditDepth) { 175 this.maxAuditDepth = maxAuditDepth; 176 } 177 178 /** 179 * @return the enableAudit 180 */ 181 public boolean isEnableAudit() { 182 return enableAudit; 183 } 184 185 /** 186 * @param enableAudit the enableAudit to set 187 */ 188 public void setEnableAudit(boolean enableAudit) { 189 this.enableAudit = enableAudit; 190 } 191 192 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 193 destinationStatistics.getProducers().increment(); 194 } 195 196 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 197 destinationStatistics.getProducers().decrement(); 198 } 199 200 public final MemoryUsage getMemoryUsage() { 201 return memoryUsage; 202 } 203 204 public DestinationStatistics getDestinationStatistics() { 205 return destinationStatistics; 206 } 207 208 public ActiveMQDestination getActiveMQDestination() { 209 return destination; 210 } 211 212 public final String getName() { 213 return getActiveMQDestination().getPhysicalName(); 214 } 215 216 public final MessageStore getMessageStore() { 217 return store; 218 } 219 220 public final boolean isActive() { 221 return destinationStatistics.getConsumers().getCount() != 0 || destinationStatistics.getProducers().getCount() != 0; 222 } 223 224 public int getMaxPageSize() { 225 return maxPageSize; 226 } 227 228 public void setMaxPageSize(int maxPageSize) { 229 this.maxPageSize = maxPageSize; 230 } 231 232 public int getMaxBrowsePageSize() { 233 return this.maxBrowsePageSize; 234 } 235 236 public void setMaxBrowsePageSize(int maxPageSize) { 237 this.maxBrowsePageSize = maxPageSize; 238 } 239 240 public int getMaxExpirePageSize() { 241 return this.maxExpirePageSize; 242 } 243 244 public void setMaxExpirePageSize(int maxPageSize) { 245 this.maxExpirePageSize = maxPageSize; 246 } 247 248 public void setExpireMessagesPeriod(long expireMessagesPeriod) { 249 this.expireMessagesPeriod = expireMessagesPeriod; 250 } 251 252 public long getExpireMessagesPeriod() { 253 return expireMessagesPeriod; 254 } 255 256 public boolean isUseCache() { 257 return useCache; 258 } 259 260 public void setUseCache(boolean useCache) { 261 this.useCache = useCache; 262 } 263 264 public int getMinimumMessageSize() { 265 return minimumMessageSize; 266 } 267 268 public void setMinimumMessageSize(int minimumMessageSize) { 269 this.minimumMessageSize = minimumMessageSize; 270 } 271 272 public boolean isLazyDispatch() { 273 return lazyDispatch; 274 } 275 276 public void setLazyDispatch(boolean lazyDispatch) { 277 this.lazyDispatch = lazyDispatch; 278 } 279 280 protected long getDestinationSequenceId() { 281 return regionBroker.getBrokerSequenceId(); 282 } 283 284 /** 285 * @return the advisoryForSlowConsumers 286 */ 287 public boolean isAdvisoryForSlowConsumers() { 288 return advisoryForSlowConsumers; 289 } 290 291 /** 292 * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set 293 */ 294 public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) { 295 this.advisoryForSlowConsumers = advisoryForSlowConsumers; 296 } 297 298 /** 299 * @return the advisoryForDiscardingMessages 300 */ 301 public boolean isAdvisoryForDiscardingMessages() { 302 return advisoryForDiscardingMessages; 303 } 304 305 /** 306 * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to 307 * set 308 */ 309 public void setAdvisoryForDiscardingMessages(boolean advisoryForDiscardingMessages) { 310 this.advisoryForDiscardingMessages = advisoryForDiscardingMessages; 311 } 312 313 /** 314 * @return the advisoryWhenFull 315 */ 316 public boolean isAdvisoryWhenFull() { 317 return advisoryWhenFull; 318 } 319 320 /** 321 * @param advisoryWhenFull the advisoryWhenFull to set 322 */ 323 public void setAdvisoryWhenFull(boolean advisoryWhenFull) { 324 this.advisoryWhenFull = advisoryWhenFull; 325 } 326 327 /** 328 * @return the advisoryForDelivery 329 */ 330 public boolean isAdvisoryForDelivery() { 331 return advisoryForDelivery; 332 } 333 334 /** 335 * @param advisoryForDelivery the advisoryForDelivery to set 336 */ 337 public void setAdvisoryForDelivery(boolean advisoryForDelivery) { 338 this.advisoryForDelivery = advisoryForDelivery; 339 } 340 341 /** 342 * @return the advisoryForConsumed 343 */ 344 public boolean isAdvisoryForConsumed() { 345 return advisoryForConsumed; 346 } 347 348 /** 349 * @param advisoryForConsumed the advisoryForConsumed to set 350 */ 351 public void setAdvisoryForConsumed(boolean advisoryForConsumed) { 352 this.advisoryForConsumed = advisoryForConsumed; 353 } 354 355 /** 356 * @return the advisdoryForFastProducers 357 */ 358 public boolean isAdvisdoryForFastProducers() { 359 return advisdoryForFastProducers; 360 } 361 362 /** 363 * @param advisdoryForFastProducers the advisdoryForFastProducers to set 364 */ 365 public void setAdvisdoryForFastProducers(boolean advisdoryForFastProducers) { 366 this.advisdoryForFastProducers = advisdoryForFastProducers; 367 } 368 369 public boolean isSendAdvisoryIfNoConsumers() { 370 return sendAdvisoryIfNoConsumers; 371 } 372 373 public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) { 374 this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers; 375 } 376 377 /** 378 * @return the dead letter strategy 379 */ 380 public DeadLetterStrategy getDeadLetterStrategy() { 381 return deadLetterStrategy; 382 } 383 384 /** 385 * set the dead letter strategy 386 * 387 * @param deadLetterStrategy 388 */ 389 public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) { 390 this.deadLetterStrategy = deadLetterStrategy; 391 } 392 393 public int getCursorMemoryHighWaterMark() { 394 return this.cursorMemoryHighWaterMark; 395 } 396 397 public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) { 398 this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark; 399 } 400 401 /** 402 * called when message is consumed 403 * 404 * @param context 405 * @param messageReference 406 */ 407 public void messageConsumed(ConnectionContext context, MessageReference messageReference) { 408 if (advisoryForConsumed) { 409 broker.messageConsumed(context, messageReference); 410 } 411 } 412 413 /** 414 * Called when message is delivered to the broker 415 * 416 * @param context 417 * @param messageReference 418 */ 419 public void messageDelivered(ConnectionContext context, MessageReference messageReference) { 420 if (advisoryForDelivery) { 421 broker.messageDelivered(context, messageReference); 422 } 423 } 424 425 /** 426 * Called when a message is discarded - e.g. running low on memory This will 427 * happen only if the policy is enabled - e.g. non durable topics 428 * 429 * @param context 430 * @param messageReference 431 */ 432 public void messageDiscarded(ConnectionContext context, MessageReference messageReference) { 433 if (advisoryForDiscardingMessages) { 434 broker.messageDiscarded(context, messageReference); 435 } 436 } 437 438 /** 439 * Called when there is a slow consumer 440 * 441 * @param context 442 * @param subs 443 */ 444 public void slowConsumer(ConnectionContext context, Subscription subs) { 445 if (advisoryForSlowConsumers) { 446 broker.slowConsumer(context, this, subs); 447 } 448 } 449 450 /** 451 * Called to notify a producer is too fast 452 * 453 * @param context 454 * @param producerInfo 455 */ 456 public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) { 457 if (advisdoryForFastProducers) { 458 broker.fastProducer(context, producerInfo); 459 } 460 } 461 462 /** 463 * Called when a Usage reaches a limit 464 * 465 * @param context 466 * @param usage 467 */ 468 public void isFull(ConnectionContext context, Usage usage) { 469 if (advisoryWhenFull) { 470 broker.isFull(context, this, usage); 471 } 472 } 473 474 public void dispose(ConnectionContext context) throws IOException { 475 if (this.store != null) { 476 this.store.removeAllMessages(context); 477 this.store.dispose(context); 478 } 479 this.destinationStatistics.setParent(null); 480 this.memoryUsage.stop(); 481 } 482 483 /** 484 * Provides a hook to allow messages with no consumer to be processed in 485 * some way - such as to send to a dead letter queue or something.. 486 */ 487 protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) throws Exception { 488 if (!msg.isPersistent()) { 489 if (isSendAdvisoryIfNoConsumers()) { 490 // allow messages with no consumers to be dispatched to a dead 491 // letter queue 492 if (destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(destination)) { 493 494 Message message = msg.copy(); 495 // The original destination and transaction id do not get 496 // filled when the message is first sent, 497 // it is only populated if the message is routed to another 498 // destination like the DLQ 499 if (message.getOriginalDestination() != null) { 500 message.setOriginalDestination(message.getDestination()); 501 } 502 if (message.getOriginalTransactionId() != null) { 503 message.setOriginalTransactionId(message.getTransactionId()); 504 } 505 506 ActiveMQTopic advisoryTopic; 507 if (destination.isQueue()) { 508 advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination); 509 } else { 510 advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination); 511 } 512 message.setDestination(advisoryTopic); 513 message.setTransactionId(null); 514 515 // Disable flow control for this since since we don't want 516 // to block. 517 boolean originalFlowControl = context.isProducerFlowControl(); 518 try { 519 context.setProducerFlowControl(false); 520 ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); 521 producerExchange.setMutable(false); 522 producerExchange.setConnectionContext(context); 523 producerExchange.setProducerState(new ProducerState(new ProducerInfo())); 524 context.getBroker().send(producerExchange, message); 525 } finally { 526 context.setProducerFlowControl(originalFlowControl); 527 } 528 529 } 530 } 531 } 532 } 533 534 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 535 } 536 537 538 }