001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.network; 018 019 import java.io.IOException; 020 import java.security.GeneralSecurityException; 021 import java.security.cert.X509Certificate; 022 import java.util.Collection; 023 import java.util.List; 024 import java.util.Properties; 025 import java.util.concurrent.ConcurrentHashMap; 026 import java.util.concurrent.CountDownLatch; 027 import java.util.concurrent.SynchronousQueue; 028 import java.util.concurrent.ThreadFactory; 029 import java.util.concurrent.ThreadPoolExecutor; 030 import java.util.concurrent.TimeUnit; 031 import java.util.concurrent.atomic.AtomicBoolean; 032 import java.util.concurrent.atomic.AtomicLong; 033 034 import org.apache.activemq.Service; 035 import org.apache.activemq.advisory.AdvisorySupport; 036 import org.apache.activemq.broker.BrokerService; 037 import org.apache.activemq.broker.BrokerServiceAware; 038 import org.apache.activemq.broker.TransportConnection; 039 import org.apache.activemq.broker.region.AbstractRegion; 040 import org.apache.activemq.broker.region.RegionBroker; 041 import org.apache.activemq.broker.region.Subscription; 042 import org.apache.activemq.command.ActiveMQDestination; 043 import org.apache.activemq.command.ActiveMQMessage; 044 import org.apache.activemq.command.ActiveMQTempDestination; 045 import org.apache.activemq.command.ActiveMQTopic; 046 import org.apache.activemq.command.BrokerId; 047 import org.apache.activemq.command.BrokerInfo; 048 import org.apache.activemq.command.Command; 049 import org.apache.activemq.command.ConnectionError; 050 import org.apache.activemq.command.ConnectionId; 051 import org.apache.activemq.command.ConnectionInfo; 052 import org.apache.activemq.command.ConsumerId; 053 import org.apache.activemq.command.ConsumerInfo; 054 import org.apache.activemq.command.DataStructure; 055 import org.apache.activemq.command.DestinationInfo; 056 import org.apache.activemq.command.ExceptionResponse; 057 import org.apache.activemq.command.KeepAliveInfo; 058 import org.apache.activemq.command.Message; 059 import org.apache.activemq.command.MessageAck; 060 import org.apache.activemq.command.MessageDispatch; 061 import org.apache.activemq.command.NetworkBridgeFilter; 062 import org.apache.activemq.command.ProducerInfo; 063 import org.apache.activemq.command.RemoveInfo; 064 import org.apache.activemq.command.Response; 065 import org.apache.activemq.command.SessionInfo; 066 import org.apache.activemq.command.ShutdownInfo; 067 import org.apache.activemq.command.WireFormatInfo; 068 import org.apache.activemq.filter.DestinationFilter; 069 import org.apache.activemq.transport.DefaultTransportListener; 070 import org.apache.activemq.transport.FutureResponse; 071 import org.apache.activemq.transport.ResponseCallback; 072 import org.apache.activemq.transport.Transport; 073 import org.apache.activemq.transport.TransportDisposedIOException; 074 import org.apache.activemq.transport.TransportFilter; 075 import org.apache.activemq.transport.TransportListener; 076 import org.apache.activemq.transport.tcp.SslTransport; 077 import org.apache.activemq.util.IdGenerator; 078 import org.apache.activemq.util.IntrospectionSupport; 079 import org.apache.activemq.util.LongSequenceGenerator; 080 import org.apache.activemq.util.MarshallingSupport; 081 import org.apache.activemq.util.ServiceStopper; 082 import org.apache.activemq.util.ServiceSupport; 083 import org.apache.commons.logging.Log; 084 import org.apache.commons.logging.LogFactory; 085 086 /** 087 * A useful base class for implementing demand forwarding bridges. 088 * 089 * @version $Revision: 835920 $ 090 */ 091 public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware { 092 093 private static final Log LOG = LogFactory.getLog(DemandForwardingBridge.class); 094 private static final ThreadPoolExecutor ASYNC_TASKS; 095 protected final Transport localBroker; 096 protected final Transport remoteBroker; 097 protected final IdGenerator idGenerator = new IdGenerator(); 098 protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 099 protected ConnectionInfo localConnectionInfo; 100 protected ConnectionInfo remoteConnectionInfo; 101 protected SessionInfo localSessionInfo; 102 protected ProducerInfo producerInfo; 103 protected String remoteBrokerName = "Unknown"; 104 protected String localClientId; 105 protected ConsumerInfo demandConsumerInfo; 106 protected int demandConsumerDispatched; 107 protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false); 108 protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false); 109 protected AtomicBoolean disposed = new AtomicBoolean(); 110 protected BrokerId localBrokerId; 111 protected ActiveMQDestination[] excludedDestinations; 112 protected ActiveMQDestination[] dynamicallyIncludedDestinations; 113 protected ActiveMQDestination[] staticallyIncludedDestinations; 114 protected ActiveMQDestination[] durableDestinations; 115 protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<ConsumerId, DemandSubscription>(); 116 protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<ConsumerId, DemandSubscription>(); 117 protected final BrokerId localBrokerPath[] = new BrokerId[] { null }; 118 protected CountDownLatch startedLatch = new CountDownLatch(2); 119 protected CountDownLatch localStartedLatch = new CountDownLatch(1); 120 protected CountDownLatch remoteBrokerNameKnownLatch = new CountDownLatch(1); 121 protected CountDownLatch localBrokerIdKnownLatch = new CountDownLatch(1); 122 protected final AtomicBoolean remoteInterupted = new AtomicBoolean(false); 123 protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false); 124 protected NetworkBridgeConfiguration configuration; 125 126 final AtomicLong enqueueCounter = new AtomicLong(); 127 final AtomicLong dequeueCounter = new AtomicLong(); 128 129 private NetworkBridgeListener networkBridgeListener; 130 private boolean createdByDuplex; 131 private BrokerInfo localBrokerInfo; 132 private BrokerInfo remoteBrokerInfo; 133 134 private AtomicBoolean started = new AtomicBoolean(); 135 private TransportConnection duplexInitiatingConnection; 136 private BrokerService brokerService = null; 137 138 public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) { 139 this.configuration = configuration; 140 this.localBroker = localBroker; 141 this.remoteBroker = remoteBroker; 142 } 143 144 public void duplexStart(TransportConnection connection, BrokerInfo localBrokerInfo, BrokerInfo remoteBrokerInfo) throws Exception { 145 this.localBrokerInfo = localBrokerInfo; 146 this.remoteBrokerInfo = remoteBrokerInfo; 147 this.duplexInitiatingConnection = connection; 148 start(); 149 serviceRemoteCommand(remoteBrokerInfo); 150 } 151 152 public void start() throws Exception { 153 if (started.compareAndSet(false, true)) { 154 localBroker.setTransportListener(new DefaultTransportListener() { 155 156 public void onCommand(Object o) { 157 Command command = (Command) o; 158 serviceLocalCommand(command); 159 } 160 161 public void onException(IOException error) { 162 serviceLocalException(error); 163 } 164 }); 165 remoteBroker.setTransportListener(new TransportListener() { 166 167 public void onCommand(Object o) { 168 Command command = (Command) o; 169 serviceRemoteCommand(command); 170 } 171 172 public void onException(IOException error) { 173 serviceRemoteException(error); 174 } 175 176 public void transportInterupted() { 177 // clear any subscriptions - to try and prevent the bridge 178 // from stalling the broker 179 if (remoteInterupted.compareAndSet(false, true)) { 180 LOG.info("Outbound transport to " + remoteBrokerName + " interrupted."); 181 if (localBridgeStarted.get()) { 182 clearDownSubscriptions(); 183 synchronized (DemandForwardingBridgeSupport.this) { 184 try { 185 localBroker.oneway(localConnectionInfo.createRemoveCommand()); 186 } catch (TransportDisposedIOException td) { 187 LOG.debug("local broker is now disposed", td); 188 } catch (IOException e) { 189 LOG.warn("Caught exception from local start", e); 190 } 191 } 192 } 193 localBridgeStarted.set(false); 194 remoteBridgeStarted.set(false); 195 startedLatch = new CountDownLatch(2); 196 localStartedLatch = new CountDownLatch(1); 197 } 198 } 199 200 public void transportResumed() { 201 if (remoteInterupted.compareAndSet(true, false)) { 202 // We want to slow down false connects so that we don't 203 // get in a busy loop. 204 // False connects can occurr if you using SSH tunnels. 205 if (!lastConnectSucceeded.get()) { 206 try { 207 LOG.debug("Previous connection was never fully established. Sleeping for second to avoid busy loop."); 208 Thread.sleep(1000); 209 } catch (InterruptedException e) { 210 Thread.currentThread().interrupt(); 211 } 212 } 213 lastConnectSucceeded.set(false); 214 try { 215 startLocalBridge(); 216 remoteBridgeStarted.set(true); 217 startedLatch.countDown(); 218 LOG.info("Outbound transport to " + remoteBrokerName + " resumed"); 219 } catch (Exception e) { 220 LOG.error("Caught exception from local start in resume transport", e); 221 } 222 } 223 } 224 }); 225 226 localBroker.start(); 227 remoteBroker.start(); 228 if (configuration.isDuplex() && duplexInitiatingConnection == null) { 229 // initiator side of duplex network 230 remoteBrokerNameKnownLatch.await(); 231 } 232 try { 233 triggerRemoteStartBridge(); 234 } catch (IOException e) { 235 LOG.warn("Caught exception from remote start", e); 236 } 237 NetworkBridgeListener l = this.networkBridgeListener; 238 if (l != null) { 239 l.onStart(this); 240 } 241 } 242 } 243 244 protected void triggerLocalStartBridge() throws IOException { 245 ASYNC_TASKS.execute(new Runnable() { 246 public void run() { 247 final String originalName = Thread.currentThread().getName(); 248 Thread.currentThread().setName("StartLocalBridge: localBroker=" + localBroker); 249 try { 250 startLocalBridge(); 251 } catch (Exception e) { 252 serviceLocalException(e); 253 } finally { 254 Thread.currentThread().setName(originalName); 255 } 256 } 257 }); 258 } 259 260 protected void triggerRemoteStartBridge() throws IOException { 261 ASYNC_TASKS.execute(new Runnable() { 262 public void run() { 263 final String originalName = Thread.currentThread().getName(); 264 Thread.currentThread().setName("StartRemotelBridge: localBroker=" + localBroker); 265 try { 266 startRemoteBridge(); 267 } catch (Exception e) { 268 serviceRemoteException(e); 269 } finally { 270 Thread.currentThread().setName(originalName); 271 } 272 } 273 }); 274 } 275 276 protected void startLocalBridge() throws Exception { 277 if (localBridgeStarted.compareAndSet(false, true)) { 278 synchronized (this) { 279 if (LOG.isTraceEnabled()) { 280 LOG.trace(configuration.getBrokerName() + " starting local Bridge, localBroker=" + localBroker); 281 } 282 remoteBrokerNameKnownLatch.await(); 283 284 localConnectionInfo = new ConnectionInfo(); 285 localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); 286 localClientId = "NC_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName(); 287 localConnectionInfo.setClientId(localClientId); 288 localConnectionInfo.setUserName(configuration.getUserName()); 289 localConnectionInfo.setPassword(configuration.getPassword()); 290 Transport originalTransport = remoteBroker; 291 while (originalTransport instanceof TransportFilter) { 292 originalTransport = ((TransportFilter) originalTransport).getNext(); 293 } 294 if (originalTransport instanceof SslTransport) { 295 X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates(); 296 localConnectionInfo.setTransportContext(peerCerts); 297 } 298 localBroker.oneway(localConnectionInfo); 299 300 localSessionInfo = new SessionInfo(localConnectionInfo, 1); 301 localBroker.oneway(localSessionInfo); 302 303 LOG.info("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") has been established."); 304 305 startedLatch.countDown(); 306 localStartedLatch.countDown(); 307 setupStaticDestinations(); 308 } 309 } 310 } 311 312 protected void startRemoteBridge() throws Exception { 313 if (remoteBridgeStarted.compareAndSet(false, true)) { 314 if (LOG.isTraceEnabled()) { 315 LOG.trace(configuration.getBrokerName() + " starting remote Bridge, localBroker=" + localBroker); 316 } 317 synchronized (this) { 318 if (!isCreatedByDuplex()) { 319 BrokerInfo brokerInfo = new BrokerInfo(); 320 brokerInfo.setBrokerName(configuration.getBrokerName()); 321 brokerInfo.setNetworkConnection(true); 322 brokerInfo.setDuplexConnection(configuration.isDuplex()); 323 // set our properties 324 Properties props = new Properties(); 325 IntrospectionSupport.getProperties(configuration, props, null); 326 String str = MarshallingSupport.propertiesToString(props); 327 brokerInfo.setNetworkProperties(str); 328 brokerInfo.setBrokerId(this.localBrokerId); 329 remoteBroker.oneway(brokerInfo); 330 } 331 if (remoteConnectionInfo != null) { 332 remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand()); 333 } 334 remoteConnectionInfo = new ConnectionInfo(); 335 remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); 336 remoteConnectionInfo.setClientId("NC_" + configuration.getBrokerName() + "_outbound"); 337 remoteConnectionInfo.setUserName(configuration.getUserName()); 338 remoteConnectionInfo.setPassword(configuration.getPassword()); 339 remoteBroker.oneway(remoteConnectionInfo); 340 341 SessionInfo remoteSessionInfo = new SessionInfo(remoteConnectionInfo, 1); 342 remoteBroker.oneway(remoteSessionInfo); 343 producerInfo = new ProducerInfo(remoteSessionInfo, 1); 344 producerInfo.setResponseRequired(false); 345 remoteBroker.oneway(producerInfo); 346 // Listen to consumer advisory messages on the remote broker to 347 // determine demand. 348 demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1); 349 demandConsumerInfo.setDispatchAsync(configuration.isDispatchAsync()); 350 String advisoryTopic = AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + configuration.getDestinationFilter(); 351 if (configuration.isBridgeTempDestinations()) { 352 advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC; 353 } 354 demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic)); 355 demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize()); 356 remoteBroker.oneway(demandConsumerInfo); 357 startedLatch.countDown(); 358 if (!disposed.get()) { 359 triggerLocalStartBridge(); 360 } 361 } 362 } 363 } 364 365 public void stop() throws Exception { 366 if (started.compareAndSet(true, false)) { 367 if (disposed.compareAndSet(false, true)) { 368 LOG.debug(" stopping " + configuration.getBrokerName() + " bridge to " + remoteBrokerName); 369 NetworkBridgeListener l = this.networkBridgeListener; 370 if (l != null) { 371 l.onStop(this); 372 } 373 try { 374 remoteBridgeStarted.set(false); 375 final CountDownLatch sendShutdown = new CountDownLatch(1); 376 ASYNC_TASKS.execute(new Runnable() { 377 public void run() { 378 try { 379 localBroker.oneway(new ShutdownInfo()); 380 sendShutdown.countDown(); 381 remoteBroker.oneway(new ShutdownInfo()); 382 } catch (Throwable e) { 383 LOG.debug("Caught exception sending shutdown", e); 384 } finally { 385 sendShutdown.countDown(); 386 } 387 388 } 389 }); 390 if (!sendShutdown.await(10, TimeUnit.SECONDS)) { 391 LOG.info("Network Could not shutdown in a timely manner"); 392 } 393 } finally { 394 ServiceStopper ss = new ServiceStopper(); 395 ss.stop(remoteBroker); 396 ss.stop(localBroker); 397 // Release the started Latch since another thread could be 398 // stuck waiting for it to start up. 399 startedLatch.countDown(); 400 startedLatch.countDown(); 401 localStartedLatch.countDown(); 402 ss.throwFirstException(); 403 } 404 } 405 LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped"); 406 } 407 } 408 409 public void serviceRemoteException(Throwable error) { 410 if (!disposed.get()) { 411 if (error instanceof SecurityException || error instanceof GeneralSecurityException) { 412 LOG.error("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error); 413 } else { 414 LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error); 415 } 416 LOG.debug("The remote Exception was: " + error, error); 417 ASYNC_TASKS.execute(new Runnable() { 418 public void run() { 419 ServiceSupport.dispose(getControllingService()); 420 } 421 }); 422 fireBridgeFailed(); 423 } 424 } 425 426 protected void serviceRemoteCommand(Command command) { 427 if (!disposed.get()) { 428 try { 429 if (command.isMessageDispatch()) { 430 waitStarted(); 431 MessageDispatch md = (MessageDispatch) command; 432 serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure()); 433 demandConsumerDispatched++; 434 if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() * .75)) { 435 remoteBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched)); 436 demandConsumerDispatched = 0; 437 } 438 } else if (command.isBrokerInfo()) { 439 lastConnectSucceeded.set(true); 440 remoteBrokerInfo = (BrokerInfo) command; 441 Properties props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties()); 442 try { 443 IntrospectionSupport.getProperties(configuration, props, null); 444 if (configuration.getExcludedDestinations() != null) { 445 excludedDestinations = configuration.getExcludedDestinations().toArray( 446 new ActiveMQDestination[configuration.getExcludedDestinations().size()]); 447 } 448 if (configuration.getStaticallyIncludedDestinations() != null) { 449 staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray( 450 new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]); 451 } 452 if (configuration.getDynamicallyIncludedDestinations() != null) { 453 dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations() 454 .toArray( 455 new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations() 456 .size()]); 457 } 458 } catch (Throwable t) { 459 LOG.error("Error mapping remote destinations", t); 460 } 461 serviceRemoteBrokerInfo(command); 462 // Let the local broker know the remote broker's ID. 463 localBroker.oneway(command); 464 } else if (command.getClass() == ConnectionError.class) { 465 ConnectionError ce = (ConnectionError) command; 466 serviceRemoteException(ce.getException()); 467 } else { 468 if (isDuplex()) { 469 if (command.isMessage()) { 470 ActiveMQMessage message = (ActiveMQMessage) command; 471 if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination())) { 472 serviceRemoteConsumerAdvisory(message.getDataStructure()); 473 } else { 474 if (!isPermissableDestination(message.getDestination(), true)) { 475 return; 476 } 477 if (message.isResponseRequired()) { 478 Response reply = new Response(); 479 reply.setCorrelationId(message.getCommandId()); 480 localBroker.oneway(message); 481 remoteBroker.oneway(reply); 482 } else { 483 localBroker.oneway(message); 484 } 485 } 486 } else { 487 switch (command.getDataStructureType()) { 488 case ConnectionInfo.DATA_STRUCTURE_TYPE: 489 case SessionInfo.DATA_STRUCTURE_TYPE: 490 case ProducerInfo.DATA_STRUCTURE_TYPE: 491 localBroker.oneway(command); 492 break; 493 case ConsumerInfo.DATA_STRUCTURE_TYPE: 494 localStartedLatch.await(); 495 if (started.get()) { 496 if (!addConsumerInfo((ConsumerInfo) command)) { 497 if (LOG.isDebugEnabled()) { 498 LOG.debug("Ignoring ConsumerInfo: " + command); 499 } 500 } else { 501 if (LOG.isTraceEnabled()) { 502 LOG.trace("Adding ConsumerInfo: " + command); 503 } 504 } 505 } else { 506 // received a subscription whilst stopping 507 LOG.warn("Stopping - ignoring ConsumerInfo: " + command); 508 } 509 break; 510 default: 511 if (LOG.isDebugEnabled()) { 512 LOG.debug("Ignoring remote command: " + command); 513 } 514 } 515 } 516 } else { 517 switch (command.getDataStructureType()) { 518 case KeepAliveInfo.DATA_STRUCTURE_TYPE: 519 case WireFormatInfo.DATA_STRUCTURE_TYPE: 520 case ShutdownInfo.DATA_STRUCTURE_TYPE: 521 break; 522 default: 523 LOG.warn("Unexpected remote command: " + command); 524 } 525 } 526 } 527 } catch (Throwable e) { 528 if (LOG.isDebugEnabled()) { 529 LOG.debug("Exception processing remote command: " + command, e); 530 } 531 serviceRemoteException(e); 532 } 533 } 534 } 535 536 private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException { 537 final int networkTTL = configuration.getNetworkTTL(); 538 if (data.getClass() == ConsumerInfo.class) { 539 // Create a new local subscription 540 ConsumerInfo info = (ConsumerInfo) data; 541 BrokerId[] path = info.getBrokerPath(); 542 543 if (path != null && path.length >= networkTTL) { 544 if (LOG.isDebugEnabled()) { 545 LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", restricted to " + networkTTL + " network hops only : " + info); 546 } 547 return; 548 } 549 if (contains(path, localBrokerPath[0])) { 550 // Ignore this consumer as it's a consumer we locally sent to the broker. 551 if (LOG.isDebugEnabled()) { 552 LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", already routed through this broker once : " + info); 553 } 554 return; 555 } 556 if (!isPermissableDestination(info.getDestination())) { 557 // ignore if not in the permitted or in the excluded list 558 if (LOG.isDebugEnabled()) { 559 LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", destination " + info.getDestination() + " is not permiited :" + info); 560 } 561 return; 562 } 563 564 // in a cyclic network there can be multiple bridges per broker that can propagate 565 // a network subscription so there is a need to synchronise on a shared entity 566 synchronized (brokerService.getVmConnectorURI()) { 567 if (addConsumerInfo(info)) { 568 if (LOG.isDebugEnabled()) { 569 LOG.debug(configuration.getBrokerName() + " bridging sub on " + localBroker + " from " + remoteBrokerName + " : " + info); 570 } 571 } else { 572 if (LOG.isDebugEnabled()) { 573 LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + " as already subscribed to matching destination : " + info); 574 } 575 } 576 } 577 } else if (data.getClass() == DestinationInfo.class) { 578 // It's a destination info - we want to pass up 579 // information about temporary destinations 580 DestinationInfo destInfo = (DestinationInfo) data; 581 BrokerId[] path = destInfo.getBrokerPath(); 582 if (path != null && path.length >= networkTTL) { 583 if (LOG.isDebugEnabled()) { 584 LOG.debug(configuration.getBrokerName() + " Ignoring destination " + destInfo + " restricted to " + networkTTL + " network hops only"); 585 } 586 return; 587 } 588 if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) { 589 // Ignore this consumer as it's a consumer we locally sent to 590 // the broker. 591 if (LOG.isDebugEnabled()) { 592 LOG.debug(configuration.getBrokerName() + " Ignoring destination " + destInfo + " already routed through this broker once"); 593 } 594 return; 595 } 596 destInfo.setConnectionId(localConnectionInfo.getConnectionId()); 597 if (destInfo.getDestination() instanceof ActiveMQTempDestination) { 598 // re-set connection id so comes from here 599 ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination(); 600 tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId()); 601 } 602 destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath())); 603 if (LOG.isTraceEnabled()) { 604 LOG.trace("bridging destination control command: " + destInfo); 605 } 606 localBroker.oneway(destInfo); 607 } else if (data.getClass() == RemoveInfo.class) { 608 ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId(); 609 removeDemandSubscription(id); 610 } 611 } 612 613 public void serviceLocalException(Throwable error) { 614 if (!disposed.get()) { 615 LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a local error: " + error); 616 LOG.debug("The local Exception was:" + error, error); 617 ASYNC_TASKS.execute(new Runnable() { 618 public void run() { 619 ServiceSupport.dispose(getControllingService()); 620 } 621 }); 622 fireBridgeFailed(); 623 } 624 } 625 626 protected Service getControllingService() { 627 return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this; 628 } 629 630 protected void addSubscription(DemandSubscription sub) throws IOException { 631 if (sub != null) { 632 localBroker.oneway(sub.getLocalInfo()); 633 } 634 } 635 636 protected void removeSubscription(final DemandSubscription sub) throws IOException { 637 if (sub != null) { 638 if (LOG.isDebugEnabled()) { 639 LOG.debug(configuration.getBrokerName() + " remove local subscription for remote " + sub.getRemoteInfo().getConsumerId()); 640 } 641 subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId()); 642 643 // continue removal in separate thread to free up this thread for outstanding responses 644 ASYNC_TASKS.execute(new Runnable() { 645 public void run() { 646 sub.waitForCompletion(); 647 try { 648 localBroker.oneway(sub.getLocalInfo().createRemoveCommand()); 649 } catch (IOException e) { 650 LOG.warn("failed to deliver remove command for local subscription, for remote " + sub.getRemoteInfo().getConsumerId(), e); 651 } 652 } 653 }); 654 } 655 } 656 657 protected Message configureMessage(MessageDispatch md) { 658 Message message = md.getMessage().copy(); 659 // Update the packet to show where it came from. 660 message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath)); 661 message.setProducerId(producerInfo.getProducerId()); 662 message.setDestination(md.getDestination()); 663 if (message.getOriginalTransactionId() == null) { 664 message.setOriginalTransactionId(message.getTransactionId()); 665 } 666 message.setTransactionId(null); 667 return message; 668 } 669 670 protected void serviceLocalCommand(Command command) { 671 if (!disposed.get()) { 672 try { 673 if (command.isMessageDispatch()) { 674 enqueueCounter.incrementAndGet(); 675 final MessageDispatch md = (MessageDispatch) command; 676 final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId()); 677 if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) { 678 // See if this consumer's brokerPath tells us it came from the broker at the other end 679 // of the bridge. I think we should be making this decision based on the message's 680 // broker bread crumbs and not the consumer's? However, the message's broker bread 681 // crumbs are null, which is another matter. 682 boolean cameFromRemote = false; 683 Object consumerInfo = md.getMessage().getDataStructure(); 684 if (consumerInfo != null && (consumerInfo instanceof ConsumerInfo)) 685 cameFromRemote = contains(((ConsumerInfo) consumerInfo).getBrokerPath(), remoteBrokerInfo.getBrokerId()); 686 687 Message message = configureMessage(md); 688 if (LOG.isDebugEnabled()) { 689 LOG.debug("bridging " + configuration.getBrokerName() + " -> " + remoteBrokerName + ": " + message); 690 } 691 692 if (!message.isResponseRequired()) { 693 694 // If the message was originally sent using async 695 // send, we will preserve that QOS 696 // by bridging it using an async send (small chance 697 // of message loss). 698 699 try { 700 // Don't send it off to the remote if it originally came from the remote. 701 if (!cameFromRemote) { 702 remoteBroker.oneway(message); 703 } else { 704 if (LOG.isDebugEnabled()) { 705 LOG.debug("Message not forwarded on to remote, because message came from remote"); 706 } 707 } 708 709 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); 710 dequeueCounter.incrementAndGet(); 711 } finally { 712 sub.decrementOutstandingResponses(); 713 } 714 715 } else { 716 717 // The message was not sent using async send, so we 718 // should only ack the local 719 // broker when we get confirmation that the remote 720 // broker has received the message. 721 ResponseCallback callback = new ResponseCallback() { 722 public void onCompletion(FutureResponse future) { 723 try { 724 Response response = future.getResult(); 725 if (response.isException()) { 726 ExceptionResponse er = (ExceptionResponse) response; 727 serviceLocalException(er.getException()); 728 } else { 729 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); 730 dequeueCounter.incrementAndGet(); 731 732 } 733 } catch (IOException e) { 734 serviceLocalException(e); 735 } finally { 736 sub.decrementOutstandingResponses(); 737 } 738 } 739 }; 740 741 remoteBroker.asyncRequest(message, callback); 742 } 743 744 } else { 745 if (LOG.isDebugEnabled()) { 746 LOG.debug("No subscription registered with this network bridge for consumerId " + md.getConsumerId() + " for message: " + md.getMessage()); 747 } 748 } 749 } else if (command.isBrokerInfo()) { 750 localBrokerInfo = (BrokerInfo) command; 751 serviceLocalBrokerInfo(command); 752 } else if (command.isShutdownInfo()) { 753 LOG.info(configuration.getBrokerName() + " Shutting down"); 754 // Don't shut down the whole connector if the remote side 755 // was interrupted. 756 // the local transport is just shutting down temporarily 757 // until the remote side 758 // is restored. 759 if (!remoteInterupted.get()) { 760 stop(); 761 } 762 } else if (command.getClass() == ConnectionError.class) { 763 ConnectionError ce = (ConnectionError) command; 764 serviceLocalException(ce.getException()); 765 } else { 766 switch (command.getDataStructureType()) { 767 case WireFormatInfo.DATA_STRUCTURE_TYPE: 768 break; 769 default: 770 LOG.warn("Unexpected local command: " + command); 771 } 772 } 773 } catch (Throwable e) { 774 LOG.warn("Caught an exception processing local command", e); 775 serviceLocalException(e); 776 } 777 } 778 } 779 780 /** 781 * @return Returns the dynamicallyIncludedDestinations. 782 */ 783 public ActiveMQDestination[] getDynamicallyIncludedDestinations() { 784 return dynamicallyIncludedDestinations; 785 } 786 787 /** 788 * @param dynamicallyIncludedDestinations The 789 * dynamicallyIncludedDestinations to set. 790 */ 791 public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) { 792 this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations; 793 } 794 795 /** 796 * @return Returns the excludedDestinations. 797 */ 798 public ActiveMQDestination[] getExcludedDestinations() { 799 return excludedDestinations; 800 } 801 802 /** 803 * @param excludedDestinations The excludedDestinations to set. 804 */ 805 public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) { 806 this.excludedDestinations = excludedDestinations; 807 } 808 809 /** 810 * @return Returns the staticallyIncludedDestinations. 811 */ 812 public ActiveMQDestination[] getStaticallyIncludedDestinations() { 813 return staticallyIncludedDestinations; 814 } 815 816 /** 817 * @param staticallyIncludedDestinations The staticallyIncludedDestinations 818 * to set. 819 */ 820 public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) { 821 this.staticallyIncludedDestinations = staticallyIncludedDestinations; 822 } 823 824 /** 825 * @return Returns the durableDestinations. 826 */ 827 public ActiveMQDestination[] getDurableDestinations() { 828 return durableDestinations; 829 } 830 831 /** 832 * @param durableDestinations The durableDestinations to set. 833 */ 834 public void setDurableDestinations(ActiveMQDestination[] durableDestinations) { 835 this.durableDestinations = durableDestinations; 836 } 837 838 /** 839 * @return Returns the localBroker. 840 */ 841 public Transport getLocalBroker() { 842 return localBroker; 843 } 844 845 /** 846 * @return Returns the remoteBroker. 847 */ 848 public Transport getRemoteBroker() { 849 return remoteBroker; 850 } 851 852 /** 853 * @return the createdByDuplex 854 */ 855 public boolean isCreatedByDuplex() { 856 return this.createdByDuplex; 857 } 858 859 /** 860 * @param createdByDuplex the createdByDuplex to set 861 */ 862 public void setCreatedByDuplex(boolean createdByDuplex) { 863 this.createdByDuplex = createdByDuplex; 864 } 865 866 public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) { 867 if (brokerPath != null) { 868 for (int i = 0; i < brokerPath.length; i++) { 869 if (brokerId.equals(brokerPath[i])) { 870 return true; 871 } 872 } 873 } 874 return false; 875 } 876 877 protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId[] pathsToAppend) { 878 if (brokerPath == null || brokerPath.length == 0) { 879 return pathsToAppend; 880 } 881 BrokerId rc[] = new BrokerId[brokerPath.length + pathsToAppend.length]; 882 System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length); 883 System.arraycopy(pathsToAppend, 0, rc, brokerPath.length, pathsToAppend.length); 884 return rc; 885 } 886 887 protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) { 888 if (brokerPath == null || brokerPath.length == 0) { 889 return new BrokerId[] { idToAppend }; 890 } 891 BrokerId rc[] = new BrokerId[brokerPath.length + 1]; 892 System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length); 893 rc[brokerPath.length] = idToAppend; 894 return rc; 895 } 896 897 protected boolean isPermissableDestination(ActiveMQDestination destination) { 898 return isPermissableDestination(destination, false); 899 } 900 901 protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary) { 902 // Are we not bridging temp destinations? 903 if (destination.isTemporary()) { 904 if (allowTemporary) { 905 return true; 906 } else { 907 return configuration.isBridgeTempDestinations(); 908 } 909 } 910 911 final DestinationFilter filter = DestinationFilter.parseFilter(destination); 912 913 ActiveMQDestination[] dests = excludedDestinations; 914 if (dests != null && dests.length > 0) { 915 for (int i = 0; i < dests.length; i++) { 916 DestinationFilter exclusionFilter = filter; 917 ActiveMQDestination match = dests[i]; 918 if (exclusionFilter instanceof org.apache.activemq.filter.SimpleDestinationFilter) { 919 DestinationFilter newFilter = DestinationFilter.parseFilter(match); 920 if (!(newFilter instanceof org.apache.activemq.filter.SimpleDestinationFilter)) { 921 exclusionFilter = newFilter; 922 match = destination; 923 } 924 } 925 if (match != null && exclusionFilter.matches(match) && dests[i].getDestinationType() == destination.getDestinationType()) { 926 return false; 927 } 928 } 929 } 930 dests = dynamicallyIncludedDestinations; 931 if (dests != null && dests.length > 0) { 932 for (int i = 0; i < dests.length; i++) { 933 DestinationFilter inclusionFilter = filter; 934 ActiveMQDestination match = dests[i]; 935 if (inclusionFilter instanceof org.apache.activemq.filter.SimpleDestinationFilter) { 936 DestinationFilter newFilter = DestinationFilter.parseFilter(match); 937 if (!(newFilter instanceof org.apache.activemq.filter.SimpleDestinationFilter)) { 938 inclusionFilter = newFilter; 939 match = destination; 940 } 941 } 942 if (match != null && inclusionFilter.matches(match) && dests[i].getDestinationType() == destination.getDestinationType()) { 943 return true; 944 } 945 } 946 return false; 947 } 948 return true; 949 } 950 951 /** 952 * Subscriptions for these destinations are always created 953 */ 954 protected void setupStaticDestinations() { 955 ActiveMQDestination[] dests = staticallyIncludedDestinations; 956 if (dests != null) { 957 for (int i = 0; i < dests.length; i++) { 958 ActiveMQDestination dest = dests[i]; 959 DemandSubscription sub = createDemandSubscription(dest); 960 try { 961 addSubscription(sub); 962 } catch (IOException e) { 963 LOG.error("Failed to add static destination " + dest, e); 964 } 965 if (LOG.isTraceEnabled()) { 966 LOG.trace("bridging messages for static destination: " + dest); 967 } 968 } 969 } 970 } 971 972 protected boolean addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException { 973 boolean consumerAdded = false; 974 ConsumerInfo info = consumerInfo.copy(); 975 addRemoteBrokerToBrokerPath(info); 976 DemandSubscription sub = createDemandSubscription(info); 977 if (sub != null) { 978 if (duplicateSuppressionIsRequired(sub)) { 979 undoMapRegistration(sub); 980 } else { 981 addSubscription(sub); 982 consumerAdded = true; 983 } 984 } 985 return consumerAdded; 986 } 987 988 private void undoMapRegistration(DemandSubscription sub) { 989 subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId()); 990 subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId()); 991 } 992 993 /* 994 * check our existing subs networkConsumerIds against the list of network ids in this subscription 995 * A match means a duplicate which we suppress for topics and maybe for queues 996 */ 997 private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) { 998 final ConsumerInfo consumerInfo = candidate.getRemoteInfo(); 999 boolean suppress = false; 1000 1001 if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions()) { 1002 return suppress; 1003 } 1004 1005 List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds(); 1006 Collection<Subscription> currentSubs = 1007 getRegionSubscriptions(consumerInfo.getDestination().isTopic()); 1008 for (Subscription sub : currentSubs) { 1009 List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds(); 1010 if (!networkConsumers.isEmpty()) { 1011 if (matchFound(candidateConsumers, networkConsumers)) { 1012 suppress = hasLowerPriority(sub, candidate.getLocalInfo()); 1013 break; 1014 } 1015 } 1016 } 1017 return suppress; 1018 } 1019 1020 private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) { 1021 boolean suppress = false; 1022 1023 if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) { 1024 if (LOG.isDebugEnabled()) { 1025 LOG.debug(configuration.getBrokerName() + " Ignoring duplicate subscription from " + remoteBrokerName 1026 + ", sub: " + candidateInfo + " is duplicated by network subscription with equal or higher network priority: " 1027 + existingSub.getConsumerInfo() + ", networkComsumerIds: " + existingSub.getConsumerInfo().getNetworkConsumerIds()); 1028 } 1029 suppress = true; 1030 } else { 1031 // remove the existing lower priority duplicate and allow this candidate 1032 try { 1033 removeDuplicateSubscription(existingSub); 1034 1035 if (LOG.isDebugEnabled()) { 1036 LOG.debug(configuration.getBrokerName() + " Replacing duplicate subscription " + existingSub.getConsumerInfo() 1037 + " with sub from " + remoteBrokerName 1038 + ", which has a higher priority, new sub: " + candidateInfo + ", networkComsumerIds: " 1039 + candidateInfo.getNetworkConsumerIds()); 1040 } 1041 } catch (IOException e) { 1042 LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: " + existingSub, e); 1043 } 1044 } 1045 return suppress; 1046 } 1047 1048 private void removeDuplicateSubscription(Subscription existingSub) throws IOException { 1049 for (NetworkConnector connector : brokerService.getNetworkConnectors()) { 1050 if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) { 1051 break; 1052 } 1053 } 1054 } 1055 1056 private boolean matchFound(List<ConsumerId> candidateConsumers, List<ConsumerId> networkConsumers) { 1057 boolean found = false; 1058 for (ConsumerId aliasConsumer : networkConsumers) { 1059 if (candidateConsumers.contains(aliasConsumer)) { 1060 found = true; 1061 break; 1062 } 1063 } 1064 return found; 1065 } 1066 1067 private final Collection<Subscription> getRegionSubscriptions(boolean isTopic) { 1068 RegionBroker region = (RegionBroker) brokerService.getRegionBroker(); 1069 AbstractRegion abstractRegion = (AbstractRegion) 1070 (isTopic ? region.getTopicRegion() : region.getQueueRegion()); 1071 return abstractRegion.getSubscriptions().values(); 1072 } 1073 1074 protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException { 1075 //add our original id to ourselves 1076 info.addNetworkConsumerId(info.getConsumerId()); 1077 return doCreateDemandSubscription(info); 1078 } 1079 1080 protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException { 1081 DemandSubscription result = new DemandSubscription(info); 1082 result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId())); 1083 if (info.getDestination().isTemporary()) { 1084 // reset the local connection Id 1085 1086 ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination(); 1087 dest.setConnectionId(localConnectionInfo.getConnectionId().toString()); 1088 } 1089 1090 if (configuration.isDecreaseNetworkConsumerPriority()) { 1091 byte priority = ConsumerInfo.NETWORK_CONSUMER_PRIORITY; 1092 if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) { 1093 // The longer the path to the consumer, the less it's consumer priority. 1094 priority -= info.getBrokerPath().length + 1; 1095 } 1096 result.getLocalInfo().setPriority(priority); 1097 if (LOG.isDebugEnabled()) { 1098 LOG.debug(configuration.getBrokerName() + " using priority :" + priority + " for subscription: " + info); 1099 } 1100 } 1101 configureDemandSubscription(info, result); 1102 return result; 1103 } 1104 1105 final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination) { 1106 ConsumerInfo info = new ConsumerInfo(); 1107 info.setDestination(destination); 1108 // the remote info held by the DemandSubscription holds the original 1109 // consumerId, 1110 // the local info get's overwritten 1111 1112 info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId())); 1113 DemandSubscription result = null; 1114 try { 1115 result = createDemandSubscription(info); 1116 } catch (IOException e) { 1117 LOG.error("Failed to create DemandSubscription ", e); 1118 } 1119 if (result != null) { 1120 result.getLocalInfo().setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY); 1121 } 1122 return result; 1123 } 1124 1125 protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException { 1126 sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync()); 1127 sub.getLocalInfo().setPrefetchSize(configuration.getPrefetchSize()); 1128 subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub); 1129 subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub); 1130 1131 // This works for now since we use a VM connection to the local broker. 1132 // may need to change if we ever subscribe to a remote broker. 1133 sub.getLocalInfo().setAdditionalPredicate(createNetworkBridgeFilter(info)); 1134 } 1135 1136 protected void removeDemandSubscription(ConsumerId id) throws IOException { 1137 DemandSubscription sub = subscriptionMapByRemoteId.remove(id); 1138 if (LOG.isDebugEnabled()) { 1139 LOG.debug(configuration.getBrokerName() + " remove request on " + localBroker + " from " + remoteBrokerName + " , consumer id: " + id + ", matching sub: " + sub); 1140 } 1141 if (sub != null) { 1142 removeSubscription(sub); 1143 if (LOG.isDebugEnabled()) { 1144 LOG.debug(configuration.getBrokerName() + " removed sub on " + localBroker + " from " + remoteBrokerName + " : " + sub.getRemoteInfo()); 1145 } 1146 } 1147 } 1148 1149 protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) { 1150 boolean removeDone = false; 1151 DemandSubscription sub = subscriptionMapByLocalId.get(consumerId); 1152 if (sub != null) { 1153 try { 1154 removeDemandSubscription(sub.getRemoteInfo().getConsumerId()); 1155 removeDone = true; 1156 } catch (IOException e) { 1157 LOG.debug("removeDemandSubscriptionByLocalId failed for localId: " + consumerId, e); 1158 } 1159 } 1160 return removeDone; 1161 } 1162 1163 protected void waitStarted() throws InterruptedException { 1164 startedLatch.await(); 1165 localBrokerIdKnownLatch.await(); 1166 } 1167 1168 protected void clearDownSubscriptions() { 1169 subscriptionMapByLocalId.clear(); 1170 subscriptionMapByRemoteId.clear(); 1171 } 1172 1173 protected abstract NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException; 1174 1175 protected abstract void serviceLocalBrokerInfo(Command command) throws InterruptedException; 1176 1177 protected abstract void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException; 1178 1179 protected abstract void serviceRemoteBrokerInfo(Command command) throws IOException; 1180 1181 protected abstract BrokerId[] getRemoteBrokerPath(); 1182 1183 public void setNetworkBridgeListener(NetworkBridgeListener listener) { 1184 this.networkBridgeListener = listener; 1185 } 1186 1187 private void fireBridgeFailed() { 1188 NetworkBridgeListener l = this.networkBridgeListener; 1189 if (l != null) { 1190 l.bridgeFailed(); 1191 } 1192 } 1193 1194 public String getRemoteAddress() { 1195 return remoteBroker.getRemoteAddress(); 1196 } 1197 1198 public String getLocalAddress() { 1199 return localBroker.getRemoteAddress(); 1200 } 1201 1202 public String getRemoteBrokerName() { 1203 return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName(); 1204 } 1205 1206 public String getLocalBrokerName() { 1207 return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName(); 1208 } 1209 1210 public long getDequeueCounter() { 1211 return dequeueCounter.get(); 1212 } 1213 1214 public long getEnqueueCounter() { 1215 return enqueueCounter.get(); 1216 } 1217 1218 protected boolean isDuplex() { 1219 return configuration.isDuplex() || createdByDuplex; 1220 } 1221 1222 public void setBrokerService(BrokerService brokerService) { 1223 this.brokerService = brokerService; 1224 } 1225 1226 static { 1227 ASYNC_TASKS = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() { 1228 public Thread newThread(Runnable runnable) { 1229 Thread thread = new Thread(runnable, "NetworkBridge"); 1230 thread.setDaemon(true); 1231 return thread; 1232 } 1233 }); 1234 } 1235 1236 }