001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.network;
018    
019    import java.io.IOException;
020    
021    import org.apache.activemq.command.BrokerId;
022    import org.apache.activemq.command.BrokerInfo;
023    import org.apache.activemq.command.Command;
024    import org.apache.activemq.command.ConsumerInfo;
025    import org.apache.activemq.command.Endpoint;
026    import org.apache.activemq.command.NetworkBridgeFilter;
027    import org.apache.activemq.transport.Transport;
028    import org.apache.activemq.util.ServiceSupport;
029    import org.apache.commons.logging.Log;
030    import org.apache.commons.logging.LogFactory;
031    
032    /**
033     * A demand forwarding bridge which works with multicast style transports where
034     * a single Transport could be communicating with multiple remote brokers
035     * 
036     * @org.apache.xbean.XBean
037     * 
038     * @version $Revision: 808890 $
039     */
040    public class CompositeDemandForwardingBridge extends DemandForwardingBridgeSupport {
041        private static final Log LOG = LogFactory.getLog(CompositeDemandForwardingBridge.class);
042    
043        protected final BrokerId remoteBrokerPath[] = new BrokerId[] {null};
044        protected Object brokerInfoMutex = new Object();
045    
046        public CompositeDemandForwardingBridge(NetworkBridgeConfiguration configuration, Transport localBroker,
047                                               Transport remoteBroker) {
048            super(configuration, localBroker, remoteBroker);
049            remoteBrokerName = remoteBroker.toString();
050            remoteBrokerNameKnownLatch.countDown();
051        }
052    
053        protected void serviceRemoteBrokerInfo(Command command) throws IOException {
054            synchronized (brokerInfoMutex) {
055                BrokerInfo remoteBrokerInfo = (BrokerInfo)command;
056                BrokerId remoteBrokerId = remoteBrokerInfo.getBrokerId();
057    
058                // lets associate the incoming endpoint with a broker ID so we can
059                // refer to it later
060                Endpoint from = command.getFrom();
061                if (from == null) {
062                    LOG.warn("Incoming command does not have a from endpoint: " + command);
063                } else {
064                    from.setBrokerInfo(remoteBrokerInfo);
065                }
066                if (localBrokerId != null) {
067                    if (localBrokerId.equals(remoteBrokerId)) {
068                        LOG.info("Disconnecting loop back connection.");
069                        // waitStarted();
070                        ServiceSupport.dispose(this);
071                    }
072                }
073                if (!disposed.get()) {
074                    triggerLocalStartBridge();
075                }
076            }
077        }
078    
079        protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException {
080            info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getFromBrokerId(info)));
081        }
082    
083        /**
084         * Returns the broker ID that the command came from
085         */
086        protected BrokerId getFromBrokerId(Command command) throws IOException {
087            BrokerId answer = null;
088            Endpoint from = command.getFrom();
089            if (from == null) {
090                LOG.warn("Incoming command does not have a from endpoint: " + command);
091            } else {
092                answer = from.getBrokerId();
093            }
094            if (answer != null) {
095                return answer;
096            } else {
097                throw new IOException("No broker ID is available for endpoint: " + from + " from command: "
098                                      + command);
099            }
100        }
101    
102        protected void serviceLocalBrokerInfo(Command command) throws InterruptedException {
103            // TODO is there much we can do here?
104        }
105    
106        protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {
107            return new NetworkBridgeFilter(getFromBrokerId(info), configuration.getNetworkTTL());
108        }
109    
110        protected BrokerId[] getRemoteBrokerPath() {
111            return remoteBrokerPath;
112        }
113    
114    }