001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq.web;
019    
020    import java.io.IOException;
021    import java.io.PrintWriter;
022    import java.io.StringWriter;
023    import java.util.HashMap;
024    import java.util.List;
025    import java.util.Map;
026    
027    import javax.jms.Destination;
028    import javax.jms.JMSException;
029    import javax.jms.Message;
030    import javax.jms.MessageConsumer;
031    import javax.jms.ObjectMessage;
032    import javax.jms.TextMessage;
033    import javax.servlet.ServletConfig;
034    import javax.servlet.ServletException;
035    import javax.servlet.http.HttpServletRequest;
036    import javax.servlet.http.HttpServletResponse;
037    import javax.servlet.http.HttpSession;
038    
039    import org.apache.activemq.MessageAvailableConsumer;
040    import org.apache.activemq.MessageAvailableListener;
041    import org.apache.commons.logging.Log;
042    import org.apache.commons.logging.LogFactory;
043    import org.mortbay.util.ajax.Continuation;
044    import org.mortbay.util.ajax.ContinuationSupport;
045    
046    /**
047     * A servlet for sending and receiving messages to/from JMS destinations using
048     * HTTP POST for sending and HTTP GET for receiving. <p/> You can specify the
049     * destination and whether it is a topic or queue via configuration details on
050     * the servlet or as request parameters. <p/> For reading messages you can
051     * specify a readTimeout parameter to determine how long the servlet should
052     * block for. The servlet can be configured with the following init parameters:
053     * <dl>
054     * <dt>defaultReadTimeout</dt>
055     * <dd>The default time in ms to wait for messages. May be overridden by a
056     * request using the 'timeout' parameter</dd>
057     * <dt>maximumReadTimeout</dt>
058     * <dd>The maximum value a request may specify for the 'timeout' parameter</dd>
059     * <dt>maximumMessages</dt>
060     * <dd>maximum messages to send per response</dd>
061     * <dt></dt>
062     * <dd></dd>
063     * </dl>
064     * 
065     * @version $Revision: 1.1.1.1 $
066     */
067    public class MessageListenerServlet extends MessageServletSupport {
068        private static final Log LOG = LogFactory.getLog(MessageListenerServlet.class);
069    
070        private String readTimeoutParameter = "timeout";
071        private long defaultReadTimeout = -1;
072        private long maximumReadTimeout = 25000;
073        private int maximumMessages = 100;
074    
075        public void init() throws ServletException {
076            ServletConfig servletConfig = getServletConfig();
077            String name = servletConfig.getInitParameter("defaultReadTimeout");
078            if (name != null) {
079                defaultReadTimeout = asLong(name);
080            }
081            name = servletConfig.getInitParameter("maximumReadTimeout");
082            if (name != null) {
083                maximumReadTimeout = asLong(name);
084            }
085            name = servletConfig.getInitParameter("maximumMessages");
086            if (name != null) {
087                maximumMessages = (int)asLong(name);
088            }
089        }
090    
091        /**
092         * Sends a message to a destination or manage subscriptions. If the the
093         * content type of the POST is
094         * <code>application/x-www-form-urlencoded</code>, then the form
095         * parameters "destination", "message" and "type" are used to pass a message
096         * or a subscription. If multiple messages or subscriptions are passed in a
097         * single post, then additional parameters are shortened to "dN", "mN" and
098         * "tN" where N is an index starting from 1. The type is either "send",
099         * "listen" or "unlisten". For send types, the message is the text of the
100         * TextMessage, otherwise it is the ID to be used for the subscription. If
101         * the content type is not <code>application/x-www-form-urlencoded</code>,
102         * then the body of the post is sent as the message to a destination that is
103         * derived from a query parameter, the URL or the default destination.
104         * 
105         * @param request
106         * @param response
107         * @throws ServletException
108         * @throws IOException
109         */
110        protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
111    
112            // lets turn the HTTP post into a JMS Message
113    
114            WebClient client = WebClient.getWebClient(request);
115            String messageIds = "";
116    
117            synchronized (client) {
118    
119                if (LOG.isDebugEnabled()) {
120                    LOG.debug("POST client=" + client + " session=" + request.getSession().getId() + " info=" + request.getPathInfo() + " contentType=" + request.getContentType());
121                    // dump(request.getParameterMap());
122                }
123    
124                int messages = 0;
125    
126                // loop until no more messages
127                while (true) {
128                    // Get the message parameters. Multiple messages are encoded
129                    // with more compact parameter names.
130                    String destinationName = request.getParameter(messages == 0 ? "destination" : ("d" + messages));
131    
132                    if (destinationName == null) {
133                        destinationName = request.getHeader("destination");
134                    }
135    
136                    String message = request.getParameter(messages == 0 ? "message" : ("m" + messages));
137                    String type = request.getParameter(messages == 0 ? "type" : ("t" + messages));
138    
139                    if (destinationName == null || message == null || type == null) {
140                        break;
141                    }
142    
143                    try {
144                        Destination destination = getDestination(client, request, destinationName);
145    
146                        if (LOG.isDebugEnabled()) {
147                            LOG.debug(messages + " destination=" + destinationName + " message=" + message + " type=" + type);
148                            LOG.debug(destination + " is a " + destination.getClass().getName());
149                        }
150    
151                        messages++;
152    
153                        if ("listen".equals(type)) {
154                            Listener listener = getListener(request);
155                            Map<MessageAvailableConsumer, String> consumerIdMap = getConsumerIdMap(request);
156                            Map<MessageAvailableConsumer, String> consumerDestinationMap = getConsumerDestinationNameMap(request);
157                            client.closeConsumer(destination); // drop any existing
158                            // consumer.
159                            MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination);
160    
161                            consumer.setAvailableListener(listener);
162                            consumerIdMap.put(consumer, message);
163                            consumerDestinationMap.put(consumer, destinationName);
164                            if (LOG.isDebugEnabled()) {
165                                LOG.debug("Subscribed: " + consumer + " to " + destination + " id=" + message);
166                            }
167                        } else if ("unlisten".equals(type)) {
168                            Map<MessageAvailableConsumer, String> consumerIdMap = getConsumerIdMap(request);
169                            Map consumerDestinationMap = getConsumerDestinationNameMap(request);
170                            MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination);
171    
172                            consumer.setAvailableListener(null);
173                            consumerIdMap.remove(consumer);
174                            consumerDestinationMap.remove(consumer);
175                            client.closeConsumer(destination);
176                            if (LOG.isDebugEnabled()) {
177                                LOG.debug("Unsubscribed: " + consumer);
178                            }
179                        } else if ("send".equals(type)) {
180                            TextMessage text = client.getSession().createTextMessage(message);
181                            appendParametersToMessage(request, text);
182    
183                            client.send(destination, text);
184                            messageIds += text.getJMSMessageID() + "\n";
185                            if (LOG.isDebugEnabled()) {
186                                LOG.debug("Sent " + message + " to " + destination);
187                            }
188                        } else {
189                            LOG.warn("unknown type " + type);
190                        }
191    
192                    } catch (JMSException e) {
193                        LOG.warn("jms", e);
194                    }
195                }
196            }
197    
198            if ("true".equals(request.getParameter("poll"))) {
199                try {
200                    // TODO return message IDs
201                    doMessages(client, request, response);
202                } catch (JMSException e) {
203                    throw new ServletException("JMS problem: " + e, e);
204                }
205            } else {
206                // handle simple POST of a message
207                if (request.getContentLength() != 0 && (request.getContentType() == null || !request.getContentType().toLowerCase().startsWith("application/x-www-form-urlencoded"))) {
208                    try {
209                        Destination destination = getDestination(client, request);
210                        String body = getPostedMessageBody(request);
211                        TextMessage message = client.getSession().createTextMessage(body);
212                        appendParametersToMessage(request, message);
213    
214                        client.send(destination, message);
215                        if (LOG.isDebugEnabled()) {
216                            LOG.debug("Sent to destination: " + destination + " body: " + body);
217                        }
218                        messageIds += message.getJMSMessageID() + "\n";
219                    } catch (JMSException e) {
220                        throw new ServletException(e);
221                    }
222                }
223    
224                response.setContentType("text/plain");
225                response.setHeader("Cache-Control", "no-cache");
226                response.getWriter().print(messageIds);
227            }
228        }
229    
230        /**
231         * Supports a HTTP DELETE to be equivlanent of consuming a singe message
232         * from a queue
233         */
234        protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
235            try {
236                WebClient client = WebClient.getWebClient(request);
237                if (LOG.isDebugEnabled()) {
238                    LOG.debug("GET client=" + client + " session=" + request.getSession().getId() + " uri=" + request.getRequestURI() + " query=" + request.getQueryString());
239                }
240    
241                doMessages(client, request, response);
242            } catch (JMSException e) {
243                throw new ServletException("JMS problem: " + e, e);
244            }
245        }
246    
247        /**
248         * Reads a message from a destination up to some specific timeout period
249         * 
250         * @param client The webclient
251         * @param request
252         * @param response
253         * @throws ServletException
254         * @throws IOException
255         */
256        protected void doMessages(WebClient client, HttpServletRequest request, HttpServletResponse response) throws JMSException, IOException {
257    
258            int messages = 0;
259            // This is a poll for any messages
260    
261            long timeout = getReadTimeout(request);
262            if (LOG.isDebugEnabled()) {
263                LOG.debug("doMessage timeout=" + timeout);
264            }
265    
266            Continuation continuation = ContinuationSupport.getContinuation(request, client);
267            Listener listener = getListener(request);
268            if (listener != null && continuation != null && !continuation.isPending()) {
269                listener.access();
270            }
271    
272            Message message = null;
273            synchronized (client) {
274    
275                List consumers = client.getConsumers();
276                MessageAvailableConsumer consumer = null;
277    
278                // Look for a message that is ready to go
279                for (int i = 0; message == null && i < consumers.size(); i++) {
280                    consumer = (MessageAvailableConsumer)consumers.get(i);
281                    if (consumer.getAvailableListener() == null) {
282                        continue;
283                    }
284    
285                    // Look for any available messages
286                    message = consumer.receiveNoWait();
287                    if (LOG.isDebugEnabled()) {
288                        LOG.debug("received " + message + " from " + consumer);
289                    }
290                }
291    
292                // Get an existing Continuation or create a new one if there are no
293                // messages
294    
295                if (message == null) {
296                    // register this continuation with our listener.
297                    listener.setContinuation(continuation);
298    
299                    // Get the continuation object (may wait and/or retry
300                    // request here).
301                    continuation.suspend(timeout);
302                }
303                listener.setContinuation(null);
304    
305                // prepare the responds
306                response.setContentType("text/xml");
307                response.setHeader("Cache-Control", "no-cache");
308    
309                StringWriter swriter = new StringWriter();
310                PrintWriter writer = new PrintWriter(swriter);
311    
312                Map<MessageAvailableConsumer, String> consumerIdMap = getConsumerIdMap(request);
313                Map<MessageAvailableConsumer, String> consumerDestinationNameMap = getConsumerDestinationNameMap(request);
314                response.setStatus(HttpServletResponse.SC_OK);
315                writer.println("<ajax-response>");
316    
317                // Send any message we already have
318                if (message != null) {
319                    String id = consumerIdMap.get(consumer);
320                    String destinationName = consumerDestinationNameMap.get(consumer);
321                    writer.print("<response id='");
322                    writer.print(id);
323                    writer.print("'");
324                    if (destinationName != null) {
325                        writer.print(" destination='" + destinationName + "' ");
326                    }
327                    writer.print(">");
328                    writeMessageResponse(writer, message);
329                    writer.println("</response>");
330                    messages++;
331                }
332    
333                // Send the rest of the messages
334                for (int i = 0; i < consumers.size() && messages < maximumMessages; i++) {
335                    consumer = (MessageAvailableConsumer)consumers.get(i);
336                    if (consumer.getAvailableListener() == null) {
337                        continue;
338                    }
339    
340                    // Look for any available messages
341                    while (messages < maximumMessages) {
342                        message = consumer.receiveNoWait();
343                        if (message == null) {
344                            break;
345                        }
346                        messages++;
347                        String id = consumerIdMap.get(consumer);
348                        String destinationName = consumerDestinationNameMap.get(consumer);
349                        writer.print("<response id='");
350                        writer.print(id);
351                        writer.print("'");
352                        if (destinationName != null) {
353                            writer.print(" destination='" + destinationName + "' ");
354                        }
355                        writer.print(">");
356                        writeMessageResponse(writer, message);
357                        writer.println("</response>");
358                    }
359                }
360    
361                // Add poll message
362                // writer.println("<response type='object'
363                // id='amqPoll'><ok/></response>");
364    
365                writer.print("</ajax-response>");
366    
367                writer.flush();
368                String m = swriter.toString();
369                response.getWriter().println(m);
370            }
371    
372        }
373    
374        protected void writeMessageResponse(PrintWriter writer, Message message) throws JMSException, IOException {
375            if (message instanceof TextMessage) {
376                TextMessage textMsg = (TextMessage)message;
377                String txt = textMsg.getText();
378                if (txt.startsWith("<?")) {
379                    txt = txt.substring(txt.indexOf("?>") + 2);
380                }
381                writer.print(txt);
382            } else if (message instanceof ObjectMessage) {
383                ObjectMessage objectMsg = (ObjectMessage)message;
384                Object object = objectMsg.getObject();
385                writer.print(object.toString());
386            }
387        }
388    
389        protected Listener getListener(HttpServletRequest request) {
390            HttpSession session = request.getSession();
391            Listener listener = (Listener)session.getAttribute("mls.listener");
392            if (listener == null) {
393                listener = new Listener(WebClient.getWebClient(request));
394                session.setAttribute("mls.listener", listener);
395            }
396            return listener;
397        }
398    
399        protected Map<MessageAvailableConsumer, String> getConsumerIdMap(HttpServletRequest request) {
400            HttpSession session = request.getSession(true);
401            Map<MessageAvailableConsumer, String> map = (Map<MessageAvailableConsumer, String>)session.getAttribute("mls.consumerIdMap");
402            if (map == null) {
403                map = new HashMap<MessageAvailableConsumer, String>();
404                session.setAttribute("mls.consumerIdMap", map);
405            }
406            return map;
407        }
408    
409        protected Map<MessageAvailableConsumer, String> getConsumerDestinationNameMap(HttpServletRequest request) {
410            HttpSession session = request.getSession(true);
411            Map<MessageAvailableConsumer, String> map = (Map<MessageAvailableConsumer, String>)session.getAttribute("mls.consumerDestinationNameMap");
412            if (map == null) {
413                map = new HashMap<MessageAvailableConsumer, String>();
414                session.setAttribute("mls.consumerDestinationNameMap", map);
415            }
416            return map;
417        }
418    
419        protected boolean isRicoAjax(HttpServletRequest request) {
420            String rico = request.getParameter("rico");
421            return rico != null && rico.equals("true");
422        }
423    
424        /**
425         * @return the timeout value for read requests which is always >= 0 and <=
426         *         maximumReadTimeout to avoid DoS attacks
427         */
428        protected long getReadTimeout(HttpServletRequest request) {
429            long answer = defaultReadTimeout;
430    
431            String name = request.getParameter(readTimeoutParameter);
432            if (name != null) {
433                answer = asLong(name);
434            }
435            if (answer < 0 || answer > maximumReadTimeout) {
436                answer = maximumReadTimeout;
437            }
438            return answer;
439        }
440    
441        /*
442         * Listen for available messages and wakeup any continuations.
443         */
444        private class Listener implements MessageAvailableListener {
445            WebClient client;
446            long lastAccess;
447            Continuation continuation;
448    
449            Listener(WebClient client) {
450                this.client = client;
451            }
452    
453            public void access() {
454                lastAccess = System.currentTimeMillis();
455            }
456    
457            public synchronized void setContinuation(Continuation continuation) {
458                this.continuation = continuation;
459            }
460    
461            public synchronized void onMessageAvailable(MessageConsumer consumer) {
462                if (LOG.isDebugEnabled()) {
463                    LOG.debug("message for " + consumer + "continuation=" + continuation);
464                }
465                if (continuation != null) {
466                    continuation.resume();
467                } else if (System.currentTimeMillis() - lastAccess > 2 * maximumReadTimeout) {
468                    new Thread() {
469                        public void run() {
470                            client.closeConsumers();
471                        };
472                    }.start();
473                }
474                continuation = null;
475            }
476    
477        }
478    }