001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadaptor;
018    
019    import java.io.IOException;
020    import java.util.Iterator;
021    import java.util.Map;
022    import java.util.concurrent.ConcurrentHashMap;
023    import org.apache.activemq.broker.ConnectionContext;
024    import org.apache.activemq.command.ActiveMQDestination;
025    import org.apache.activemq.command.Message;
026    import org.apache.activemq.command.MessageId;
027    import org.apache.activemq.command.SubscriptionInfo;
028    import org.apache.activemq.kaha.ListContainer;
029    import org.apache.activemq.kaha.MapContainer;
030    import org.apache.activemq.kaha.Marshaller;
031    import org.apache.activemq.kaha.Store;
032    import org.apache.activemq.kaha.StoreEntry;
033    import org.apache.activemq.store.MessageRecoveryListener;
034    import org.apache.activemq.store.TopicMessageStore;
035    
036    /**
037     * @version $Revision: 1.5 $
038     */
039    public class KahaTopicMessageStore extends KahaMessageStore implements TopicMessageStore {
040    
041        protected ListContainer<TopicSubAck> ackContainer;
042        protected Map<Object, TopicSubContainer> subscriberMessages = new ConcurrentHashMap<Object, TopicSubContainer>();
043        private Map<String, SubscriptionInfo> subscriberContainer;
044        private Store store;
045    
046        public KahaTopicMessageStore(Store store, MapContainer<MessageId, Message> messageContainer,
047                                     ListContainer<TopicSubAck> ackContainer, MapContainer<String, SubscriptionInfo> subsContainer,
048                                     ActiveMQDestination destination) throws IOException {
049            super(messageContainer, destination);
050            this.store = store;
051            this.ackContainer = ackContainer;
052            subscriberContainer = subsContainer;
053            // load all the Ack containers
054            for (Iterator<String> i = subscriberContainer.keySet().iterator(); i.hasNext();) {
055                Object key = i.next();
056                addSubscriberMessageContainer(key);
057            }
058        }
059    
060        @Override
061        public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
062            int subscriberCount = subscriberMessages.size();
063            if (subscriberCount > 0) {
064                MessageId id = message.getMessageId();
065                StoreEntry messageEntry = messageContainer.place(id, message);
066                TopicSubAck tsa = new TopicSubAck();
067                tsa.setCount(subscriberCount);
068                tsa.setMessageEntry(messageEntry);
069                StoreEntry ackEntry = ackContainer.placeLast(tsa);
070                for (Iterator<TopicSubContainer> i = subscriberMessages.values().iterator(); i.hasNext();) {
071                    TopicSubContainer container = i.next();
072                    ConsumerMessageRef ref = new ConsumerMessageRef();
073                    ref.setAckEntry(ackEntry);
074                    ref.setMessageEntry(messageEntry);
075                    ref.setMessageId(id);
076                    container.add(ref);
077                }
078            }
079        }
080    
081        public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
082                                             MessageId messageId) throws IOException {
083            String subcriberId = getSubscriptionKey(clientId, subscriptionName);
084            TopicSubContainer container = subscriberMessages.get(subcriberId);
085            if (container != null) {
086                ConsumerMessageRef ref = container.remove(messageId);
087                if (container.isEmpty()) {
088                    container.reset();
089                }
090                if (ref != null) {
091                    TopicSubAck tsa = ackContainer.get(ref.getAckEntry());
092                    if (tsa != null) {
093                        if (tsa.decrementCount() <= 0) {
094                            StoreEntry entry = ref.getAckEntry();
095                            entry = ackContainer.refresh(entry);
096                            ackContainer.remove(entry);
097                            entry = tsa.getMessageEntry();
098                            entry = messageContainer.refresh(entry);
099                            messageContainer.remove(entry);
100                        } else {
101                            ackContainer.update(ref.getAckEntry(), tsa);
102                        }
103                    }
104                }
105            }
106        }
107    
108        public synchronized SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
109            return subscriberContainer.get(getSubscriptionKey(clientId, subscriptionName));
110        }
111    
112        public synchronized void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException {
113            String key = getSubscriptionKey(info.getClientId(), info.getSubscriptionName());
114            // if already exists - won't add it again as it causes data files
115            // to hang around
116            if (!subscriberContainer.containsKey(key)) {
117                subscriberContainer.put(key, info);
118            }
119            // add the subscriber
120            addSubscriberMessageContainer(key);
121            /*
122             * if(retroactive){ for(StoreEntry
123             * entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
124             * TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry);
125             * ConsumerMessageRef ref=new ConsumerMessageRef();
126             * ref.setAckEntry(entry); ref.setMessageEntry(tsa.getMessageEntry());
127             * container.add(ref); } }
128             */
129        }
130    
131        public synchronized void deleteSubscription(String clientId, String subscriptionName) throws IOException {
132            String key = getSubscriptionKey(clientId, subscriptionName);
133            removeSubscriberMessageContainer(key);
134        }
135    
136        public synchronized void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener)
137            throws Exception {
138            String key = getSubscriptionKey(clientId, subscriptionName);
139            TopicSubContainer container = subscriberMessages.get(key);
140            if (container != null) {
141                for (Iterator i = container.iterator(); i.hasNext();) {
142                    ConsumerMessageRef ref = (ConsumerMessageRef)i.next();
143                    Message msg = messageContainer.get(ref.getMessageEntry());
144                    if (msg != null) {
145                        if (!recoverMessage(listener, msg)) {
146                            break;
147                        }
148                    }
149                }
150            }
151        }
152    
153        public synchronized void recoverNextMessages(String clientId, String subscriptionName, int maxReturned,
154                                        MessageRecoveryListener listener) throws Exception {
155            String key = getSubscriptionKey(clientId, subscriptionName);
156            TopicSubContainer container = subscriberMessages.get(key);
157            if (container != null) {
158                int count = 0;
159                StoreEntry entry = container.getBatchEntry();
160                if (entry == null) {
161                    entry = container.getEntry();
162                } else {
163                    entry = container.refreshEntry(entry);
164                    if (entry != null) {
165                        entry = container.getNextEntry(entry);
166                    }
167                }
168                if (entry != null) {
169                    do {
170                        ConsumerMessageRef consumerRef = container.get(entry);
171                        Message msg = messageContainer.getValue(consumerRef.getMessageEntry());
172                        if (msg != null) {
173                            recoverMessage(listener, msg);
174                            count++;
175                            container.setBatchEntry(msg.getMessageId().toString(), entry);
176                        } else {
177                            container.reset();
178                        }
179    
180                        entry = container.getNextEntry(entry);
181                    } while (entry != null && count < maxReturned && listener.hasSpace());
182                }
183            }
184        }
185    
186        public synchronized void delete() {
187            super.delete();
188            ackContainer.clear();
189            subscriberContainer.clear();
190        }
191    
192        public SubscriptionInfo[] getAllSubscriptions() throws IOException {
193            return subscriberContainer.values()
194                .toArray(new SubscriptionInfo[subscriberContainer.size()]);
195        }
196    
197        protected String getSubscriptionKey(String clientId, String subscriberName) {
198            String result = clientId + ":";
199            result += subscriberName != null ? subscriberName : "NOT_SET";
200            return result;
201        }
202    
203        protected MapContainer addSubscriberMessageContainer(Object key) throws IOException {
204            MapContainer container = store.getMapContainer(key, "topic-subs");
205            container.setKeyMarshaller(Store.MESSAGEID_MARSHALLER);
206            Marshaller marshaller = new ConsumerMessageRefMarshaller();
207            container.setValueMarshaller(marshaller);
208            TopicSubContainer tsc = new TopicSubContainer(container);
209            subscriberMessages.put(key, tsc);
210            return container;
211        }
212    
213        protected synchronized void removeSubscriberMessageContainer(Object key)
214                throws IOException {
215            subscriberContainer.remove(key);
216            TopicSubContainer container = subscriberMessages.remove(key);
217            if (container != null) {
218                for (Iterator i = container.iterator(); i.hasNext();) {
219                    ConsumerMessageRef ref = (ConsumerMessageRef) i.next();
220                    if (ref != null) {
221                        TopicSubAck tsa = ackContainer.get(ref.getAckEntry());
222                        if (tsa != null) {
223                            if (tsa.decrementCount() <= 0) {
224                                ackContainer.remove(ref.getAckEntry());
225                                messageContainer.remove(tsa.getMessageEntry());
226                            } else {
227                                ackContainer.update(ref.getAckEntry(), tsa);
228                            }
229                        }
230                    }
231                }
232                container.clear();
233            }
234            store.deleteListContainer(key, "topic-subs");
235    
236        }
237    
238        public synchronized int getMessageCount(String clientId, String subscriberName) throws IOException {
239            String key = getSubscriptionKey(clientId, subscriberName);
240            TopicSubContainer container = subscriberMessages.get(key);
241            return container != null ? container.size() : 0;
242        }
243    
244        /**
245         * @param context
246         * @throws IOException
247         * @see org.apache.activemq.store.MessageStore#removeAllMessages(org.apache.activemq.broker.ConnectionContext)
248         */
249        public synchronized void removeAllMessages(ConnectionContext context) throws IOException {
250            messageContainer.clear();
251            ackContainer.clear();
252            for (Iterator<TopicSubContainer> i = subscriberMessages.values().iterator(); i.hasNext();) {
253                TopicSubContainer container = i.next();
254                container.clear();
255            }
256        }
257    
258        public synchronized void resetBatching(String clientId, String subscriptionName) {
259            String key = getSubscriptionKey(clientId, subscriptionName);
260            TopicSubContainer topicSubContainer = subscriberMessages.get(key);
261            if (topicSubContainer != null) {
262                topicSubContainer.reset();
263            }
264        }
265    }