001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadaptor;
018    
019    import java.io.IOException;
020    import java.util.HashSet;
021    import java.util.Iterator;
022    import java.util.List;
023    import java.util.Map;
024    import java.util.Set;
025    import java.util.Map.Entry;
026    import java.util.concurrent.ConcurrentHashMap;
027    
028    import org.apache.activemq.broker.ConnectionContext;
029    import org.apache.activemq.command.ActiveMQDestination;
030    import org.apache.activemq.command.Message;
031    import org.apache.activemq.command.MessageId;
032    import org.apache.activemq.command.SubscriptionInfo;
033    import org.apache.activemq.kaha.ListContainer;
034    import org.apache.activemq.kaha.MapContainer;
035    import org.apache.activemq.kaha.Marshaller;
036    import org.apache.activemq.kaha.Store;
037    import org.apache.activemq.kaha.StoreEntry;
038    import org.apache.activemq.store.MessageRecoveryListener;
039    import org.apache.activemq.store.TopicReferenceStore;
040    import org.apache.commons.logging.Log;
041    import org.apache.commons.logging.LogFactory;
042    
043    public class KahaTopicReferenceStore extends KahaReferenceStore implements TopicReferenceStore {
044        private static final Log LOG = LogFactory.getLog(KahaTopicReferenceStore.class);
045        protected ListContainer<TopicSubAck> ackContainer;
046        protected Map<String, TopicSubContainer> subscriberMessages = new ConcurrentHashMap<String, TopicSubContainer>();
047        private MapContainer<String, SubscriptionInfo> subscriberContainer;
048        private Store store;
049        private static final String TOPIC_SUB_NAME = "tsn";
050    
051        public KahaTopicReferenceStore(Store store, KahaReferenceStoreAdapter adapter,
052                                       MapContainer<MessageId, ReferenceRecord> messageContainer, ListContainer<TopicSubAck> ackContainer,
053                                       MapContainer<String, SubscriptionInfo> subsContainer, ActiveMQDestination destination)
054            throws IOException {
055            super(adapter, messageContainer, destination);
056            this.store = store;
057            this.ackContainer = ackContainer;
058            subscriberContainer = subsContainer;
059            // load all the Ack containers
060            for (Iterator<SubscriptionInfo> i = subscriberContainer.values().iterator(); i.hasNext();) {
061                SubscriptionInfo info = i.next();
062                addSubscriberMessageContainer(info.getClientId(), info.getSubscriptionName());
063            }
064        }
065    
066        public void dispose(ConnectionContext context) {
067            super.dispose(context);
068            subscriberContainer.delete();
069        }
070    
071        protected MessageId getMessageId(Object object) {
072            return new MessageId(((ReferenceRecord)object).getMessageId());
073        }
074    
075        public void addMessage(ConnectionContext context, Message message) throws IOException {
076            throw new RuntimeException("Use addMessageReference instead");
077        }
078    
079        public Message getMessage(MessageId identity) throws IOException {
080            throw new RuntimeException("Use addMessageReference instead");
081        }
082    
083        public boolean addMessageReference(final ConnectionContext context, final MessageId messageId,
084                                        final ReferenceData data) {
085            boolean uniqueReferenceAdded = false;
086            lock.lock();
087            try {
088                final ReferenceRecord record = new ReferenceRecord(messageId.toString(), data);
089                final int subscriberCount = subscriberMessages.size();
090                if (subscriberCount > 0 && !isDuplicate(messageId)) {
091                    final StoreEntry messageEntry = messageContainer.place(messageId, record);
092                    addInterest(record);
093                    uniqueReferenceAdded = true;
094                    final TopicSubAck tsa = new TopicSubAck();
095                    tsa.setCount(subscriberCount);
096                    tsa.setMessageEntry(messageEntry);
097                    final StoreEntry ackEntry = ackContainer.placeLast(tsa);
098                    for (final Iterator<TopicSubContainer> i = subscriberMessages.values().iterator(); i.hasNext();) {
099                        final TopicSubContainer container = i.next();
100                        final ConsumerMessageRef ref = new ConsumerMessageRef();
101                        ref.setAckEntry(ackEntry);
102                        ref.setMessageEntry(messageEntry);
103                        ref.setMessageId(messageId);
104                        container.add(ref);
105                    }
106                    if (LOG.isTraceEnabled()) {
107                        LOG.trace(destination.getPhysicalName() + " add reference: " + messageId);
108                    }
109                }
110            } finally {
111                lock.unlock();
112            }
113            return uniqueReferenceAdded;
114        }
115    
116        public ReferenceData getMessageReference(final MessageId identity) throws IOException {
117            final ReferenceRecord result = messageContainer.get(identity);
118            if (result == null) {
119                return null;
120            }
121            return result.getData();
122        }
123    
124        public void addReferenceFileIdsInUse() {
125            for (StoreEntry entry = ackContainer.getFirst(); entry != null; entry = ackContainer.getNext(entry)) {
126                TopicSubAck subAck = ackContainer.get(entry);
127                if (subAck.getCount() > 0) {
128                    ReferenceRecord rr = messageContainer.getValue(subAck.getMessageEntry());
129                    addInterest(rr);
130                }
131            }
132        }
133    
134        
135        protected MapContainer addSubscriberMessageContainer(String clientId, String subscriptionName) throws IOException {
136            String containerName = getSubscriptionContainerName(getSubscriptionKey(clientId, subscriptionName));
137            MapContainer container = store.getMapContainer(containerName,containerName);
138            container.setKeyMarshaller(Store.MESSAGEID_MARSHALLER);
139            Marshaller marshaller = new ConsumerMessageRefMarshaller();
140            container.setValueMarshaller(marshaller);
141            TopicSubContainer tsc = new TopicSubContainer(container);
142            subscriberMessages.put(getSubscriptionKey(clientId, subscriptionName), tsc);
143            return container;
144        }
145    
146        public boolean acknowledgeReference(ConnectionContext context,
147                String clientId, String subscriptionName, MessageId messageId)
148                throws IOException {
149            boolean removeMessage = false;
150            lock.lock();
151                try {
152                String key = getSubscriptionKey(clientId, subscriptionName);
153        
154                TopicSubContainer container = subscriberMessages.get(key);
155                if (container != null) {
156                    ConsumerMessageRef ref = null;
157                    if((ref = container.remove(messageId)) != null) {
158                        StoreEntry entry = ref.getAckEntry();
159                        //ensure we get up to-date pointers
160                        entry = ackContainer.refresh(entry);
161                        TopicSubAck tsa = ackContainer.get(entry);
162                        if (tsa != null) {
163                            if (tsa.decrementCount() <= 0) {
164                                ackContainer.remove(entry);
165                                ReferenceRecord rr = messageContainer.get(messageId);
166                                if (rr != null) {
167                                    entry = tsa.getMessageEntry();
168                                    entry = messageContainer.refresh(entry);
169                                    messageContainer.remove(entry);
170                                    removeInterest(rr);
171                                    removeMessage = true;
172                                    dispatchAudit.isDuplicate(messageId);
173                                }
174                            }else {
175                                ackContainer.update(entry,tsa);
176                            }
177                        }
178                        if (LOG.isTraceEnabled()) {
179                            LOG.trace(destination.getPhysicalName() + " remove: " + messageId);
180                        }
181                    }else{
182                        if (ackContainer.isEmpty() || subscriberMessages.size() == 1 || isUnreferencedBySubscribers(key, subscriberMessages, messageId)) {
183                            // no message reference held        
184                            removeMessage = true;
185                            if (LOG.isDebugEnabled()) {
186                                LOG.debug(destination.getPhysicalName() + " remove with no outstanding reference (dup ack): " + messageId);
187                            }
188                        }
189                    }
190                }
191            }finally {
192                lock.unlock();
193            }
194            return removeMessage;
195        }
196        
197        // verify that no subscriber has a reference to this message. In the case where the subscribers
198        // references are persisted but more than the persisted consumers get the message, the ack from the non
199        // persisted consumer would remove the message in error
200        //
201        // see: https://issues.apache.org/activemq/browse/AMQ-2123
202        private boolean isUnreferencedBySubscribers(
203                String key, Map<String, TopicSubContainer> subscriberContainers, MessageId messageId) {
204            boolean isUnreferenced = true;
205            for (Entry<String, TopicSubContainer> entry : subscriberContainers.entrySet()) {
206                if (!key.equals(entry.getKey()) && !entry.getValue().isEmpty()) {
207                    TopicSubContainer container = entry.getValue();
208                    for (Iterator i = container.iterator(); i.hasNext();) {
209                        ConsumerMessageRef ref = (ConsumerMessageRef) i.next();
210                        if (messageId.equals(ref.getMessageId())) {
211                            isUnreferenced = false;
212                            break;
213                        }
214                    }
215                }
216            }
217            return isUnreferenced; 
218        }
219    
220        public void acknowledge(ConnectionContext context,
221                            String clientId, String subscriptionName, MessageId messageId) throws IOException {
222                acknowledgeReference(context, clientId, subscriptionName, messageId);
223            }
224    
225        public void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException {
226            String key = getSubscriptionKey(info.getClientId(), info.getSubscriptionName());
227            lock.lock();
228            try {
229                // if already exists - won't add it again as it causes data files
230                // to hang around
231                if (!subscriberContainer.containsKey(key)) {
232                    subscriberContainer.put(key, info);
233                    adapter.addSubscriberState(info);
234                }
235                // add the subscriber
236                addSubscriberMessageContainer(info.getClientId(), info.getSubscriptionName());
237                if (retroactive) {
238                    /*
239                     * for(StoreEntry
240                     * entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
241                     * TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry);
242                     * ConsumerMessageRef ref=new ConsumerMessageRef();
243                     * ref.setAckEntry(entry);
244                     * ref.setMessageEntry(tsa.getMessageEntry()); container.add(ref); }
245                     */
246                }
247            }finally {
248                lock.unlock();
249            }
250        }
251    
252        public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
253            lock.lock();
254            try {
255                SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
256                if (info != null) {
257                    adapter.removeSubscriberState(info);
258                }
259            removeSubscriberMessageContainer(clientId,subscriptionName);
260            }finally {
261                lock.unlock();
262            }
263        }
264    
265        public SubscriptionInfo[] getAllSubscriptions() throws IOException {
266            SubscriptionInfo[] result = subscriberContainer.values()
267                .toArray(new SubscriptionInfo[subscriberContainer.size()]);
268            return result;
269        }
270    
271        public int getMessageCount(String clientId, String subscriberName) throws IOException {
272            String key = getSubscriptionKey(clientId, subscriberName);
273            TopicSubContainer container = subscriberMessages.get(key);
274            return container != null ? container.size() : 0;
275        }
276    
277        public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
278            return subscriberContainer.get(getSubscriptionKey(clientId, subscriptionName));
279        }
280    
281        public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned,
282                                                     MessageRecoveryListener listener) throws Exception {
283            String key = getSubscriptionKey(clientId, subscriptionName);
284            lock.lock();
285            try {
286                TopicSubContainer container = subscriberMessages.get(key);
287                if (container != null) {
288                    int count = 0;
289                    StoreEntry entry = container.getBatchEntry();
290                    if (entry == null) {
291                        entry = container.getEntry();
292                    } else {
293                        entry = container.refreshEntry(entry);
294                        if (entry != null) {
295                            entry = container.getNextEntry(entry);
296                        }
297                    }
298                   
299                    if (entry != null) {
300                        do {
301                            ConsumerMessageRef consumerRef = container.get(entry);
302                            ReferenceRecord msg = messageContainer.getValue(consumerRef
303                                    .getMessageEntry());
304                            if (msg != null) {
305                                if (recoverReference(listener, msg)) {
306                                    count++;
307                                    container.setBatchEntry(msg.getMessageId(), entry);
308                                } else {
309                                    break;
310                                }
311                            } else {
312                                container.reset();
313                            }
314        
315                            entry = container.getNextEntry(entry);
316                        } while (entry != null && count < maxReturned && listener.hasSpace());
317                    }
318                }
319            }finally {
320                lock.unlock();
321            }
322        }
323    
324        public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener)
325            throws Exception {
326            String key = getSubscriptionKey(clientId, subscriptionName);
327            TopicSubContainer container = subscriberMessages.get(key);
328            if (container != null) {
329                for (Iterator i = container.iterator(); i.hasNext();) {
330                    ConsumerMessageRef ref = (ConsumerMessageRef)i.next();
331                    ReferenceRecord msg = messageContainer.get(ref.getMessageEntry());
332                    if (msg != null) {
333                        if (!recoverReference(listener, msg)) {
334                            break;
335                        }
336                    }
337                }
338            }
339        }
340    
341        public void resetBatching(String clientId, String subscriptionName) {
342            lock.lock();
343            try {
344                String key = getSubscriptionKey(clientId, subscriptionName);
345                TopicSubContainer topicSubContainer = subscriberMessages.get(key);
346                if (topicSubContainer != null) {
347                    topicSubContainer.reset();
348                }
349            }finally {
350                lock.unlock();
351            }
352        }
353        
354        public void removeAllMessages(ConnectionContext context) throws IOException {
355            lock.lock();
356            try {
357                Set<String> tmpSet = new HashSet<String>(subscriberContainer.keySet());
358                for (String key:tmpSet) {
359                    TopicSubContainer container = subscriberMessages.get(key);
360                    if (container != null) {
361                        container.clear();
362                    }
363                }
364                ackContainer.clear();
365            }finally {
366                lock.unlock();
367            }
368            super.removeAllMessages(context);
369        }
370    
371        protected void removeSubscriberMessageContainer(String clientId, String subscriptionName) throws IOException {
372            String subscriberKey = getSubscriptionKey(clientId, subscriptionName);
373            String containerName = getSubscriptionContainerName(subscriberKey);
374            subscriberContainer.remove(subscriberKey);
375            TopicSubContainer container = subscriberMessages.remove(subscriberKey);
376            if (container != null) {
377                for (Iterator i = container.iterator(); i.hasNext();) {
378                    ConsumerMessageRef ref = (ConsumerMessageRef)i.next();
379                    if (ref != null) {
380                        TopicSubAck tsa = ackContainer.get(ref.getAckEntry());
381                        if (tsa != null) {
382                            if (tsa.decrementCount() <= 0) {
383                                ackContainer.remove(ref.getAckEntry());
384                                messageContainer.remove(tsa.getMessageEntry());
385                            } else {
386                                ackContainer.update(ref.getAckEntry(), tsa);
387                            }
388                        }
389                    }
390                }
391            }
392            store.deleteMapContainer(containerName,containerName);
393        }
394    
395        protected String getSubscriptionKey(String clientId, String subscriberName) {
396            StringBuffer buffer = new StringBuffer();
397            buffer.append(clientId).append(":");  
398            String name = subscriberName != null ? subscriberName : "NOT_SET";
399            return buffer.append(name).toString();
400        }
401        
402        private String getSubscriptionContainerName(String subscriptionKey) {
403            StringBuffer result = new StringBuffer(TOPIC_SUB_NAME);
404            result.append(destination.getQualifiedName());
405            result.append(subscriptionKey);
406            return result.toString();
407        }
408    }