001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.io.IOException;
020    import java.util.Iterator;
021    import java.util.concurrent.ConcurrentHashMap;
022    
023    import javax.jms.InvalidSelectorException;
024    import javax.jms.JMSException;
025    
026    import org.apache.activemq.broker.Broker;
027    import org.apache.activemq.broker.ConnectionContext;
028    import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
029    import org.apache.activemq.command.ActiveMQDestination;
030    import org.apache.activemq.command.ConsumerInfo;
031    import org.apache.activemq.command.Message;
032    import org.apache.activemq.command.MessageAck;
033    import org.apache.activemq.command.MessageDispatch;
034    import org.apache.activemq.command.MessageId;
035    import org.apache.activemq.store.TopicMessageStore;
036    import org.apache.activemq.usage.SystemUsage;
037    import org.apache.activemq.usage.Usage;
038    import org.apache.activemq.usage.UsageListener;
039    import org.apache.activemq.util.SubscriptionKey;
040    import org.apache.commons.logging.Log;
041    import org.apache.commons.logging.LogFactory;
042    
043    public class DurableTopicSubscription extends PrefetchSubscription implements UsageListener {
044    
045        private static final Log LOG = LogFactory.getLog(DurableTopicSubscription.class);
046        private final ConcurrentHashMap<MessageId, Integer> redeliveredMessages = new ConcurrentHashMap<MessageId, Integer>();
047        private final ConcurrentHashMap<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
048        private final SubscriptionKey subscriptionKey;
049        private final boolean keepDurableSubsActive;
050        private boolean active;
051    
052        public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
053            throws JMSException {
054            super(broker,usageManager, context, info);
055            this.pending = new StoreDurableSubscriberCursor(broker,context.getClientId(), info.getSubscriptionName(), info.getPrefetchSize(), this);
056            this.pending.setSystemUsage(usageManager);
057            this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
058            this.keepDurableSubsActive = keepDurableSubsActive;
059            subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
060            
061        }
062    
063        public boolean isActive() {
064            return active;
065        }
066    
067        public boolean isFull() {
068            return !active || super.isFull();
069        }
070    
071        public void gc() {
072        }
073    
074        public void add(ConnectionContext context, Destination destination) throws Exception {
075            super.add(context, destination);
076            destinations.put(destination.getActiveMQDestination(), destination);
077            if (destination.getMessageStore() != null) {
078                TopicMessageStore store = (TopicMessageStore)destination.getMessageStore();
079                try {
080                    this.enqueueCounter+=store.getMessageCount(subscriptionKey.getClientId(),subscriptionKey.getSubscriptionName());
081                } catch (IOException e) {
082                    JMSException jmsEx = new JMSException("Failed to retrieve eunqueueCount from store "+ e);
083                    jmsEx.setLinkedException(e);
084                    throw jmsEx;
085                }
086            }
087            if (active || keepDurableSubsActive) {
088                Topic topic = (Topic)destination;
089                topic.activate(context, this);
090                if (pending.isEmpty(topic)) {
091                    topic.recoverRetroactiveMessages(context, this);
092                }
093            }
094            dispatchPending();
095        }
096    
097        public void activate(SystemUsage memoryManager, ConnectionContext context,
098                ConsumerInfo info) throws Exception {
099            LOG.debug("Activating " + this);
100            if (!active) {
101                this.active = true;
102                this.context = context;
103                this.info = info;
104                int prefetch = info.getPrefetchSize();
105                if (prefetch>0) {
106                prefetch += prefetch/2;
107                }
108                int depth = Math.max(prefetch, this.pending.getMaxAuditDepth());
109                this.pending.setMaxAuditDepth(depth);
110                if (!keepDurableSubsActive) {
111                    for (Iterator<Destination> iter = destinations.values()
112                            .iterator(); iter.hasNext();) {
113                        Topic topic = (Topic) iter.next();
114                        topic.activate(context, this);
115                    }
116                }
117                synchronized (pending) {
118                    pending.setSystemUsage(memoryManager);
119                    pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
120                    pending.start();
121    
122                    // If nothing was in the persistent store, then try to use the
123                    // recovery policy.
124                    if (pending.isEmpty()) {
125                        for (Iterator<Destination> iter = destinations.values()
126                                .iterator(); iter.hasNext();) {
127                            Topic topic = (Topic) iter.next();
128                            topic.recoverRetroactiveMessages(context, this);
129                        }
130                    }
131                }
132                dispatchPending();
133                this.usageManager.getMemoryUsage().addUsageListener(this);
134            }
135        }
136    
137        public void deactivate(boolean keepDurableSubsActive) throws Exception {
138            active = false;
139            this.usageManager.getMemoryUsage().removeUsageListener(this);
140            synchronized (pending) {
141                pending.stop();
142            }
143            if (!keepDurableSubsActive) {
144                for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
145                    Topic topic = (Topic)iter.next();
146                    topic.deactivate(context, this);
147                }
148            }
149            for (final MessageReference node : dispatched) {
150                // Mark the dispatched messages as redelivered for next time.
151                Integer count = redeliveredMessages.get(node.getMessageId());
152                if (count != null) {
153                    redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue() + 1));
154                } else {
155                    redeliveredMessages.put(node.getMessageId(), Integer.valueOf(1));
156                }
157                if (keepDurableSubsActive&& pending.isTransient()) {
158                    synchronized (pending) {
159                        pending.addMessageFirst(node);
160                    }
161                } else {
162                    node.decrementReferenceCount();
163                }
164            }
165            synchronized(dispatched) {
166                dispatched.clear();
167            }
168            if (!keepDurableSubsActive && pending.isTransient()) {
169                synchronized (pending) {
170                    try {
171                        pending.reset();
172                        while (pending.hasNext()) {
173                            MessageReference node = pending.next();
174                            node.decrementReferenceCount();
175                            pending.remove();
176                        }
177                    } finally {
178                        pending.release();
179                    }
180                }
181            }
182            prefetchExtension = 0;
183        }
184        
185        
186        protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
187            MessageDispatch md = super.createMessageDispatch(node, message);
188            Integer count = redeliveredMessages.get(node.getMessageId());
189            if (count != null) {
190                md.setRedeliveryCounter(count.intValue());
191            }
192            return md;
193        }
194    
195        public void add(MessageReference node) throws Exception {
196            if (!active && !keepDurableSubsActive) {
197                return;
198            }
199            super.add(node);
200        }
201    
202        protected void doAddRecoveredMessage(MessageReference message) throws Exception {
203            synchronized(pending) {
204                pending.addRecoveredMessage(message);
205            }
206        }
207    
208        public int getPendingQueueSize() {
209            if (active || keepDurableSubsActive) {
210                return super.getPendingQueueSize();
211            }
212            // TODO: need to get from store
213            return 0;
214        }
215    
216        public void setSelector(String selector) throws InvalidSelectorException {
217            throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions");
218        }
219    
220        protected boolean canDispatch(MessageReference node) {
221            return active;
222        }
223    
224        protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException {
225            node.getRegionDestination().acknowledge(context, this, ack, node);
226            redeliveredMessages.remove(node.getMessageId());
227            node.decrementReferenceCount();
228        }
229    
230        
231        public synchronized String toString() {
232            return "DurableTopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", total=" + enqueueCounter + ", pending="
233                   + getPendingQueueSize() + ", dispatched=" + dispatchCounter + ", inflight=" + dispatched.size() + ", prefetchExtension=" + this.prefetchExtension;
234        }
235    
236        public SubscriptionKey getSubscriptionKey() {
237            return subscriptionKey;
238        }
239    
240        /**
241         * Release any references that we are holding.
242         */
243        public void destroy() {
244            synchronized (pending) {
245                try {
246    
247                    pending.reset();
248                    while (pending.hasNext()) {
249                        MessageReference node = pending.next();
250                        node.decrementReferenceCount();
251                    }
252    
253                } finally {
254                    pending.release();
255                    pending.clear();
256                }
257            }
258            synchronized(dispatched) {
259                for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
260                    MessageReference node = (MessageReference) iter.next();
261                    node.decrementReferenceCount();
262                }
263                dispatched.clear();
264            }
265        }
266    
267        /**
268         * @param usageManager
269         * @param oldPercentUsage
270         * @param newPercentUsage
271         * @see org.apache.activemq.usage.UsageListener#onMemoryUseChanged(org.apache.activemq.usage.SystemUsage,
272         *      int, int)
273         */
274        public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
275            if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) {
276                try {
277                    dispatchPending();
278                } catch (IOException e) {
279                    LOG.warn("problem calling dispatchMatched", e);
280                }
281            }
282        }
283        
284        protected boolean isDropped(MessageReference node) {
285           return false;
286         }
287    }