001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.apache.activemq.web; 019 020 import java.io.Externalizable; 021 import java.io.IOException; 022 import java.io.ObjectInput; 023 import java.io.ObjectOutput; 024 import java.util.ArrayList; 025 import java.util.HashMap; 026 import java.util.Iterator; 027 import java.util.List; 028 import java.util.Map; 029 import java.util.concurrent.Semaphore; 030 031 import javax.jms.Connection; 032 import javax.jms.ConnectionFactory; 033 import javax.jms.DeliveryMode; 034 import javax.jms.Destination; 035 import javax.jms.JMSException; 036 import javax.jms.Message; 037 import javax.jms.MessageConsumer; 038 import javax.jms.MessageProducer; 039 import javax.jms.Session; 040 import javax.servlet.ServletContext; 041 import javax.servlet.http.HttpServletRequest; 042 import javax.servlet.http.HttpSession; 043 import javax.servlet.http.HttpSessionActivationListener; 044 import javax.servlet.http.HttpSessionBindingEvent; 045 import javax.servlet.http.HttpSessionBindingListener; 046 import javax.servlet.http.HttpSessionEvent; 047 048 import org.apache.activemq.ActiveMQConnectionFactory; 049 import org.apache.activemq.MessageAvailableConsumer; 050 import org.apache.activemq.camel.component.ActiveMQComponent; 051 import org.apache.activemq.camel.component.ActiveMQConfiguration; 052 import org.apache.activemq.pool.PooledConnectionFactory; 053 import org.apache.camel.CamelContext; 054 import org.apache.camel.ProducerTemplate; 055 import org.apache.camel.impl.DefaultCamelContext; 056 import org.apache.commons.logging.Log; 057 import org.apache.commons.logging.LogFactory; 058 059 /** 060 * Represents a messaging client used from inside a web container typically 061 * stored inside a HttpSession TODO controls to prevent DOS attacks with users 062 * requesting many consumers TODO configure consumers with small prefetch. 063 * 064 * @version $Revision: 1.1.1.1 $ 065 */ 066 public class WebClient implements HttpSessionActivationListener, HttpSessionBindingListener, Externalizable { 067 068 public static final String WEB_CLIENT_ATTRIBUTE = "org.apache.activemq.webclient"; 069 public static final String CONNECTION_FACTORY_ATTRIBUTE = "org.apache.activemq.connectionFactory"; 070 public static final String CONNECTION_FACTORY_PREFETCH_PARAM = "org.apache.activemq.connectionFactory.prefetch"; 071 public static final String CONNECTION_FACTORY_OPTIMIZE_ACK_PARAM = "org.apache.activemq.connectionFactory.optimizeAck"; 072 public static final String BROKER_URL_INIT_PARAM = "org.apache.activemq.brokerURL"; 073 074 private static final Log LOG = LogFactory.getLog(WebClient.class); 075 076 private static transient ConnectionFactory factory; 077 078 private transient Map<Destination, MessageConsumer> consumers = new HashMap<Destination, MessageConsumer>(); 079 private transient Connection connection; 080 private transient Session session; 081 private transient MessageProducer producer; 082 private int deliveryMode = DeliveryMode.NON_PERSISTENT; 083 084 private final Semaphore semaphore = new Semaphore(1); 085 086 private CamelContext camelContext; 087 private ProducerTemplate producerTemplate; 088 089 public WebClient() { 090 if (factory == null) { 091 throw new IllegalStateException("initContext(ServletContext) not called"); 092 } 093 } 094 095 /** 096 * Helper method to get the client for the current session, lazily creating 097 * a client if there is none currently 098 * 099 * @param request is the current HTTP request 100 * @return the current client or a newly creates 101 */ 102 public static WebClient getWebClient(HttpServletRequest request) { 103 HttpSession session = request.getSession(true); 104 WebClient client = getWebClient(session); 105 if (client == null || client.isClosed()) { 106 client = WebClient.createWebClient(request); 107 session.setAttribute(WEB_CLIENT_ATTRIBUTE, client); 108 } 109 110 return client; 111 } 112 113 /** 114 * @return the web client for the current HTTP session or null if there is 115 * not a web client created yet 116 */ 117 public static WebClient getWebClient(HttpSession session) { 118 return (WebClient)session.getAttribute(WEB_CLIENT_ATTRIBUTE); 119 } 120 121 public static void initContext(ServletContext context) { 122 initConnectionFactory(context); 123 context.setAttribute("webClients", new HashMap<String, WebClient>()); 124 } 125 126 public int getDeliveryMode() { 127 return deliveryMode; 128 } 129 130 public void setDeliveryMode(int deliveryMode) { 131 this.deliveryMode = deliveryMode; 132 } 133 134 public synchronized void closeConsumers() { 135 for (Iterator<MessageConsumer> it = consumers.values().iterator(); it.hasNext();) { 136 MessageConsumer consumer = it.next(); 137 it.remove(); 138 try { 139 consumer.setMessageListener(null); 140 if (consumer instanceof MessageAvailableConsumer) { 141 ((MessageAvailableConsumer)consumer).setAvailableListener(null); 142 } 143 consumer.close(); 144 } catch (JMSException e) { 145 LOG.debug("caught exception closing consumer", e); 146 } 147 } 148 } 149 150 public synchronized void close() { 151 try { 152 closeConsumers(); 153 if (connection != null) { 154 connection.close(); 155 } 156 if (producerTemplate != null) { 157 producerTemplate.stop(); 158 } 159 } catch (Exception e) { 160 LOG.debug("caught exception closing consumer", e); 161 } finally { 162 producer = null; 163 session = null; 164 connection = null; 165 producerTemplate = null; 166 if (consumers != null) { 167 consumers.clear(); 168 } 169 consumers = null; 170 } 171 } 172 173 public boolean isClosed() { 174 return consumers == null; 175 } 176 177 public void writeExternal(ObjectOutput out) throws IOException { 178 if (consumers != null) { 179 out.write(consumers.size()); 180 Iterator<Destination> i = consumers.keySet().iterator(); 181 while (i.hasNext()) { 182 out.writeObject(i.next().toString()); 183 } 184 } else { 185 out.write(-1); 186 } 187 188 } 189 190 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { 191 int size = in.readInt(); 192 if (size >= 0) { 193 consumers = new HashMap<Destination, MessageConsumer>(); 194 for (int i = 0; i < size; i++) { 195 String destinationName = in.readObject().toString(); 196 197 try { 198 Destination destination = destinationName.startsWith("topic://") ? (Destination)getSession().createTopic(destinationName) : (Destination)getSession().createQueue(destinationName); 199 consumers.put(destination, getConsumer(destination, true)); 200 } catch (JMSException e) { 201 LOG.debug("Caought Exception ", e); 202 IOException ex = new IOException(e.getMessage()); 203 ex.initCause(e.getCause() != null ? e.getCause() : e); 204 throw ex; 205 206 } 207 } 208 } 209 } 210 211 public void send(Destination destination, Message message) throws JMSException { 212 getProducer().send(destination, message); 213 if (LOG.isDebugEnabled()) { 214 LOG.debug("Sent! to destination: " + destination + " message: " + message); 215 } 216 } 217 218 public void send(Destination destination, Message message, boolean persistent, int priority, long timeToLive) throws JMSException { 219 int deliveryMode = persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT; 220 getProducer().send(destination, message, deliveryMode, priority, timeToLive); 221 if (LOG.isDebugEnabled()) { 222 LOG.debug("Sent! to destination: " + destination + " message: " + message); 223 } 224 } 225 226 public Session getSession() throws JMSException { 227 if (session == null) { 228 session = createSession(); 229 } 230 return session; 231 } 232 233 public Connection getConnection() throws JMSException { 234 if (connection == null) { 235 connection = factory.createConnection(); 236 connection.start(); 237 } 238 return connection; 239 } 240 241 protected static synchronized void initConnectionFactory(ServletContext servletContext) { 242 if (factory == null) { 243 factory = (ConnectionFactory)servletContext.getAttribute(CONNECTION_FACTORY_ATTRIBUTE); 244 } 245 if (factory == null) { 246 String brokerURL = servletContext.getInitParameter(BROKER_URL_INIT_PARAM); 247 248 LOG.debug("Value of: " + BROKER_URL_INIT_PARAM + " is: " + brokerURL); 249 250 if (brokerURL == null) { 251 throw new IllegalStateException("missing brokerURL (specified via " + BROKER_URL_INIT_PARAM + " init-Param"); 252 } 253 254 ActiveMQConnectionFactory amqfactory = new ActiveMQConnectionFactory(brokerURL); 255 256 // Set prefetch policy for factory 257 if (servletContext.getInitParameter(CONNECTION_FACTORY_PREFETCH_PARAM) != null) { 258 int prefetch = Integer.valueOf(servletContext.getInitParameter(CONNECTION_FACTORY_PREFETCH_PARAM)).intValue(); 259 amqfactory.getPrefetchPolicy().setAll(prefetch); 260 } 261 262 // Set optimize acknowledge setting 263 if (servletContext.getInitParameter(CONNECTION_FACTORY_OPTIMIZE_ACK_PARAM) != null) { 264 boolean optimizeAck = Boolean.valueOf(servletContext.getInitParameter(CONNECTION_FACTORY_OPTIMIZE_ACK_PARAM)).booleanValue(); 265 amqfactory.setOptimizeAcknowledge(optimizeAck); 266 } 267 268 factory = amqfactory; 269 270 servletContext.setAttribute(CONNECTION_FACTORY_ATTRIBUTE, factory); 271 } 272 } 273 274 public synchronized CamelContext getCamelContext() { 275 if (camelContext == null) { 276 LOG.debug("Creating camel context"); 277 camelContext = new DefaultCamelContext(); 278 ActiveMQConfiguration conf = new ActiveMQConfiguration(); 279 conf.setConnectionFactory(new PooledConnectionFactory((ActiveMQConnectionFactory)factory)); 280 ActiveMQComponent component = new ActiveMQComponent(conf); 281 camelContext.addComponent("activemq", component); 282 } 283 return camelContext; 284 } 285 286 public synchronized ProducerTemplate getProducerTemplate() throws Exception { 287 if (producerTemplate == null) { 288 LOG.debug("Creating producer template"); 289 producerTemplate = getCamelContext().createProducerTemplate(); 290 producerTemplate.start(); 291 } 292 return producerTemplate; 293 } 294 295 public synchronized MessageProducer getProducer() throws JMSException { 296 if (producer == null) { 297 producer = getSession().createProducer(null); 298 producer.setDeliveryMode(deliveryMode); 299 } 300 return producer; 301 } 302 303 public void setProducer(MessageProducer producer) { 304 this.producer = producer; 305 } 306 307 public synchronized MessageConsumer getConsumer(Destination destination) throws JMSException { 308 return getConsumer(destination, true); 309 } 310 311 public synchronized MessageConsumer getConsumer(Destination destination, boolean create) throws JMSException { 312 MessageConsumer consumer = consumers.get(destination); 313 if (create && consumer == null) { 314 consumer = getSession().createConsumer(destination); 315 consumers.put(destination, consumer); 316 } 317 return consumer; 318 } 319 320 public synchronized void closeConsumer(Destination destination) throws JMSException { 321 MessageConsumer consumer = consumers.get(destination); 322 if (consumer != null) { 323 consumers.remove(destination); 324 consumer.setMessageListener(null); 325 if (consumer instanceof MessageAvailableConsumer) { 326 ((MessageAvailableConsumer)consumer).setAvailableListener(null); 327 } 328 consumer.close(); 329 } 330 } 331 332 public synchronized List<MessageConsumer> getConsumers() { 333 return new ArrayList<MessageConsumer>(consumers.values()); 334 } 335 336 protected Session createSession() throws JMSException { 337 return getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE); 338 } 339 340 public Semaphore getSemaphore() { 341 return semaphore; 342 } 343 344 public void sessionWillPassivate(HttpSessionEvent event) { 345 close(); 346 } 347 348 public void sessionDidActivate(HttpSessionEvent event) { 349 } 350 351 public void valueBound(HttpSessionBindingEvent event) { 352 } 353 354 public void valueUnbound(HttpSessionBindingEvent event) { 355 close(); 356 } 357 358 protected static WebClient createWebClient(HttpServletRequest request) { 359 return new WebClient(); 360 } 361 362 }