001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.journal;
018    
019    import java.io.IOException;
020    import java.util.HashMap;
021    import java.util.Iterator;
022    
023    import org.apache.activeio.journal.RecordLocation;
024    import org.apache.activemq.broker.ConnectionContext;
025    import org.apache.activemq.command.ActiveMQTopic;
026    import org.apache.activemq.command.JournalTopicAck;
027    import org.apache.activemq.command.Message;
028    import org.apache.activemq.command.MessageId;
029    import org.apache.activemq.command.SubscriptionInfo;
030    import org.apache.activemq.store.MessageRecoveryListener;
031    import org.apache.activemq.store.TopicMessageStore;
032    import org.apache.activemq.transaction.Synchronization;
033    import org.apache.activemq.util.Callback;
034    import org.apache.activemq.util.SubscriptionKey;
035    import org.apache.commons.logging.Log;
036    import org.apache.commons.logging.LogFactory;
037    
038    /**
039     * A MessageStore that uses a Journal to store it's messages.
040     * 
041     * @version $Revision: 1.13 $
042     */
043    public class JournalTopicMessageStore extends JournalMessageStore implements TopicMessageStore {
044    
045        private static final Log LOG = LogFactory.getLog(JournalTopicMessageStore.class);
046    
047        private TopicMessageStore longTermStore;
048        private HashMap<SubscriptionKey, MessageId> ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>();
049    
050        public JournalTopicMessageStore(JournalPersistenceAdapter adapter, TopicMessageStore checkpointStore,
051                                        ActiveMQTopic destinationName) {
052            super(adapter, checkpointStore, destinationName);
053            this.longTermStore = checkpointStore;
054        }
055    
056        public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener)
057            throws Exception {
058            this.peristenceAdapter.checkpoint(true, true);
059            longTermStore.recoverSubscription(clientId, subscriptionName, listener);
060        }
061    
062        public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned,
063                                        MessageRecoveryListener listener) throws Exception {
064            this.peristenceAdapter.checkpoint(true, true);
065            longTermStore.recoverNextMessages(clientId, subscriptionName, maxReturned, listener);
066    
067        }
068    
069        public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
070            return longTermStore.lookupSubscription(clientId, subscriptionName);
071        }
072    
073        public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
074            this.peristenceAdapter.checkpoint(true, true);
075            longTermStore.addSubsciption(subscriptionInfo, retroactive);
076        }
077    
078        public void addMessage(ConnectionContext context, Message message) throws IOException {
079            super.addMessage(context, message);
080        }
081    
082        /**
083         */
084        public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
085                                final MessageId messageId) throws IOException {
086            final boolean debug = LOG.isDebugEnabled();
087    
088            JournalTopicAck ack = new JournalTopicAck();
089            ack.setDestination(destination);
090            ack.setMessageId(messageId);
091            ack.setMessageSequenceId(messageId.getBrokerSequenceId());
092            ack.setSubscritionName(subscriptionName);
093            ack.setClientId(clientId);
094            ack.setTransactionId(context.getTransaction() != null
095                ? context.getTransaction().getTransactionId() : null);
096            final RecordLocation location = peristenceAdapter.writeCommand(ack, false);
097    
098            final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
099            if (!context.isInTransaction()) {
100                if (debug) {
101                    LOG.debug("Journalled acknowledge for: " + messageId + ", at: " + location);
102                }
103                acknowledge(messageId, location, key);
104            } else {
105                if (debug) {
106                    LOG.debug("Journalled transacted acknowledge for: " + messageId + ", at: " + location);
107                }
108                synchronized (this) {
109                    inFlightTxLocations.add(location);
110                }
111                transactionStore.acknowledge(this, ack, location);
112                context.getTransaction().addSynchronization(new Synchronization() {
113                    public void afterCommit() throws Exception {
114                        if (debug) {
115                            LOG.debug("Transacted acknowledge commit for: " + messageId + ", at: " + location);
116                        }
117                        synchronized (JournalTopicMessageStore.this) {
118                            inFlightTxLocations.remove(location);
119                            acknowledge(messageId, location, key);
120                        }
121                    }
122    
123                    public void afterRollback() throws Exception {
124                        if (debug) {
125                            LOG.debug("Transacted acknowledge rollback for: " + messageId + ", at: " + location);
126                        }
127                        synchronized (JournalTopicMessageStore.this) {
128                            inFlightTxLocations.remove(location);
129                        }
130                    }
131                });
132            }
133    
134        }
135    
136        public void replayAcknowledge(ConnectionContext context, String clientId, String subscritionName,
137                                      MessageId messageId) {
138            try {
139                SubscriptionInfo sub = longTermStore.lookupSubscription(clientId, subscritionName);
140                if (sub != null) {
141                    longTermStore.acknowledge(context, clientId, subscritionName, messageId);
142                }
143            } catch (Throwable e) {
144                LOG.debug("Could not replay acknowledge for message '" + messageId
145                          + "'.  Message may have already been acknowledged. reason: " + e);
146            }
147        }
148    
149        /**
150         * @param messageId
151         * @param location
152         * @param key
153         */
154        protected void acknowledge(MessageId messageId, RecordLocation location, SubscriptionKey key) {
155            synchronized (this) {
156                lastLocation = location;
157                ackedLastAckLocations.put(key, messageId);
158            }
159        }
160    
161        public RecordLocation checkpoint() throws IOException {
162    
163            final HashMap<SubscriptionKey, MessageId> cpAckedLastAckLocations;
164    
165            // swap out the hash maps..
166            synchronized (this) {
167                cpAckedLastAckLocations = this.ackedLastAckLocations;
168                this.ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>();
169            }
170    
171            return super.checkpoint(new Callback() {
172                public void execute() throws Exception {
173    
174                    // Checkpoint the acknowledged messages.
175                    Iterator<SubscriptionKey> iterator = cpAckedLastAckLocations.keySet().iterator();
176                    while (iterator.hasNext()) {
177                        SubscriptionKey subscriptionKey = iterator.next();
178                        MessageId identity = cpAckedLastAckLocations.get(subscriptionKey);
179                        longTermStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId,
180                                                  subscriptionKey.subscriptionName, identity);
181                    }
182    
183                }
184            });
185    
186        }
187    
188        /**
189         * @return Returns the longTermStore.
190         */
191        public TopicMessageStore getLongTermTopicMessageStore() {
192            return longTermStore;
193        }
194    
195        public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
196            longTermStore.deleteSubscription(clientId, subscriptionName);
197        }
198    
199        public SubscriptionInfo[] getAllSubscriptions() throws IOException {
200            return longTermStore.getAllSubscriptions();
201        }
202    
203        public int getMessageCount(String clientId, String subscriberName) throws IOException {
204            this.peristenceAdapter.checkpoint(true, true);
205            return longTermStore.getMessageCount(clientId, subscriberName);
206        }
207    
208        public void resetBatching(String clientId, String subscriptionName) {
209            longTermStore.resetBatching(clientId, subscriptionName);
210        }
211    
212    }