001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transaction;
018    
019    import java.io.IOException;
020    
021    import javax.transaction.xa.XAException;
022    import javax.transaction.xa.XAResource;
023    
024    import org.apache.activemq.broker.TransactionBroker;
025    import org.apache.activemq.command.TransactionId;
026    import org.apache.activemq.command.XATransactionId;
027    import org.apache.activemq.store.TransactionStore;
028    import org.apache.commons.logging.Log;
029    import org.apache.commons.logging.LogFactory;
030    
031    /**
032     * @version $Revision: 1.4 $
033     */
034    public class XATransaction extends Transaction {
035    
036        private static final Log LOG = LogFactory.getLog(XATransaction.class);
037    
038        private final TransactionStore transactionStore;
039        private final XATransactionId xid;
040        private final TransactionBroker broker;
041    
042        public XATransaction(TransactionStore transactionStore, XATransactionId xid, TransactionBroker broker) {
043            this.transactionStore = transactionStore;
044            this.xid = xid;
045            this.broker = broker;
046            if (LOG.isDebugEnabled()) {
047                LOG.debug("XA Transaction new/begin : " + xid);
048            }
049        }
050    
051        public void commit(boolean onePhase) throws XAException, IOException {
052            if (LOG.isDebugEnabled()) {
053                LOG.debug("XA Transaction commit: " + xid);
054            }
055    
056            switch (getState()) {
057            case START_STATE:
058                // 1 phase commit, no work done.
059                checkForPreparedState(onePhase);
060                setStateFinished();
061                break;
062            case IN_USE_STATE:
063                // 1 phase commit, work done.
064                checkForPreparedState(onePhase);
065                doPrePrepare();
066                setStateFinished();
067                transactionStore.commit(getTransactionId(), false);
068                doPostCommit();
069                break;
070            case PREPARED_STATE:
071                // 2 phase commit, work done.
072                // We would record commit here.
073                setStateFinished();
074                transactionStore.commit(getTransactionId(), true);
075                doPostCommit();
076                break;
077            default:
078                illegalStateTransition("commit");
079            }
080        }
081    
082        private void illegalStateTransition(String callName) throws XAException {
083            XAException xae = new XAException("Cannot call " + callName + " now.");
084            xae.errorCode = XAException.XAER_PROTO;
085            throw xae;
086        }
087    
088        private void checkForPreparedState(boolean onePhase) throws XAException {
089            if (!onePhase) {
090                XAException xae = new XAException("Cannot do 2 phase commit if the transaction has not been prepared.");
091                xae.errorCode = XAException.XAER_PROTO;
092                throw xae;
093            }
094        }
095    
096        private void doPrePrepare() throws XAException, IOException {
097            try {
098                prePrepare();
099            } catch (XAException e) {
100                throw e;
101            } catch (Throwable e) {
102                LOG.warn("PRE-PREPARE FAILED: ", e);
103                rollback();
104                XAException xae = new XAException("PRE-PREPARE FAILED: Transaction rolled back.");
105                xae.errorCode = XAException.XA_RBOTHER;
106                xae.initCause(e);
107                throw xae;
108            }
109        }
110    
111        private void doPostCommit() throws XAException {
112            try {
113                fireAfterCommit();
114            } catch (Throwable e) {
115                // I guess this could happen. Post commit task failed
116                // to execute properly.
117                LOG.warn("POST COMMIT FAILED: ", e);
118                XAException xae = new XAException("POST COMMIT FAILED");
119                xae.errorCode = XAException.XAER_RMERR;
120                xae.initCause(e);
121                throw xae;
122            }
123        }
124    
125        public void rollback() throws XAException, IOException {
126    
127            if (LOG.isDebugEnabled()) {
128                LOG.debug("XA Transaction rollback: " + xid);
129            }
130    
131            switch (getState()) {
132            case START_STATE:
133                // 1 phase rollback no work done.
134                setStateFinished();
135                break;
136            case IN_USE_STATE:
137                // 1 phase rollback work done.
138                setStateFinished();
139                transactionStore.rollback(getTransactionId());
140                doPostRollback();
141                break;
142            case PREPARED_STATE:
143                // 2 phase rollback work done.
144                setStateFinished();
145                transactionStore.rollback(getTransactionId());
146                doPostRollback();
147                break;
148            default:
149                throw new XAException("Invalid state");
150            }
151    
152        }
153    
154        private void doPostRollback() throws XAException {
155            try {
156                fireAfterRollback();
157            } catch (Throwable e) {
158                // I guess this could happen. Post commit task failed
159                // to execute properly.
160                LOG.warn("POST ROLLBACK FAILED: ", e);
161                XAException xae = new XAException("POST ROLLBACK FAILED");
162                xae.errorCode = XAException.XAER_RMERR;
163                xae.initCause(e);
164                throw xae;
165            }
166        }
167    
168        public int prepare() throws XAException, IOException {
169            if (LOG.isDebugEnabled()) {
170                LOG.debug("XA Transaction prepare: " + xid);
171            }
172    
173            switch (getState()) {
174            case START_STATE:
175                // No work done.. no commit/rollback needed.
176                setStateFinished();
177                return XAResource.XA_RDONLY;
178            case IN_USE_STATE:
179                // We would record prepare here.
180                doPrePrepare();
181                setState(Transaction.PREPARED_STATE);
182                transactionStore.prepare(getTransactionId());
183                return XAResource.XA_OK;
184            default:
185                illegalStateTransition("prepare");
186                return XAResource.XA_RDONLY;
187            }
188        }
189    
190        private void setStateFinished() {
191            setState(Transaction.FINISHED_STATE);
192            broker.removeTransaction(xid);
193        }
194    
195        public TransactionId getTransactionId() {
196            return xid;
197        }
198    }