001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transport.vm; 018 019 import java.io.IOException; 020 import java.net.URI; 021 import java.net.URISyntaxException; 022 import java.util.HashMap; 023 import java.util.Map; 024 import java.util.concurrent.ConcurrentHashMap; 025 026 import org.apache.activemq.broker.BrokerFactory; 027 import org.apache.activemq.broker.BrokerFactoryHandler; 028 import org.apache.activemq.broker.BrokerRegistry; 029 import org.apache.activemq.broker.BrokerService; 030 import org.apache.activemq.broker.TransportConnector; 031 import org.apache.activemq.transport.MarshallingTransportFilter; 032 import org.apache.activemq.transport.Transport; 033 import org.apache.activemq.transport.TransportFactory; 034 import org.apache.activemq.transport.TransportServer; 035 import org.apache.activemq.util.IOExceptionSupport; 036 import org.apache.activemq.util.IntrospectionSupport; 037 import org.apache.activemq.util.ServiceSupport; 038 import org.apache.activemq.util.URISupport; 039 import org.apache.activemq.util.URISupport.CompositeData; 040 import org.apache.commons.logging.Log; 041 import org.apache.commons.logging.LogFactory; 042 043 public class VMTransportFactory extends TransportFactory { 044 045 public static final ConcurrentHashMap<String, BrokerService> BROKERS = new ConcurrentHashMap<String, BrokerService>(); 046 public static final ConcurrentHashMap<String, TransportConnector> CONNECTORS = new ConcurrentHashMap<String, TransportConnector>(); 047 public static final ConcurrentHashMap<String, VMTransportServer> SERVERS = new ConcurrentHashMap<String, VMTransportServer>(); 048 private static final Log LOG = LogFactory.getLog(VMTransportFactory.class); 049 050 BrokerFactoryHandler brokerFactoryHandler; 051 052 public Transport doConnect(URI location) throws Exception { 053 return VMTransportServer.configure(doCompositeConnect(location)); 054 } 055 056 public Transport doCompositeConnect(URI location) throws Exception { 057 URI brokerURI; 058 String host; 059 Map<String, String> options; 060 boolean create = true; 061 int waitForStart = -1; 062 CompositeData data = URISupport.parseComposite(location); 063 if (data.getComponents().length == 1 && "broker".equals(data.getComponents()[0].getScheme())) { 064 brokerURI = data.getComponents()[0]; 065 CompositeData brokerData = URISupport.parseComposite(brokerURI); 066 host = (String)brokerData.getParameters().get("brokerName"); 067 if (host == null) { 068 host = "localhost"; 069 } 070 if (brokerData.getPath() != null) { 071 host = brokerData.getPath(); 072 } 073 options = data.getParameters(); 074 location = new URI("vm://" + host); 075 } else { 076 // If using the less complex vm://localhost?broker.persistent=true 077 // form 078 try { 079 host = location.getHost(); 080 options = URISupport.parseParamters(location); 081 String config = (String)options.remove("brokerConfig"); 082 if (config != null) { 083 brokerURI = new URI(config); 084 } else { 085 Map brokerOptions = IntrospectionSupport.extractProperties(options, "broker."); 086 brokerURI = new URI("broker://()/" + host + "?" 087 + URISupport.createQueryString(brokerOptions)); 088 } 089 if ("false".equals(options.remove("create"))) { 090 create = false; 091 } 092 String waitForStartString = options.remove("waitForStart"); 093 if (waitForStartString != null) { 094 waitForStart = Integer.parseInt(waitForStartString); 095 } 096 } catch (URISyntaxException e1) { 097 throw IOExceptionSupport.create(e1); 098 } 099 location = new URI("vm://" + host); 100 } 101 if (host == null) { 102 host = "localhost"; 103 } 104 VMTransportServer server = SERVERS.get(host); 105 // validate the broker is still active 106 if (!validateBroker(host) || server == null) { 107 BrokerService broker = null; 108 // Synchronize on the registry so that multiple concurrent threads 109 // doing this do not think that the broker has not been created and 110 // cause multiple brokers to be started. 111 synchronized (BrokerRegistry.getInstance().getRegistryMutext()) { 112 broker = lookupBroker(BrokerRegistry.getInstance(), host, waitForStart); 113 if (broker == null) { 114 if (!create) { 115 throw new IOException("Broker named '" + host + "' does not exist."); 116 } 117 try { 118 if (brokerFactoryHandler != null) { 119 broker = brokerFactoryHandler.createBroker(brokerURI); 120 } else { 121 broker = BrokerFactory.createBroker(brokerURI); 122 } 123 broker.start(); 124 } catch (URISyntaxException e) { 125 throw IOExceptionSupport.create(e); 126 } 127 BROKERS.put(host, broker); 128 BrokerRegistry.getInstance().getRegistryMutext().notifyAll(); 129 } 130 131 server = SERVERS.get(host); 132 if (server == null) { 133 server = (VMTransportServer)bind(location, true); 134 TransportConnector connector = new TransportConnector(server); 135 connector.setBrokerService(broker); 136 connector.setUri(location); 137 connector.setTaskRunnerFactory(broker.getTaskRunnerFactory()); 138 connector.start(); 139 CONNECTORS.put(host, connector); 140 } 141 142 } 143 } 144 145 VMTransport vmtransport = server.connect(); 146 IntrospectionSupport.setProperties(vmtransport.peer, new HashMap<String,String>(options)); 147 IntrospectionSupport.setProperties(vmtransport, options); 148 Transport transport = vmtransport; 149 if (vmtransport.isMarshal()) { 150 Map<String, String> optionsCopy = new HashMap<String, String>(options); 151 transport = new MarshallingTransportFilter(transport, createWireFormat(options), 152 createWireFormat(optionsCopy)); 153 } 154 if (!options.isEmpty()) { 155 throw new IllegalArgumentException("Invalid connect parameters: " + options); 156 } 157 return transport; 158 } 159 160 /** 161 * @param registry 162 * @param brokerName 163 * @param waitForStart - time in milliseconds to wait for a broker to appear 164 * @return 165 */ 166 private BrokerService lookupBroker(final BrokerRegistry registry, final String brokerName, int waitForStart) { 167 BrokerService broker = null; 168 synchronized(registry.getRegistryMutext()) { 169 broker = registry.lookup(brokerName); 170 if (broker == null && waitForStart > 0) { 171 final long expiry = System.currentTimeMillis() + waitForStart; 172 while (broker == null && expiry > System.currentTimeMillis()) { 173 long timeout = Math.max(0, expiry - System.currentTimeMillis()); 174 try { 175 LOG.debug("waiting for broker named: " + brokerName + " to start"); 176 registry.getRegistryMutext().wait(timeout); 177 } catch (InterruptedException ignored) { 178 } 179 broker = registry.lookup(brokerName); 180 } 181 } 182 } 183 return broker; 184 } 185 186 public TransportServer doBind(URI location) throws IOException { 187 return bind(location, false); 188 } 189 190 /** 191 * @param location 192 * @return the TransportServer 193 * @throws IOException 194 */ 195 private TransportServer bind(URI location, boolean dispose) throws IOException { 196 String host = location.getHost(); 197 LOG.debug("binding to broker: " + host); 198 VMTransportServer server = new VMTransportServer(location, dispose); 199 Object currentBoundValue = SERVERS.get(host); 200 if (currentBoundValue != null) { 201 throw new IOException("VMTransportServer already bound at: " + location); 202 } 203 SERVERS.put(host, server); 204 return server; 205 } 206 207 public static void stopped(VMTransportServer server) { 208 String host = server.getBindURI().getHost(); 209 SERVERS.remove(host); 210 TransportConnector connector = CONNECTORS.remove(host); 211 if (connector != null) { 212 LOG.debug("Shutting down VM connectors for broker: " + host); 213 ServiceSupport.dispose(connector); 214 BrokerService broker = BROKERS.remove(host); 215 if (broker != null) { 216 ServiceSupport.dispose(broker); 217 } 218 } 219 } 220 221 public static void stopped(String host) { 222 SERVERS.remove(host); 223 TransportConnector connector = CONNECTORS.remove(host); 224 if (connector != null) { 225 LOG.debug("Shutting down VM connectors for broker: " + host); 226 ServiceSupport.dispose(connector); 227 BrokerService broker = BROKERS.remove(host); 228 if (broker != null) { 229 ServiceSupport.dispose(broker); 230 } 231 } 232 } 233 234 public BrokerFactoryHandler getBrokerFactoryHandler() { 235 return brokerFactoryHandler; 236 } 237 238 public void setBrokerFactoryHandler(BrokerFactoryHandler brokerFactoryHandler) { 239 this.brokerFactoryHandler = brokerFactoryHandler; 240 } 241 242 private boolean validateBroker(String host) { 243 boolean result = true; 244 if (BROKERS.containsKey(host) || SERVERS.containsKey(host) || CONNECTORS.containsKey(host)) { 245 // check the broker is still in the BrokerRegistry 246 TransportConnector connector = CONNECTORS.get(host); 247 if (BrokerRegistry.getInstance().lookup(host) == null 248 || (connector != null && connector.getBroker().isStopped())) { 249 result = false; 250 // clean-up 251 BROKERS.remove(host); 252 SERVERS.remove(host); 253 if (connector != null) { 254 CONNECTORS.remove(host); 255 if (connector != null) { 256 ServiceSupport.dispose(connector); 257 } 258 } 259 } 260 } 261 return result; 262 } 263 }