001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.apache.activemq.pool; 019 020 import java.io.IOException; 021 import java.util.HashMap; 022 import java.util.Iterator; 023 import java.util.Map; 024 import java.util.concurrent.atomic.AtomicBoolean; 025 026 import javax.jms.JMSException; 027 import javax.jms.Session; 028 import javax.transaction.RollbackException; 029 import javax.transaction.Status; 030 import javax.transaction.SystemException; 031 import javax.transaction.TransactionManager; 032 import javax.transaction.xa.XAResource; 033 034 import org.apache.activemq.ActiveMQConnection; 035 import org.apache.activemq.transport.TransportListener; 036 import org.apache.commons.pool.ObjectPoolFactory; 037 038 /** 039 * Holds a real JMS connection along with the session pools associated with it. 040 * 041 * @version $Revision: 668559 $ 042 */ 043 public class ConnectionPool { 044 045 private ActiveMQConnection connection; 046 private Map<SessionKey, SessionPool> cache; 047 private AtomicBoolean started = new AtomicBoolean(false); 048 private int referenceCount; 049 private ObjectPoolFactory poolFactory; 050 private long lastUsed = System.currentTimeMillis(); 051 private boolean hasFailed; 052 private boolean hasExpired; 053 private int idleTimeout = 30 * 1000; 054 055 public ConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory) { 056 this(connection, new HashMap<SessionKey, SessionPool>(), poolFactory); 057 // Add a transport Listener so that we can notice if this connection 058 // should be expired due to 059 // a connection failure. 060 connection.addTransportListener(new TransportListener() { 061 public void onCommand(Object command) { 062 } 063 064 public void onException(IOException error) { 065 synchronized (ConnectionPool.this) { 066 hasFailed = true; 067 } 068 } 069 070 public void transportInterupted() { 071 } 072 073 public void transportResumed() { 074 } 075 }); 076 // 077 // make sure that we set the hasFailed flag, in case the transport already failed 078 // prior to the addition of our new TransportListener 079 // 080 if(connection.isTransportFailed()) { 081 hasFailed = true; 082 } 083 } 084 085 public ConnectionPool(ActiveMQConnection connection, Map<SessionKey, SessionPool> cache, ObjectPoolFactory poolFactory) { 086 this.connection = connection; 087 this.cache = cache; 088 this.poolFactory = poolFactory; 089 } 090 091 public void start() throws JMSException { 092 if (started.compareAndSet(false, true)) { 093 connection.start(); 094 } 095 } 096 097 public synchronized ActiveMQConnection getConnection() { 098 return connection; 099 } 100 101 public Session createSession(boolean transacted, int ackMode) throws JMSException { 102 SessionKey key = new SessionKey(transacted, ackMode); 103 SessionPool pool = cache.get(key); 104 if (pool == null) { 105 pool = createSessionPool(key); 106 cache.put(key, pool); 107 } 108 PooledSession session = pool.borrowSession(); 109 return session; 110 } 111 112 public synchronized void close() { 113 if (connection != null) { 114 try { 115 Iterator<SessionPool> i = cache.values().iterator(); 116 while (i.hasNext()) { 117 SessionPool pool = i.next(); 118 i.remove(); 119 try { 120 pool.close(); 121 } catch (Exception e) { 122 } 123 } 124 } finally { 125 try { 126 connection.close(); 127 } catch (Exception e) { 128 } finally { 129 connection = null; 130 } 131 } 132 } 133 } 134 135 public synchronized void incrementReferenceCount() { 136 referenceCount++; 137 lastUsed = System.currentTimeMillis(); 138 } 139 140 public synchronized void decrementReferenceCount() { 141 referenceCount--; 142 lastUsed = System.currentTimeMillis(); 143 if (referenceCount == 0) { 144 expiredCheck(); 145 } 146 } 147 148 /** 149 * @return true if this connection has expired. 150 */ 151 public synchronized boolean expiredCheck() { 152 if (connection == null) { 153 return true; 154 } 155 if (hasExpired) { 156 if (referenceCount == 0) { 157 close(); 158 } 159 return true; 160 } 161 if (hasFailed || (idleTimeout > 0 && System.currentTimeMillis() > lastUsed + idleTimeout)) { 162 hasExpired = true; 163 if (referenceCount == 0) { 164 close(); 165 } 166 return true; 167 } 168 return false; 169 } 170 171 public int getIdleTimeout() { 172 return idleTimeout; 173 } 174 175 public void setIdleTimeout(int idleTimeout) { 176 this.idleTimeout = idleTimeout; 177 } 178 179 protected SessionPool createSessionPool(SessionKey key) { 180 return new SessionPool(this, key, poolFactory.createPool()); 181 } 182 183 }