001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.io.IOException;
020    
021    import org.apache.activemq.advisory.AdvisorySupport;
022    import org.apache.activemq.broker.Broker;
023    import org.apache.activemq.broker.BrokerService;
024    import org.apache.activemq.broker.ConnectionContext;
025    import org.apache.activemq.broker.ProducerBrokerExchange;
026    import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
027    import org.apache.activemq.command.ActiveMQDestination;
028    import org.apache.activemq.command.ActiveMQTopic;
029    import org.apache.activemq.command.Message;
030    import org.apache.activemq.command.MessageDispatchNotification;
031    import org.apache.activemq.command.ProducerInfo;
032    import org.apache.activemq.state.ProducerState;
033    import org.apache.activemq.store.MessageStore;
034    import org.apache.activemq.usage.MemoryUsage;
035    import org.apache.activemq.usage.SystemUsage;
036    import org.apache.activemq.usage.Usage;
037    
038    /**
039     * @version $Revision: 1.12 $
040     */
041    public abstract class BaseDestination implements Destination {
042        /**
043         * The maximum number of messages to page in to the destination from
044         * persistent storage
045         */
046        public static final int MAX_PAGE_SIZE = 200;
047        public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2;
048        public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000;
049        protected final ActiveMQDestination destination;
050        protected final Broker broker;
051        protected final MessageStore store;
052        protected SystemUsage systemUsage;
053        protected MemoryUsage memoryUsage;
054        private boolean producerFlowControl = true;
055        protected boolean warnOnProducerFlowControl = true;
056        protected long blockedProducerWarningInterval = DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL;
057    
058        private int maxProducersToAudit = 1024;
059        private int maxAuditDepth = 2048;
060        private boolean enableAudit = true;
061        private int maxPageSize = MAX_PAGE_SIZE;
062        private int maxBrowsePageSize = MAX_BROWSE_PAGE_SIZE;
063        private boolean useCache = true;
064        private int minimumMessageSize = 1024;
065        private boolean lazyDispatch = false;
066        private boolean advisoryForSlowConsumers;
067        private boolean advisdoryForFastProducers;
068        private boolean advisoryForDiscardingMessages;
069        private boolean advisoryWhenFull;
070        private boolean advisoryForDelivery;
071        private boolean advisoryForConsumed;
072        private boolean sendAdvisoryIfNoConsumers;
073        protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
074        protected final BrokerService brokerService;
075        protected final Broker regionBroker;
076        protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY;
077        protected long expireMessagesPeriod = EXPIRE_MESSAGE_PERIOD;
078        private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE;
079        protected int cursorMemoryHighWaterMark = 70;
080    
081        /**
082         * @param broker
083         * @param store
084         * @param destination
085         * @param parentStats
086         * @throws Exception
087         */
088        public BaseDestination(BrokerService brokerService, MessageStore store, ActiveMQDestination destination, DestinationStatistics parentStats) throws Exception {
089            this.brokerService = brokerService;
090            this.broker = brokerService.getBroker();
091            this.store = store;
092            this.destination = destination;
093            // let's copy the enabled property from the parent DestinationStatistics
094            this.destinationStatistics.setEnabled(parentStats.isEnabled());
095            this.destinationStatistics.setParent(parentStats);
096            this.systemUsage = new SystemUsage(brokerService.getProducerSystemUsage(), destination.toString());
097            this.memoryUsage = this.systemUsage.getMemoryUsage();
098            this.memoryUsage.setUsagePortion(1.0f);
099            this.regionBroker = brokerService.getRegionBroker();
100        }
101    
102        /**
103         * initialize the destination
104         * 
105         * @throws Exception
106         */
107        public void initialize() throws Exception {
108            // Let the store know what usage manager we are using so that he can
109            // flush messages to disk when usage gets high.
110            if (store != null) {
111                store.setMemoryUsage(this.memoryUsage);
112            }
113        }
114    
115        /**
116         * @return the producerFlowControl
117         */
118        public boolean isProducerFlowControl() {
119            return producerFlowControl;
120        }
121    
122        /**
123         * @param producerFlowControl the producerFlowControl to set
124         */
125        public void setProducerFlowControl(boolean producerFlowControl) {
126            this.producerFlowControl = producerFlowControl;
127        }
128    
129        /**
130         * Set's the interval at which warnings about producers being blocked by
131         * resource usage will be triggered. Values of 0 or less will disable
132         * warnings
133         * 
134         * @param blockedProducerWarningInterval the interval at which warning about
135         *            blocked producers will be triggered.
136         */
137        public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
138            this.blockedProducerWarningInterval = blockedProducerWarningInterval;
139        }
140    
141        /**
142         * 
143         * @return the interval at which warning about blocked producers will be
144         *         triggered.
145         */
146        public long getBlockedProducerWarningInterval() {
147            return blockedProducerWarningInterval;
148        }
149    
150        /**
151         * @return the maxProducersToAudit
152         */
153        public int getMaxProducersToAudit() {
154            return maxProducersToAudit;
155        }
156    
157        /**
158         * @param maxProducersToAudit the maxProducersToAudit to set
159         */
160        public void setMaxProducersToAudit(int maxProducersToAudit) {
161            this.maxProducersToAudit = maxProducersToAudit;
162        }
163    
164        /**
165         * @return the maxAuditDepth
166         */
167        public int getMaxAuditDepth() {
168            return maxAuditDepth;
169        }
170    
171        /**
172         * @param maxAuditDepth the maxAuditDepth to set
173         */
174        public void setMaxAuditDepth(int maxAuditDepth) {
175            this.maxAuditDepth = maxAuditDepth;
176        }
177    
178        /**
179         * @return the enableAudit
180         */
181        public boolean isEnableAudit() {
182            return enableAudit;
183        }
184    
185        /**
186         * @param enableAudit the enableAudit to set
187         */
188        public void setEnableAudit(boolean enableAudit) {
189            this.enableAudit = enableAudit;
190        }
191    
192        public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
193            destinationStatistics.getProducers().increment();
194        }
195    
196        public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
197            destinationStatistics.getProducers().decrement();
198        }
199    
200        public final MemoryUsage getMemoryUsage() {
201            return memoryUsage;
202        }
203    
204        public DestinationStatistics getDestinationStatistics() {
205            return destinationStatistics;
206        }
207    
208        public ActiveMQDestination getActiveMQDestination() {
209            return destination;
210        }
211    
212        public final String getName() {
213            return getActiveMQDestination().getPhysicalName();
214        }
215    
216        public final MessageStore getMessageStore() {
217            return store;
218        }
219    
220        public final boolean isActive() {
221            return destinationStatistics.getConsumers().getCount() != 0 || destinationStatistics.getProducers().getCount() != 0;
222        }
223    
224        public int getMaxPageSize() {
225            return maxPageSize;
226        }
227    
228        public void setMaxPageSize(int maxPageSize) {
229            this.maxPageSize = maxPageSize;
230        }
231    
232        public int getMaxBrowsePageSize() {
233            return this.maxBrowsePageSize;
234        }
235    
236        public void setMaxBrowsePageSize(int maxPageSize) {
237            this.maxBrowsePageSize = maxPageSize;
238        }
239    
240        public int getMaxExpirePageSize() {
241            return this.maxExpirePageSize;
242        }
243    
244        public void setMaxExpirePageSize(int maxPageSize) {
245            this.maxExpirePageSize = maxPageSize;
246        }
247    
248        public void setExpireMessagesPeriod(long expireMessagesPeriod) {
249            this.expireMessagesPeriod = expireMessagesPeriod;
250        }
251    
252        public long getExpireMessagesPeriod() {
253            return expireMessagesPeriod;
254        }
255    
256        public boolean isUseCache() {
257            return useCache;
258        }
259    
260        public void setUseCache(boolean useCache) {
261            this.useCache = useCache;
262        }
263    
264        public int getMinimumMessageSize() {
265            return minimumMessageSize;
266        }
267    
268        public void setMinimumMessageSize(int minimumMessageSize) {
269            this.minimumMessageSize = minimumMessageSize;
270        }
271    
272        public boolean isLazyDispatch() {
273            return lazyDispatch;
274        }
275    
276        public void setLazyDispatch(boolean lazyDispatch) {
277            this.lazyDispatch = lazyDispatch;
278        }
279    
280        protected long getDestinationSequenceId() {
281            return regionBroker.getBrokerSequenceId();
282        }
283    
284        /**
285         * @return the advisoryForSlowConsumers
286         */
287        public boolean isAdvisoryForSlowConsumers() {
288            return advisoryForSlowConsumers;
289        }
290    
291        /**
292         * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set
293         */
294        public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) {
295            this.advisoryForSlowConsumers = advisoryForSlowConsumers;
296        }
297    
298        /**
299         * @return the advisoryForDiscardingMessages
300         */
301        public boolean isAdvisoryForDiscardingMessages() {
302            return advisoryForDiscardingMessages;
303        }
304    
305        /**
306         * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to
307         *            set
308         */
309        public void setAdvisoryForDiscardingMessages(boolean advisoryForDiscardingMessages) {
310            this.advisoryForDiscardingMessages = advisoryForDiscardingMessages;
311        }
312    
313        /**
314         * @return the advisoryWhenFull
315         */
316        public boolean isAdvisoryWhenFull() {
317            return advisoryWhenFull;
318        }
319    
320        /**
321         * @param advisoryWhenFull the advisoryWhenFull to set
322         */
323        public void setAdvisoryWhenFull(boolean advisoryWhenFull) {
324            this.advisoryWhenFull = advisoryWhenFull;
325        }
326    
327        /**
328         * @return the advisoryForDelivery
329         */
330        public boolean isAdvisoryForDelivery() {
331            return advisoryForDelivery;
332        }
333    
334        /**
335         * @param advisoryForDelivery the advisoryForDelivery to set
336         */
337        public void setAdvisoryForDelivery(boolean advisoryForDelivery) {
338            this.advisoryForDelivery = advisoryForDelivery;
339        }
340    
341        /**
342         * @return the advisoryForConsumed
343         */
344        public boolean isAdvisoryForConsumed() {
345            return advisoryForConsumed;
346        }
347    
348        /**
349         * @param advisoryForConsumed the advisoryForConsumed to set
350         */
351        public void setAdvisoryForConsumed(boolean advisoryForConsumed) {
352            this.advisoryForConsumed = advisoryForConsumed;
353        }
354    
355        /**
356         * @return the advisdoryForFastProducers
357         */
358        public boolean isAdvisdoryForFastProducers() {
359            return advisdoryForFastProducers;
360        }
361    
362        /**
363         * @param advisdoryForFastProducers the advisdoryForFastProducers to set
364         */
365        public void setAdvisdoryForFastProducers(boolean advisdoryForFastProducers) {
366            this.advisdoryForFastProducers = advisdoryForFastProducers;
367        }
368    
369        public boolean isSendAdvisoryIfNoConsumers() {
370            return sendAdvisoryIfNoConsumers;
371        }
372    
373        public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) {
374            this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
375        }
376    
377        /**
378         * @return the dead letter strategy
379         */
380        public DeadLetterStrategy getDeadLetterStrategy() {
381            return deadLetterStrategy;
382        }
383    
384        /**
385         * set the dead letter strategy
386         * 
387         * @param deadLetterStrategy
388         */
389        public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
390            this.deadLetterStrategy = deadLetterStrategy;
391        }
392    
393        public int getCursorMemoryHighWaterMark() {
394            return this.cursorMemoryHighWaterMark;
395        }
396    
397        public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
398            this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
399        }
400    
401        /**
402         * called when message is consumed
403         * 
404         * @param context
405         * @param messageReference
406         */
407        public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
408            if (advisoryForConsumed) {
409                broker.messageConsumed(context, messageReference);
410            }
411        }
412    
413        /**
414         * Called when message is delivered to the broker
415         * 
416         * @param context
417         * @param messageReference
418         */
419        public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
420            if (advisoryForDelivery) {
421                broker.messageDelivered(context, messageReference);
422            }
423        }
424    
425        /**
426         * Called when a message is discarded - e.g. running low on memory This will
427         * happen only if the policy is enabled - e.g. non durable topics
428         * 
429         * @param context
430         * @param messageReference
431         */
432        public void messageDiscarded(ConnectionContext context, MessageReference messageReference) {
433            if (advisoryForDiscardingMessages) {
434                broker.messageDiscarded(context, messageReference);
435            }
436        }
437    
438        /**
439         * Called when there is a slow consumer
440         * 
441         * @param context
442         * @param subs
443         */
444        public void slowConsumer(ConnectionContext context, Subscription subs) {
445            if (advisoryForSlowConsumers) {
446                broker.slowConsumer(context, this, subs);
447            }
448        }
449    
450        /**
451         * Called to notify a producer is too fast
452         * 
453         * @param context
454         * @param producerInfo
455         */
456        public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
457            if (advisdoryForFastProducers) {
458                broker.fastProducer(context, producerInfo);
459            }
460        }
461    
462        /**
463         * Called when a Usage reaches a limit
464         * 
465         * @param context
466         * @param usage
467         */
468        public void isFull(ConnectionContext context, Usage usage) {
469            if (advisoryWhenFull) {
470                broker.isFull(context, this, usage);
471            }
472        }
473    
474        public void dispose(ConnectionContext context) throws IOException {
475            if (this.store != null) {
476                this.store.removeAllMessages(context);
477                this.store.dispose(context);
478            }
479            this.destinationStatistics.setParent(null);
480            this.memoryUsage.stop();
481        }
482    
483        /**
484         * Provides a hook to allow messages with no consumer to be processed in
485         * some way - such as to send to a dead letter queue or something..
486         */
487        protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) throws Exception {
488            if (!msg.isPersistent()) {
489                if (isSendAdvisoryIfNoConsumers()) {
490                    // allow messages with no consumers to be dispatched to a dead
491                    // letter queue
492                    if (destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(destination)) {
493    
494                        Message message = msg.copy();
495                        // The original destination and transaction id do not get
496                        // filled when the message is first sent,
497                        // it is only populated if the message is routed to another
498                        // destination like the DLQ
499                        if (message.getOriginalDestination() != null) {
500                            message.setOriginalDestination(message.getDestination());
501                        }
502                        if (message.getOriginalTransactionId() != null) {
503                            message.setOriginalTransactionId(message.getTransactionId());
504                        }
505    
506                        ActiveMQTopic advisoryTopic;
507                        if (destination.isQueue()) {
508                            advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination);
509                        } else {
510                            advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
511                        }
512                        message.setDestination(advisoryTopic);
513                        message.setTransactionId(null);
514    
515                        // Disable flow control for this since since we don't want
516                        // to block.
517                        boolean originalFlowControl = context.isProducerFlowControl();
518                        try {
519                            context.setProducerFlowControl(false);
520                            ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
521                            producerExchange.setMutable(false);
522                            producerExchange.setConnectionContext(context);
523                            producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
524                            context.getBroker().send(producerExchange, message);
525                        } finally {
526                            context.setProducerFlowControl(originalFlowControl);
527                        }
528    
529                    }
530                }
531            }
532        }
533    
534        public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
535        }
536        
537        
538    }