001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.amq; 018 019 import java.io.IOException; 020 021 import org.apache.activemq.broker.ConnectionContext; 022 import org.apache.activemq.command.ActiveMQTopic; 023 import org.apache.activemq.command.JournalTopicAck; 024 import org.apache.activemq.command.MessageAck; 025 import org.apache.activemq.command.MessageId; 026 import org.apache.activemq.command.SubscriptionInfo; 027 import org.apache.activemq.kaha.impl.async.Location; 028 import org.apache.activemq.store.MessageRecoveryListener; 029 import org.apache.activemq.store.TopicMessageStore; 030 import org.apache.activemq.store.TopicReferenceStore; 031 import org.apache.activemq.transaction.Synchronization; 032 import org.apache.activemq.util.SubscriptionKey; 033 import org.apache.commons.logging.Log; 034 import org.apache.commons.logging.LogFactory; 035 036 /** 037 * A MessageStore that uses a Journal to store it's messages. 038 * 039 * @version $Revision: 1.13 $ 040 */ 041 public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessageStore { 042 043 private static final Log LOG = LogFactory.getLog(AMQTopicMessageStore.class); 044 private TopicReferenceStore topicReferenceStore; 045 public AMQTopicMessageStore(AMQPersistenceAdapter adapter,TopicReferenceStore topicReferenceStore, ActiveMQTopic destinationName) { 046 super(adapter, topicReferenceStore, destinationName); 047 this.topicReferenceStore = topicReferenceStore; 048 } 049 050 public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception { 051 flush(); 052 topicReferenceStore.recoverSubscription(clientId, subscriptionName, new RecoveryListenerAdapter(this, listener)); 053 } 054 055 public void recoverNextMessages(String clientId, String subscriptionName, 056 int maxReturned, final MessageRecoveryListener listener) 057 throws Exception { 058 RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(this, listener); 059 topicReferenceStore.recoverNextMessages(clientId, subscriptionName,maxReturned, recoveryListener); 060 if (recoveryListener.size() == 0) { 061 flush(); 062 topicReferenceStore.recoverNextMessages(clientId,subscriptionName, maxReturned, recoveryListener); 063 } 064 } 065 066 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 067 return topicReferenceStore.lookupSubscription(clientId, subscriptionName); 068 } 069 070 public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { 071 peristenceAdapter.writeCommand(subscriptionInfo, false); 072 topicReferenceStore.addSubsciption(subscriptionInfo, retroactive); 073 } 074 075 /** 076 */ 077 public void acknowledge(final ConnectionContext context, final String clientId, final String subscriptionName, final MessageId messageId) throws IOException { 078 final boolean debug = LOG.isDebugEnabled(); 079 JournalTopicAck ack = new JournalTopicAck(); 080 ack.setDestination(destination); 081 ack.setMessageId(messageId); 082 ack.setMessageSequenceId(messageId.getBrokerSequenceId()); 083 ack.setSubscritionName(subscriptionName); 084 ack.setClientId(clientId); 085 ack.setTransactionId(context.getTransaction() != null ? context.getTransaction().getTransactionId() : null); 086 final Location location = peristenceAdapter.writeCommand(ack, false); 087 final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName); 088 if (!context.isInTransaction()) { 089 if (debug) { 090 LOG.debug("Journalled acknowledge for: " + messageId + ", at: " + location); 091 } 092 acknowledge(context,messageId, location, clientId,subscriptionName); 093 } else { 094 if (debug) { 095 LOG.debug("Journalled transacted acknowledge for: " + messageId + ", at: " + location); 096 } 097 lock.lock(); 098 try { 099 inFlightTxLocations.add(location); 100 }finally { 101 lock.unlock(); 102 } 103 transactionStore.acknowledge(this, ack, location); 104 context.getTransaction().addSynchronization(new Synchronization() { 105 106 public void afterCommit() throws Exception { 107 if (debug) { 108 LOG.debug("Transacted acknowledge commit for: " + messageId + ", at: " + location); 109 } 110 lock.lock(); 111 try { 112 inFlightTxLocations.remove(location); 113 acknowledge(context,messageId, location, clientId,subscriptionName); 114 }finally { 115 lock.unlock(); 116 } 117 } 118 119 public void afterRollback() throws Exception { 120 if (debug) { 121 LOG.debug("Transacted acknowledge rollback for: " + messageId + ", at: " + location); 122 } 123 lock.lock(); 124 try{ 125 inFlightTxLocations.remove(location); 126 }finally { 127 lock.unlock(); 128 } 129 } 130 }); 131 } 132 } 133 134 public boolean replayAcknowledge(ConnectionContext context, String clientId, String subscritionName, MessageId messageId) { 135 try { 136 SubscriptionInfo sub = topicReferenceStore.lookupSubscription(clientId, subscritionName); 137 if (sub != null) { 138 topicReferenceStore.acknowledge(context, clientId, subscritionName, messageId); 139 return true; 140 } 141 } catch (Throwable e) { 142 LOG.debug("Could not replay acknowledge for message '" + messageId + "'. Message may have already been acknowledged. reason: " + e); 143 } 144 return false; 145 } 146 147 /** 148 * @param messageId 149 * @param location 150 * @param key 151 * @throws IOException 152 */ 153 protected void acknowledge(final ConnectionContext context, MessageId messageId, 154 Location location, String clientId, String subscriptionName) 155 throws IOException { 156 MessageAck ack = null; 157 lock.lock(); 158 try { 159 lastLocation = location; 160 }finally { 161 lock.unlock(); 162 } 163 164 if (topicReferenceStore.acknowledgeReference(context, clientId, 165 subscriptionName, messageId)) { 166 ack = new MessageAck(); 167 ack.setLastMessageId(messageId); 168 169 } 170 171 if (ack != null) { 172 removeMessage(context, ack); 173 } 174 } 175 176 /** 177 * @return Returns the longTermStore. 178 */ 179 public TopicReferenceStore getTopicReferenceStore() { 180 return topicReferenceStore; 181 } 182 183 public void deleteSubscription(String clientId, String subscriptionName) throws IOException { 184 topicReferenceStore.deleteSubscription(clientId, subscriptionName); 185 } 186 187 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 188 return topicReferenceStore.getAllSubscriptions(); 189 } 190 191 public int getMessageCount(String clientId, String subscriberName) throws IOException { 192 flush(); 193 return topicReferenceStore.getMessageCount(clientId, subscriberName); 194 } 195 196 public void resetBatching(String clientId, String subscriptionName) { 197 topicReferenceStore.resetBatching(clientId, subscriptionName); 198 } 199 }