001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker; 018 019 import org.apache.activemq.broker.jmx.ManagedTransportConnector; 020 import org.apache.activemq.broker.jmx.ManagementContext; 021 import org.apache.activemq.broker.region.ConnectorStatistics; 022 import org.apache.activemq.command.BrokerInfo; 023 import org.apache.activemq.security.MessageAuthorizationPolicy; 024 import org.apache.activemq.thread.DefaultThreadPools; 025 import org.apache.activemq.thread.TaskRunnerFactory; 026 import org.apache.activemq.transport.Transport; 027 import org.apache.activemq.transport.TransportAcceptListener; 028 import org.apache.activemq.transport.TransportFactory; 029 import org.apache.activemq.transport.TransportServer; 030 import org.apache.activemq.transport.discovery.DiscoveryAgent; 031 import org.apache.activemq.transport.discovery.DiscoveryAgentFactory; 032 import org.apache.activemq.util.ServiceStopper; 033 import org.apache.activemq.util.ServiceSupport; 034 import org.apache.commons.logging.Log; 035 import org.apache.commons.logging.LogFactory; 036 037 import static org.apache.activemq.thread.DefaultThreadPools.*; 038 039 import java.io.IOException; 040 import java.net.URI; 041 import java.net.URISyntaxException; 042 import java.util.Iterator; 043 import java.util.concurrent.CopyOnWriteArrayList; 044 import javax.management.ObjectName; 045 046 /** 047 * @org.apache.xbean.XBean 048 * @version $Revision: 1.6 $ 049 */ 050 public class TransportConnector implements Connector, BrokerServiceAware { 051 052 private static final Log LOG = LogFactory.getLog(TransportConnector.class); 053 054 protected CopyOnWriteArrayList<TransportConnection> connections = new CopyOnWriteArrayList<TransportConnection>(); 055 protected TransportStatusDetector statusDector; 056 057 private BrokerService brokerService; 058 private TransportServer server; 059 private URI uri; 060 private BrokerInfo brokerInfo = new BrokerInfo(); 061 private TaskRunnerFactory taskRunnerFactory; 062 private MessageAuthorizationPolicy messageAuthorizationPolicy; 063 private DiscoveryAgent discoveryAgent; 064 private ConnectorStatistics statistics = new ConnectorStatistics(); 065 private URI discoveryUri; 066 private URI connectUri; 067 private String name; 068 private boolean disableAsyncDispatch; 069 private boolean enableStatusMonitor = false; 070 private Broker broker; 071 072 public TransportConnector() { 073 } 074 075 public TransportConnector(TransportServer server) { 076 this(); 077 setServer(server); 078 if (server != null && server.getConnectURI() != null) { 079 URI uri = server.getConnectURI(); 080 if (uri != null && uri.getScheme().equals("vm")) { 081 setEnableStatusMonitor(false); 082 } 083 } 084 085 } 086 087 088 /** 089 * @return Returns the connections. 090 */ 091 public CopyOnWriteArrayList<TransportConnection> getConnections() { 092 return connections; 093 } 094 095 /** 096 * Factory method to create a JMX managed version of this transport 097 * connector 098 */ 099 public ManagedTransportConnector asManagedConnector(ManagementContext context, ObjectName connectorName) throws IOException, URISyntaxException { 100 ManagedTransportConnector rc = new ManagedTransportConnector(context, connectorName, getServer()); 101 rc.setBrokerInfo(getBrokerInfo()); 102 rc.setConnectUri(getConnectUri()); 103 rc.setDisableAsyncDispatch(isDisableAsyncDispatch()); 104 rc.setDiscoveryAgent(getDiscoveryAgent()); 105 rc.setDiscoveryUri(getDiscoveryUri()); 106 rc.setEnableStatusMonitor(isEnableStatusMonitor()); 107 rc.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy()); 108 rc.setName(getName()); 109 rc.setTaskRunnerFactory(getTaskRunnerFactory()); 110 rc.setUri(getUri()); 111 rc.setBrokerService(brokerService); 112 return rc; 113 } 114 115 public BrokerInfo getBrokerInfo() { 116 return brokerInfo; 117 } 118 119 public void setBrokerInfo(BrokerInfo brokerInfo) { 120 this.brokerInfo = brokerInfo; 121 } 122 123 /** 124 * 125 * @deprecated use the {@link #setBrokerService(BrokerService)} method instead. 126 */ 127 @Deprecated 128 public void setBrokerName(String name) { 129 if (this.brokerInfo==null) { 130 this.brokerInfo=new BrokerInfo(); 131 } 132 this.brokerInfo.setBrokerName(name); 133 } 134 135 public TransportServer getServer() throws IOException, URISyntaxException { 136 if (server == null) { 137 setServer(createTransportServer()); 138 } 139 return server; 140 } 141 142 public void setServer(TransportServer server) { 143 this.server = server; 144 } 145 146 public URI getUri() { 147 if (uri == null) { 148 try { 149 uri = getConnectUri(); 150 } catch (Throwable e) { 151 } 152 } 153 return uri; 154 } 155 156 /** 157 * Sets the server transport URI to use if there is not a 158 * {@link TransportServer} configured via the 159 * {@link #setServer(TransportServer)} method. This value is used to lazy 160 * create a {@link TransportServer} instance 161 * 162 * @param uri 163 */ 164 public void setUri(URI uri) { 165 this.uri = uri; 166 } 167 168 public TaskRunnerFactory getTaskRunnerFactory() { 169 return taskRunnerFactory; 170 } 171 172 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { 173 this.taskRunnerFactory = taskRunnerFactory; 174 } 175 176 /** 177 * @return the statistics for this connector 178 */ 179 public ConnectorStatistics getStatistics() { 180 return statistics; 181 } 182 183 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() { 184 return messageAuthorizationPolicy; 185 } 186 187 /** 188 * Sets the policy used to decide if the current connection is authorized to 189 * consume a given message 190 */ 191 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) { 192 this.messageAuthorizationPolicy = messageAuthorizationPolicy; 193 } 194 195 public void start() throws Exception { 196 197 TransportServer server = getServer(); 198 199 broker = brokerService.getBroker(); 200 brokerInfo.setBrokerName(broker.getBrokerName()); 201 brokerInfo.setBrokerId(broker.getBrokerId()); 202 brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos()); 203 brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration()); 204 brokerInfo.setBrokerURL(server.getConnectURI().toString()); 205 206 server.setAcceptListener(new TransportAcceptListener() { 207 public void onAccept(final Transport transport) { 208 try { 209 getDefaultTaskRunnerFactory().execute(new Runnable(){ 210 public void run() { 211 try { 212 Connection connection = createConnection(transport); 213 connection.start(); 214 } catch (Exception e) { 215 ServiceSupport.dispose(transport); 216 onAcceptError(e); 217 } 218 } 219 }); 220 } catch (Exception e) { 221 String remoteHost = transport.getRemoteAddress(); 222 ServiceSupport.dispose(transport); 223 onAcceptError(e, remoteHost); 224 } 225 } 226 227 public void onAcceptError(Exception error) { 228 onAcceptError(error, null); 229 } 230 231 private void onAcceptError(Exception error, String remoteHost) { 232 LOG.error("Could not accept connection " + (remoteHost == null ? "" : "from " + remoteHost) + ": " + error); 233 LOG.debug("Reason: " + error, error); 234 } 235 }); 236 237 server.setBrokerInfo(brokerInfo); 238 server.start(); 239 240 DiscoveryAgent da = getDiscoveryAgent(); 241 if (da != null) { 242 da.registerService(getPublishableConnectString()); 243 da.start(); 244 } 245 if (enableStatusMonitor) { 246 this.statusDector = new TransportStatusDetector(this); 247 this.statusDector.start(); 248 } 249 250 LOG.info("Connector " + getName() + " Started"); 251 } 252 253 private String getPublishableConnectString() throws Exception { 254 URI connectUri = getConnectUri(); 255 String publishableConnectString = connectUri.toString(); 256 // strip off server side query parameters which may not be compatible to clients 257 if (connectUri.getRawQuery() != null) { 258 publishableConnectString = 259 publishableConnectString.substring(0, publishableConnectString.indexOf(connectUri.getRawQuery()) -1); 260 } 261 if (LOG.isDebugEnabled()) { 262 LOG.debug("Publishing: " + publishableConnectString + " for broker transport URI: " + connectUri); 263 } 264 return publishableConnectString; 265 } 266 267 public void stop() throws Exception { 268 ServiceStopper ss = new ServiceStopper(); 269 if (discoveryAgent != null) { 270 ss.stop(discoveryAgent); 271 } 272 if (server != null) { 273 ss.stop(server); 274 server = null; 275 } 276 if (this.statusDector != null) { 277 this.statusDector.stop(); 278 } 279 280 for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext();) { 281 TransportConnection c = iter.next(); 282 ss.stop(c); 283 } 284 ss.throwFirstException(); 285 LOG.info("Connector " + getName() + " Stopped"); 286 } 287 288 // Implementation methods 289 // ------------------------------------------------------------------------- 290 protected Connection createConnection(Transport transport) throws IOException { 291 TransportConnection answer = new TransportConnection(this, transport, broker, disableAsyncDispatch ? null : taskRunnerFactory); 292 boolean statEnabled = this.getStatistics().isEnabled(); 293 answer.getStatistics().setEnabled(statEnabled); 294 answer.setMessageAuthorizationPolicy(messageAuthorizationPolicy); 295 return answer; 296 } 297 298 protected TransportServer createTransportServer() throws IOException, URISyntaxException { 299 if (uri == null) { 300 throw new IllegalArgumentException("You must specify either a server or uri property"); 301 } 302 if (brokerService == null) { 303 throw new IllegalArgumentException("You must specify the brokerService property. Maybe this connector should be added to a broker?"); 304 } 305 return TransportFactory.bind(brokerService, uri); 306 } 307 308 public DiscoveryAgent getDiscoveryAgent() throws IOException { 309 if (discoveryAgent == null) { 310 discoveryAgent = createDiscoveryAgent(); 311 } 312 return discoveryAgent; 313 } 314 315 protected DiscoveryAgent createDiscoveryAgent() throws IOException { 316 if (discoveryUri != null) { 317 return DiscoveryAgentFactory.createDiscoveryAgent(discoveryUri); 318 } 319 return null; 320 } 321 322 public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) { 323 this.discoveryAgent = discoveryAgent; 324 } 325 326 public URI getDiscoveryUri() { 327 return discoveryUri; 328 } 329 330 public void setDiscoveryUri(URI discoveryUri) { 331 this.discoveryUri = discoveryUri; 332 } 333 334 public URI getConnectUri() throws IOException, URISyntaxException { 335 if (connectUri == null) { 336 if (server != null) { 337 connectUri = server.getConnectURI(); 338 } 339 } 340 return connectUri; 341 } 342 343 public void setConnectUri(URI transportUri) { 344 this.connectUri = transportUri; 345 } 346 347 public void onStarted(TransportConnection connection) { 348 connections.add(connection); 349 } 350 351 public void onStopped(TransportConnection connection) { 352 connections.remove(connection); 353 } 354 355 public String getName() { 356 if (name == null) { 357 uri = getUri(); 358 if (uri != null) { 359 name = uri.toString(); 360 } 361 } 362 return name; 363 } 364 365 public void setName(String name) { 366 this.name = name; 367 } 368 369 public String toString() { 370 String rc = getName(); 371 if (rc == null) { 372 rc = super.toString(); 373 } 374 return rc; 375 } 376 377 public boolean isDisableAsyncDispatch() { 378 return disableAsyncDispatch; 379 } 380 381 public void setDisableAsyncDispatch(boolean disableAsyncDispatch) { 382 this.disableAsyncDispatch = disableAsyncDispatch; 383 } 384 385 /** 386 * @return the enableStatusMonitor 387 */ 388 public boolean isEnableStatusMonitor() { 389 return enableStatusMonitor; 390 } 391 392 /** 393 * @param enableStatusMonitor the enableStatusMonitor to set 394 */ 395 public void setEnableStatusMonitor(boolean enableStatusMonitor) { 396 this.enableStatusMonitor = enableStatusMonitor; 397 } 398 399 /** 400 * This is called by the BrokerService right before it starts the transport. 401 */ 402 public void setBrokerService(BrokerService brokerService) { 403 this.brokerService = brokerService; 404 } 405 406 public Broker getBroker() { 407 return broker; 408 } 409 410 public BrokerService getBrokerService() { 411 return brokerService; 412 } 413 }