001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.memory;
018    
019    import java.io.IOException;
020    import java.util.Collections;
021    import java.util.HashMap;
022    import java.util.Iterator;
023    import java.util.Map;
024    import java.util.Map.Entry;
025    import org.apache.activemq.broker.ConnectionContext;
026    import org.apache.activemq.command.ActiveMQDestination;
027    import org.apache.activemq.command.Message;
028    import org.apache.activemq.command.MessageId;
029    import org.apache.activemq.command.SubscriptionInfo;
030    import org.apache.activemq.store.MessageRecoveryListener;
031    import org.apache.activemq.store.TopicMessageStore;
032    import org.apache.activemq.util.LRUCache;
033    import org.apache.activemq.util.SubscriptionKey;
034    
035    /**
036     * @version $Revision: 1.5 $
037     */
038    public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore {
039    
040        private Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase;
041        private Map<SubscriptionKey, MemoryTopicSub> topicSubMap;
042    
043        public MemoryTopicMessageStore(ActiveMQDestination destination) {
044            this(destination, new LRUCache<MessageId, Message>(100, 100, 0.75f, false), makeSubscriptionInfoMap());
045        }
046    
047        public MemoryTopicMessageStore(ActiveMQDestination destination, Map<MessageId, Message> messageTable, Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase) {
048            super(destination, messageTable);
049            this.subscriberDatabase = subscriberDatabase;
050            this.topicSubMap = makeSubMap();
051        }
052    
053        protected static Map<SubscriptionKey, SubscriptionInfo> makeSubscriptionInfoMap() {
054            return Collections.synchronizedMap(new HashMap<SubscriptionKey, SubscriptionInfo>());
055        }
056        
057        protected static Map<SubscriptionKey, MemoryTopicSub> makeSubMap() {
058            return Collections.synchronizedMap(new HashMap<SubscriptionKey, MemoryTopicSub>());
059        }
060    
061        public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
062            super.addMessage(context, message);
063            for (Iterator<MemoryTopicSub> i = topicSubMap.values().iterator(); i.hasNext();) {
064                MemoryTopicSub sub = i.next();
065                sub.addMessage(message.getMessageId(), message);
066            }
067        }
068    
069        public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
070            SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
071            MemoryTopicSub sub = topicSubMap.get(key);
072            if (sub != null) {
073                sub.removeMessage(messageId);
074            }
075        }
076    
077        public synchronized SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
078            return subscriberDatabase.get(new SubscriptionKey(clientId, subscriptionName));
079        }
080    
081        public synchronized void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException {
082            SubscriptionKey key = new SubscriptionKey(info);
083            MemoryTopicSub sub = new MemoryTopicSub();
084            topicSubMap.put(key, sub);
085            if (retroactive) {
086                for (Iterator i = messageTable.entrySet().iterator(); i.hasNext();) {
087                    Map.Entry entry = (Entry)i.next();
088                    sub.addMessage((MessageId)entry.getKey(), (Message)entry.getValue());
089                }
090            }
091            subscriberDatabase.put(key, info);
092        }
093    
094        public synchronized void deleteSubscription(String clientId, String subscriptionName) {
095            org.apache.activemq.util.SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
096            subscriberDatabase.remove(key);
097            topicSubMap.remove(key);
098        }
099    
100        public synchronized void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
101            MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
102            if (sub != null) {
103                sub.recoverSubscription(listener);
104            }
105        }
106    
107        public synchronized void delete() {
108            super.delete();
109            subscriberDatabase.clear();
110            topicSubMap.clear();
111        }
112    
113        public SubscriptionInfo[] getAllSubscriptions() throws IOException {
114            return subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]);
115        }
116    
117        public synchronized int getMessageCount(String clientId, String subscriberName) throws IOException {
118            int result = 0;
119            MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriberName));
120            if (sub != null) {
121                result = sub.size();
122            }
123            return result;
124        }
125    
126        public synchronized void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception {
127            MemoryTopicSub sub = this.topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
128            if (sub != null) {
129                sub.recoverNextMessages(maxReturned, listener);
130            }
131        }
132    
133        public void resetBatching(String clientId, String subscriptionName) {
134            MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
135            if (sub != null) {
136                sub.resetBatching();
137            }
138        }
139    }