001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.tool;
018    
019    import java.io.IOException;
020    
021    import javax.jms.Connection;
022    import javax.jms.JMSException;
023    import javax.jms.Message;
024    import javax.jms.MessageConsumer;
025    import javax.jms.MessageListener;
026    import javax.jms.Session;
027    import javax.jms.TextMessage;
028    import javax.jms.Topic;
029    
030    /**
031     * A simple tool for consuming messages
032     * 
033     * @version $Revision$
034     */
035    public class ConsumerTool extends ToolSupport implements MessageListener {
036    
037        protected int count;
038        protected int dumpCount = 10;
039        protected boolean verbose = true;
040        protected int maxiumMessages;
041        private boolean pauseBeforeShutdown;
042    
043        public static void main(String[] args) {
044            ConsumerTool tool = new ConsumerTool();
045            if (args.length > 0) {
046                tool.url = args[0];
047            }
048            if (args.length > 1) {
049                tool.topic = args[1].equalsIgnoreCase("true");
050            }
051            if (args.length > 2) {
052                tool.subject = args[2];
053            }
054            if (args.length > 3) {
055                tool.durable = args[3].equalsIgnoreCase("true");
056            }
057            if (args.length > 4) {
058                tool.maxiumMessages = Integer.parseInt(args[4]);
059            }
060            tool.run();
061        }
062    
063        public void run() {
064            try {
065                System.out.println("Connecting to URL: " + url);
066                System.out.println("Consuming " + (topic ? "topic" : "queue") + ": " + subject);
067                System.out.println("Using " + (durable ? "durable" : "non-durable") + " subscription");
068    
069                Connection connection = createConnection();
070                Session session = createSession(connection);
071                MessageConsumer consumer = null;
072                if (durable && topic) {
073                    consumer = session.createDurableSubscriber((Topic)destination, consumerName);
074                } else {
075                    consumer = session.createConsumer(destination);
076                }
077                if (maxiumMessages <= 0) {
078                    consumer.setMessageListener(this);
079                }
080                connection.start();
081    
082                if (maxiumMessages > 0) {
083                    consumeMessagesAndClose(connection, session, consumer);
084                }
085            } catch (Exception e) {
086                System.out.println("Caught: " + e);
087                e.printStackTrace();
088            }
089        }
090    
091        public void onMessage(Message message) {
092            try {
093                if (message instanceof TextMessage) {
094                    TextMessage txtMsg = (TextMessage)message;
095                    if (verbose) {
096    
097                        String msg = txtMsg.getText();
098                        if (msg.length() > 50) {
099                            msg = msg.substring(0, 50) + "...";
100                        }
101    
102                        System.out.println("Received: " + msg);
103                    }
104                } else {
105                    if (verbose) {
106                        System.out.println("Received: " + message);
107                    }
108                }
109                /*
110                 * if (++count % dumpCount == 0) { dumpStats(connection); }
111                 */
112            } catch (JMSException e) {
113                System.out.println("Caught: " + e);
114                e.printStackTrace();
115            }
116        }
117    
118        protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer) throws JMSException, IOException {
119            System.out.println("We are about to wait until we consume: " + maxiumMessages + " message(s) then we will shutdown");
120    
121            for (int i = 0; i < maxiumMessages; i++) {
122                Message message = consumer.receive();
123                onMessage(message);
124            }
125            System.out.println("Closing connection");
126            consumer.close();
127            session.close();
128            connection.close();
129            if (pauseBeforeShutdown) {
130                System.out.println("Press return to shut down");
131                System.in.read();
132            }
133        }
134    }