001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.jdbc; 018 019 import java.io.IOException; 020 import java.sql.Connection; 021 import java.sql.PreparedStatement; 022 import java.sql.SQLException; 023 import java.sql.Statement; 024 025 import javax.sql.DataSource; 026 027 import org.apache.activemq.util.IOExceptionSupport; 028 import org.apache.commons.logging.Log; 029 import org.apache.commons.logging.LogFactory; 030 031 /** 032 * Helps keep track of the current transaction/JDBC connection. 033 * 034 * @version $Revision: 1.2 $ 035 */ 036 public class TransactionContext { 037 038 private static final Log LOG = LogFactory.getLog(TransactionContext.class); 039 040 private final DataSource dataSource; 041 private Connection connection; 042 private boolean inTx; 043 private PreparedStatement addMessageStatement; 044 private PreparedStatement removedMessageStatement; 045 private PreparedStatement updateLastAckStatement; 046 // a cheap dirty level that we can live with 047 private int transactionIsolation = Connection.TRANSACTION_READ_UNCOMMITTED; 048 049 public TransactionContext(DataSource dataSource) { 050 this.dataSource = dataSource; 051 } 052 053 public Connection getConnection() throws IOException { 054 if (connection == null) { 055 try { 056 connection = dataSource.getConnection(); 057 boolean autoCommit = !inTx; 058 if (connection.getAutoCommit() != autoCommit) { 059 connection.setAutoCommit(autoCommit); 060 } 061 } catch (SQLException e) { 062 JDBCPersistenceAdapter.log("Could not get JDBC connection: ", e); 063 throw IOExceptionSupport.create(e); 064 } 065 066 try { 067 connection.setTransactionIsolation(transactionIsolation); 068 } catch (Throwable e) { 069 } 070 } 071 return connection; 072 } 073 074 public void executeBatch() throws SQLException { 075 try { 076 executeBatch(addMessageStatement, "Failed add a message"); 077 } finally { 078 addMessageStatement = null; 079 try { 080 executeBatch(removedMessageStatement, "Failed to remove a message"); 081 } finally { 082 removedMessageStatement = null; 083 try { 084 executeBatch(updateLastAckStatement, "Failed to ack a message"); 085 } finally { 086 updateLastAckStatement = null; 087 } 088 } 089 } 090 } 091 092 private void executeBatch(PreparedStatement p, String message) throws SQLException { 093 if (p == null) { 094 return; 095 } 096 097 try { 098 int[] rc = p.executeBatch(); 099 for (int i = 0; i < rc.length; i++) { 100 int code = rc[i]; 101 if (code < 0 && code != Statement.SUCCESS_NO_INFO) { 102 throw new SQLException(message + ". Response code: " + code); 103 } 104 } 105 } finally { 106 try { 107 p.close(); 108 } catch (Throwable e) { 109 } 110 } 111 } 112 113 public void close() throws IOException { 114 if (!inTx) { 115 try { 116 117 /** 118 * we are not in a transaction so should not be committing ?? 119 * This was previously commented out - but had adverse affects 120 * on testing - so it's back! 121 * 122 */ 123 try { 124 executeBatch(); 125 } finally { 126 if (connection != null && !connection.getAutoCommit()) { 127 connection.commit(); 128 } 129 } 130 131 } catch (SQLException e) { 132 JDBCPersistenceAdapter.log("Error while closing connection: ", e); 133 throw IOExceptionSupport.create(e); 134 } finally { 135 try { 136 if (connection != null) { 137 connection.close(); 138 } 139 } catch (Throwable e) { 140 LOG.warn("Close failed: " + e.getMessage(), e); 141 } finally { 142 connection = null; 143 } 144 } 145 } 146 } 147 148 public void begin() throws IOException { 149 if (inTx) { 150 throw new IOException("Already started."); 151 } 152 inTx = true; 153 connection = getConnection(); 154 } 155 156 public void commit() throws IOException { 157 if (!inTx) { 158 throw new IOException("Not started."); 159 } 160 try { 161 executeBatch(); 162 if (!connection.getAutoCommit()) { 163 connection.commit(); 164 } 165 } catch (SQLException e) { 166 JDBCPersistenceAdapter.log("Commit failed: ", e); 167 168 this.rollback(); 169 170 throw IOExceptionSupport.create(e); 171 } finally { 172 inTx = false; 173 close(); 174 } 175 } 176 177 public void rollback() throws IOException { 178 if (!inTx) { 179 throw new IOException("Not started."); 180 } 181 try { 182 if (addMessageStatement != null) { 183 addMessageStatement.close(); 184 addMessageStatement = null; 185 } 186 if (removedMessageStatement != null) { 187 removedMessageStatement.close(); 188 removedMessageStatement = null; 189 } 190 if (updateLastAckStatement != null) { 191 updateLastAckStatement.close(); 192 updateLastAckStatement = null; 193 } 194 connection.rollback(); 195 196 } catch (SQLException e) { 197 JDBCPersistenceAdapter.log("Rollback failed: ", e); 198 throw IOExceptionSupport.create(e); 199 } finally { 200 inTx = false; 201 close(); 202 } 203 } 204 205 public PreparedStatement getAddMessageStatement() { 206 return addMessageStatement; 207 } 208 209 public void setAddMessageStatement(PreparedStatement addMessageStatement) { 210 this.addMessageStatement = addMessageStatement; 211 } 212 213 public PreparedStatement getUpdateLastAckStatement() { 214 return updateLastAckStatement; 215 } 216 217 public void setUpdateLastAckStatement(PreparedStatement ackMessageStatement) { 218 this.updateLastAckStatement = ackMessageStatement; 219 } 220 221 public PreparedStatement getRemovedMessageStatement() { 222 return removedMessageStatement; 223 } 224 225 public void setRemovedMessageStatement(PreparedStatement removedMessageStatement) { 226 this.removedMessageStatement = removedMessageStatement; 227 } 228 229 public void setTransactionIsolation(int transactionIsolation) { 230 this.transactionIsolation = transactionIsolation; 231 } 232 233 }