001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transaction; 018 019 import java.io.IOException; 020 021 import javax.transaction.xa.XAException; 022 import javax.transaction.xa.XAResource; 023 024 import org.apache.activemq.broker.TransactionBroker; 025 import org.apache.activemq.command.TransactionId; 026 import org.apache.activemq.command.XATransactionId; 027 import org.apache.activemq.store.TransactionStore; 028 import org.apache.commons.logging.Log; 029 import org.apache.commons.logging.LogFactory; 030 031 /** 032 * @version $Revision: 1.4 $ 033 */ 034 public class XATransaction extends Transaction { 035 036 private static final Log LOG = LogFactory.getLog(XATransaction.class); 037 038 private final TransactionStore transactionStore; 039 private final XATransactionId xid; 040 private final TransactionBroker broker; 041 042 public XATransaction(TransactionStore transactionStore, XATransactionId xid, TransactionBroker broker) { 043 this.transactionStore = transactionStore; 044 this.xid = xid; 045 this.broker = broker; 046 if (LOG.isDebugEnabled()) { 047 LOG.debug("XA Transaction new/begin : " + xid); 048 } 049 } 050 051 public void commit(boolean onePhase) throws XAException, IOException { 052 if (LOG.isDebugEnabled()) { 053 LOG.debug("XA Transaction commit: " + xid); 054 } 055 056 switch (getState()) { 057 case START_STATE: 058 // 1 phase commit, no work done. 059 checkForPreparedState(onePhase); 060 setStateFinished(); 061 break; 062 case IN_USE_STATE: 063 // 1 phase commit, work done. 064 checkForPreparedState(onePhase); 065 doPrePrepare(); 066 setStateFinished(); 067 transactionStore.commit(getTransactionId(), false); 068 doPostCommit(); 069 break; 070 case PREPARED_STATE: 071 // 2 phase commit, work done. 072 // We would record commit here. 073 setStateFinished(); 074 transactionStore.commit(getTransactionId(), true); 075 doPostCommit(); 076 break; 077 default: 078 illegalStateTransition("commit"); 079 } 080 } 081 082 private void illegalStateTransition(String callName) throws XAException { 083 XAException xae = new XAException("Cannot call " + callName + " now."); 084 xae.errorCode = XAException.XAER_PROTO; 085 throw xae; 086 } 087 088 private void checkForPreparedState(boolean onePhase) throws XAException { 089 if (!onePhase) { 090 XAException xae = new XAException("Cannot do 2 phase commit if the transaction has not been prepared."); 091 xae.errorCode = XAException.XAER_PROTO; 092 throw xae; 093 } 094 } 095 096 private void doPrePrepare() throws XAException, IOException { 097 try { 098 prePrepare(); 099 } catch (XAException e) { 100 throw e; 101 } catch (Throwable e) { 102 LOG.warn("PRE-PREPARE FAILED: ", e); 103 rollback(); 104 XAException xae = new XAException("PRE-PREPARE FAILED: Transaction rolled back."); 105 xae.errorCode = XAException.XA_RBOTHER; 106 xae.initCause(e); 107 throw xae; 108 } 109 } 110 111 private void doPostCommit() throws XAException { 112 try { 113 fireAfterCommit(); 114 } catch (Throwable e) { 115 // I guess this could happen. Post commit task failed 116 // to execute properly. 117 LOG.warn("POST COMMIT FAILED: ", e); 118 XAException xae = new XAException("POST COMMIT FAILED"); 119 xae.errorCode = XAException.XAER_RMERR; 120 xae.initCause(e); 121 throw xae; 122 } 123 } 124 125 public void rollback() throws XAException, IOException { 126 127 if (LOG.isDebugEnabled()) { 128 LOG.debug("XA Transaction rollback: " + xid); 129 } 130 131 switch (getState()) { 132 case START_STATE: 133 // 1 phase rollback no work done. 134 setStateFinished(); 135 break; 136 case IN_USE_STATE: 137 // 1 phase rollback work done. 138 setStateFinished(); 139 transactionStore.rollback(getTransactionId()); 140 doPostRollback(); 141 break; 142 case PREPARED_STATE: 143 // 2 phase rollback work done. 144 setStateFinished(); 145 transactionStore.rollback(getTransactionId()); 146 doPostRollback(); 147 break; 148 default: 149 throw new XAException("Invalid state"); 150 } 151 152 } 153 154 private void doPostRollback() throws XAException { 155 try { 156 fireAfterRollback(); 157 } catch (Throwable e) { 158 // I guess this could happen. Post commit task failed 159 // to execute properly. 160 LOG.warn("POST ROLLBACK FAILED: ", e); 161 XAException xae = new XAException("POST ROLLBACK FAILED"); 162 xae.errorCode = XAException.XAER_RMERR; 163 xae.initCause(e); 164 throw xae; 165 } 166 } 167 168 public int prepare() throws XAException, IOException { 169 if (LOG.isDebugEnabled()) { 170 LOG.debug("XA Transaction prepare: " + xid); 171 } 172 173 switch (getState()) { 174 case START_STATE: 175 // No work done.. no commit/rollback needed. 176 setStateFinished(); 177 return XAResource.XA_RDONLY; 178 case IN_USE_STATE: 179 // We would record prepare here. 180 doPrePrepare(); 181 setState(Transaction.PREPARED_STATE); 182 transactionStore.prepare(getTransactionId()); 183 return XAResource.XA_OK; 184 default: 185 illegalStateTransition("prepare"); 186 return XAResource.XA_RDONLY; 187 } 188 } 189 190 private void setStateFinished() { 191 setState(Transaction.FINISHED_STATE); 192 broker.removeTransaction(xid); 193 } 194 195 public TransactionId getTransactionId() { 196 return xid; 197 } 198 }