001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.kahadaptor; 018 019 import java.io.IOException; 020 import java.util.Iterator; 021 import java.util.Map; 022 import java.util.concurrent.ConcurrentHashMap; 023 import org.apache.activemq.broker.ConnectionContext; 024 import org.apache.activemq.command.ActiveMQDestination; 025 import org.apache.activemq.command.Message; 026 import org.apache.activemq.command.MessageId; 027 import org.apache.activemq.command.SubscriptionInfo; 028 import org.apache.activemq.kaha.ListContainer; 029 import org.apache.activemq.kaha.MapContainer; 030 import org.apache.activemq.kaha.Marshaller; 031 import org.apache.activemq.kaha.Store; 032 import org.apache.activemq.kaha.StoreEntry; 033 import org.apache.activemq.store.MessageRecoveryListener; 034 import org.apache.activemq.store.TopicMessageStore; 035 036 /** 037 * @version $Revision: 1.5 $ 038 */ 039 public class KahaTopicMessageStore extends KahaMessageStore implements TopicMessageStore { 040 041 protected ListContainer<TopicSubAck> ackContainer; 042 protected Map<Object, TopicSubContainer> subscriberMessages = new ConcurrentHashMap<Object, TopicSubContainer>(); 043 private Map<String, SubscriptionInfo> subscriberContainer; 044 private Store store; 045 046 public KahaTopicMessageStore(Store store, MapContainer<MessageId, Message> messageContainer, 047 ListContainer<TopicSubAck> ackContainer, MapContainer<String, SubscriptionInfo> subsContainer, 048 ActiveMQDestination destination) throws IOException { 049 super(messageContainer, destination); 050 this.store = store; 051 this.ackContainer = ackContainer; 052 subscriberContainer = subsContainer; 053 // load all the Ack containers 054 for (Iterator<String> i = subscriberContainer.keySet().iterator(); i.hasNext();) { 055 Object key = i.next(); 056 addSubscriberMessageContainer(key); 057 } 058 } 059 060 @Override 061 public synchronized void addMessage(ConnectionContext context, Message message) throws IOException { 062 int subscriberCount = subscriberMessages.size(); 063 if (subscriberCount > 0) { 064 MessageId id = message.getMessageId(); 065 StoreEntry messageEntry = messageContainer.place(id, message); 066 TopicSubAck tsa = new TopicSubAck(); 067 tsa.setCount(subscriberCount); 068 tsa.setMessageEntry(messageEntry); 069 StoreEntry ackEntry = ackContainer.placeLast(tsa); 070 for (Iterator<TopicSubContainer> i = subscriberMessages.values().iterator(); i.hasNext();) { 071 TopicSubContainer container = i.next(); 072 ConsumerMessageRef ref = new ConsumerMessageRef(); 073 ref.setAckEntry(ackEntry); 074 ref.setMessageEntry(messageEntry); 075 ref.setMessageId(id); 076 container.add(ref); 077 } 078 } 079 } 080 081 public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 082 MessageId messageId) throws IOException { 083 String subcriberId = getSubscriptionKey(clientId, subscriptionName); 084 TopicSubContainer container = subscriberMessages.get(subcriberId); 085 if (container != null) { 086 ConsumerMessageRef ref = container.remove(messageId); 087 if (container.isEmpty()) { 088 container.reset(); 089 } 090 if (ref != null) { 091 TopicSubAck tsa = ackContainer.get(ref.getAckEntry()); 092 if (tsa != null) { 093 if (tsa.decrementCount() <= 0) { 094 StoreEntry entry = ref.getAckEntry(); 095 entry = ackContainer.refresh(entry); 096 ackContainer.remove(entry); 097 entry = tsa.getMessageEntry(); 098 entry = messageContainer.refresh(entry); 099 messageContainer.remove(entry); 100 } else { 101 ackContainer.update(ref.getAckEntry(), tsa); 102 } 103 } 104 } 105 } 106 } 107 108 public synchronized SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 109 return subscriberContainer.get(getSubscriptionKey(clientId, subscriptionName)); 110 } 111 112 public synchronized void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException { 113 String key = getSubscriptionKey(info.getClientId(), info.getSubscriptionName()); 114 // if already exists - won't add it again as it causes data files 115 // to hang around 116 if (!subscriberContainer.containsKey(key)) { 117 subscriberContainer.put(key, info); 118 } 119 // add the subscriber 120 addSubscriberMessageContainer(key); 121 /* 122 * if(retroactive){ for(StoreEntry 123 * entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){ 124 * TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry); 125 * ConsumerMessageRef ref=new ConsumerMessageRef(); 126 * ref.setAckEntry(entry); ref.setMessageEntry(tsa.getMessageEntry()); 127 * container.add(ref); } } 128 */ 129 } 130 131 public synchronized void deleteSubscription(String clientId, String subscriptionName) throws IOException { 132 String key = getSubscriptionKey(clientId, subscriptionName); 133 removeSubscriberMessageContainer(key); 134 } 135 136 public synchronized void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) 137 throws Exception { 138 String key = getSubscriptionKey(clientId, subscriptionName); 139 TopicSubContainer container = subscriberMessages.get(key); 140 if (container != null) { 141 for (Iterator i = container.iterator(); i.hasNext();) { 142 ConsumerMessageRef ref = (ConsumerMessageRef)i.next(); 143 Message msg = messageContainer.get(ref.getMessageEntry()); 144 if (msg != null) { 145 if (!recoverMessage(listener, msg)) { 146 break; 147 } 148 } 149 } 150 } 151 } 152 153 public synchronized void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, 154 MessageRecoveryListener listener) throws Exception { 155 String key = getSubscriptionKey(clientId, subscriptionName); 156 TopicSubContainer container = subscriberMessages.get(key); 157 if (container != null) { 158 int count = 0; 159 StoreEntry entry = container.getBatchEntry(); 160 if (entry == null) { 161 entry = container.getEntry(); 162 } else { 163 entry = container.refreshEntry(entry); 164 if (entry != null) { 165 entry = container.getNextEntry(entry); 166 } 167 } 168 if (entry != null) { 169 do { 170 ConsumerMessageRef consumerRef = container.get(entry); 171 Message msg = messageContainer.getValue(consumerRef.getMessageEntry()); 172 if (msg != null) { 173 recoverMessage(listener, msg); 174 count++; 175 container.setBatchEntry(msg.getMessageId().toString(), entry); 176 } else { 177 container.reset(); 178 } 179 180 entry = container.getNextEntry(entry); 181 } while (entry != null && count < maxReturned && listener.hasSpace()); 182 } 183 } 184 } 185 186 public synchronized void delete() { 187 super.delete(); 188 ackContainer.clear(); 189 subscriberContainer.clear(); 190 } 191 192 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 193 return subscriberContainer.values() 194 .toArray(new SubscriptionInfo[subscriberContainer.size()]); 195 } 196 197 protected String getSubscriptionKey(String clientId, String subscriberName) { 198 String result = clientId + ":"; 199 result += subscriberName != null ? subscriberName : "NOT_SET"; 200 return result; 201 } 202 203 protected MapContainer addSubscriberMessageContainer(Object key) throws IOException { 204 MapContainer container = store.getMapContainer(key, "topic-subs"); 205 container.setKeyMarshaller(Store.MESSAGEID_MARSHALLER); 206 Marshaller marshaller = new ConsumerMessageRefMarshaller(); 207 container.setValueMarshaller(marshaller); 208 TopicSubContainer tsc = new TopicSubContainer(container); 209 subscriberMessages.put(key, tsc); 210 return container; 211 } 212 213 protected synchronized void removeSubscriberMessageContainer(Object key) 214 throws IOException { 215 subscriberContainer.remove(key); 216 TopicSubContainer container = subscriberMessages.remove(key); 217 if (container != null) { 218 for (Iterator i = container.iterator(); i.hasNext();) { 219 ConsumerMessageRef ref = (ConsumerMessageRef) i.next(); 220 if (ref != null) { 221 TopicSubAck tsa = ackContainer.get(ref.getAckEntry()); 222 if (tsa != null) { 223 if (tsa.decrementCount() <= 0) { 224 ackContainer.remove(ref.getAckEntry()); 225 messageContainer.remove(tsa.getMessageEntry()); 226 } else { 227 ackContainer.update(ref.getAckEntry(), tsa); 228 } 229 } 230 } 231 } 232 container.clear(); 233 } 234 store.deleteListContainer(key, "topic-subs"); 235 236 } 237 238 public synchronized int getMessageCount(String clientId, String subscriberName) throws IOException { 239 String key = getSubscriptionKey(clientId, subscriberName); 240 TopicSubContainer container = subscriberMessages.get(key); 241 return container != null ? container.size() : 0; 242 } 243 244 /** 245 * @param context 246 * @throws IOException 247 * @see org.apache.activemq.store.MessageStore#removeAllMessages(org.apache.activemq.broker.ConnectionContext) 248 */ 249 public synchronized void removeAllMessages(ConnectionContext context) throws IOException { 250 messageContainer.clear(); 251 ackContainer.clear(); 252 for (Iterator<TopicSubContainer> i = subscriberMessages.values().iterator(); i.hasNext();) { 253 TopicSubContainer container = i.next(); 254 container.clear(); 255 } 256 } 257 258 public synchronized void resetBatching(String clientId, String subscriptionName) { 259 String key = getSubscriptionKey(clientId, subscriptionName); 260 TopicSubContainer topicSubContainer = subscriberMessages.get(key); 261 if (topicSubContainer != null) { 262 topicSubContainer.reset(); 263 } 264 } 265 }