001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.network; 018 019 import java.io.IOException; 020 import java.net.URI; 021 import java.net.URISyntaxException; 022 import java.util.Iterator; 023 import java.util.Map; 024 import java.util.concurrent.ConcurrentHashMap; 025 026 import org.apache.activemq.broker.SslContext; 027 import org.apache.activemq.command.DiscoveryEvent; 028 import org.apache.activemq.transport.Transport; 029 import org.apache.activemq.transport.TransportFactory; 030 import org.apache.activemq.transport.discovery.DiscoveryAgent; 031 import org.apache.activemq.transport.discovery.DiscoveryAgentFactory; 032 import org.apache.activemq.transport.discovery.DiscoveryListener; 033 import org.apache.activemq.util.IntrospectionSupport; 034 import org.apache.activemq.util.ServiceStopper; 035 import org.apache.activemq.util.ServiceSupport; 036 import org.apache.activemq.util.URISupport; 037 import org.apache.commons.logging.Log; 038 import org.apache.commons.logging.LogFactory; 039 040 /** 041 * A network connector which uses a discovery agent to detect the remote brokers 042 * available and setup a connection to each available remote broker 043 * 044 * @org.apache.xbean.XBean element="networkConnector" 045 * @version $Revision: 824823 $ 046 */ 047 public class DiscoveryNetworkConnector extends NetworkConnector implements DiscoveryListener { 048 private static final Log LOG = LogFactory.getLog(DiscoveryNetworkConnector.class); 049 050 private DiscoveryAgent discoveryAgent; 051 052 private Map<String, String> parameters; 053 054 public DiscoveryNetworkConnector() { 055 } 056 057 public DiscoveryNetworkConnector(URI discoveryURI) throws IOException { 058 setUri(discoveryURI); 059 } 060 061 public void setUri(URI discoveryURI) throws IOException { 062 setDiscoveryAgent(DiscoveryAgentFactory.createDiscoveryAgent(discoveryURI)); 063 try { 064 parameters = URISupport.parseParamters(discoveryURI); 065 // allow discovery agent to grab it's parameters 066 IntrospectionSupport.setProperties(getDiscoveryAgent(), parameters); 067 } catch (URISyntaxException e) { 068 LOG.warn("failed to parse query parameters from discoveryURI: " + discoveryURI, e); 069 } 070 071 } 072 073 public void onServiceAdd(DiscoveryEvent event) { 074 String localURIName = localURI.getScheme() + "://" + localURI.getHost(); 075 // Ignore events once we start stopping. 076 if (serviceSupport.isStopped() || serviceSupport.isStopping()) { 077 return; 078 } 079 String url = event.getServiceName(); 080 if (url != null) { 081 URI uri; 082 try { 083 uri = new URI(url); 084 } catch (URISyntaxException e) { 085 LOG.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e); 086 return; 087 } 088 // Should we try to connect to that URI? 089 if( bridges.containsKey(uri) ) { 090 LOG.debug("Discovery agent generated a duplicate onServiceAdd event for: "+uri ); 091 return; 092 } 093 if ( localURI.equals(uri) || (connectionFilter != null && !connectionFilter.connectTo(uri))) { 094 LOG.debug("not connecting loopback: " + uri); 095 return; 096 } 097 URI connectUri = uri; 098 try { 099 connectUri = URISupport.applyParameters(connectUri, parameters); 100 } catch (URISyntaxException e) { 101 LOG.warn("could not apply query parameters: " + parameters + " to: " + connectUri, e); 102 } 103 LOG.info("Establishing network connection from " + localURIName + " to " + connectUri); 104 105 Transport remoteTransport; 106 Transport localTransport; 107 try { 108 // Allows the transport to access the broker's ssl configuration. 109 SslContext.setCurrentSslContext(getBrokerService().getSslContext()); 110 try { 111 remoteTransport = TransportFactory.connect(connectUri); 112 } catch (Exception e) { 113 LOG.warn("Could not connect to remote URI: " + connectUri + ": " + e.getMessage()); 114 LOG.debug("Connection failure exception: " + e, e); 115 return; 116 } 117 try { 118 localTransport = createLocalTransport(); 119 } catch (Exception e) { 120 ServiceSupport.dispose(remoteTransport); 121 LOG.warn("Could not connect to local URI: " + localURIName + ": " + e.getMessage()); 122 LOG.debug("Connection failure exception: " + e, e); 123 return; 124 } 125 } finally { 126 SslContext.setCurrentSslContext(null); 127 } 128 NetworkBridge bridge = createBridge(localTransport, remoteTransport, event); 129 try { 130 bridge.start(); 131 bridges.put(uri, bridge); 132 } catch (Exception e) { 133 ServiceSupport.dispose(localTransport); 134 ServiceSupport.dispose(remoteTransport); 135 LOG.warn("Could not start network bridge between: " + localURIName + " and: " + uri + " due to: " + e); 136 LOG.debug("Start failure exception: " + e, e); 137 try { 138 discoveryAgent.serviceFailed(event); 139 } catch (IOException e1) { 140 LOG.debug("Discovery agent failure while handling failure event: " + e1.getMessage(), e1); 141 } 142 return; 143 } 144 } 145 } 146 147 public void onServiceRemove(DiscoveryEvent event) { 148 String url = event.getServiceName(); 149 if (url != null) { 150 URI uri; 151 try { 152 uri = new URI(url); 153 } catch (URISyntaxException e) { 154 LOG.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e); 155 return; 156 } 157 158 NetworkBridge bridge = bridges.remove(uri); 159 if (bridge == null) { 160 return; 161 } 162 163 ServiceSupport.dispose(bridge); 164 } 165 } 166 167 public DiscoveryAgent getDiscoveryAgent() { 168 return discoveryAgent; 169 } 170 171 public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) { 172 this.discoveryAgent = discoveryAgent; 173 if (discoveryAgent != null) { 174 this.discoveryAgent.setDiscoveryListener(this); 175 } 176 } 177 178 protected void handleStart() throws Exception { 179 if (discoveryAgent == null) { 180 throw new IllegalStateException("You must configure the 'discoveryAgent' property"); 181 } 182 this.discoveryAgent.start(); 183 super.handleStart(); 184 } 185 186 protected void handleStop(ServiceStopper stopper) throws Exception { 187 for (Iterator<NetworkBridge> i = bridges.values().iterator(); i.hasNext();) { 188 NetworkBridge bridge = i.next(); 189 try { 190 bridge.stop(); 191 } catch (Exception e) { 192 stopper.onException(this, e); 193 } 194 } 195 try { 196 this.discoveryAgent.stop(); 197 } catch (Exception e) { 198 stopper.onException(this, e); 199 } 200 201 super.handleStop(stopper); 202 } 203 204 protected NetworkBridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) { 205 NetworkBridgeListener listener = new NetworkBridgeListener() { 206 207 public void bridgeFailed() { 208 if (!serviceSupport.isStopped()) { 209 try { 210 discoveryAgent.serviceFailed(event); 211 } catch (IOException e) { 212 } 213 } 214 215 } 216 217 public void onStart(NetworkBridge bridge) { 218 registerNetworkBridgeMBean(bridge); 219 } 220 221 public void onStop(NetworkBridge bridge) { 222 unregisterNetworkBridgeMBean(bridge); 223 } 224 225 }; 226 DemandForwardingBridge result = NetworkBridgeFactory.createBridge(this, localTransport, remoteTransport, listener); 227 result.setBrokerService(getBrokerService()); 228 return configureBridge(result); 229 } 230 231 public String getName() { 232 String name = super.getName(); 233 if (name == null) { 234 name = discoveryAgent.toString(); 235 super.setName(name); 236 } 237 return name; 238 } 239 240 @Override 241 public String toString() { 242 return "DiscoveryNetworkConnector:" + getName() + ":" + getBrokerService(); 243 } 244 }