001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.apache.activemq.web; 019 020 import java.io.IOException; 021 import java.io.PrintWriter; 022 import java.io.StringWriter; 023 import java.util.HashMap; 024 import java.util.List; 025 import java.util.Map; 026 027 import javax.jms.Destination; 028 import javax.jms.JMSException; 029 import javax.jms.Message; 030 import javax.jms.MessageConsumer; 031 import javax.jms.ObjectMessage; 032 import javax.jms.TextMessage; 033 import javax.servlet.ServletConfig; 034 import javax.servlet.ServletException; 035 import javax.servlet.http.HttpServletRequest; 036 import javax.servlet.http.HttpServletResponse; 037 import javax.servlet.http.HttpSession; 038 039 import org.apache.activemq.MessageAvailableConsumer; 040 import org.apache.activemq.MessageAvailableListener; 041 import org.apache.commons.logging.Log; 042 import org.apache.commons.logging.LogFactory; 043 import org.mortbay.util.ajax.Continuation; 044 import org.mortbay.util.ajax.ContinuationSupport; 045 046 /** 047 * A servlet for sending and receiving messages to/from JMS destinations using 048 * HTTP POST for sending and HTTP GET for receiving. <p/> You can specify the 049 * destination and whether it is a topic or queue via configuration details on 050 * the servlet or as request parameters. <p/> For reading messages you can 051 * specify a readTimeout parameter to determine how long the servlet should 052 * block for. The servlet can be configured with the following init parameters: 053 * <dl> 054 * <dt>defaultReadTimeout</dt> 055 * <dd>The default time in ms to wait for messages. May be overridden by a 056 * request using the 'timeout' parameter</dd> 057 * <dt>maximumReadTimeout</dt> 058 * <dd>The maximum value a request may specify for the 'timeout' parameter</dd> 059 * <dt>maximumMessages</dt> 060 * <dd>maximum messages to send per response</dd> 061 * <dt></dt> 062 * <dd></dd> 063 * </dl> 064 * 065 * @version $Revision: 1.1.1.1 $ 066 */ 067 public class MessageListenerServlet extends MessageServletSupport { 068 private static final Log LOG = LogFactory.getLog(MessageListenerServlet.class); 069 070 private String readTimeoutParameter = "timeout"; 071 private long defaultReadTimeout = -1; 072 private long maximumReadTimeout = 25000; 073 private int maximumMessages = 100; 074 075 public void init() throws ServletException { 076 ServletConfig servletConfig = getServletConfig(); 077 String name = servletConfig.getInitParameter("defaultReadTimeout"); 078 if (name != null) { 079 defaultReadTimeout = asLong(name); 080 } 081 name = servletConfig.getInitParameter("maximumReadTimeout"); 082 if (name != null) { 083 maximumReadTimeout = asLong(name); 084 } 085 name = servletConfig.getInitParameter("maximumMessages"); 086 if (name != null) { 087 maximumMessages = (int)asLong(name); 088 } 089 } 090 091 /** 092 * Sends a message to a destination or manage subscriptions. If the the 093 * content type of the POST is 094 * <code>application/x-www-form-urlencoded</code>, then the form 095 * parameters "destination", "message" and "type" are used to pass a message 096 * or a subscription. If multiple messages or subscriptions are passed in a 097 * single post, then additional parameters are shortened to "dN", "mN" and 098 * "tN" where N is an index starting from 1. The type is either "send", 099 * "listen" or "unlisten". For send types, the message is the text of the 100 * TextMessage, otherwise it is the ID to be used for the subscription. If 101 * the content type is not <code>application/x-www-form-urlencoded</code>, 102 * then the body of the post is sent as the message to a destination that is 103 * derived from a query parameter, the URL or the default destination. 104 * 105 * @param request 106 * @param response 107 * @throws ServletException 108 * @throws IOException 109 */ 110 protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 111 112 // lets turn the HTTP post into a JMS Message 113 114 WebClient client = WebClient.getWebClient(request); 115 String messageIds = ""; 116 117 synchronized (client) { 118 119 if (LOG.isDebugEnabled()) { 120 LOG.debug("POST client=" + client + " session=" + request.getSession().getId() + " info=" + request.getPathInfo() + " contentType=" + request.getContentType()); 121 // dump(request.getParameterMap()); 122 } 123 124 int messages = 0; 125 126 // loop until no more messages 127 while (true) { 128 // Get the message parameters. Multiple messages are encoded 129 // with more compact parameter names. 130 String destinationName = request.getParameter(messages == 0 ? "destination" : ("d" + messages)); 131 132 if (destinationName == null) { 133 destinationName = request.getHeader("destination"); 134 } 135 136 String message = request.getParameter(messages == 0 ? "message" : ("m" + messages)); 137 String type = request.getParameter(messages == 0 ? "type" : ("t" + messages)); 138 139 if (destinationName == null || message == null || type == null) { 140 break; 141 } 142 143 try { 144 Destination destination = getDestination(client, request, destinationName); 145 146 if (LOG.isDebugEnabled()) { 147 LOG.debug(messages + " destination=" + destinationName + " message=" + message + " type=" + type); 148 LOG.debug(destination + " is a " + destination.getClass().getName()); 149 } 150 151 messages++; 152 153 if ("listen".equals(type)) { 154 Listener listener = getListener(request); 155 Map<MessageAvailableConsumer, String> consumerIdMap = getConsumerIdMap(request); 156 Map<MessageAvailableConsumer, String> consumerDestinationMap = getConsumerDestinationNameMap(request); 157 client.closeConsumer(destination); // drop any existing 158 // consumer. 159 MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination); 160 161 consumer.setAvailableListener(listener); 162 consumerIdMap.put(consumer, message); 163 consumerDestinationMap.put(consumer, destinationName); 164 if (LOG.isDebugEnabled()) { 165 LOG.debug("Subscribed: " + consumer + " to " + destination + " id=" + message); 166 } 167 } else if ("unlisten".equals(type)) { 168 Map<MessageAvailableConsumer, String> consumerIdMap = getConsumerIdMap(request); 169 Map consumerDestinationMap = getConsumerDestinationNameMap(request); 170 MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination); 171 172 consumer.setAvailableListener(null); 173 consumerIdMap.remove(consumer); 174 consumerDestinationMap.remove(consumer); 175 client.closeConsumer(destination); 176 if (LOG.isDebugEnabled()) { 177 LOG.debug("Unsubscribed: " + consumer); 178 } 179 } else if ("send".equals(type)) { 180 TextMessage text = client.getSession().createTextMessage(message); 181 appendParametersToMessage(request, text); 182 183 client.send(destination, text); 184 messageIds += text.getJMSMessageID() + "\n"; 185 if (LOG.isDebugEnabled()) { 186 LOG.debug("Sent " + message + " to " + destination); 187 } 188 } else { 189 LOG.warn("unknown type " + type); 190 } 191 192 } catch (JMSException e) { 193 LOG.warn("jms", e); 194 } 195 } 196 } 197 198 if ("true".equals(request.getParameter("poll"))) { 199 try { 200 // TODO return message IDs 201 doMessages(client, request, response); 202 } catch (JMSException e) { 203 throw new ServletException("JMS problem: " + e, e); 204 } 205 } else { 206 // handle simple POST of a message 207 if (request.getContentLength() != 0 && (request.getContentType() == null || !request.getContentType().toLowerCase().startsWith("application/x-www-form-urlencoded"))) { 208 try { 209 Destination destination = getDestination(client, request); 210 String body = getPostedMessageBody(request); 211 TextMessage message = client.getSession().createTextMessage(body); 212 appendParametersToMessage(request, message); 213 214 client.send(destination, message); 215 if (LOG.isDebugEnabled()) { 216 LOG.debug("Sent to destination: " + destination + " body: " + body); 217 } 218 messageIds += message.getJMSMessageID() + "\n"; 219 } catch (JMSException e) { 220 throw new ServletException(e); 221 } 222 } 223 224 response.setContentType("text/plain"); 225 response.setHeader("Cache-Control", "no-cache"); 226 response.getWriter().print(messageIds); 227 } 228 } 229 230 /** 231 * Supports a HTTP DELETE to be equivlanent of consuming a singe message 232 * from a queue 233 */ 234 protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 235 try { 236 WebClient client = WebClient.getWebClient(request); 237 if (LOG.isDebugEnabled()) { 238 LOG.debug("GET client=" + client + " session=" + request.getSession().getId() + " uri=" + request.getRequestURI() + " query=" + request.getQueryString()); 239 } 240 241 doMessages(client, request, response); 242 } catch (JMSException e) { 243 throw new ServletException("JMS problem: " + e, e); 244 } 245 } 246 247 /** 248 * Reads a message from a destination up to some specific timeout period 249 * 250 * @param client The webclient 251 * @param request 252 * @param response 253 * @throws ServletException 254 * @throws IOException 255 */ 256 protected void doMessages(WebClient client, HttpServletRequest request, HttpServletResponse response) throws JMSException, IOException { 257 258 int messages = 0; 259 // This is a poll for any messages 260 261 long timeout = getReadTimeout(request); 262 if (LOG.isDebugEnabled()) { 263 LOG.debug("doMessage timeout=" + timeout); 264 } 265 266 Continuation continuation = ContinuationSupport.getContinuation(request, client); 267 Listener listener = getListener(request); 268 if (listener != null && continuation != null && !continuation.isPending()) { 269 listener.access(); 270 } 271 272 Message message = null; 273 synchronized (client) { 274 275 List consumers = client.getConsumers(); 276 MessageAvailableConsumer consumer = null; 277 278 // Look for a message that is ready to go 279 for (int i = 0; message == null && i < consumers.size(); i++) { 280 consumer = (MessageAvailableConsumer)consumers.get(i); 281 if (consumer.getAvailableListener() == null) { 282 continue; 283 } 284 285 // Look for any available messages 286 message = consumer.receiveNoWait(); 287 if (LOG.isDebugEnabled()) { 288 LOG.debug("received " + message + " from " + consumer); 289 } 290 } 291 292 // Get an existing Continuation or create a new one if there are no 293 // messages 294 295 if (message == null) { 296 // register this continuation with our listener. 297 listener.setContinuation(continuation); 298 299 // Get the continuation object (may wait and/or retry 300 // request here). 301 continuation.suspend(timeout); 302 } 303 listener.setContinuation(null); 304 305 // prepare the responds 306 response.setContentType("text/xml"); 307 response.setHeader("Cache-Control", "no-cache"); 308 309 StringWriter swriter = new StringWriter(); 310 PrintWriter writer = new PrintWriter(swriter); 311 312 Map<MessageAvailableConsumer, String> consumerIdMap = getConsumerIdMap(request); 313 Map<MessageAvailableConsumer, String> consumerDestinationNameMap = getConsumerDestinationNameMap(request); 314 response.setStatus(HttpServletResponse.SC_OK); 315 writer.println("<ajax-response>"); 316 317 // Send any message we already have 318 if (message != null) { 319 String id = consumerIdMap.get(consumer); 320 String destinationName = consumerDestinationNameMap.get(consumer); 321 writer.print("<response id='"); 322 writer.print(id); 323 writer.print("'"); 324 if (destinationName != null) { 325 writer.print(" destination='" + destinationName + "' "); 326 } 327 writer.print(">"); 328 writeMessageResponse(writer, message); 329 writer.println("</response>"); 330 messages++; 331 } 332 333 // Send the rest of the messages 334 for (int i = 0; i < consumers.size() && messages < maximumMessages; i++) { 335 consumer = (MessageAvailableConsumer)consumers.get(i); 336 if (consumer.getAvailableListener() == null) { 337 continue; 338 } 339 340 // Look for any available messages 341 while (messages < maximumMessages) { 342 message = consumer.receiveNoWait(); 343 if (message == null) { 344 break; 345 } 346 messages++; 347 String id = consumerIdMap.get(consumer); 348 String destinationName = consumerDestinationNameMap.get(consumer); 349 writer.print("<response id='"); 350 writer.print(id); 351 writer.print("'"); 352 if (destinationName != null) { 353 writer.print(" destination='" + destinationName + "' "); 354 } 355 writer.print(">"); 356 writeMessageResponse(writer, message); 357 writer.println("</response>"); 358 } 359 } 360 361 // Add poll message 362 // writer.println("<response type='object' 363 // id='amqPoll'><ok/></response>"); 364 365 writer.print("</ajax-response>"); 366 367 writer.flush(); 368 String m = swriter.toString(); 369 response.getWriter().println(m); 370 } 371 372 } 373 374 protected void writeMessageResponse(PrintWriter writer, Message message) throws JMSException, IOException { 375 if (message instanceof TextMessage) { 376 TextMessage textMsg = (TextMessage)message; 377 String txt = textMsg.getText(); 378 if (txt.startsWith("<?")) { 379 txt = txt.substring(txt.indexOf("?>") + 2); 380 } 381 writer.print(txt); 382 } else if (message instanceof ObjectMessage) { 383 ObjectMessage objectMsg = (ObjectMessage)message; 384 Object object = objectMsg.getObject(); 385 writer.print(object.toString()); 386 } 387 } 388 389 protected Listener getListener(HttpServletRequest request) { 390 HttpSession session = request.getSession(); 391 Listener listener = (Listener)session.getAttribute("mls.listener"); 392 if (listener == null) { 393 listener = new Listener(WebClient.getWebClient(request)); 394 session.setAttribute("mls.listener", listener); 395 } 396 return listener; 397 } 398 399 protected Map<MessageAvailableConsumer, String> getConsumerIdMap(HttpServletRequest request) { 400 HttpSession session = request.getSession(true); 401 Map<MessageAvailableConsumer, String> map = (Map<MessageAvailableConsumer, String>)session.getAttribute("mls.consumerIdMap"); 402 if (map == null) { 403 map = new HashMap<MessageAvailableConsumer, String>(); 404 session.setAttribute("mls.consumerIdMap", map); 405 } 406 return map; 407 } 408 409 protected Map<MessageAvailableConsumer, String> getConsumerDestinationNameMap(HttpServletRequest request) { 410 HttpSession session = request.getSession(true); 411 Map<MessageAvailableConsumer, String> map = (Map<MessageAvailableConsumer, String>)session.getAttribute("mls.consumerDestinationNameMap"); 412 if (map == null) { 413 map = new HashMap<MessageAvailableConsumer, String>(); 414 session.setAttribute("mls.consumerDestinationNameMap", map); 415 } 416 return map; 417 } 418 419 protected boolean isRicoAjax(HttpServletRequest request) { 420 String rico = request.getParameter("rico"); 421 return rico != null && rico.equals("true"); 422 } 423 424 /** 425 * @return the timeout value for read requests which is always >= 0 and <= 426 * maximumReadTimeout to avoid DoS attacks 427 */ 428 protected long getReadTimeout(HttpServletRequest request) { 429 long answer = defaultReadTimeout; 430 431 String name = request.getParameter(readTimeoutParameter); 432 if (name != null) { 433 answer = asLong(name); 434 } 435 if (answer < 0 || answer > maximumReadTimeout) { 436 answer = maximumReadTimeout; 437 } 438 return answer; 439 } 440 441 /* 442 * Listen for available messages and wakeup any continuations. 443 */ 444 private class Listener implements MessageAvailableListener { 445 WebClient client; 446 long lastAccess; 447 Continuation continuation; 448 449 Listener(WebClient client) { 450 this.client = client; 451 } 452 453 public void access() { 454 lastAccess = System.currentTimeMillis(); 455 } 456 457 public synchronized void setContinuation(Continuation continuation) { 458 this.continuation = continuation; 459 } 460 461 public synchronized void onMessageAvailable(MessageConsumer consumer) { 462 if (LOG.isDebugEnabled()) { 463 LOG.debug("message for " + consumer + "continuation=" + continuation); 464 } 465 if (continuation != null) { 466 continuation.resume(); 467 } else if (System.currentTimeMillis() - lastAccess > 2 * maximumReadTimeout) { 468 new Thread() { 469 public void run() { 470 client.closeConsumers(); 471 }; 472 }.start(); 473 } 474 continuation = null; 475 } 476 477 } 478 }