001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq; 018 019 import java.io.InterruptedIOException; 020 import java.util.ArrayList; 021 import java.util.Arrays; 022 import java.util.List; 023 import java.util.concurrent.ConcurrentHashMap; 024 025 import javax.jms.JMSException; 026 import javax.jms.TransactionInProgressException; 027 import javax.jms.TransactionRolledBackException; 028 import javax.transaction.xa.XAException; 029 import javax.transaction.xa.XAResource; 030 import javax.transaction.xa.Xid; 031 032 import org.apache.activemq.command.Command; 033 import org.apache.activemq.command.ConnectionId; 034 import org.apache.activemq.command.DataArrayResponse; 035 import org.apache.activemq.command.DataStructure; 036 import org.apache.activemq.command.IntegerResponse; 037 import org.apache.activemq.command.LocalTransactionId; 038 import org.apache.activemq.command.Response; 039 import org.apache.activemq.command.TransactionId; 040 import org.apache.activemq.command.TransactionInfo; 041 import org.apache.activemq.command.XATransactionId; 042 import org.apache.activemq.transaction.Synchronization; 043 import org.apache.activemq.util.JMSExceptionSupport; 044 import org.apache.activemq.util.LongSequenceGenerator; 045 import org.apache.commons.logging.Log; 046 import org.apache.commons.logging.LogFactory; 047 048 /** 049 * A TransactionContext provides the means to control a JMS transaction. It 050 * provides a local transaction interface and also an XAResource interface. <p/> 051 * An application server controls the transactional assignment of an XASession 052 * by obtaining its XAResource. It uses the XAResource to assign the session to 053 * a transaction, prepare and commit work on the transaction, and so on. <p/> An 054 * XAResource provides some fairly sophisticated facilities for interleaving 055 * work on multiple transactions, recovering a list of transactions in progress, 056 * and so on. A JTA aware JMS provider must fully implement this functionality. 057 * This could be done by using the services of a database that supports XA, or a 058 * JMS provider may choose to implement this functionality from scratch. <p/> 059 * 060 * @version $Revision: 1.10 $ 061 * @see javax.jms.Session 062 * @see javax.jms.QueueSession 063 * @see javax.jms.TopicSession 064 * @see javax.jms.XASession 065 */ 066 public class TransactionContext implements XAResource { 067 068 private static final Log LOG = LogFactory.getLog(TransactionContext.class); 069 070 // XATransactionId -> ArrayList of TransactionContext objects 071 private final static ConcurrentHashMap<TransactionId, List<TransactionContext>> ENDED_XA_TRANSACTION_CONTEXTS = new ConcurrentHashMap<TransactionId, List<TransactionContext>>(); 072 073 private final ActiveMQConnection connection; 074 private final LongSequenceGenerator localTransactionIdGenerator; 075 private final ConnectionId connectionId; 076 private List<Synchronization> synchronizations; 077 078 // To track XA transactions. 079 private Xid associatedXid; 080 private TransactionId transactionId; 081 private LocalTransactionEventListener localTransactionEventListener; 082 private int beforeEndIndex; 083 084 public TransactionContext(ActiveMQConnection connection) { 085 this.connection = connection; 086 this.localTransactionIdGenerator = connection.getLocalTransactionIdGenerator(); 087 this.connectionId = connection.getConnectionInfo().getConnectionId(); 088 } 089 090 public boolean isInXATransaction() { 091 return transactionId != null && transactionId.isXATransaction(); 092 } 093 094 public boolean isInLocalTransaction() { 095 return transactionId != null && transactionId.isLocalTransaction(); 096 } 097 098 public boolean isInTransaction() { 099 return transactionId != null; 100 } 101 102 /** 103 * @return Returns the localTransactionEventListener. 104 */ 105 public LocalTransactionEventListener getLocalTransactionEventListener() { 106 return localTransactionEventListener; 107 } 108 109 /** 110 * Used by the resource adapter to listen to transaction events. 111 * 112 * @param localTransactionEventListener The localTransactionEventListener to 113 * set. 114 */ 115 public void setLocalTransactionEventListener(LocalTransactionEventListener localTransactionEventListener) { 116 this.localTransactionEventListener = localTransactionEventListener; 117 } 118 119 // /////////////////////////////////////////////////////////// 120 // 121 // Methods that work with the Synchronization objects registered with 122 // the transaction. 123 // 124 // /////////////////////////////////////////////////////////// 125 126 public void addSynchronization(Synchronization s) { 127 if (synchronizations == null) { 128 synchronizations = new ArrayList<Synchronization>(10); 129 } 130 synchronizations.add(s); 131 } 132 133 private void afterRollback() throws JMSException { 134 if (synchronizations == null) { 135 return; 136 } 137 138 int size = synchronizations.size(); 139 try { 140 for (int i = 0; i < size; i++) { 141 synchronizations.get(i).afterRollback(); 142 } 143 } catch (JMSException e) { 144 throw e; 145 } catch (Throwable e) { 146 throw JMSExceptionSupport.create(e); 147 } finally { 148 synchronizations = null; 149 } 150 } 151 152 private void afterCommit() throws JMSException { 153 if (synchronizations == null) { 154 return; 155 } 156 157 int size = synchronizations.size(); 158 try { 159 for (int i = 0; i < size; i++) { 160 synchronizations.get(i).afterCommit(); 161 } 162 } catch (JMSException e) { 163 throw e; 164 } catch (Throwable e) { 165 throw JMSExceptionSupport.create(e); 166 } finally { 167 synchronizations = null; 168 } 169 } 170 171 private void beforeEnd() throws JMSException { 172 if (synchronizations == null) { 173 return; 174 } 175 176 int size = synchronizations.size(); 177 try { 178 for (;beforeEndIndex < size;) { 179 synchronizations.get(beforeEndIndex++).beforeEnd(); 180 } 181 } catch (JMSException e) { 182 throw e; 183 } catch (Throwable e) { 184 throw JMSExceptionSupport.create(e); 185 } 186 } 187 188 public TransactionId getTransactionId() { 189 return transactionId; 190 } 191 192 // /////////////////////////////////////////////////////////// 193 // 194 // Local transaction interface. 195 // 196 // /////////////////////////////////////////////////////////// 197 198 /** 199 * Start a local transaction. 200 * @throws javax.jms.JMSException on internal error 201 */ 202 public void begin() throws JMSException { 203 204 if (isInXATransaction()) { 205 throw new TransactionInProgressException("Cannot start local transaction. XA transaction is already in progress."); 206 } 207 208 if (transactionId == null) { 209 synchronizations = null; 210 beforeEndIndex = 0; 211 this.transactionId = new LocalTransactionId(connectionId, localTransactionIdGenerator.getNextSequenceId()); 212 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN); 213 this.connection.ensureConnectionInfoSent(); 214 this.connection.asyncSendPacket(info); 215 216 // Notify the listener that the tx was started. 217 if (localTransactionEventListener != null) { 218 localTransactionEventListener.beginEvent(); 219 } 220 if (LOG.isDebugEnabled()) { 221 LOG.debug("Begin:" + transactionId); 222 } 223 } 224 225 } 226 227 /** 228 * Rolls back any work done in this transaction and releases any locks 229 * currently held. 230 * 231 * @throws JMSException if the JMS provider fails to roll back the 232 * transaction due to some internal error. 233 * @throws javax.jms.IllegalStateException if the method is not called by a 234 * transacted session. 235 */ 236 public void rollback() throws JMSException { 237 if (isInXATransaction()) { 238 throw new TransactionInProgressException("Cannot rollback() if an XA transaction is already in progress "); 239 } 240 241 try { 242 beforeEnd(); 243 } catch (TransactionRolledBackException canOcurrOnFailover) { 244 LOG.warn("rollback processing error", canOcurrOnFailover); 245 } 246 if (transactionId != null) { 247 if (LOG.isDebugEnabled()) { 248 LOG.debug("Rollback: " + transactionId 249 + " syncCount: " 250 + (synchronizations != null ? synchronizations.size() : 0)); 251 } 252 253 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK); 254 this.transactionId = null; 255 //make this synchronous - see https://issues.apache.org/activemq/browse/AMQ-2364 256 this.connection.syncSendPacket(info); 257 // Notify the listener that the tx was rolled back 258 if (localTransactionEventListener != null) { 259 localTransactionEventListener.rollbackEvent(); 260 } 261 } 262 263 afterRollback(); 264 } 265 266 /** 267 * Commits all work done in this transaction and releases any locks 268 * currently held. 269 * 270 * @throws JMSException if the JMS provider fails to commit the transaction 271 * due to some internal error. 272 * @throws javax.jms.IllegalStateException if the method is not called by a 273 * transacted session. 274 */ 275 public void commit() throws JMSException { 276 if (isInXATransaction()) { 277 throw new TransactionInProgressException("Cannot commit() if an XA transaction is already in progress "); 278 } 279 280 try { 281 beforeEnd(); 282 } catch (JMSException e) { 283 rollback(); 284 throw e; 285 } 286 287 // Only send commit if the transaction was started. 288 if (transactionId != null) { 289 if (LOG.isDebugEnabled()) { 290 LOG.debug("Commit: " + transactionId 291 + " syncCount: " 292 + (synchronizations != null ? synchronizations.size() : 0)); 293 } 294 295 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.COMMIT_ONE_PHASE); 296 this.transactionId = null; 297 // Notify the listener that the tx was committed back 298 try { 299 syncSendPacketWithInterruptionHandling(info); 300 if (localTransactionEventListener != null) { 301 localTransactionEventListener.commitEvent(); 302 } 303 afterCommit(); 304 } catch (JMSException cause) { 305 LOG.info("commit failed for transaction " + info.getTransactionId(), cause); 306 if (localTransactionEventListener != null) { 307 localTransactionEventListener.rollbackEvent(); 308 } 309 afterRollback(); 310 throw cause; 311 } 312 313 } 314 } 315 316 // /////////////////////////////////////////////////////////// 317 // 318 // XAResource Implementation 319 // 320 // /////////////////////////////////////////////////////////// 321 /** 322 * Associates a transaction with the resource. 323 */ 324 public void start(Xid xid, int flags) throws XAException { 325 326 if (LOG.isDebugEnabled()) { 327 LOG.debug("Start: " + xid); 328 } 329 if (isInLocalTransaction()) { 330 throw new XAException(XAException.XAER_PROTO); 331 } 332 // Are we already associated? 333 if (associatedXid != null) { 334 throw new XAException(XAException.XAER_PROTO); 335 } 336 337 // if ((flags & TMJOIN) == TMJOIN) { 338 // // TODO: verify that the server has seen the xid 339 // } 340 // if ((flags & TMJOIN) == TMRESUME) { 341 // // TODO: verify that the xid was suspended. 342 // } 343 344 // associate 345 synchronizations = null; 346 beforeEndIndex = 0; 347 setXid(xid); 348 } 349 350 /** 351 * @return connectionId for connection 352 */ 353 private ConnectionId getConnectionId() { 354 return connection.getConnectionInfo().getConnectionId(); 355 } 356 357 public void end(Xid xid, int flags) throws XAException { 358 359 if (LOG.isDebugEnabled()) { 360 LOG.debug("End: " + xid); 361 } 362 363 if (isInLocalTransaction()) { 364 throw new XAException(XAException.XAER_PROTO); 365 } 366 367 if ((flags & (TMSUSPEND | TMFAIL)) != 0) { 368 // You can only suspend the associated xid. 369 if (!equals(associatedXid, xid)) { 370 throw new XAException(XAException.XAER_PROTO); 371 } 372 373 // TODO: we may want to put the xid in a suspended list. 374 try { 375 beforeEnd(); 376 } catch (JMSException e) { 377 throw toXAException(e); 378 } 379 setXid(null); 380 } else if ((flags & TMSUCCESS) == TMSUCCESS) { 381 // set to null if this is the current xid. 382 // otherwise this could be an asynchronous success call 383 if (equals(associatedXid, xid)) { 384 try { 385 beforeEnd(); 386 } catch (JMSException e) { 387 throw toXAException(e); 388 } 389 setXid(null); 390 } 391 } else { 392 throw new XAException(XAException.XAER_INVAL); 393 } 394 } 395 396 private boolean equals(Xid xid1, Xid xid2) { 397 if (xid1 == xid2) { 398 return true; 399 } 400 if (xid1 == null ^ xid2 == null) { 401 return false; 402 } 403 return xid1.getFormatId() == xid2.getFormatId() && Arrays.equals(xid1.getBranchQualifier(), xid2.getBranchQualifier()) 404 && Arrays.equals(xid1.getGlobalTransactionId(), xid2.getGlobalTransactionId()); 405 } 406 407 public int prepare(Xid xid) throws XAException { 408 if (LOG.isDebugEnabled()) { 409 LOG.debug("Prepare: " + xid); 410 } 411 412 // We allow interleaving multiple transactions, so 413 // we don't limit prepare to the associated xid. 414 XATransactionId x; 415 // THIS SHOULD NEVER HAPPEN because end(xid, TMSUCCESS) should have been 416 // called first 417 if (xid == null || (equals(associatedXid, xid))) { 418 throw new XAException(XAException.XAER_PROTO); 419 } else { 420 // TODO: cache the known xids so we don't keep recreating this one?? 421 x = new XATransactionId(xid); 422 } 423 424 try { 425 TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.PREPARE); 426 427 // Find out if the server wants to commit or rollback. 428 IntegerResponse response = (IntegerResponse)syncSendPacketWithInterruptionHandling(info); 429 return response.getResult(); 430 431 } catch (JMSException e) { 432 throw toXAException(e); 433 } 434 } 435 436 public void rollback(Xid xid) throws XAException { 437 438 if (LOG.isDebugEnabled()) { 439 LOG.debug("Rollback: " + xid); 440 } 441 442 // We allow interleaving multiple transactions, so 443 // we don't limit rollback to the associated xid. 444 XATransactionId x; 445 if (xid == null) { 446 throw new XAException(XAException.XAER_PROTO); 447 } 448 if (equals(associatedXid, xid)) { 449 // I think this can happen even without an end(xid) call. Need to 450 // check spec. 451 x = (XATransactionId)transactionId; 452 } else { 453 x = new XATransactionId(xid); 454 } 455 456 try { 457 this.connection.checkClosedOrFailed(); 458 this.connection.ensureConnectionInfoSent(); 459 460 // Let the server know that the tx is rollback. 461 TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK); 462 syncSendPacketWithInterruptionHandling(info); 463 464 List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 465 if (l != null && !l.isEmpty()) { 466 for (TransactionContext ctx : l) { 467 ctx.afterRollback(); 468 } 469 } 470 471 } catch (JMSException e) { 472 throw toXAException(e); 473 } 474 } 475 476 // XAResource interface 477 public void commit(Xid xid, boolean onePhase) throws XAException { 478 479 if (LOG.isDebugEnabled()) { 480 LOG.debug("Commit: " + xid); 481 } 482 483 // We allow interleaving multiple transactions, so 484 // we don't limit commit to the associated xid. 485 XATransactionId x; 486 if (xid == null || (equals(associatedXid, xid))) { 487 // should never happen, end(xid,TMSUCCESS) must have been previously 488 // called 489 throw new XAException(XAException.XAER_PROTO); 490 } else { 491 x = new XATransactionId(xid); 492 } 493 494 try { 495 this.connection.checkClosedOrFailed(); 496 this.connection.ensureConnectionInfoSent(); 497 498 // Notify the server that the tx was committed back 499 TransactionInfo info = new TransactionInfo(getConnectionId(), x, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE); 500 501 syncSendPacketWithInterruptionHandling(info); 502 503 List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 504 if (l != null && !l.isEmpty()) { 505 for (TransactionContext ctx : l) { 506 ctx.afterCommit(); 507 } 508 } 509 510 } catch (JMSException e) { 511 throw toXAException(e); 512 } 513 514 } 515 516 public void forget(Xid xid) throws XAException { 517 if (LOG.isDebugEnabled()) { 518 LOG.debug("Forget: " + xid); 519 } 520 521 // We allow interleaving multiple transactions, so 522 // we don't limit forget to the associated xid. 523 XATransactionId x; 524 if (xid == null) { 525 throw new XAException(XAException.XAER_PROTO); 526 } 527 if (equals(associatedXid, xid)) { 528 // TODO determine if this can happen... I think not. 529 x = (XATransactionId)transactionId; 530 } else { 531 x = new XATransactionId(xid); 532 } 533 534 TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.FORGET); 535 536 try { 537 // Tell the server to forget the transaction. 538 syncSendPacketWithInterruptionHandling(info); 539 } catch (JMSException e) { 540 throw toXAException(e); 541 } 542 } 543 544 public boolean isSameRM(XAResource xaResource) throws XAException { 545 if (xaResource == null) { 546 return false; 547 } 548 if (!(xaResource instanceof TransactionContext)) { 549 return false; 550 } 551 TransactionContext xar = (TransactionContext)xaResource; 552 try { 553 return getResourceManagerId().equals(xar.getResourceManagerId()); 554 } catch (Throwable e) { 555 throw (XAException)new XAException("Could not get resource manager id.").initCause(e); 556 } 557 } 558 559 public Xid[] recover(int flag) throws XAException { 560 if (LOG.isDebugEnabled()) { 561 LOG.debug("Recover: " + flag); 562 } 563 564 TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER); 565 try { 566 this.connection.checkClosedOrFailed(); 567 this.connection.ensureConnectionInfoSent(); 568 569 DataArrayResponse receipt = (DataArrayResponse)this.connection.syncSendPacket(info); 570 DataStructure[] data = receipt.getData(); 571 XATransactionId[] answer; 572 if (data instanceof XATransactionId[]) { 573 answer = (XATransactionId[])data; 574 } else { 575 answer = new XATransactionId[data.length]; 576 System.arraycopy(data, 0, answer, 0, data.length); 577 } 578 return answer; 579 } catch (JMSException e) { 580 throw toXAException(e); 581 } 582 } 583 584 public int getTransactionTimeout() throws XAException { 585 return 0; 586 } 587 588 public boolean setTransactionTimeout(int seconds) throws XAException { 589 return false; 590 } 591 592 // /////////////////////////////////////////////////////////// 593 // 594 // Helper methods. 595 // 596 // /////////////////////////////////////////////////////////// 597 private String getResourceManagerId() throws JMSException { 598 return this.connection.getResourceManagerId(); 599 } 600 601 private void setXid(Xid xid) throws XAException { 602 603 try { 604 this.connection.checkClosedOrFailed(); 605 this.connection.ensureConnectionInfoSent(); 606 } catch (JMSException e) { 607 throw toXAException(e); 608 } 609 610 if (xid != null) { 611 // associate 612 associatedXid = xid; 613 transactionId = new XATransactionId(xid); 614 615 TransactionInfo info = new TransactionInfo(connectionId, transactionId, TransactionInfo.BEGIN); 616 try { 617 this.connection.asyncSendPacket(info); 618 if (LOG.isDebugEnabled()) { 619 LOG.debug("Started XA transaction: " + transactionId); 620 } 621 } catch (JMSException e) { 622 throw toXAException(e); 623 } 624 625 } else { 626 627 if (transactionId != null) { 628 TransactionInfo info = new TransactionInfo(connectionId, transactionId, TransactionInfo.END); 629 try { 630 syncSendPacketWithInterruptionHandling(info); 631 if (LOG.isDebugEnabled()) { 632 LOG.debug("Ended XA transaction: " + transactionId); 633 } 634 } catch (JMSException e) { 635 throw toXAException(e); 636 } 637 638 // Add our self to the list of contexts that are interested in 639 // post commit/rollback events. 640 List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId); 641 if (l == null) { 642 l = new ArrayList<TransactionContext>(3); 643 ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l); 644 l.add(this); 645 } else if (!l.contains(this)) { 646 l.add(this); 647 } 648 } 649 650 // dis-associate 651 associatedXid = null; 652 transactionId = null; 653 } 654 } 655 656 /** 657 * Sends the given command. Also sends the command in case of interruption, 658 * so that important commands like rollback and commit are never interrupted. 659 * If interruption occurred, set the interruption state of the current 660 * after performing the action again. 661 * 662 * @return the response 663 */ 664 private Response syncSendPacketWithInterruptionHandling(Command command) throws JMSException { 665 try { 666 return this.connection.syncSendPacket(command); 667 } catch (JMSException e) { 668 if (e.getLinkedException() instanceof InterruptedIOException) { 669 try { 670 Thread.interrupted(); 671 return this.connection.syncSendPacket(command); 672 } finally { 673 Thread.currentThread().interrupt(); 674 } 675 } 676 677 throw e; 678 } 679 } 680 681 /** 682 * Converts a JMSException from the server to an XAException. if the 683 * JMSException contained a linked XAException that is returned instead. 684 * 685 * @param e JMSException to convert 686 * @return XAException wrapping original exception or its message 687 */ 688 private XAException toXAException(JMSException e) { 689 if (e.getCause() != null && e.getCause() instanceof XAException) { 690 XAException original = (XAException)e.getCause(); 691 XAException xae = new XAException(original.getMessage()); 692 xae.errorCode = original.errorCode; 693 xae.initCause(original); 694 return xae; 695 } 696 697 XAException xae = new XAException(e.getMessage()); 698 xae.errorCode = XAException.XAER_RMFAIL; 699 xae.initCause(e); 700 return xae; 701 } 702 703 public ActiveMQConnection getConnection() { 704 return connection; 705 } 706 707 public void cleanup() { 708 associatedXid = null; 709 transactionId = null; 710 } 711 }