001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.amq;
018    
019    import java.io.IOException;
020    
021    import org.apache.activemq.broker.ConnectionContext;
022    import org.apache.activemq.command.ActiveMQTopic;
023    import org.apache.activemq.command.JournalTopicAck;
024    import org.apache.activemq.command.MessageAck;
025    import org.apache.activemq.command.MessageId;
026    import org.apache.activemq.command.SubscriptionInfo;
027    import org.apache.activemq.kaha.impl.async.Location;
028    import org.apache.activemq.store.MessageRecoveryListener;
029    import org.apache.activemq.store.TopicMessageStore;
030    import org.apache.activemq.store.TopicReferenceStore;
031    import org.apache.activemq.transaction.Synchronization;
032    import org.apache.activemq.util.SubscriptionKey;
033    import org.apache.commons.logging.Log;
034    import org.apache.commons.logging.LogFactory;
035    
036    /**
037     * A MessageStore that uses a Journal to store it's messages.
038     * 
039     * @version $Revision: 1.13 $
040     */
041    public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessageStore {
042    
043        private static final Log LOG = LogFactory.getLog(AMQTopicMessageStore.class);
044        private TopicReferenceStore topicReferenceStore;
045        public AMQTopicMessageStore(AMQPersistenceAdapter adapter,TopicReferenceStore topicReferenceStore, ActiveMQTopic destinationName) {
046            super(adapter, topicReferenceStore, destinationName);
047            this.topicReferenceStore = topicReferenceStore;
048        }
049    
050        public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
051            flush();
052            topicReferenceStore.recoverSubscription(clientId, subscriptionName, new RecoveryListenerAdapter(this, listener));
053        }
054    
055        public void recoverNextMessages(String clientId, String subscriptionName,
056                int maxReturned, final MessageRecoveryListener listener)
057                throws Exception {
058            RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(this, listener);
059                topicReferenceStore.recoverNextMessages(clientId, subscriptionName,maxReturned, recoveryListener);
060                if (recoveryListener.size() == 0) {
061                    flush();
062                    topicReferenceStore.recoverNextMessages(clientId,subscriptionName, maxReturned, recoveryListener);
063                }
064        }
065    
066        public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
067            return topicReferenceStore.lookupSubscription(clientId, subscriptionName);
068        }
069    
070        public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
071            peristenceAdapter.writeCommand(subscriptionInfo, false);
072            topicReferenceStore.addSubsciption(subscriptionInfo, retroactive);
073        }
074    
075        /**
076         */
077        public void acknowledge(final ConnectionContext context, final String clientId, final String subscriptionName, final MessageId messageId) throws IOException {
078            final boolean debug = LOG.isDebugEnabled();
079            JournalTopicAck ack = new JournalTopicAck();
080            ack.setDestination(destination);
081            ack.setMessageId(messageId);
082            ack.setMessageSequenceId(messageId.getBrokerSequenceId());
083            ack.setSubscritionName(subscriptionName);
084            ack.setClientId(clientId);
085            ack.setTransactionId(context.getTransaction() != null ? context.getTransaction().getTransactionId() : null);
086            final Location location = peristenceAdapter.writeCommand(ack, false);
087            final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
088            if (!context.isInTransaction()) {
089                if (debug) {
090                    LOG.debug("Journalled acknowledge for: " + messageId + ", at: " + location);
091                }
092                acknowledge(context,messageId, location, clientId,subscriptionName);
093            } else {
094                if (debug) {
095                    LOG.debug("Journalled transacted acknowledge for: " + messageId + ", at: " + location);
096                }
097                lock.lock();
098                try {
099                    inFlightTxLocations.add(location);
100                }finally {
101                    lock.unlock();
102                }
103                transactionStore.acknowledge(this, ack, location);
104                context.getTransaction().addSynchronization(new Synchronization() {
105    
106                    public void afterCommit() throws Exception {
107                        if (debug) {
108                            LOG.debug("Transacted acknowledge commit for: " + messageId + ", at: " + location);
109                        }
110                        lock.lock();
111                        try {
112                            inFlightTxLocations.remove(location);
113                            acknowledge(context,messageId, location, clientId,subscriptionName);
114                        }finally {
115                            lock.unlock();
116                        }
117                    }
118    
119                    public void afterRollback() throws Exception {
120                        if (debug) {
121                            LOG.debug("Transacted acknowledge rollback for: " + messageId + ", at: " + location);
122                        }
123                        lock.lock();
124                        try{
125                            inFlightTxLocations.remove(location);
126                        }finally {
127                            lock.unlock();
128                        }
129                    }
130                });
131            }
132        }
133    
134        public boolean replayAcknowledge(ConnectionContext context, String clientId, String subscritionName, MessageId messageId) {
135            try {
136                SubscriptionInfo sub = topicReferenceStore.lookupSubscription(clientId, subscritionName);
137                if (sub != null) {
138                    topicReferenceStore.acknowledge(context, clientId, subscritionName, messageId);
139                    return true;
140                }
141            } catch (Throwable e) {
142                LOG.debug("Could not replay acknowledge for message '" + messageId + "'.  Message may have already been acknowledged. reason: " + e);
143            }
144            return false;
145        }
146    
147        /**
148         * @param messageId
149         * @param location
150         * @param key
151         * @throws IOException 
152         */
153        protected void acknowledge(final ConnectionContext context, MessageId messageId,
154                Location location, String clientId, String subscriptionName)
155                throws IOException {
156            MessageAck ack = null;
157            lock.lock();
158            try {
159                lastLocation = location;
160            }finally {
161                lock.unlock();
162            }
163            
164                if (topicReferenceStore.acknowledgeReference(context, clientId,
165                        subscriptionName, messageId)) {
166                    ack = new MessageAck();
167                    ack.setLastMessageId(messageId);
168                   
169                }
170            
171            if (ack != null) {
172                removeMessage(context, ack);
173            }
174        }
175    
176        /**
177         * @return Returns the longTermStore.
178         */
179        public TopicReferenceStore getTopicReferenceStore() {
180            return topicReferenceStore;
181        }
182    
183        public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
184            topicReferenceStore.deleteSubscription(clientId, subscriptionName);
185        }
186    
187        public SubscriptionInfo[] getAllSubscriptions() throws IOException {
188            return topicReferenceStore.getAllSubscriptions();
189        }
190    
191        public int getMessageCount(String clientId, String subscriberName) throws IOException {
192            flush();
193            return topicReferenceStore.getMessageCount(clientId, subscriberName);
194        }
195    
196        public void resetBatching(String clientId, String subscriptionName) {
197            topicReferenceStore.resetBatching(clientId, subscriptionName);
198        }
199    }