001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.memory; 018 019 import java.io.IOException; 020 import java.util.Collections; 021 import java.util.HashMap; 022 import java.util.Iterator; 023 import java.util.Map; 024 import java.util.Map.Entry; 025 import org.apache.activemq.broker.ConnectionContext; 026 import org.apache.activemq.command.ActiveMQDestination; 027 import org.apache.activemq.command.Message; 028 import org.apache.activemq.command.MessageId; 029 import org.apache.activemq.command.SubscriptionInfo; 030 import org.apache.activemq.store.MessageRecoveryListener; 031 import org.apache.activemq.store.TopicMessageStore; 032 import org.apache.activemq.util.LRUCache; 033 import org.apache.activemq.util.SubscriptionKey; 034 035 /** 036 * @version $Revision: 1.5 $ 037 */ 038 public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore { 039 040 private Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase; 041 private Map<SubscriptionKey, MemoryTopicSub> topicSubMap; 042 043 public MemoryTopicMessageStore(ActiveMQDestination destination) { 044 this(destination, new LRUCache<MessageId, Message>(100, 100, 0.75f, false), makeSubscriptionInfoMap()); 045 } 046 047 public MemoryTopicMessageStore(ActiveMQDestination destination, Map<MessageId, Message> messageTable, Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase) { 048 super(destination, messageTable); 049 this.subscriberDatabase = subscriberDatabase; 050 this.topicSubMap = makeSubMap(); 051 } 052 053 protected static Map<SubscriptionKey, SubscriptionInfo> makeSubscriptionInfoMap() { 054 return Collections.synchronizedMap(new HashMap<SubscriptionKey, SubscriptionInfo>()); 055 } 056 057 protected static Map<SubscriptionKey, MemoryTopicSub> makeSubMap() { 058 return Collections.synchronizedMap(new HashMap<SubscriptionKey, MemoryTopicSub>()); 059 } 060 061 public synchronized void addMessage(ConnectionContext context, Message message) throws IOException { 062 super.addMessage(context, message); 063 for (Iterator<MemoryTopicSub> i = topicSubMap.values().iterator(); i.hasNext();) { 064 MemoryTopicSub sub = i.next(); 065 sub.addMessage(message.getMessageId(), message); 066 } 067 } 068 069 public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException { 070 SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName); 071 MemoryTopicSub sub = topicSubMap.get(key); 072 if (sub != null) { 073 sub.removeMessage(messageId); 074 } 075 } 076 077 public synchronized SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 078 return subscriberDatabase.get(new SubscriptionKey(clientId, subscriptionName)); 079 } 080 081 public synchronized void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException { 082 SubscriptionKey key = new SubscriptionKey(info); 083 MemoryTopicSub sub = new MemoryTopicSub(); 084 topicSubMap.put(key, sub); 085 if (retroactive) { 086 for (Iterator i = messageTable.entrySet().iterator(); i.hasNext();) { 087 Map.Entry entry = (Entry)i.next(); 088 sub.addMessage((MessageId)entry.getKey(), (Message)entry.getValue()); 089 } 090 } 091 subscriberDatabase.put(key, info); 092 } 093 094 public synchronized void deleteSubscription(String clientId, String subscriptionName) { 095 org.apache.activemq.util.SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName); 096 subscriberDatabase.remove(key); 097 topicSubMap.remove(key); 098 } 099 100 public synchronized void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception { 101 MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriptionName)); 102 if (sub != null) { 103 sub.recoverSubscription(listener); 104 } 105 } 106 107 public synchronized void delete() { 108 super.delete(); 109 subscriberDatabase.clear(); 110 topicSubMap.clear(); 111 } 112 113 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 114 return subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]); 115 } 116 117 public synchronized int getMessageCount(String clientId, String subscriberName) throws IOException { 118 int result = 0; 119 MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriberName)); 120 if (sub != null) { 121 result = sub.size(); 122 } 123 return result; 124 } 125 126 public synchronized void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception { 127 MemoryTopicSub sub = this.topicSubMap.get(new SubscriptionKey(clientId, subscriptionName)); 128 if (sub != null) { 129 sub.recoverNextMessages(maxReturned, listener); 130 } 131 } 132 133 public void resetBatching(String clientId, String subscriptionName) { 134 MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriptionName)); 135 if (sub != null) { 136 sub.resetBatching(); 137 } 138 } 139 }