001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.network.jms;
018    
019    import java.util.Iterator;
020    import java.util.List;
021    import java.util.Map;
022    import java.util.concurrent.CopyOnWriteArrayList;
023    import java.util.concurrent.atomic.AtomicBoolean;
024    
025    import javax.jms.Connection;
026    import javax.jms.Destination;
027    import javax.jms.JMSException;
028    import javax.naming.NamingException;
029    
030    import org.apache.activemq.ActiveMQConnectionFactory;
031    import org.apache.activemq.Service;
032    import org.apache.activemq.broker.BrokerService;
033    import org.apache.activemq.util.LRUCache;
034    import org.apache.commons.logging.Log;
035    import org.apache.commons.logging.LogFactory;
036    import org.springframework.jndi.JndiTemplate;
037    
038    /**
039     * This bridge joins the gap between foreign JMS providers and ActiveMQ As some
040     * JMS providers are still only 1.0.1 compliant, this bridge itself aimed to be
041     * JMS 1.0.2 compliant.
042     * 
043     * @version $Revision: 1.1.1.1 $
044     */
045    public abstract class JmsConnector implements Service {
046    
047        private static int nextId;
048        private static final Log LOG = LogFactory.getLog(JmsConnector.class);
049        
050        protected JndiTemplate jndiLocalTemplate;
051        protected JndiTemplate jndiOutboundTemplate;
052        protected JmsMesageConvertor inboundMessageConvertor;
053        protected JmsMesageConvertor outboundMessageConvertor;
054        protected AtomicBoolean initialized = new AtomicBoolean(false);
055        protected AtomicBoolean started = new AtomicBoolean(false);
056        protected ActiveMQConnectionFactory embeddedConnectionFactory;
057        protected int replyToDestinationCacheSize = 10000;
058        protected String outboundUsername;
059        protected String outboundPassword;
060        protected String localUsername;
061        protected String localPassword;
062        protected String outboundClientId;
063        protected String localClientId;
064        protected LRUCache replyToBridges = createLRUCache();
065    
066        private List<DestinationBridge> inboundBridges = new CopyOnWriteArrayList<DestinationBridge>();
067        private List<DestinationBridge> outboundBridges = new CopyOnWriteArrayList<DestinationBridge>();
068        private String name;
069    
070    
071        private static LRUCache createLRUCache() {
072            return new LRUCache() {
073                private static final long serialVersionUID = -7446792754185879286L;
074    
075                protected boolean removeEldestEntry(Map.Entry enty) {
076                    if (size() > maxCacheSize) {
077                        Iterator iter = entrySet().iterator();
078                        Map.Entry lru = (Map.Entry)iter.next();
079                        remove(lru.getKey());
080                        DestinationBridge bridge = (DestinationBridge)lru.getValue();
081                        try {
082                            bridge.stop();
083                            LOG.info("Expired bridge: " + bridge);
084                        } catch (Exception e) {
085                            LOG.warn("stopping expired bridge" + bridge + " caused an exception", e);
086                        }
087                    }
088                    return false;
089                }
090            };
091        }
092    
093        /**
094         */
095        public boolean init() {
096            boolean result = initialized.compareAndSet(false, true);
097            if (result) {
098                if (jndiLocalTemplate == null) {
099                    jndiLocalTemplate = new JndiTemplate();
100                }
101                if (jndiOutboundTemplate == null) {
102                    jndiOutboundTemplate = new JndiTemplate();
103                }
104                if (inboundMessageConvertor == null) {
105                    inboundMessageConvertor = new SimpleJmsMessageConvertor();
106                }
107                if (outboundMessageConvertor == null) {
108                    outboundMessageConvertor = new SimpleJmsMessageConvertor();
109                }
110                replyToBridges.setMaxCacheSize(getReplyToDestinationCacheSize());
111            }
112            return result;
113        }
114    
115        public void start() throws Exception {
116            init();
117            if (started.compareAndSet(false, true)) {
118                for (int i = 0; i < inboundBridges.size(); i++) {
119                    DestinationBridge bridge = inboundBridges.get(i);
120                    bridge.start();
121                }
122                for (int i = 0; i < outboundBridges.size(); i++) {
123                    DestinationBridge bridge = outboundBridges.get(i);
124                    bridge.start();
125                }
126                LOG.info("JMS Connector " + getName() + " Started");
127            }
128        }
129    
130        public void stop() throws Exception {
131            if (started.compareAndSet(true, false)) {
132                for (int i = 0; i < inboundBridges.size(); i++) {
133                    DestinationBridge bridge = inboundBridges.get(i);
134                    bridge.stop();
135                }
136                for (int i = 0; i < outboundBridges.size(); i++) {
137                    DestinationBridge bridge = outboundBridges.get(i);
138                    bridge.stop();
139                }
140                LOG.info("JMS Connector " + getName() + " Stopped");
141            }
142        }
143        
144        public void clearBridges() {
145            inboundBridges.clear();
146            outboundBridges.clear();
147        }
148    
149        protected abstract Destination createReplyToBridge(Destination destination, Connection consumerConnection, Connection producerConnection);
150    
151        /**
152         * One way to configure the local connection - this is called by The
153         * BrokerService when the Connector is embedded
154         * 
155         * @param service
156         */
157        public void setBrokerService(BrokerService service) {
158            embeddedConnectionFactory = new ActiveMQConnectionFactory(service.getVmConnectorURI());
159        }
160    
161        /**
162         * @return Returns the jndiTemplate.
163         */
164        public JndiTemplate getJndiLocalTemplate() {
165            return jndiLocalTemplate;
166        }
167    
168        /**
169         * @param jndiTemplate The jndiTemplate to set.
170         */
171        public void setJndiLocalTemplate(JndiTemplate jndiTemplate) {
172            this.jndiLocalTemplate = jndiTemplate;
173        }
174    
175        /**
176         * @return Returns the jndiOutboundTemplate.
177         */
178        public JndiTemplate getJndiOutboundTemplate() {
179            return jndiOutboundTemplate;
180        }
181    
182        /**
183         * @param jndiOutboundTemplate The jndiOutboundTemplate to set.
184         */
185        public void setJndiOutboundTemplate(JndiTemplate jndiOutboundTemplate) {
186            this.jndiOutboundTemplate = jndiOutboundTemplate;
187        }
188    
189        /**
190         * @return Returns the inboundMessageConvertor.
191         */
192        public JmsMesageConvertor getInboundMessageConvertor() {
193            return inboundMessageConvertor;
194        }
195    
196        /**
197         * @param inboundMessageConvertor The inboundMessageConvertor to set.
198         */
199        public void setInboundMessageConvertor(JmsMesageConvertor jmsMessageConvertor) {
200            this.inboundMessageConvertor = jmsMessageConvertor;
201        }
202    
203        /**
204         * @return Returns the outboundMessageConvertor.
205         */
206        public JmsMesageConvertor getOutboundMessageConvertor() {
207            return outboundMessageConvertor;
208        }
209    
210        /**
211         * @param outboundMessageConvertor The outboundMessageConvertor to set.
212         */
213        public void setOutboundMessageConvertor(JmsMesageConvertor outboundMessageConvertor) {
214            this.outboundMessageConvertor = outboundMessageConvertor;
215        }
216    
217        /**
218         * @return Returns the replyToDestinationCacheSize.
219         */
220        public int getReplyToDestinationCacheSize() {
221            return replyToDestinationCacheSize;
222        }
223    
224        /**
225         * @param replyToDestinationCacheSize The replyToDestinationCacheSize to
226         *                set.
227         */
228        public void setReplyToDestinationCacheSize(int replyToDestinationCacheSize) {
229            this.replyToDestinationCacheSize = replyToDestinationCacheSize;
230        }
231    
232        /**
233         * @return Returns the localPassword.
234         */
235        public String getLocalPassword() {
236            return localPassword;
237        }
238    
239        /**
240         * @param localPassword The localPassword to set.
241         */
242        public void setLocalPassword(String localPassword) {
243            this.localPassword = localPassword;
244        }
245    
246        /**
247         * @return Returns the localUsername.
248         */
249        public String getLocalUsername() {
250            return localUsername;
251        }
252    
253        /**
254         * @param localUsername The localUsername to set.
255         */
256        public void setLocalUsername(String localUsername) {
257            this.localUsername = localUsername;
258        }
259    
260        /**
261         * @return Returns the outboundPassword.
262         */
263        public String getOutboundPassword() {
264            return outboundPassword;
265        }
266    
267        /**
268         * @param outboundPassword The outboundPassword to set.
269         */
270        public void setOutboundPassword(String outboundPassword) {
271            this.outboundPassword = outboundPassword;
272        }
273    
274        /**
275         * @return Returns the outboundUsername.
276         */
277        public String getOutboundUsername() {
278            return outboundUsername;
279        }
280    
281        /**
282         * @param outboundUsername The outboundUsername to set.
283         */
284        public void setOutboundUsername(String outboundUsername) {
285            this.outboundUsername = outboundUsername;
286        }
287        
288        /**
289         * @return the outboundClientId
290         */
291        public String getOutboundClientId() {
292            return outboundClientId;
293        }
294    
295        /**
296         * @param outboundClientId the outboundClientId to set
297         */
298        public void setOutboundClientId(String outboundClientId) {
299            this.outboundClientId = outboundClientId;
300        }
301    
302        /**
303         * @return the localClientId
304         */
305        public String getLocalClientId() {
306            return localClientId;
307        }
308    
309        /**
310         * @param localClientId the localClientId to set
311         */
312        public void setLocalClientId(String localClientId) {
313            this.localClientId = localClientId;
314        }
315        
316        
317        protected void addInboundBridge(DestinationBridge bridge) {
318            inboundBridges.add(bridge);
319        }
320    
321        protected void addOutboundBridge(DestinationBridge bridge) {
322            outboundBridges.add(bridge);
323        }
324    
325        protected void removeInboundBridge(DestinationBridge bridge) {
326            inboundBridges.remove(bridge);
327        }
328    
329        protected void removeOutboundBridge(DestinationBridge bridge) {
330            outboundBridges.remove(bridge);
331        }
332    
333        public String getName() {
334            if (name == null) {
335                name = "Connector:" + getNextId();
336            }
337            return name;
338        }
339    
340        private static synchronized int getNextId() {
341            return nextId++;
342        }
343    
344        public void setName(String name) {
345            this.name = name;
346        }
347    
348        public abstract void restartProducerConnection() throws NamingException, JMSException;
349    }