001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq.store.amq;
019    
020    import java.io.IOException;
021    import java.util.Iterator;
022    import java.util.LinkedHashMap;
023    import java.util.Map;
024    
025    import javax.transaction.xa.XAException;
026    
027    import org.apache.activemq.command.JournalTopicAck;
028    import org.apache.activemq.command.JournalTransaction;
029    import org.apache.activemq.command.Message;
030    import org.apache.activemq.command.MessageAck;
031    import org.apache.activemq.command.TransactionId;
032    import org.apache.activemq.command.XATransactionId;
033    import org.apache.activemq.kaha.impl.async.Location;
034    import org.apache.activemq.store.TransactionRecoveryListener;
035    import org.apache.activemq.store.TransactionStore;
036    
037    /**
038     */
039    public class AMQTransactionStore implements TransactionStore {
040    
041        protected Map<TransactionId, AMQTx> inflightTransactions = new LinkedHashMap<TransactionId, AMQTx>();
042        Map<TransactionId, AMQTx> preparedTransactions = new LinkedHashMap<TransactionId, AMQTx>();
043    
044        private final AMQPersistenceAdapter peristenceAdapter;
045        private boolean doingRecover;
046    
047        public AMQTransactionStore(AMQPersistenceAdapter adapter) {
048            this.peristenceAdapter = adapter;
049        }
050    
051        /**
052         * @throws IOException
053         * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
054         */
055        public void prepare(TransactionId txid) throws IOException {
056            AMQTx tx = null;
057            synchronized (inflightTransactions) {
058                tx = inflightTransactions.remove(txid);
059            }
060            if (tx == null) {
061                return;
062            }
063            peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE, txid, false), true);
064            synchronized (preparedTransactions) {
065                preparedTransactions.put(txid, tx);
066            }
067        }
068    
069        /**
070         * @throws IOException
071         * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
072         */
073        public void replayPrepare(TransactionId txid) throws IOException {
074            AMQTx tx = null;
075            synchronized (inflightTransactions) {
076                tx = inflightTransactions.remove(txid);
077            }
078            if (tx == null) {
079                return;
080            }
081            synchronized (preparedTransactions) {
082                preparedTransactions.put(txid, tx);
083            }
084        }
085    
086        public AMQTx getTx(TransactionId txid, Location location) {
087            AMQTx tx = null;
088            synchronized (inflightTransactions) {
089                tx = inflightTransactions.get(txid);
090                if (tx == null) {
091                    tx = new AMQTx(location);
092                    inflightTransactions.put(txid, tx);
093                }
094            }
095            return tx;
096        }
097    
098        /**
099         * @throws XAException
100         * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
101         */
102        public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
103            AMQTx tx;
104            if (wasPrepared) {
105                synchronized (preparedTransactions) {
106                    tx = preparedTransactions.remove(txid);
107                }
108            } else {
109                synchronized (inflightTransactions) {
110                    tx = inflightTransactions.remove(txid);
111                }
112            }
113            if (tx == null) {
114                return;
115            }
116            if (txid.isXATransaction()) {
117                peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT, txid, wasPrepared), true,true);
118            } else {
119                peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid, wasPrepared), true,true);
120            }
121        }
122    
123        /**
124         * @throws XAException
125         * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
126         */
127        public AMQTx replayCommit(TransactionId txid, boolean wasPrepared) throws IOException {
128            if (wasPrepared) {
129                synchronized (preparedTransactions) {
130                    return preparedTransactions.remove(txid);
131                }
132            } else {
133                synchronized (inflightTransactions) {
134                    return inflightTransactions.remove(txid);
135                }
136            }
137        }
138    
139        /**
140         * @throws IOException
141         * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
142         */
143        public void rollback(TransactionId txid) throws IOException {
144            AMQTx tx = null;
145            synchronized (inflightTransactions) {
146                tx = inflightTransactions.remove(txid);
147            }
148            if (tx != null) {
149                synchronized (preparedTransactions) {
150                    tx = preparedTransactions.remove(txid);
151                }
152            }
153            if (tx != null) {
154                if (txid.isXATransaction()) {
155                    peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK, txid, false), true,true);
156                } else {
157                    peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK, txid, false), true,true);
158                }
159            }
160        }
161    
162        /**
163         * @throws IOException
164         * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
165         */
166        public void replayRollback(TransactionId txid) throws IOException {
167            boolean inflight = false;
168            synchronized (inflightTransactions) {
169                inflight = inflightTransactions.remove(txid) != null;
170            }
171            if (inflight) {
172                synchronized (preparedTransactions) {
173                    preparedTransactions.remove(txid);
174                }
175            }
176        }
177    
178        public void start() throws Exception {
179        }
180    
181        public void stop() throws Exception {
182        }
183    
184        public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
185            // All the in-flight transactions get rolled back..
186            synchronized (inflightTransactions) {
187                inflightTransactions.clear();
188            }
189            this.doingRecover = true;
190            try {
191                Map<TransactionId, AMQTx> txs = null;
192                synchronized (preparedTransactions) {
193                    txs = new LinkedHashMap<TransactionId, AMQTx>(preparedTransactions);
194                }
195                for (Iterator<TransactionId> iter = txs.keySet().iterator(); iter.hasNext();) {
196                    Object txid = iter.next();
197                    AMQTx tx = txs.get(txid);
198                    listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks());
199                }
200            } finally {
201                this.doingRecover = false;
202            }
203        }
204    
205        /**
206         * @param message
207         * @throws IOException
208         */
209        void addMessage(AMQMessageStore store, Message message, Location location) throws IOException {
210            AMQTx tx = getTx(message.getTransactionId(), location);
211            tx.add(store, message, location);
212        }
213    
214        /**
215         * @param ack
216         * @throws IOException
217         */
218        public void removeMessage(AMQMessageStore store, MessageAck ack, Location location) throws IOException {
219            AMQTx tx = getTx(ack.getTransactionId(), location);
220            tx.add(store, ack);
221        }
222    
223        public void acknowledge(AMQTopicMessageStore store, JournalTopicAck ack, Location location) {
224            AMQTx tx = getTx(ack.getTransactionId(), location);
225            tx.add(store, ack);
226        }
227    
228        public Location checkpoint() throws IOException {
229            // Nothing really to checkpoint.. since, we don't
230            // checkpoint tx operations in to long term store until they are
231            // committed.
232            // But we keep track of the first location of an operation
233            // that was associated with an active tx. The journal can not
234            // roll over active tx records.
235            Location minimumLocationInUse = null;
236            synchronized (inflightTransactions) {
237                for (Iterator<AMQTx> iter = inflightTransactions.values().iterator(); iter.hasNext();) {
238                    AMQTx tx = iter.next();
239                    Location location = tx.getLocation();
240                    if (minimumLocationInUse == null || location.compareTo(minimumLocationInUse) < 0) {
241                        minimumLocationInUse = location;
242                    }
243                }
244            }
245            synchronized (preparedTransactions) {
246                for (Iterator<AMQTx> iter = preparedTransactions.values().iterator(); iter.hasNext();) {
247                    AMQTx tx = iter.next();
248                    Location location = tx.getLocation();
249                    if (minimumLocationInUse == null || location.compareTo(minimumLocationInUse) < 0) {
250                        minimumLocationInUse = location;
251                    }
252                }
253                return minimumLocationInUse;
254            }
255        }
256    
257        public boolean isDoingRecover() {
258            return doingRecover;
259        }
260    
261        /**
262         * @return the preparedTransactions
263         */
264        public Map<TransactionId, AMQTx> getPreparedTransactions() {
265            return this.preparedTransactions;
266        }
267    
268        /**
269         * @param preparedTransactions the preparedTransactions to set
270         */
271        public void setPreparedTransactions(Map<TransactionId, AMQTx> preparedTransactions) {
272            if (preparedTransactions != null) {
273                this.preparedTransactions.clear();
274                this.preparedTransactions.putAll(preparedTransactions);
275            }
276        }
277    }