001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.benchmark;
018    
019    import javax.jms.Destination;
020    import javax.jms.JMSException;
021    import javax.jms.Message;
022    import javax.jms.MessageConsumer;
023    import javax.jms.MessageListener;
024    import javax.jms.Session;
025    import javax.jms.TextMessage;
026    import javax.jms.Topic;
027    
028    /**
029     * @author James Strachan
030     * @version $Revision$
031     */
032    public class Consumer extends BenchmarkSupport implements MessageListener {
033    
034        public Consumer() {
035        }
036    
037        public static void main(String[] args) {
038            Consumer tool = new Consumer();
039            if (args.length > 0) {
040                tool.setUrl(args[0]);
041            }
042            if (args.length > 1) {
043                tool.setTopic(parseBoolean(args[1]));
044            }
045            if (args.length > 2) {
046                tool.setSubject(args[2]);
047            }
048            if (args.length > 3) {
049                tool.setDurable(parseBoolean(args[3]));
050            }
051            if (args.length > 4) {
052                tool.setConnectionCount(Integer.parseInt(args[4]));
053            }
054    
055            try {
056                tool.run();
057            } catch (Exception e) {
058                System.out.println("Caught: " + e);
059                e.printStackTrace();
060            }
061        }
062    
063        public void run() throws JMSException {
064            start();
065            subscribe();
066        }
067    
068        protected void subscribe() throws JMSException {
069            for (int i = 0; i < subjects.length; i++) {
070                subscribe(subjects[i]);
071            }
072        }
073    
074        protected void subscribe(String subject) throws JMSException {
075            Session session = createSession();
076    
077            Destination destination = createDestination(session, subject);
078    
079            System.out.println("Consuming on : " + destination + " of type: " + destination.getClass().getName());
080    
081            MessageConsumer consumer = null;
082            if (isDurable() && isTopic()) {
083                consumer = session.createDurableSubscriber((Topic)destination, getClass().getName());
084            } else {
085                consumer = session.createConsumer(destination);
086            }
087            consumer.setMessageListener(this);
088            addResource(consumer);
089        }
090    
091        public void onMessage(Message message) {
092            try {
093                TextMessage textMessage = (TextMessage)message;
094    
095                // lets force the content to be deserialized
096                textMessage.getText();
097                count(1);
098    
099                // lets count the messages
100    
101                // message.acknowledge();
102            } catch (JMSException e) {
103                // TODO Auto-generated catch block
104                e.printStackTrace();
105            }
106        }
107    
108    }