001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.proxy;
018    
019    import java.io.IOException;
020    import java.net.URI;
021    import java.net.URISyntaxException;
022    import java.util.Iterator;
023    import java.util.concurrent.CopyOnWriteArrayList;
024    
025    import org.apache.activemq.Service;
026    import org.apache.activemq.transport.CompositeTransport;
027    import org.apache.activemq.transport.Transport;
028    import org.apache.activemq.transport.TransportAcceptListener;
029    import org.apache.activemq.transport.TransportFactory;
030    import org.apache.activemq.transport.TransportFilter;
031    import org.apache.activemq.transport.TransportServer;
032    import org.apache.activemq.util.ServiceStopper;
033    import org.apache.commons.logging.Log;
034    import org.apache.commons.logging.LogFactory;
035    
036    /**
037     * @org.apache.xbean.XBean
038     * 
039     * @version $Revision$
040     */
041    public class ProxyConnector implements Service {
042    
043        private static final Log LOG = LogFactory.getLog(ProxyConnector.class);
044        private TransportServer server;
045        private URI bind;
046        private URI remote;
047        private URI localUri;
048        private String name;
049        private CopyOnWriteArrayList<ProxyConnection> connections = new CopyOnWriteArrayList<ProxyConnection>();
050    
051        public void start() throws Exception {
052    
053            this.getServer().setAcceptListener(new TransportAcceptListener() {
054                public void onAccept(Transport localTransport) {
055                    try {
056                        Transport remoteTransport = createRemoteTransport();
057                        ProxyConnection connection = new ProxyConnection(localTransport, remoteTransport);
058                        connections.add(connection);
059                        connection.start();
060                    } catch (Exception e) {
061                        onAcceptError(e);
062                    }
063                }
064    
065                public void onAcceptError(Exception error) {
066                    LOG.error("Could not accept connection: " + error, error);
067                }
068            });
069            getServer().start();
070            LOG.info("Proxy Connector " + getName() + " Started");
071    
072        }
073    
074        public void stop() throws Exception {
075            ServiceStopper ss = new ServiceStopper();
076            if (this.server != null) {
077                ss.stop(this.server);
078            }
079            for (Iterator<ProxyConnection> iter = connections.iterator(); iter.hasNext();) {
080                LOG.info("Connector stopped: Stopping proxy.");
081                ss.stop(iter.next());
082            }
083            ss.throwFirstException();
084            LOG.info("Proxy Connector " + getName() + " Stopped");
085        }
086    
087        // Properties
088        // -------------------------------------------------------------------------
089    
090        public URI getLocalUri() {
091            return localUri;
092        }
093    
094        public void setLocalUri(URI localURI) {
095            this.localUri = localURI;
096        }
097    
098        public URI getBind() {
099            return bind;
100        }
101    
102        public void setBind(URI bind) {
103            this.bind = bind;
104        }
105    
106        public URI getRemote() {
107            return remote;
108        }
109    
110        public void setRemote(URI remote) {
111            this.remote = remote;
112        }
113    
114        public TransportServer getServer() throws IOException, URISyntaxException {
115            if (server == null) {
116                server = createServer();
117            }
118            return server;
119        }
120    
121        public void setServer(TransportServer server) {
122            this.server = server;
123        }
124    
125        protected TransportServer createServer() throws IOException, URISyntaxException {
126            if (bind == null) {
127                throw new IllegalArgumentException("You must specify either a server or the bind property");
128            }
129            return TransportFactory.bind(bind);
130        }
131    
132        private Transport createRemoteTransport() throws Exception {
133            Transport transport = TransportFactory.compositeConnect(remote);
134            CompositeTransport ct = (CompositeTransport)transport.narrow(CompositeTransport.class);
135            if (ct != null && localUri != null) {
136                ct.add(new URI[] {localUri});
137            }
138    
139            // Add a transport filter so that can track the transport life cycle
140            transport = new TransportFilter(transport) {
141                public void stop() throws Exception {
142                    LOG.info("Stopping proxy.");
143                    super.stop();
144                    connections.remove(this);
145                }
146            };
147            return transport;
148        }
149    
150        public String getName() {
151            if (name == null) {
152                if (server != null) {
153                    name = server.getConnectURI().toString();
154                } else {
155                    name = "proxy";
156                }
157            }
158            return name;
159        }
160    
161        public void setName(String name) {
162            this.name = name;
163        }
164    
165    }