001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker;
018    
019    import org.apache.activemq.broker.jmx.ManagedTransportConnector;
020    import org.apache.activemq.broker.jmx.ManagementContext;
021    import org.apache.activemq.broker.region.ConnectorStatistics;
022    import org.apache.activemq.command.BrokerInfo;
023    import org.apache.activemq.security.MessageAuthorizationPolicy;
024    import org.apache.activemq.thread.DefaultThreadPools;
025    import org.apache.activemq.thread.TaskRunnerFactory;
026    import org.apache.activemq.transport.Transport;
027    import org.apache.activemq.transport.TransportAcceptListener;
028    import org.apache.activemq.transport.TransportFactory;
029    import org.apache.activemq.transport.TransportServer;
030    import org.apache.activemq.transport.discovery.DiscoveryAgent;
031    import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
032    import org.apache.activemq.util.ServiceStopper;
033    import org.apache.activemq.util.ServiceSupport;
034    import org.apache.commons.logging.Log;
035    import org.apache.commons.logging.LogFactory;
036    
037    import static org.apache.activemq.thread.DefaultThreadPools.*;
038    
039    import java.io.IOException;
040    import java.net.URI;
041    import java.net.URISyntaxException;
042    import java.util.Iterator;
043    import java.util.concurrent.CopyOnWriteArrayList;
044    import javax.management.ObjectName;
045    
046    /**
047     * @org.apache.xbean.XBean
048     * @version $Revision: 1.6 $
049     */
050    public class TransportConnector implements Connector, BrokerServiceAware {
051    
052        private static final Log LOG = LogFactory.getLog(TransportConnector.class);
053    
054        protected CopyOnWriteArrayList<TransportConnection> connections = new CopyOnWriteArrayList<TransportConnection>();
055        protected TransportStatusDetector statusDector;
056    
057        private BrokerService brokerService;
058        private TransportServer server;
059        private URI uri;
060        private BrokerInfo brokerInfo = new BrokerInfo();
061        private TaskRunnerFactory taskRunnerFactory;
062        private MessageAuthorizationPolicy messageAuthorizationPolicy;
063        private DiscoveryAgent discoveryAgent;
064        private ConnectorStatistics statistics = new ConnectorStatistics();
065        private URI discoveryUri;
066        private URI connectUri;
067        private String name;
068        private boolean disableAsyncDispatch;
069        private boolean enableStatusMonitor = false;
070        private Broker broker;
071    
072        public TransportConnector() {
073        }
074    
075        public TransportConnector(TransportServer server) {
076            this();
077            setServer(server);
078            if (server != null && server.getConnectURI() != null) {
079                URI uri = server.getConnectURI();
080                if (uri != null && uri.getScheme().equals("vm")) {
081                    setEnableStatusMonitor(false);
082                }
083            }
084    
085        }
086    
087    
088        /**
089         * @return Returns the connections.
090         */
091        public CopyOnWriteArrayList<TransportConnection> getConnections() {
092            return connections;
093        }
094    
095        /**
096         * Factory method to create a JMX managed version of this transport
097         * connector
098         */
099        public ManagedTransportConnector asManagedConnector(ManagementContext context, ObjectName connectorName) throws IOException, URISyntaxException {
100            ManagedTransportConnector rc = new ManagedTransportConnector(context, connectorName, getServer());
101            rc.setBrokerInfo(getBrokerInfo());
102            rc.setConnectUri(getConnectUri());
103            rc.setDisableAsyncDispatch(isDisableAsyncDispatch());
104            rc.setDiscoveryAgent(getDiscoveryAgent());
105            rc.setDiscoveryUri(getDiscoveryUri());
106            rc.setEnableStatusMonitor(isEnableStatusMonitor());
107            rc.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
108            rc.setName(getName());
109            rc.setTaskRunnerFactory(getTaskRunnerFactory());
110            rc.setUri(getUri());
111            rc.setBrokerService(brokerService);
112            return rc;
113        }
114    
115        public BrokerInfo getBrokerInfo() {
116            return brokerInfo;
117        }
118    
119        public void setBrokerInfo(BrokerInfo brokerInfo) {
120            this.brokerInfo = brokerInfo;
121        }
122        
123        /**
124         * 
125         * @deprecated use the {@link #setBrokerService(BrokerService)} method instead.
126         */
127        @Deprecated
128        public void setBrokerName(String name) {
129            if (this.brokerInfo==null) {
130                this.brokerInfo=new BrokerInfo();
131            }
132            this.brokerInfo.setBrokerName(name);
133        }
134    
135        public TransportServer getServer() throws IOException, URISyntaxException {
136            if (server == null) {
137                setServer(createTransportServer());
138            }
139            return server;
140        }
141    
142        public void setServer(TransportServer server) {
143            this.server = server;
144        }
145    
146        public URI getUri() {
147            if (uri == null) {
148                try {
149                    uri = getConnectUri();
150                } catch (Throwable e) {
151                }
152            }
153            return uri;
154        }
155    
156        /**
157         * Sets the server transport URI to use if there is not a
158         * {@link TransportServer} configured via the
159         * {@link #setServer(TransportServer)} method. This value is used to lazy
160         * create a {@link TransportServer} instance
161         * 
162         * @param uri
163         */
164        public void setUri(URI uri) {
165            this.uri = uri;
166        }
167    
168        public TaskRunnerFactory getTaskRunnerFactory() {
169            return taskRunnerFactory;
170        }
171    
172        public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
173            this.taskRunnerFactory = taskRunnerFactory;
174        }
175    
176        /**
177         * @return the statistics for this connector
178         */
179        public ConnectorStatistics getStatistics() {
180            return statistics;
181        }
182    
183        public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
184            return messageAuthorizationPolicy;
185        }
186    
187        /**
188         * Sets the policy used to decide if the current connection is authorized to
189         * consume a given message
190         */
191        public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
192            this.messageAuthorizationPolicy = messageAuthorizationPolicy;
193        }
194    
195        public void start() throws Exception {
196            
197            TransportServer server = getServer();
198            
199            broker = brokerService.getBroker();
200            brokerInfo.setBrokerName(broker.getBrokerName());
201            brokerInfo.setBrokerId(broker.getBrokerId());
202            brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos());
203            brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration());
204            brokerInfo.setBrokerURL(server.getConnectURI().toString());
205            
206            server.setAcceptListener(new TransportAcceptListener() {
207                public void onAccept(final Transport transport) {
208                    try {
209                        getDefaultTaskRunnerFactory().execute(new Runnable(){
210                            public void run() {
211                                try {
212                                    Connection connection = createConnection(transport);
213                                    connection.start();
214                                } catch (Exception e) {
215                                    ServiceSupport.dispose(transport);
216                                    onAcceptError(e);
217                                }
218                            }
219                        });
220                    } catch (Exception e) {
221                        String remoteHost = transport.getRemoteAddress();
222                        ServiceSupport.dispose(transport);
223                        onAcceptError(e, remoteHost);
224                    }
225                }
226    
227                public void onAcceptError(Exception error) {
228                    onAcceptError(error, null);
229                }
230    
231                private void onAcceptError(Exception error, String remoteHost) {
232                    LOG.error("Could not accept connection " + (remoteHost == null ? "" : "from " + remoteHost) + ": " + error);
233                    LOG.debug("Reason: " + error, error);
234                }
235            });
236            
237            server.setBrokerInfo(brokerInfo);
238            server.start();
239            
240            DiscoveryAgent da = getDiscoveryAgent();
241            if (da != null) {
242                da.registerService(getPublishableConnectString());
243                da.start();
244            }
245            if (enableStatusMonitor) {
246                this.statusDector = new TransportStatusDetector(this);
247                this.statusDector.start();
248            }
249    
250            LOG.info("Connector " + getName() + " Started");
251        }
252    
253        private String getPublishableConnectString() throws Exception {
254            URI connectUri = getConnectUri();
255            String publishableConnectString = connectUri.toString();
256            // strip off server side query parameters which may not be compatible to clients
257            if (connectUri.getRawQuery() != null) {
258                publishableConnectString = 
259                    publishableConnectString.substring(0, publishableConnectString.indexOf(connectUri.getRawQuery()) -1); 
260            }
261            if (LOG.isDebugEnabled()) {
262                LOG.debug("Publishing: " + publishableConnectString + " for broker transport URI: " + connectUri);
263            }
264            return publishableConnectString;
265        }
266    
267        public void stop() throws Exception {
268            ServiceStopper ss = new ServiceStopper();
269            if (discoveryAgent != null) {
270                ss.stop(discoveryAgent);
271            }
272            if (server != null) {
273                ss.stop(server);
274                server = null;
275            }
276            if (this.statusDector != null) {
277                this.statusDector.stop();
278            }
279    
280            for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext();) {
281                TransportConnection c = iter.next();
282                ss.stop(c);
283            }
284            ss.throwFirstException();
285            LOG.info("Connector " + getName() + " Stopped");
286        }
287    
288        // Implementation methods
289        // -------------------------------------------------------------------------
290        protected Connection createConnection(Transport transport) throws IOException {
291            TransportConnection answer = new TransportConnection(this, transport, broker, disableAsyncDispatch ? null : taskRunnerFactory);
292            boolean statEnabled = this.getStatistics().isEnabled();
293            answer.getStatistics().setEnabled(statEnabled);
294            answer.setMessageAuthorizationPolicy(messageAuthorizationPolicy);
295            return answer;
296        }
297    
298        protected TransportServer createTransportServer() throws IOException, URISyntaxException {
299            if (uri == null) {
300                throw new IllegalArgumentException("You must specify either a server or uri property");
301            }
302            if (brokerService == null) {
303                throw new IllegalArgumentException("You must specify the brokerService property. Maybe this connector should be added to a broker?");
304            }
305            return TransportFactory.bind(brokerService, uri);
306        }
307    
308        public DiscoveryAgent getDiscoveryAgent() throws IOException {
309            if (discoveryAgent == null) {
310                discoveryAgent = createDiscoveryAgent();
311            }
312            return discoveryAgent;
313        }
314    
315        protected DiscoveryAgent createDiscoveryAgent() throws IOException {
316            if (discoveryUri != null) {
317                return DiscoveryAgentFactory.createDiscoveryAgent(discoveryUri);
318            }
319            return null;
320        }
321    
322        public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) {
323            this.discoveryAgent = discoveryAgent;
324        }
325    
326        public URI getDiscoveryUri() {
327            return discoveryUri;
328        }
329    
330        public void setDiscoveryUri(URI discoveryUri) {
331            this.discoveryUri = discoveryUri;
332        }
333    
334        public URI getConnectUri() throws IOException, URISyntaxException {
335            if (connectUri == null) {
336                if (server != null) {
337                    connectUri = server.getConnectURI();
338                }
339            }
340            return connectUri;
341        }
342    
343        public void setConnectUri(URI transportUri) {
344            this.connectUri = transportUri;
345        }
346    
347        public void onStarted(TransportConnection connection) {
348            connections.add(connection);
349        }
350    
351        public void onStopped(TransportConnection connection) {
352            connections.remove(connection);
353        }
354    
355        public String getName() {
356            if (name == null) {
357                uri = getUri();
358                if (uri != null) {
359                    name = uri.toString();
360                }
361            }
362            return name;
363        }
364    
365        public void setName(String name) {
366            this.name = name;
367        }
368    
369        public String toString() {
370            String rc = getName();
371            if (rc == null) {
372                rc = super.toString();
373            }
374            return rc;
375        }
376    
377        public boolean isDisableAsyncDispatch() {
378            return disableAsyncDispatch;
379        }
380    
381        public void setDisableAsyncDispatch(boolean disableAsyncDispatch) {
382            this.disableAsyncDispatch = disableAsyncDispatch;
383        }
384    
385        /**
386         * @return the enableStatusMonitor
387         */
388        public boolean isEnableStatusMonitor() {
389            return enableStatusMonitor;
390        }
391    
392        /**
393         * @param enableStatusMonitor the enableStatusMonitor to set
394         */
395        public void setEnableStatusMonitor(boolean enableStatusMonitor) {
396            this.enableStatusMonitor = enableStatusMonitor;
397        }
398    
399        /**
400         * This is called by the BrokerService right before it starts the transport.
401         */
402        public void setBrokerService(BrokerService brokerService) {
403            this.brokerService = brokerService;
404        }
405    
406        public Broker getBroker() {
407            return broker;
408        }
409    
410            public BrokerService getBrokerService() {
411                    return brokerService;
412            }
413    }