001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.apache.activemq.store.journal; 019 020 import java.io.IOException; 021 import java.util.ArrayList; 022 import java.util.Iterator; 023 import java.util.LinkedHashMap; 024 import java.util.Map; 025 026 import javax.transaction.xa.XAException; 027 028 import org.apache.activeio.journal.RecordLocation; 029 import org.apache.activemq.command.JournalTopicAck; 030 import org.apache.activemq.command.JournalTransaction; 031 import org.apache.activemq.command.Message; 032 import org.apache.activemq.command.MessageAck; 033 import org.apache.activemq.command.TransactionId; 034 import org.apache.activemq.command.XATransactionId; 035 import org.apache.activemq.store.TransactionRecoveryListener; 036 import org.apache.activemq.store.TransactionStore; 037 038 /** 039 */ 040 public class JournalTransactionStore implements TransactionStore { 041 042 private final JournalPersistenceAdapter peristenceAdapter; 043 private Map<Object, Tx> inflightTransactions = new LinkedHashMap<Object, Tx>(); 044 private Map<TransactionId, Tx> preparedTransactions = new LinkedHashMap<TransactionId, Tx>(); 045 private boolean doingRecover; 046 047 public static class TxOperation { 048 049 static final byte ADD_OPERATION_TYPE = 0; 050 static final byte REMOVE_OPERATION_TYPE = 1; 051 static final byte ACK_OPERATION_TYPE = 3; 052 053 public byte operationType; 054 public JournalMessageStore store; 055 public Object data; 056 057 public TxOperation(byte operationType, JournalMessageStore store, Object data) { 058 this.operationType = operationType; 059 this.store = store; 060 this.data = data; 061 } 062 063 } 064 065 /** 066 * Operations 067 * 068 * @version $Revision: 1.6 $ 069 */ 070 public static class Tx { 071 072 private final RecordLocation location; 073 private ArrayList<TxOperation> operations = new ArrayList<TxOperation>(); 074 075 public Tx(RecordLocation location) { 076 this.location = location; 077 } 078 079 public void add(JournalMessageStore store, Message msg) { 080 operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg)); 081 } 082 083 public void add(JournalMessageStore store, MessageAck ack) { 084 operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack)); 085 } 086 087 public void add(JournalTopicMessageStore store, JournalTopicAck ack) { 088 operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack)); 089 } 090 091 public Message[] getMessages() { 092 ArrayList<Object> list = new ArrayList<Object>(); 093 for (Iterator<TxOperation> iter = operations.iterator(); iter.hasNext();) { 094 TxOperation op = iter.next(); 095 if (op.operationType == TxOperation.ADD_OPERATION_TYPE) { 096 list.add(op.data); 097 } 098 } 099 Message rc[] = new Message[list.size()]; 100 list.toArray(rc); 101 return rc; 102 } 103 104 public MessageAck[] getAcks() { 105 ArrayList<Object> list = new ArrayList<Object>(); 106 for (Iterator<TxOperation> iter = operations.iterator(); iter.hasNext();) { 107 TxOperation op = iter.next(); 108 if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) { 109 list.add(op.data); 110 } 111 } 112 MessageAck rc[] = new MessageAck[list.size()]; 113 list.toArray(rc); 114 return rc; 115 } 116 117 public ArrayList<TxOperation> getOperations() { 118 return operations; 119 } 120 121 } 122 123 public JournalTransactionStore(JournalPersistenceAdapter adapter) { 124 this.peristenceAdapter = adapter; 125 } 126 127 /** 128 * @throws IOException 129 * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) 130 */ 131 public void prepare(TransactionId txid) throws IOException { 132 Tx tx = null; 133 synchronized (inflightTransactions) { 134 tx = inflightTransactions.remove(txid); 135 } 136 if (tx == null) { 137 return; 138 } 139 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE, txid, false), 140 true); 141 synchronized (preparedTransactions) { 142 preparedTransactions.put(txid, tx); 143 } 144 } 145 146 /** 147 * @throws IOException 148 * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) 149 */ 150 public void replayPrepare(TransactionId txid) throws IOException { 151 Tx tx = null; 152 synchronized (inflightTransactions) { 153 tx = inflightTransactions.remove(txid); 154 } 155 if (tx == null) { 156 return; 157 } 158 synchronized (preparedTransactions) { 159 preparedTransactions.put(txid, tx); 160 } 161 } 162 163 public Tx getTx(Object txid, RecordLocation location) { 164 Tx tx = null; 165 synchronized (inflightTransactions) { 166 tx = inflightTransactions.get(txid); 167 } 168 if (tx == null) { 169 tx = new Tx(location); 170 inflightTransactions.put(txid, tx); 171 } 172 return tx; 173 } 174 175 /** 176 * @throws XAException 177 * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction) 178 */ 179 public void commit(TransactionId txid, boolean wasPrepared) throws IOException { 180 Tx tx; 181 if (wasPrepared) { 182 synchronized (preparedTransactions) { 183 tx = preparedTransactions.remove(txid); 184 } 185 } else { 186 synchronized (inflightTransactions) { 187 tx = inflightTransactions.remove(txid); 188 } 189 } 190 if (tx == null) { 191 return; 192 } 193 if (txid.isXATransaction()) { 194 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT, txid, 195 wasPrepared), true); 196 } else { 197 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid, 198 wasPrepared), true); 199 } 200 } 201 202 /** 203 * @throws XAException 204 * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction) 205 */ 206 public Tx replayCommit(TransactionId txid, boolean wasPrepared) throws IOException { 207 if (wasPrepared) { 208 synchronized (preparedTransactions) { 209 return preparedTransactions.remove(txid); 210 } 211 } else { 212 synchronized (inflightTransactions) { 213 return inflightTransactions.remove(txid); 214 } 215 } 216 } 217 218 /** 219 * @throws IOException 220 * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) 221 */ 222 public void rollback(TransactionId txid) throws IOException { 223 Tx tx = null; 224 synchronized (inflightTransactions) { 225 tx = inflightTransactions.remove(txid); 226 } 227 if (tx != null) { 228 synchronized (preparedTransactions) { 229 tx = preparedTransactions.remove(txid); 230 } 231 } 232 if (tx != null) { 233 if (txid.isXATransaction()) { 234 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK, txid, 235 false), true); 236 } else { 237 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK, 238 txid, false), true); 239 } 240 } 241 } 242 243 /** 244 * @throws IOException 245 * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) 246 */ 247 public void replayRollback(TransactionId txid) throws IOException { 248 boolean inflight = false; 249 synchronized (inflightTransactions) { 250 inflight = inflightTransactions.remove(txid) != null; 251 } 252 if (inflight) { 253 synchronized (preparedTransactions) { 254 preparedTransactions.remove(txid); 255 } 256 } 257 } 258 259 public void start() throws Exception { 260 } 261 262 public void stop() throws Exception { 263 } 264 265 public synchronized void recover(TransactionRecoveryListener listener) throws IOException { 266 // All the in-flight transactions get rolled back.. 267 synchronized (inflightTransactions) { 268 inflightTransactions.clear(); 269 } 270 this.doingRecover = true; 271 try { 272 Map<TransactionId, Tx> txs = null; 273 synchronized (preparedTransactions) { 274 txs = new LinkedHashMap<TransactionId, Tx>(preparedTransactions); 275 } 276 for (Iterator<TransactionId> iter = txs.keySet().iterator(); iter.hasNext();) { 277 Object txid = iter.next(); 278 Tx tx = txs.get(txid); 279 listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks()); 280 } 281 } finally { 282 this.doingRecover = false; 283 } 284 } 285 286 /** 287 * @param message 288 * @throws IOException 289 */ 290 void addMessage(JournalMessageStore store, Message message, RecordLocation location) throws IOException { 291 Tx tx = getTx(message.getTransactionId(), location); 292 tx.add(store, message); 293 } 294 295 /** 296 * @param ack 297 * @throws IOException 298 */ 299 public void removeMessage(JournalMessageStore store, MessageAck ack, RecordLocation location) 300 throws IOException { 301 Tx tx = getTx(ack.getTransactionId(), location); 302 tx.add(store, ack); 303 } 304 305 public void acknowledge(JournalTopicMessageStore store, JournalTopicAck ack, RecordLocation location) { 306 Tx tx = getTx(ack.getTransactionId(), location); 307 tx.add(store, ack); 308 } 309 310 public RecordLocation checkpoint() throws IOException { 311 // Nothing really to checkpoint.. since, we don't 312 // checkpoint tx operations in to long term store until they are 313 // committed. 314 // But we keep track of the first location of an operation 315 // that was associated with an active tx. The journal can not 316 // roll over active tx records. 317 RecordLocation rc = null; 318 synchronized (inflightTransactions) { 319 for (Iterator<Tx> iter = inflightTransactions.values().iterator(); iter.hasNext();) { 320 Tx tx = iter.next(); 321 RecordLocation location = tx.location; 322 if (rc == null || rc.compareTo(location) < 0) { 323 rc = location; 324 } 325 } 326 } 327 synchronized (preparedTransactions) { 328 for (Iterator<Tx> iter = preparedTransactions.values().iterator(); iter.hasNext();) { 329 Tx tx = iter.next(); 330 RecordLocation location = tx.location; 331 if (rc == null || rc.compareTo(location) < 0) { 332 rc = location; 333 } 334 } 335 return rc; 336 } 337 } 338 339 public boolean isDoingRecover() { 340 return doingRecover; 341 } 342 343 }