001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker; 018 019 import java.util.ArrayList; 020 import java.util.Iterator; 021 import java.util.LinkedHashMap; 022 import java.util.List; 023 import java.util.Map; 024 import java.util.concurrent.ConcurrentHashMap; 025 026 import javax.jms.JMSException; 027 import javax.transaction.xa.XAException; 028 029 import org.apache.activemq.ActiveMQMessageAudit; 030 import org.apache.activemq.command.ConnectionInfo; 031 import org.apache.activemq.command.LocalTransactionId; 032 import org.apache.activemq.command.Message; 033 import org.apache.activemq.command.MessageAck; 034 import org.apache.activemq.command.ProducerInfo; 035 import org.apache.activemq.command.TransactionId; 036 import org.apache.activemq.command.XATransactionId; 037 import org.apache.activemq.state.ProducerState; 038 import org.apache.activemq.store.TransactionRecoveryListener; 039 import org.apache.activemq.store.TransactionStore; 040 import org.apache.activemq.transaction.LocalTransaction; 041 import org.apache.activemq.transaction.Synchronization; 042 import org.apache.activemq.transaction.Transaction; 043 import org.apache.activemq.transaction.XATransaction; 044 import org.apache.activemq.util.IOExceptionSupport; 045 import org.apache.activemq.util.WrappedException; 046 import org.apache.commons.logging.Log; 047 import org.apache.commons.logging.LogFactory; 048 049 /** 050 * This broker filter handles the transaction related operations in the Broker 051 * interface. 052 * 053 * @version $Revision: 1.10 $ 054 */ 055 public class TransactionBroker extends BrokerFilter { 056 057 private static final Log LOG = LogFactory.getLog(TransactionBroker.class); 058 059 // The prepared XA transactions. 060 private TransactionStore transactionStore; 061 private Map<TransactionId, Transaction> xaTransactions = new LinkedHashMap<TransactionId, Transaction>(); 062 private ActiveMQMessageAudit audit; 063 064 public TransactionBroker(Broker next, TransactionStore transactionStore) { 065 super(next); 066 this.transactionStore = transactionStore; 067 } 068 069 // //////////////////////////////////////////////////////////////////////////// 070 // 071 // Life cycle Methods 072 // 073 // //////////////////////////////////////////////////////////////////////////// 074 075 /** 076 * Recovers any prepared transactions. 077 */ 078 public void start() throws Exception { 079 transactionStore.start(); 080 try { 081 final ConnectionContext context = new ConnectionContext(); 082 context.setBroker(this); 083 context.setInRecoveryMode(true); 084 context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>()); 085 context.setProducerFlowControl(false); 086 final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); 087 producerExchange.setMutable(true); 088 producerExchange.setConnectionContext(context); 089 producerExchange.setProducerState(new ProducerState(new ProducerInfo())); 090 final ConsumerBrokerExchange consumerExchange = new ConsumerBrokerExchange(); 091 consumerExchange.setConnectionContext(context); 092 transactionStore.recover(new TransactionRecoveryListener() { 093 public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) { 094 try { 095 beginTransaction(context, xid); 096 for (int i = 0; i < addedMessages.length; i++) { 097 send(producerExchange, addedMessages[i]); 098 } 099 for (int i = 0; i < aks.length; i++) { 100 acknowledge(consumerExchange, aks[i]); 101 } 102 prepareTransaction(context, xid); 103 } catch (Throwable e) { 104 throw new WrappedException(e); 105 } 106 } 107 }); 108 } catch (WrappedException e) { 109 Throwable cause = e.getCause(); 110 throw IOExceptionSupport.create("Recovery Failed: " + cause.getMessage(), cause); 111 } 112 next.start(); 113 } 114 115 public void stop() throws Exception { 116 transactionStore.stop(); 117 next.stop(); 118 } 119 120 // //////////////////////////////////////////////////////////////////////////// 121 // 122 // BrokerFilter overrides 123 // 124 // //////////////////////////////////////////////////////////////////////////// 125 public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception { 126 List<TransactionId> txs = new ArrayList<TransactionId>(); 127 synchronized (xaTransactions) { 128 for (Iterator<Transaction> iter = xaTransactions.values().iterator(); iter.hasNext();) { 129 Transaction tx = iter.next(); 130 if (tx.isPrepared()) { 131 txs.add(tx.getTransactionId()); 132 } 133 } 134 } 135 XATransactionId rc[] = new XATransactionId[txs.size()]; 136 txs.toArray(rc); 137 return rc; 138 } 139 140 public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { 141 // the transaction may have already been started. 142 if (xid.isXATransaction()) { 143 Transaction transaction = null; 144 synchronized (xaTransactions) { 145 transaction = xaTransactions.get(xid); 146 if (transaction != null) { 147 return; 148 } 149 transaction = new XATransaction(transactionStore, (XATransactionId)xid, this); 150 xaTransactions.put(xid, transaction); 151 } 152 } else { 153 Map<TransactionId, Transaction> transactionMap = context.getTransactions(); 154 Transaction transaction = transactionMap.get(xid); 155 if (transaction != null) { 156 throw new JMSException("Transaction '" + xid + "' has already been started."); 157 } 158 transaction = new LocalTransaction(transactionStore, (LocalTransactionId)xid, context); 159 transactionMap.put(xid, transaction); 160 } 161 } 162 163 public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { 164 Transaction transaction = getTransaction(context, xid, false); 165 return transaction.prepare(); 166 } 167 168 public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { 169 Transaction transaction = getTransaction(context, xid, true); 170 transaction.commit(onePhase); 171 } 172 173 public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { 174 Transaction transaction = getTransaction(context, xid, true); 175 transaction.rollback(); 176 } 177 178 public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception { 179 Transaction transaction = getTransaction(context, xid, true); 180 transaction.rollback(); 181 } 182 183 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { 184 // This method may be invoked recursively. 185 // Track original tx so that it can be restored. 186 final ConnectionContext context = consumerExchange.getConnectionContext(); 187 Transaction originalTx = context.getTransaction(); 188 Transaction transaction = null; 189 if (ack.isInTransaction()) { 190 transaction = getTransaction(context, ack.getTransactionId(), false); 191 } 192 context.setTransaction(transaction); 193 try { 194 next.acknowledge(consumerExchange, ack); 195 } finally { 196 context.setTransaction(originalTx); 197 } 198 } 199 200 public void send(ProducerBrokerExchange producerExchange, final Message message) throws Exception { 201 // This method may be invoked recursively. 202 // Track original tx so that it can be restored. 203 final ConnectionContext context = producerExchange.getConnectionContext(); 204 Transaction originalTx = context.getTransaction(); 205 Transaction transaction = null; 206 Synchronization sync = null; 207 if (message.getTransactionId() != null) { 208 transaction = getTransaction(context, message.getTransactionId(), false); 209 if (transaction != null) { 210 sync = new Synchronization() { 211 212 public void afterRollback() { 213 if (audit != null) { 214 audit.rollback(message); 215 } 216 } 217 }; 218 transaction.addSynchronization(sync); 219 } 220 } 221 if (audit == null || !audit.isDuplicate(message)) { 222 context.setTransaction(transaction); 223 try { 224 next.send(producerExchange, message); 225 } finally { 226 context.setTransaction(originalTx); 227 } 228 } else { 229 if (sync != null && transaction != null) { 230 transaction.removeSynchronization(sync); 231 } 232 if (LOG.isDebugEnabled()) { 233 LOG.debug("IGNORING duplicate message " + message); 234 } 235 } 236 } 237 238 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { 239 for (Iterator<Transaction> iter = context.getTransactions().values().iterator(); iter.hasNext();) { 240 try { 241 Transaction transaction = iter.next(); 242 transaction.rollback(); 243 } catch (Exception e) { 244 LOG.warn("ERROR Rolling back disconnected client's transactions: ", e); 245 } 246 iter.remove(); 247 } 248 next.removeConnection(context, info, error); 249 } 250 251 // //////////////////////////////////////////////////////////////////////////// 252 // 253 // Implementation help methods. 254 // 255 // //////////////////////////////////////////////////////////////////////////// 256 public Transaction getTransaction(ConnectionContext context, TransactionId xid, boolean mightBePrepared) throws JMSException, XAException { 257 Map transactionMap = null; 258 synchronized (xaTransactions) { 259 transactionMap = xid.isXATransaction() ? xaTransactions : context.getTransactions(); 260 } 261 Transaction transaction = (Transaction)transactionMap.get(xid); 262 if (transaction != null) { 263 return transaction; 264 } 265 if (xid.isXATransaction()) { 266 XAException e = new XAException("Transaction '" + xid + "' has not been started."); 267 e.errorCode = XAException.XAER_NOTA; 268 throw e; 269 } else { 270 throw new JMSException("Transaction '" + xid + "' has not been started."); 271 } 272 } 273 274 public void removeTransaction(XATransactionId xid) { 275 synchronized (xaTransactions) { 276 xaTransactions.remove(xid); 277 } 278 } 279 280 public synchronized void brokerServiceStarted() { 281 super.brokerServiceStarted(); 282 if (getBrokerService().isSupportFailOver() && audit == null) { 283 audit = new ActiveMQMessageAudit(); 284 } 285 } 286 287 }