001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.journal; 018 019 import java.io.IOException; 020 import java.util.HashMap; 021 import java.util.Iterator; 022 023 import org.apache.activeio.journal.RecordLocation; 024 import org.apache.activemq.broker.ConnectionContext; 025 import org.apache.activemq.command.ActiveMQTopic; 026 import org.apache.activemq.command.JournalTopicAck; 027 import org.apache.activemq.command.Message; 028 import org.apache.activemq.command.MessageId; 029 import org.apache.activemq.command.SubscriptionInfo; 030 import org.apache.activemq.store.MessageRecoveryListener; 031 import org.apache.activemq.store.TopicMessageStore; 032 import org.apache.activemq.transaction.Synchronization; 033 import org.apache.activemq.util.Callback; 034 import org.apache.activemq.util.SubscriptionKey; 035 import org.apache.commons.logging.Log; 036 import org.apache.commons.logging.LogFactory; 037 038 /** 039 * A MessageStore that uses a Journal to store it's messages. 040 * 041 * @version $Revision: 1.13 $ 042 */ 043 public class JournalTopicMessageStore extends JournalMessageStore implements TopicMessageStore { 044 045 private static final Log LOG = LogFactory.getLog(JournalTopicMessageStore.class); 046 047 private TopicMessageStore longTermStore; 048 private HashMap<SubscriptionKey, MessageId> ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>(); 049 050 public JournalTopicMessageStore(JournalPersistenceAdapter adapter, TopicMessageStore checkpointStore, 051 ActiveMQTopic destinationName) { 052 super(adapter, checkpointStore, destinationName); 053 this.longTermStore = checkpointStore; 054 } 055 056 public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) 057 throws Exception { 058 this.peristenceAdapter.checkpoint(true, true); 059 longTermStore.recoverSubscription(clientId, subscriptionName, listener); 060 } 061 062 public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, 063 MessageRecoveryListener listener) throws Exception { 064 this.peristenceAdapter.checkpoint(true, true); 065 longTermStore.recoverNextMessages(clientId, subscriptionName, maxReturned, listener); 066 067 } 068 069 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 070 return longTermStore.lookupSubscription(clientId, subscriptionName); 071 } 072 073 public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { 074 this.peristenceAdapter.checkpoint(true, true); 075 longTermStore.addSubsciption(subscriptionInfo, retroactive); 076 } 077 078 public void addMessage(ConnectionContext context, Message message) throws IOException { 079 super.addMessage(context, message); 080 } 081 082 /** 083 */ 084 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 085 final MessageId messageId) throws IOException { 086 final boolean debug = LOG.isDebugEnabled(); 087 088 JournalTopicAck ack = new JournalTopicAck(); 089 ack.setDestination(destination); 090 ack.setMessageId(messageId); 091 ack.setMessageSequenceId(messageId.getBrokerSequenceId()); 092 ack.setSubscritionName(subscriptionName); 093 ack.setClientId(clientId); 094 ack.setTransactionId(context.getTransaction() != null 095 ? context.getTransaction().getTransactionId() : null); 096 final RecordLocation location = peristenceAdapter.writeCommand(ack, false); 097 098 final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName); 099 if (!context.isInTransaction()) { 100 if (debug) { 101 LOG.debug("Journalled acknowledge for: " + messageId + ", at: " + location); 102 } 103 acknowledge(messageId, location, key); 104 } else { 105 if (debug) { 106 LOG.debug("Journalled transacted acknowledge for: " + messageId + ", at: " + location); 107 } 108 synchronized (this) { 109 inFlightTxLocations.add(location); 110 } 111 transactionStore.acknowledge(this, ack, location); 112 context.getTransaction().addSynchronization(new Synchronization() { 113 public void afterCommit() throws Exception { 114 if (debug) { 115 LOG.debug("Transacted acknowledge commit for: " + messageId + ", at: " + location); 116 } 117 synchronized (JournalTopicMessageStore.this) { 118 inFlightTxLocations.remove(location); 119 acknowledge(messageId, location, key); 120 } 121 } 122 123 public void afterRollback() throws Exception { 124 if (debug) { 125 LOG.debug("Transacted acknowledge rollback for: " + messageId + ", at: " + location); 126 } 127 synchronized (JournalTopicMessageStore.this) { 128 inFlightTxLocations.remove(location); 129 } 130 } 131 }); 132 } 133 134 } 135 136 public void replayAcknowledge(ConnectionContext context, String clientId, String subscritionName, 137 MessageId messageId) { 138 try { 139 SubscriptionInfo sub = longTermStore.lookupSubscription(clientId, subscritionName); 140 if (sub != null) { 141 longTermStore.acknowledge(context, clientId, subscritionName, messageId); 142 } 143 } catch (Throwable e) { 144 LOG.debug("Could not replay acknowledge for message '" + messageId 145 + "'. Message may have already been acknowledged. reason: " + e); 146 } 147 } 148 149 /** 150 * @param messageId 151 * @param location 152 * @param key 153 */ 154 protected void acknowledge(MessageId messageId, RecordLocation location, SubscriptionKey key) { 155 synchronized (this) { 156 lastLocation = location; 157 ackedLastAckLocations.put(key, messageId); 158 } 159 } 160 161 public RecordLocation checkpoint() throws IOException { 162 163 final HashMap<SubscriptionKey, MessageId> cpAckedLastAckLocations; 164 165 // swap out the hash maps.. 166 synchronized (this) { 167 cpAckedLastAckLocations = this.ackedLastAckLocations; 168 this.ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>(); 169 } 170 171 return super.checkpoint(new Callback() { 172 public void execute() throws Exception { 173 174 // Checkpoint the acknowledged messages. 175 Iterator<SubscriptionKey> iterator = cpAckedLastAckLocations.keySet().iterator(); 176 while (iterator.hasNext()) { 177 SubscriptionKey subscriptionKey = iterator.next(); 178 MessageId identity = cpAckedLastAckLocations.get(subscriptionKey); 179 longTermStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId, 180 subscriptionKey.subscriptionName, identity); 181 } 182 183 } 184 }); 185 186 } 187 188 /** 189 * @return Returns the longTermStore. 190 */ 191 public TopicMessageStore getLongTermTopicMessageStore() { 192 return longTermStore; 193 } 194 195 public void deleteSubscription(String clientId, String subscriptionName) throws IOException { 196 longTermStore.deleteSubscription(clientId, subscriptionName); 197 } 198 199 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 200 return longTermStore.getAllSubscriptions(); 201 } 202 203 public int getMessageCount(String clientId, String subscriberName) throws IOException { 204 this.peristenceAdapter.checkpoint(true, true); 205 return longTermStore.getMessageCount(clientId, subscriberName); 206 } 207 208 public void resetBatching(String clientId, String subscriptionName) { 209 longTermStore.resetBatching(clientId, subscriptionName); 210 } 211 212 }