001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region; 018 019 import java.io.IOException; 020 import java.util.Iterator; 021 import java.util.concurrent.ConcurrentHashMap; 022 023 import javax.jms.InvalidSelectorException; 024 import javax.jms.JMSException; 025 026 import org.apache.activemq.broker.Broker; 027 import org.apache.activemq.broker.ConnectionContext; 028 import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor; 029 import org.apache.activemq.command.ActiveMQDestination; 030 import org.apache.activemq.command.ConsumerInfo; 031 import org.apache.activemq.command.Message; 032 import org.apache.activemq.command.MessageAck; 033 import org.apache.activemq.command.MessageDispatch; 034 import org.apache.activemq.command.MessageId; 035 import org.apache.activemq.store.TopicMessageStore; 036 import org.apache.activemq.usage.SystemUsage; 037 import org.apache.activemq.usage.Usage; 038 import org.apache.activemq.usage.UsageListener; 039 import org.apache.activemq.util.SubscriptionKey; 040 import org.apache.commons.logging.Log; 041 import org.apache.commons.logging.LogFactory; 042 043 public class DurableTopicSubscription extends PrefetchSubscription implements UsageListener { 044 045 private static final Log LOG = LogFactory.getLog(DurableTopicSubscription.class); 046 private final ConcurrentHashMap<MessageId, Integer> redeliveredMessages = new ConcurrentHashMap<MessageId, Integer>(); 047 private final ConcurrentHashMap<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>(); 048 private final SubscriptionKey subscriptionKey; 049 private final boolean keepDurableSubsActive; 050 private boolean active; 051 052 public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) 053 throws JMSException { 054 super(broker,usageManager, context, info); 055 this.pending = new StoreDurableSubscriberCursor(broker,context.getClientId(), info.getSubscriptionName(), info.getPrefetchSize(), this); 056 this.pending.setSystemUsage(usageManager); 057 this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); 058 this.keepDurableSubsActive = keepDurableSubsActive; 059 subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); 060 061 } 062 063 public boolean isActive() { 064 return active; 065 } 066 067 public boolean isFull() { 068 return !active || super.isFull(); 069 } 070 071 public void gc() { 072 } 073 074 public void add(ConnectionContext context, Destination destination) throws Exception { 075 super.add(context, destination); 076 destinations.put(destination.getActiveMQDestination(), destination); 077 if (destination.getMessageStore() != null) { 078 TopicMessageStore store = (TopicMessageStore)destination.getMessageStore(); 079 try { 080 this.enqueueCounter+=store.getMessageCount(subscriptionKey.getClientId(),subscriptionKey.getSubscriptionName()); 081 } catch (IOException e) { 082 JMSException jmsEx = new JMSException("Failed to retrieve eunqueueCount from store "+ e); 083 jmsEx.setLinkedException(e); 084 throw jmsEx; 085 } 086 } 087 if (active || keepDurableSubsActive) { 088 Topic topic = (Topic)destination; 089 topic.activate(context, this); 090 if (pending.isEmpty(topic)) { 091 topic.recoverRetroactiveMessages(context, this); 092 } 093 } 094 dispatchPending(); 095 } 096 097 public void activate(SystemUsage memoryManager, ConnectionContext context, 098 ConsumerInfo info) throws Exception { 099 LOG.debug("Activating " + this); 100 if (!active) { 101 this.active = true; 102 this.context = context; 103 this.info = info; 104 int prefetch = info.getPrefetchSize(); 105 if (prefetch>0) { 106 prefetch += prefetch/2; 107 } 108 int depth = Math.max(prefetch, this.pending.getMaxAuditDepth()); 109 this.pending.setMaxAuditDepth(depth); 110 if (!keepDurableSubsActive) { 111 for (Iterator<Destination> iter = destinations.values() 112 .iterator(); iter.hasNext();) { 113 Topic topic = (Topic) iter.next(); 114 topic.activate(context, this); 115 } 116 } 117 synchronized (pending) { 118 pending.setSystemUsage(memoryManager); 119 pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); 120 pending.start(); 121 122 // If nothing was in the persistent store, then try to use the 123 // recovery policy. 124 if (pending.isEmpty()) { 125 for (Iterator<Destination> iter = destinations.values() 126 .iterator(); iter.hasNext();) { 127 Topic topic = (Topic) iter.next(); 128 topic.recoverRetroactiveMessages(context, this); 129 } 130 } 131 } 132 dispatchPending(); 133 this.usageManager.getMemoryUsage().addUsageListener(this); 134 } 135 } 136 137 public void deactivate(boolean keepDurableSubsActive) throws Exception { 138 active = false; 139 this.usageManager.getMemoryUsage().removeUsageListener(this); 140 synchronized (pending) { 141 pending.stop(); 142 } 143 if (!keepDurableSubsActive) { 144 for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) { 145 Topic topic = (Topic)iter.next(); 146 topic.deactivate(context, this); 147 } 148 } 149 for (final MessageReference node : dispatched) { 150 // Mark the dispatched messages as redelivered for next time. 151 Integer count = redeliveredMessages.get(node.getMessageId()); 152 if (count != null) { 153 redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue() + 1)); 154 } else { 155 redeliveredMessages.put(node.getMessageId(), Integer.valueOf(1)); 156 } 157 if (keepDurableSubsActive&& pending.isTransient()) { 158 synchronized (pending) { 159 pending.addMessageFirst(node); 160 } 161 } else { 162 node.decrementReferenceCount(); 163 } 164 } 165 synchronized(dispatched) { 166 dispatched.clear(); 167 } 168 if (!keepDurableSubsActive && pending.isTransient()) { 169 synchronized (pending) { 170 try { 171 pending.reset(); 172 while (pending.hasNext()) { 173 MessageReference node = pending.next(); 174 node.decrementReferenceCount(); 175 pending.remove(); 176 } 177 } finally { 178 pending.release(); 179 } 180 } 181 } 182 prefetchExtension = 0; 183 } 184 185 186 protected MessageDispatch createMessageDispatch(MessageReference node, Message message) { 187 MessageDispatch md = super.createMessageDispatch(node, message); 188 Integer count = redeliveredMessages.get(node.getMessageId()); 189 if (count != null) { 190 md.setRedeliveryCounter(count.intValue()); 191 } 192 return md; 193 } 194 195 public void add(MessageReference node) throws Exception { 196 if (!active && !keepDurableSubsActive) { 197 return; 198 } 199 super.add(node); 200 } 201 202 protected void doAddRecoveredMessage(MessageReference message) throws Exception { 203 synchronized(pending) { 204 pending.addRecoveredMessage(message); 205 } 206 } 207 208 public int getPendingQueueSize() { 209 if (active || keepDurableSubsActive) { 210 return super.getPendingQueueSize(); 211 } 212 // TODO: need to get from store 213 return 0; 214 } 215 216 public void setSelector(String selector) throws InvalidSelectorException { 217 throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions"); 218 } 219 220 protected boolean canDispatch(MessageReference node) { 221 return active; 222 } 223 224 protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException { 225 node.getRegionDestination().acknowledge(context, this, ack, node); 226 redeliveredMessages.remove(node.getMessageId()); 227 node.decrementReferenceCount(); 228 } 229 230 231 public synchronized String toString() { 232 return "DurableTopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", total=" + enqueueCounter + ", pending=" 233 + getPendingQueueSize() + ", dispatched=" + dispatchCounter + ", inflight=" + dispatched.size() + ", prefetchExtension=" + this.prefetchExtension; 234 } 235 236 public SubscriptionKey getSubscriptionKey() { 237 return subscriptionKey; 238 } 239 240 /** 241 * Release any references that we are holding. 242 */ 243 public void destroy() { 244 synchronized (pending) { 245 try { 246 247 pending.reset(); 248 while (pending.hasNext()) { 249 MessageReference node = pending.next(); 250 node.decrementReferenceCount(); 251 } 252 253 } finally { 254 pending.release(); 255 pending.clear(); 256 } 257 } 258 synchronized(dispatched) { 259 for (Iterator iter = dispatched.iterator(); iter.hasNext();) { 260 MessageReference node = (MessageReference) iter.next(); 261 node.decrementReferenceCount(); 262 } 263 dispatched.clear(); 264 } 265 } 266 267 /** 268 * @param usageManager 269 * @param oldPercentUsage 270 * @param newPercentUsage 271 * @see org.apache.activemq.usage.UsageListener#onMemoryUseChanged(org.apache.activemq.usage.SystemUsage, 272 * int, int) 273 */ 274 public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) { 275 if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) { 276 try { 277 dispatchPending(); 278 } catch (IOException e) { 279 LOG.warn("problem calling dispatchMatched", e); 280 } 281 } 282 } 283 284 protected boolean isDropped(MessageReference node) { 285 return false; 286 } 287 }