001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.network.jms;
018    
019    import java.util.concurrent.atomic.AtomicBoolean;
020    
021    import javax.jms.Connection;
022    import javax.jms.Destination;
023    import javax.jms.JMSException;
024    import javax.jms.Message;
025    import javax.jms.MessageConsumer;
026    import javax.jms.MessageListener;
027    import javax.jms.MessageProducer;
028    import javax.naming.NamingException;
029    
030    import org.apache.activemq.Service;
031    import org.apache.commons.logging.Log;
032    import org.apache.commons.logging.LogFactory;
033    
034    /**
035     * A Destination bridge is used to bridge between to different JMS systems
036     * 
037     * @version $Revision: 1.1.1.1 $
038     */
039    public abstract class DestinationBridge implements Service, MessageListener {
040        private static final Log LOG = LogFactory.getLog(DestinationBridge.class);
041        protected MessageConsumer consumer;
042        protected AtomicBoolean started = new AtomicBoolean(false);
043        protected JmsMesageConvertor jmsMessageConvertor;
044        protected boolean doHandleReplyTo = true;
045        protected JmsConnector jmsConnector;
046        private int maximumRetries = 10;
047    
048        /**
049         * @return Returns the consumer.
050         */
051        public MessageConsumer getConsumer() {
052            return consumer;
053        }
054    
055        /**
056         * @param consumer The consumer to set.
057         */
058        public void setConsumer(MessageConsumer consumer) {
059            this.consumer = consumer;
060        }
061    
062        /**
063         * @param connector
064         */
065        public void setJmsConnector(JmsConnector connector) {
066            this.jmsConnector = connector;
067        }
068    
069        /**
070         * @return Returns the inboundMessageConvertor.
071         */
072        public JmsMesageConvertor getJmsMessageConvertor() {
073            return jmsMessageConvertor;
074        }
075    
076        /**
077         * @param jmsMessageConvertor
078         */
079        public void setJmsMessageConvertor(JmsMesageConvertor jmsMessageConvertor) {
080            this.jmsMessageConvertor = jmsMessageConvertor;
081        }
082    
083        public int getMaximumRetries() {
084            return maximumRetries;
085        }
086    
087        /**
088         * Sets the maximum number of retries if a send fails before closing the
089         * bridge
090         */
091        public void setMaximumRetries(int maximumRetries) {
092            this.maximumRetries = maximumRetries;
093        }
094    
095        protected Destination processReplyToDestination(Destination destination) {
096            return jmsConnector.createReplyToBridge(destination, getConnnectionForConsumer(), getConnectionForProducer());
097        }
098    
099        public void start() throws Exception {
100            if (started.compareAndSet(false, true)) {
101                MessageConsumer consumer = createConsumer();
102                consumer.setMessageListener(this);
103                createProducer();
104            }
105        }
106    
107        public void stop() throws Exception {
108            started.set(false);
109        }
110    
111        public void onMessage(Message message) {
112            if (started.get() && message != null) {
113                int attempt = 0;
114                try {
115                    if (attempt > 0) {
116                        restartProducer();
117                    }
118                    Message converted;
119                    if (doHandleReplyTo) {
120                        Destination replyTo = message.getJMSReplyTo();
121                        if (replyTo != null) {
122                            converted = jmsMessageConvertor.convert(message, processReplyToDestination(replyTo));
123                        } else {
124                            converted = jmsMessageConvertor.convert(message);
125                        }
126                    } else {
127                        message.setJMSReplyTo(null);
128                        converted = jmsMessageConvertor.convert(message);
129                    }
130                    sendMessage(converted);
131                    message.acknowledge();
132                } catch (Exception e) {
133                    LOG.error("failed to forward message on attempt: " + (++attempt) + " reason: " + e + " message: " + message, e);
134                    if (maximumRetries > 0 && attempt >= maximumRetries) {
135                        try {
136                            stop();
137                        } catch (Exception e1) {
138                            LOG.warn("Failed to stop cleanly", e1);
139                        }
140                    }
141                }
142            }
143        }
144    
145        /**
146         * @return Returns the doHandleReplyTo.
147         */
148        protected boolean isDoHandleReplyTo() {
149            return doHandleReplyTo;
150        }
151    
152        /**
153         * @param doHandleReplyTo The doHandleReplyTo to set.
154         */
155        protected void setDoHandleReplyTo(boolean doHandleReplyTo) {
156            this.doHandleReplyTo = doHandleReplyTo;
157        }
158    
159        protected abstract MessageConsumer createConsumer() throws JMSException;
160    
161        protected abstract MessageProducer createProducer() throws JMSException;
162    
163        protected abstract void sendMessage(Message message) throws JMSException;
164    
165        protected abstract Connection getConnnectionForConsumer();
166    
167        protected abstract Connection getConnectionForProducer();
168    
169        protected void restartProducer() throws JMSException, NamingException {
170            try {
171                getConnectionForProducer().close();
172            } catch (Exception e) {
173                LOG.debug("Ignoring failure to close producer connection: " + e, e);
174            }
175            jmsConnector.restartProducerConnection();
176            createProducer();
177        }
178    }