001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.pool;
018    
019    import java.util.HashMap;
020    import java.util.Iterator;
021    import java.util.LinkedList;
022    import java.util.Map;
023    import java.util.concurrent.atomic.AtomicBoolean;
024    import javax.jms.Connection;
025    import javax.jms.ConnectionFactory;
026    import javax.jms.JMSException;
027    import org.apache.activemq.ActiveMQConnection;
028    import org.apache.activemq.ActiveMQConnectionFactory;
029    import org.apache.activemq.Service;
030    import org.apache.activemq.util.IOExceptionSupport;
031    import org.apache.commons.logging.Log;
032    import org.apache.commons.logging.LogFactory;
033    import org.apache.commons.pool.ObjectPoolFactory;
034    import org.apache.commons.pool.impl.GenericObjectPoolFactory;
035    
036    /**
037     * A JMS provider which pools Connection, Session and MessageProducer instances
038     * so it can be used with tools like Spring's <a
039     * href="http://activemq.org/Spring+Support">JmsTemplate</a>.
040     * 
041     * <b>NOTE</b> this implementation is only intended for use when sending
042     * messages. It does not deal with pooling of consumers; for that look at a
043     * library like <a href="http://jencks.org/">Jencks</a> such as in <a
044     * href="http://jencks.org/Message+Driven+POJOs">this example</a>
045     * 
046     * @org.apache.xbean.XBean element="pooledConnectionFactory"
047     * 
048     * @version $Revision: 1.1 $
049     */
050    public class PooledConnectionFactory implements ConnectionFactory, Service {
051        private static final transient Log LOG = LogFactory.getLog(PooledConnectionFactory.class);
052        private ConnectionFactory connectionFactory;
053        private Map<ConnectionKey, LinkedList<ConnectionPool>> cache = new HashMap<ConnectionKey, LinkedList<ConnectionPool>>();
054        private ObjectPoolFactory poolFactory;
055        private int maximumActive = 500;
056        private int maxConnections = 1;
057        private int idleTimeout = 30 * 1000;
058        private AtomicBoolean stopped = new AtomicBoolean(false);
059    
060        public PooledConnectionFactory() {
061            this(new ActiveMQConnectionFactory());
062        }
063    
064        public PooledConnectionFactory(String brokerURL) {
065            this(new ActiveMQConnectionFactory(brokerURL));
066        }
067    
068        public PooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
069            this.connectionFactory = connectionFactory;
070        }
071    
072        public ConnectionFactory getConnectionFactory() {
073            return connectionFactory;
074        }
075    
076        public void setConnectionFactory(ConnectionFactory connectionFactory) {
077            this.connectionFactory = connectionFactory;
078        }
079    
080        public Connection createConnection() throws JMSException {
081            return createConnection(null, null);
082        }
083    
084        public synchronized Connection createConnection(String userName, String password) throws JMSException {
085            if (stopped.get()) {
086                LOG.debug("PooledConnectionFactory is stopped, skip create new connection.");
087                return null;
088            }
089            
090            ConnectionKey key = new ConnectionKey(userName, password);
091            LinkedList<ConnectionPool> pools = cache.get(key);
092    
093            if (pools == null) {
094                pools = new LinkedList<ConnectionPool>();
095                cache.put(key, pools);
096            }
097    
098            ConnectionPool connection = null;
099            if (pools.size() == maxConnections) {
100                connection = pools.removeFirst();
101            }
102    
103            // Now.. we might get a connection, but it might be that we need to
104            // dump it..
105            if (connection != null && connection.expiredCheck()) {
106                connection = null;
107            }
108    
109            if (connection == null) {
110                ActiveMQConnection delegate = createConnection(key);
111                connection = createConnectionPool(delegate);
112            }
113            pools.add(connection);
114            return new PooledConnection(connection);
115        }
116    
117        protected ConnectionPool createConnectionPool(ActiveMQConnection connection) {
118            ConnectionPool result =  new ConnectionPool(connection, getPoolFactory());
119            result.setIdleTimeout(getIdleTimeout());
120            return result;
121        }
122    
123        protected ActiveMQConnection createConnection(ConnectionKey key) throws JMSException {
124            if (key.getUserName() == null && key.getPassword() == null) {
125                return (ActiveMQConnection)connectionFactory.createConnection();
126            } else {
127                return (ActiveMQConnection)connectionFactory.createConnection(key.getUserName(), key.getPassword());
128            }
129        }
130    
131        /**
132         * @see org.apache.activemq.service.Service#start()
133         */
134        public void start() {
135            try {
136                stopped.set(false);
137                createConnection();
138            } catch (JMSException e) {
139                LOG.warn("Create pooled connection during start failed.", e);
140                IOExceptionSupport.create(e);
141            }
142        }
143    
144        public void stop() {
145            LOG.debug("Stop the PooledConnectionFactory, number of connections in cache: "+cache.size());
146            stopped.set(true);
147            for (Iterator<LinkedList<ConnectionPool>> iter = cache.values().iterator(); iter.hasNext();) {
148                LinkedList list = iter.next();
149                for (Iterator i = list.iterator(); i.hasNext();) {
150                    ConnectionPool connection = (ConnectionPool) i.next();
151                    try {
152                        connection.close();
153                    }catch(Exception e) {
154                        LOG.warn("Close connection failed",e);
155                    }
156                }
157            }
158            cache.clear();
159        }
160    
161        public ObjectPoolFactory getPoolFactory() {
162            if (poolFactory == null) {
163                poolFactory = createPoolFactory();
164            }
165            return poolFactory;
166        }
167    
168        /**
169         * Sets the object pool factory used to create individual session pools for
170         * each connection
171         */
172        public void setPoolFactory(ObjectPoolFactory poolFactory) {
173            this.poolFactory = poolFactory;
174        }
175    
176        public int getMaximumActive() {
177            return maximumActive;
178        }
179    
180        /**
181         * Sets the maximum number of active sessions per connection
182         */
183        public void setMaximumActive(int maximumActive) {
184            this.maximumActive = maximumActive;
185        }
186    
187        /**
188         * @return the maxConnections
189         */
190        public int getMaxConnections() {
191            return maxConnections;
192        }
193    
194        /**
195         * @param maxConnections the maxConnections to set
196         */
197        public void setMaxConnections(int maxConnections) {
198            this.maxConnections = maxConnections;
199        }
200    
201        protected ObjectPoolFactory createPoolFactory() {
202            return new GenericObjectPoolFactory(null, maximumActive);
203        }
204    
205        public int getIdleTimeout() {
206            return idleTimeout;
207        }
208    
209        public void setIdleTimeout(int idleTimeout) {
210            this.idleTimeout = idleTimeout;
211        }
212    }