001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.network; 018 019 import java.io.IOException; 020 import java.util.concurrent.atomic.AtomicLong; 021 022 import org.apache.activemq.Service; 023 import org.apache.activemq.command.ActiveMQQueue; 024 import org.apache.activemq.command.ActiveMQTopic; 025 import org.apache.activemq.command.BrokerId; 026 import org.apache.activemq.command.BrokerInfo; 027 import org.apache.activemq.command.Command; 028 import org.apache.activemq.command.ConnectionId; 029 import org.apache.activemq.command.ConnectionInfo; 030 import org.apache.activemq.command.ConsumerInfo; 031 import org.apache.activemq.command.ExceptionResponse; 032 import org.apache.activemq.command.Message; 033 import org.apache.activemq.command.MessageAck; 034 import org.apache.activemq.command.MessageDispatch; 035 import org.apache.activemq.command.ProducerInfo; 036 import org.apache.activemq.command.Response; 037 import org.apache.activemq.command.SessionInfo; 038 import org.apache.activemq.command.ShutdownInfo; 039 import org.apache.activemq.transport.DefaultTransportListener; 040 import org.apache.activemq.transport.FutureResponse; 041 import org.apache.activemq.transport.ResponseCallback; 042 import org.apache.activemq.transport.Transport; 043 import org.apache.activemq.util.IdGenerator; 044 import org.apache.activemq.util.ServiceStopper; 045 import org.apache.activemq.util.ServiceSupport; 046 import org.apache.commons.logging.Log; 047 import org.apache.commons.logging.LogFactory; 048 049 /** 050 * Forwards all messages from the local broker to the remote broker. 051 * 052 * @org.apache.xbean.XBean 053 * 054 * @version $Revision$ 055 */ 056 public class ForwardingBridge implements Service { 057 058 private static final IdGenerator ID_GENERATOR = new IdGenerator(); 059 private static final Log LOG = LogFactory.getLog(ForwardingBridge.class); 060 061 final AtomicLong enqueueCounter = new AtomicLong(); 062 final AtomicLong dequeueCounter = new AtomicLong(); 063 ConnectionInfo connectionInfo; 064 SessionInfo sessionInfo; 065 ProducerInfo producerInfo; 066 ConsumerInfo queueConsumerInfo; 067 ConsumerInfo topicConsumerInfo; 068 BrokerId localBrokerId; 069 BrokerId remoteBrokerId; 070 BrokerInfo localBrokerInfo; 071 BrokerInfo remoteBrokerInfo; 072 073 private final Transport localBroker; 074 private final Transport remoteBroker; 075 private String clientId; 076 private int prefetchSize = 1000; 077 private boolean dispatchAsync; 078 private String destinationFilter = ">"; 079 private NetworkBridgeListener bridgeFailedListener; 080 081 public ForwardingBridge(Transport localBroker, Transport remoteBroker) { 082 this.localBroker = localBroker; 083 this.remoteBroker = remoteBroker; 084 } 085 086 public void start() throws Exception { 087 LOG.info("Starting a network connection between " + localBroker + " and " + remoteBroker 088 + " has been established."); 089 090 localBroker.setTransportListener(new DefaultTransportListener() { 091 public void onCommand(Object o) { 092 Command command = (Command)o; 093 serviceLocalCommand(command); 094 } 095 096 public void onException(IOException error) { 097 serviceLocalException(error); 098 } 099 }); 100 101 remoteBroker.setTransportListener(new DefaultTransportListener() { 102 public void onCommand(Object o) { 103 Command command = (Command)o; 104 serviceRemoteCommand(command); 105 } 106 107 public void onException(IOException error) { 108 serviceRemoteException(error); 109 } 110 }); 111 112 localBroker.start(); 113 remoteBroker.start(); 114 } 115 116 protected void triggerStartBridge() throws IOException { 117 Thread thead = new Thread() { 118 public void run() { 119 try { 120 startBridge(); 121 } catch (IOException e) { 122 LOG.error("Failed to start network bridge: " + e, e); 123 } 124 } 125 }; 126 thead.start(); 127 } 128 129 /** 130 * @throws IOException 131 */ 132 final void startBridge() throws IOException { 133 connectionInfo = new ConnectionInfo(); 134 connectionInfo.setConnectionId(new ConnectionId(ID_GENERATOR.generateId())); 135 connectionInfo.setClientId(clientId); 136 localBroker.oneway(connectionInfo); 137 remoteBroker.oneway(connectionInfo); 138 139 sessionInfo = new SessionInfo(connectionInfo, 1); 140 localBroker.oneway(sessionInfo); 141 remoteBroker.oneway(sessionInfo); 142 143 queueConsumerInfo = new ConsumerInfo(sessionInfo, 1); 144 queueConsumerInfo.setDispatchAsync(dispatchAsync); 145 queueConsumerInfo.setDestination(new ActiveMQQueue(destinationFilter)); 146 queueConsumerInfo.setPrefetchSize(prefetchSize); 147 queueConsumerInfo.setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY); 148 localBroker.oneway(queueConsumerInfo); 149 150 producerInfo = new ProducerInfo(sessionInfo, 1); 151 producerInfo.setResponseRequired(false); 152 remoteBroker.oneway(producerInfo); 153 154 if (connectionInfo.getClientId() != null) { 155 topicConsumerInfo = new ConsumerInfo(sessionInfo, 2); 156 topicConsumerInfo.setDispatchAsync(dispatchAsync); 157 topicConsumerInfo.setSubscriptionName("topic-bridge"); 158 topicConsumerInfo.setRetroactive(true); 159 topicConsumerInfo.setDestination(new ActiveMQTopic(destinationFilter)); 160 topicConsumerInfo.setPrefetchSize(prefetchSize); 161 topicConsumerInfo.setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY); 162 localBroker.oneway(topicConsumerInfo); 163 } 164 LOG.info("Network connection between " + localBroker + " and " + remoteBroker 165 + " has been established."); 166 } 167 168 public void stop() throws Exception { 169 try { 170 if (connectionInfo != null) { 171 localBroker.request(connectionInfo.createRemoveCommand()); 172 remoteBroker.request(connectionInfo.createRemoveCommand()); 173 } 174 localBroker.setTransportListener(null); 175 remoteBroker.setTransportListener(null); 176 localBroker.oneway(new ShutdownInfo()); 177 remoteBroker.oneway(new ShutdownInfo()); 178 } finally { 179 ServiceStopper ss = new ServiceStopper(); 180 ss.stop(localBroker); 181 ss.stop(remoteBroker); 182 ss.throwFirstException(); 183 } 184 } 185 186 public void serviceRemoteException(Throwable error) { 187 LOG.info("Unexpected remote exception: " + error); 188 LOG.debug("Exception trace: ", error); 189 } 190 191 protected void serviceRemoteCommand(Command command) { 192 try { 193 if (command.isBrokerInfo()) { 194 synchronized (this) { 195 remoteBrokerInfo = (BrokerInfo)command; 196 remoteBrokerId = remoteBrokerInfo.getBrokerId(); 197 if (localBrokerId != null) { 198 if (localBrokerId.equals(remoteBrokerId)) { 199 LOG.info("Disconnecting loop back connection."); 200 ServiceSupport.dispose(this); 201 } else { 202 triggerStartBridge(); 203 } 204 } 205 } 206 } else { 207 LOG.warn("Unexpected remote command: " + command); 208 } 209 } catch (IOException e) { 210 serviceLocalException(e); 211 } 212 } 213 214 public void serviceLocalException(Throwable error) { 215 LOG.info("Unexpected local exception: " + error); 216 LOG.debug("Exception trace: ", error); 217 fireBridgeFailed(); 218 } 219 220 protected void serviceLocalCommand(Command command) { 221 try { 222 if (command.isMessageDispatch()) { 223 224 enqueueCounter.incrementAndGet(); 225 226 final MessageDispatch md = (MessageDispatch)command; 227 Message message = md.getMessage(); 228 message.setProducerId(producerInfo.getProducerId()); 229 230 if (message.getOriginalTransactionId() == null) { 231 message.setOriginalTransactionId(message.getTransactionId()); 232 } 233 message.setTransactionId(null); 234 235 if (!message.isResponseRequired()) { 236 // If the message was originally sent using async send, we 237 // will preserve that QOS 238 // by bridging it using an async send (small chance of 239 // message loss). 240 remoteBroker.oneway(message); 241 dequeueCounter.incrementAndGet(); 242 localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1)); 243 244 } else { 245 246 // The message was not sent using async send, so we should 247 // only ack the local 248 // broker when we get confirmation that the remote broker 249 // has received the message. 250 ResponseCallback callback = new ResponseCallback() { 251 public void onCompletion(FutureResponse future) { 252 try { 253 Response response = future.getResult(); 254 if (response.isException()) { 255 ExceptionResponse er = (ExceptionResponse)response; 256 serviceLocalException(er.getException()); 257 } else { 258 dequeueCounter.incrementAndGet(); 259 localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1)); 260 } 261 } catch (IOException e) { 262 serviceLocalException(e); 263 } 264 } 265 }; 266 267 remoteBroker.asyncRequest(message, callback); 268 } 269 270 // Ack on every message since we don't know if the broker is 271 // blocked due to memory 272 // usage and is waiting for an Ack to un-block him. 273 274 // Acking a range is more efficient, but also more prone to 275 // locking up a server 276 // Perhaps doing something like the following should be policy 277 // based. 278 // if( 279 // md.getConsumerId().equals(queueConsumerInfo.getConsumerId()) 280 // ) { 281 // queueDispatched++; 282 // if( queueDispatched > (queueConsumerInfo.getPrefetchSize()/2) 283 // ) { 284 // localBroker.oneway(new MessageAck(md, 285 // MessageAck.STANDARD_ACK_TYPE, queueDispatched)); 286 // queueDispatched=0; 287 // } 288 // } else { 289 // topicDispatched++; 290 // if( topicDispatched > (topicConsumerInfo.getPrefetchSize()/2) 291 // ) { 292 // localBroker.oneway(new MessageAck(md, 293 // MessageAck.STANDARD_ACK_TYPE, topicDispatched)); 294 // topicDispatched=0; 295 // } 296 // } 297 } else if (command.isBrokerInfo()) { 298 synchronized (this) { 299 localBrokerInfo = (BrokerInfo)command; 300 localBrokerId = localBrokerInfo.getBrokerId(); 301 if (remoteBrokerId != null) { 302 if (remoteBrokerId.equals(localBrokerId)) { 303 LOG.info("Disconnecting loop back connection."); 304 ServiceSupport.dispose(this); 305 } else { 306 triggerStartBridge(); 307 } 308 } 309 } 310 } else { 311 LOG.debug("Unexpected local command: " + command); 312 } 313 } catch (IOException e) { 314 serviceLocalException(e); 315 } 316 } 317 318 public String getClientId() { 319 return clientId; 320 } 321 322 public void setClientId(String clientId) { 323 this.clientId = clientId; 324 } 325 326 public int getPrefetchSize() { 327 return prefetchSize; 328 } 329 330 public void setPrefetchSize(int prefetchSize) { 331 this.prefetchSize = prefetchSize; 332 } 333 334 public boolean isDispatchAsync() { 335 return dispatchAsync; 336 } 337 338 public void setDispatchAsync(boolean dispatchAsync) { 339 this.dispatchAsync = dispatchAsync; 340 } 341 342 public String getDestinationFilter() { 343 return destinationFilter; 344 } 345 346 public void setDestinationFilter(String destinationFilter) { 347 this.destinationFilter = destinationFilter; 348 } 349 350 public void setNetworkBridgeFailedListener(NetworkBridgeListener listener) { 351 this.bridgeFailedListener = listener; 352 } 353 354 private void fireBridgeFailed() { 355 NetworkBridgeListener l = this.bridgeFailedListener; 356 if (l != null) { 357 l.bridgeFailed(); 358 } 359 } 360 361 public String getRemoteAddress() { 362 return remoteBroker.getRemoteAddress(); 363 } 364 365 public String getLocalAddress() { 366 return localBroker.getRemoteAddress(); 367 } 368 369 public String getLocalBrokerName() { 370 return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName(); 371 } 372 373 public String getRemoteBrokerName() { 374 return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName(); 375 } 376 377 public long getDequeueCounter() { 378 return dequeueCounter.get(); 379 } 380 381 public long getEnqueueCounter() { 382 return enqueueCounter.get(); 383 } 384 385 }