001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq.web;
019    
020    import java.io.IOException;
021    import java.io.PrintWriter;
022    import java.util.HashMap;
023    import java.util.LinkedList;
024    import java.util.List;
025    
026    import javax.jms.Destination;
027    import javax.jms.JMSException;
028    import javax.jms.Message;
029    import javax.jms.MessageConsumer;
030    import javax.jms.ObjectMessage;
031    import javax.jms.TextMessage;
032    import javax.servlet.ServletConfig;
033    import javax.servlet.ServletContext;
034    import javax.servlet.ServletException;
035    import javax.servlet.http.HttpServletRequest;
036    import javax.servlet.http.HttpServletResponse;
037    
038    import org.apache.activemq.MessageAvailableConsumer;
039    import org.apache.activemq.MessageAvailableListener;
040    import org.apache.activemq.camel.converter.ActiveMQMessageConverter;
041    import org.apache.activemq.command.ActiveMQDestination;
042    import org.apache.activemq.command.ActiveMQTextMessage;
043    import org.apache.camel.Endpoint;
044    import org.apache.camel.Exchange;
045    import org.apache.camel.ExchangePattern;
046    import org.apache.camel.Producer;
047    import org.apache.commons.logging.Log;
048    import org.apache.commons.logging.LogFactory;
049    import org.mortbay.util.ajax.Continuation;
050    import org.mortbay.util.ajax.ContinuationSupport;
051    
052    /**
053     * A servlet for sending and receiving messages to/from JMS destinations using
054     * HTTP POST for sending and HTTP GET for receiving. <p/> You can specify the
055     * destination and whether it is a topic or queue via configuration details on
056     * the servlet or as request parameters. <p/> For reading messages you can
057     * specify a readTimeout parameter to determine how long the servlet should
058     * block for.
059     * 
060     * @version $Revision: 1.1.1.1 $
061     */
062    public class MessageServlet extends MessageServletSupport {
063        private static final Log LOG = LogFactory.getLog(MessageServlet.class);
064    
065        private String readTimeoutParameter = "readTimeout";
066        private long defaultReadTimeout = -1;
067        private long maximumReadTimeout = 20000;
068        private long requestTimeout = 1000;
069        
070        private HashMap<String, WebClient> clients = new HashMap<String, WebClient>();
071    
072        public void init() throws ServletException {
073            ServletConfig servletConfig = getServletConfig();
074            String name = servletConfig.getInitParameter("defaultReadTimeout");
075            if (name != null) {
076                defaultReadTimeout = asLong(name);
077            }
078            name = servletConfig.getInitParameter("maximumReadTimeout");
079            if (name != null) {
080                maximumReadTimeout = asLong(name);
081            }
082            name = servletConfig.getInitParameter("replyTimeout");
083            if (name != null) {
084                    requestTimeout = asLong(name);
085            }        
086        }
087    
088        /**
089         * Sends a message to a destination
090         * 
091         * @param request
092         * @param response
093         * @throws ServletException
094         * @throws IOException
095         */
096        protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
097            // lets turn the HTTP post into a JMS Message
098            try {
099                WebClient client = getWebClient(request);
100    
101                String text = getPostedMessageBody(request);
102    
103                // lets create the destination from the URI?
104                Destination destination = getDestination(client, request);
105                if (destination == null) {
106                    throw new NoDestinationSuppliedException();
107                }
108    
109                if (LOG.isDebugEnabled()) {
110                    LOG.debug("Sending message to: " + destination + " with text: " + text);
111                }
112    
113                boolean sync = isSync(request);
114                TextMessage message = client.getSession().createTextMessage(text);
115    
116                if (sync) {
117                   String point = "activemq:" 
118                       + ((ActiveMQDestination)destination).getPhysicalName().replace("//", "")
119                       + "?requestTimeout=" + requestTimeout;
120                   try {
121                       String body = (String)client.getProducerTemplate().requestBody(point, text);
122                       ActiveMQTextMessage answer = new ActiveMQTextMessage();
123                       answer.setText(body);
124                       writeMessageResponse(response.getWriter(), answer);
125                   } catch (Exception e) {
126                       IOException ex = new IOException();
127                       ex.initCause(e);
128                       throw ex;
129                   }
130                } else {
131                    appendParametersToMessage(request, message);
132                    boolean persistent = isSendPersistent(request);
133                    int priority = getSendPriority(request);
134                    long timeToLive = getSendTimeToLive(request);                   
135                    client.send(destination, message, persistent, priority, timeToLive);
136                }
137    
138                // lets return a unique URI for reliable messaging
139                response.setHeader("messageID", message.getJMSMessageID());
140                response.setStatus(HttpServletResponse.SC_OK);
141            } catch (JMSException e) {
142                throw new ServletException("Could not post JMS message: " + e, e);
143            }
144        }
145    
146        /**
147         * Supports a HTTP DELETE to be equivlanent of consuming a singe message
148         * from a queue
149         */
150        protected void doDelete(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
151            doMessages(request, response, 1);
152        }
153    
154        /**
155         * Supports a HTTP DELETE to be equivlanent of consuming a singe message
156         * from a queue
157         */
158        protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
159            doMessages(request, response, -1);
160        }
161    
162        /**
163         * Reads a message from a destination up to some specific timeout period
164         * 
165         * @param request
166         * @param response
167         * @throws ServletException
168         * @throws IOException
169         */
170        protected void doMessages(HttpServletRequest request, HttpServletResponse response, int maxMessages) throws ServletException, IOException {
171    
172            int messages = 0;
173            try {
174                WebClient client = getWebClient(request);
175                Destination destination = getDestination(client, request);
176                if (destination == null) {
177                    throw new NoDestinationSuppliedException();
178                }
179                long timeout = getReadTimeout(request);
180                boolean ajax = isRicoAjax(request);
181                if (!ajax) {
182                    maxMessages = 1;
183                }
184    
185                if (LOG.isDebugEnabled()) {
186                    LOG.debug("Receiving message(s) from: " + destination + " with timeout: " + timeout);
187                }
188    
189                MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination);
190                Continuation continuation = null;
191                Listener listener = null;
192                Message message = null;
193    
194                synchronized (consumer) {
195                    // Fetch the listeners
196                    listener = (Listener)consumer.getAvailableListener();
197                    if (listener == null) {
198                        listener = new Listener(consumer);
199                        consumer.setAvailableListener(listener);
200                    }
201                    // Look for any available messages
202                    message = consumer.receiveNoWait();
203    
204                    // Get an existing Continuation or create a new one if there are
205                    // no events.
206                    if (message == null) {
207                        continuation = ContinuationSupport.getContinuation(request, consumer);
208    
209                        // register this continuation with our listener.
210                        listener.setContinuation(continuation);
211    
212                        // Get the continuation object (may wait and/or retry
213                        // request here).
214                        continuation.suspend(timeout);
215                    }
216    
217                    // Try again now
218                    if (message == null) {
219                        message = consumer.receiveNoWait();
220                    }
221    
222                    // write a responds
223                    response.setContentType("text/xml");
224                    PrintWriter writer = response.getWriter();
225    
226                    if (ajax) {
227                        writer.println("<ajax-response>");
228                    }
229    
230                    // handle any message(s)
231                    if (message == null) {
232                        // No messages so OK response of for ajax else no content.
233                        response.setStatus(ajax ? HttpServletResponse.SC_OK : HttpServletResponse.SC_NO_CONTENT);
234                    } else {
235                        // We have at least one message so set up the response
236                        response.setStatus(HttpServletResponse.SC_OK);
237                        String type = getContentType(request);
238                        if (type != null) {
239                            response.setContentType(type);
240                        }
241    
242                        // send a response for each available message (up to max
243                        // messages)
244                        while ((maxMessages < 0 || messages < maxMessages) && message != null) {
245                            if (ajax) {
246                                writer.print("<response type='object' id='");
247                                writer.print(request.getParameter("id"));
248                                writer.println("'>");
249                            } else {
250                                // only ever 1 message for non ajax!
251                                setResponseHeaders(response, message);
252                            }
253    
254                            writeMessageResponse(writer, message);
255    
256                            if (ajax) {
257                                writer.println("</response>");
258                            }
259    
260                            // look for next message
261                            messages++;
262                            if(maxMessages < 0 || messages < maxMessages) {
263                                    message = consumer.receiveNoWait();
264                            }
265                        }
266                    }
267    
268                    if (ajax) {
269                        writer.println("<response type='object' id='poll'><ok/></response>");
270                        writer.println("</ajax-response>");
271                    }
272                }
273            } catch (JMSException e) {
274                throw new ServletException("Could not post JMS message: " + e, e);
275            } finally {
276                if (LOG.isDebugEnabled()) {
277                    LOG.debug("Received " + messages + " message(s)");
278                }
279            }
280        }
281    
282        /**
283         * Reads a message from a destination up to some specific timeout period
284         * 
285         * @param request
286         * @param response
287         * @throws ServletException
288         * @throws IOException
289         */
290        protected void doMessagesWithoutContinuation(HttpServletRequest request, HttpServletResponse response, int maxMessages) throws ServletException, IOException {
291    
292            int messages = 0;
293            try {
294                WebClient client = getWebClient(request);
295                Destination destination = getDestination(client, request);
296                long timeout = getReadTimeout(request);
297                boolean ajax = isRicoAjax(request);
298                if (!ajax) {
299                    maxMessages = 1;
300                }
301                if (LOG.isDebugEnabled()) {
302                    LOG.debug("Receiving message(s) from: " + destination + " with timeout: " + timeout);
303                }
304    
305                MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination);
306                Message message = null;
307    
308                // write a responds
309                response.setContentType("text/xml");
310                PrintWriter writer = response.getWriter();
311    
312                if (ajax) {
313                    writer.println("<ajax-response>");
314                }
315    
316                // Only one client thread at a time should poll for messages.
317                if (client.getSemaphore().tryAcquire()) {
318                    try {
319                        // Look for any available messages
320                        message = consumer.receive(timeout);
321    
322                        // handle any message(s)
323                        if (message == null) {
324                            // No messages so OK response of for ajax else no
325                            // content.
326                            response.setStatus(ajax ? HttpServletResponse.SC_OK : HttpServletResponse.SC_NO_CONTENT);
327                        } else {
328                            // We have at least one message so set up the
329                            // response
330                            response.setStatus(HttpServletResponse.SC_OK);
331                            String type = getContentType(request);
332                            if (type != null) {
333                                response.setContentType(type);
334                            }
335    
336                            // send a response for each available message (up to
337                            // max
338                            // messages)
339                            while ((maxMessages < 0 || messages < maxMessages) && message != null) {
340                                if (ajax) {
341                                    writer.print("<response type='object' id='");
342                                    writer.print(request.getParameter("id"));
343                                    writer.println("'>");
344                                } else {
345                                    // only ever 1 message for non ajax!
346                                    setResponseHeaders(response, message);
347                                }
348    
349                                writeMessageResponse(writer, message);
350    
351                                if (ajax) {
352                                    writer.println("</response>");
353                                }
354    
355                                // look for next message
356                                messages++;
357                                if(maxMessages < 0 || messages < maxMessages) {
358                                    message = consumer.receiveNoWait();
359                                }
360    
361                            }
362                        }
363                    } finally {
364                        client.getSemaphore().release();
365                    }
366                } else {
367                    // Client is using us in another thread.
368                    response.setStatus(ajax ? HttpServletResponse.SC_OK : HttpServletResponse.SC_NO_CONTENT);
369                }
370    
371                if (ajax) {
372                    writer.println("<response type='object' id='poll'><ok/></response>");
373                    writer.println("</ajax-response>");
374                }
375    
376            } catch (JMSException e) {
377                throw new ServletException("Could not post JMS message: " + e, e);
378            } finally {
379                if (LOG.isDebugEnabled()) {
380                    LOG.debug("Received " + messages + " message(s)");
381                }
382            }
383        }
384    
385        protected void writeMessageResponse(PrintWriter writer, Message message) throws JMSException, IOException {
386            if (message instanceof TextMessage) {
387                TextMessage textMsg = (TextMessage)message;
388                String txt = textMsg.getText();
389                if (txt.startsWith("<?")) {
390                    txt = txt.substring(txt.indexOf("?>") + 2);
391                }
392                writer.print(txt);
393            } else if (message instanceof ObjectMessage) {
394                ObjectMessage objectMsg = (ObjectMessage)message;
395                Object object = objectMsg.getObject();
396                writer.print(object.toString());
397            }
398        }
399    
400        protected boolean isRicoAjax(HttpServletRequest request) {
401            String rico = request.getParameter("rico");
402            return rico != null && rico.equals("true");
403        }
404        
405        public WebClient getWebClient(HttpServletRequest request) {
406            String clientId = request.getParameter("clientId");
407            if (clientId != null) {
408                    synchronized(this) {
409                            LOG.debug("Getting local client [" + clientId + "]");
410                            WebClient client = clients.get(clientId);
411                            if (client == null) {
412                                    LOG.debug("Creating new client [" + clientId + "]");
413                                    client = new WebClient();
414                                    clients.put(clientId, client);
415                            }
416                            return client;
417                    }
418                    
419            } else {
420                    return WebClient.getWebClient(request);
421            }
422        }    
423    
424        protected String getContentType(HttpServletRequest request) {
425            /*
426             * log("Params: " + request.getParameterMap()); Enumeration iter =
427             * request.getHeaderNames(); while (iter.hasMoreElements()) { String
428             * name = (String) iter.nextElement(); log("Header: " + name + " = " +
429             * request.getHeader(name)); }
430             */
431            String value = request.getParameter("xml");
432            if (value != null && "true".equalsIgnoreCase(value)) {
433                return "text/xml";
434            }
435            return null;
436        }
437    
438        protected void setResponseHeaders(HttpServletResponse response, Message message) throws JMSException {
439            response.setHeader("destination", message.getJMSDestination().toString());
440            response.setHeader("id", message.getJMSMessageID());
441        }
442    
443        /**
444         * @return the timeout value for read requests which is always >= 0 and <=
445         *         maximumReadTimeout to avoid DoS attacks
446         */
447        protected long getReadTimeout(HttpServletRequest request) {
448            long answer = defaultReadTimeout;
449    
450            String name = request.getParameter(readTimeoutParameter);
451            if (name != null) {
452                answer = asLong(name);
453            }
454            if (answer < 0 || answer > maximumReadTimeout) {
455                answer = maximumReadTimeout;
456            }
457            return answer;
458        }
459    
460        /*
461         * Listen for available messages and wakeup any continuations.
462         */
463        private static class Listener implements MessageAvailableListener {
464            MessageConsumer consumer;
465            Continuation continuation;
466            List queue = new LinkedList();
467    
468            Listener(MessageConsumer consumer) {
469                this.consumer = consumer;
470            }
471    
472            public void setContinuation(Continuation continuation) {
473                synchronized (consumer) {
474                    this.continuation = continuation;
475                }
476            }
477    
478            public void onMessageAvailable(MessageConsumer consumer) {
479                assert this.consumer == consumer;
480    
481                synchronized (this.consumer) {
482                    if (continuation != null) {
483                        continuation.resume();
484                    }
485                    continuation = null;
486                }
487            }
488        }
489    
490    }