001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.benchmark; 018 019 import javax.jms.Destination; 020 import javax.jms.JMSException; 021 import javax.jms.Message; 022 import javax.jms.MessageConsumer; 023 import javax.jms.MessageListener; 024 import javax.jms.Session; 025 import javax.jms.TextMessage; 026 import javax.jms.Topic; 027 028 /** 029 * @author James Strachan 030 * @version $Revision$ 031 */ 032 public class Consumer extends BenchmarkSupport implements MessageListener { 033 034 public Consumer() { 035 } 036 037 public static void main(String[] args) { 038 Consumer tool = new Consumer(); 039 if (args.length > 0) { 040 tool.setUrl(args[0]); 041 } 042 if (args.length > 1) { 043 tool.setTopic(parseBoolean(args[1])); 044 } 045 if (args.length > 2) { 046 tool.setSubject(args[2]); 047 } 048 if (args.length > 3) { 049 tool.setDurable(parseBoolean(args[3])); 050 } 051 if (args.length > 4) { 052 tool.setConnectionCount(Integer.parseInt(args[4])); 053 } 054 055 try { 056 tool.run(); 057 } catch (Exception e) { 058 System.out.println("Caught: " + e); 059 e.printStackTrace(); 060 } 061 } 062 063 public void run() throws JMSException { 064 start(); 065 subscribe(); 066 } 067 068 protected void subscribe() throws JMSException { 069 for (int i = 0; i < subjects.length; i++) { 070 subscribe(subjects[i]); 071 } 072 } 073 074 protected void subscribe(String subject) throws JMSException { 075 Session session = createSession(); 076 077 Destination destination = createDestination(session, subject); 078 079 System.out.println("Consuming on : " + destination + " of type: " + destination.getClass().getName()); 080 081 MessageConsumer consumer = null; 082 if (isDurable() && isTopic()) { 083 consumer = session.createDurableSubscriber((Topic)destination, getClass().getName()); 084 } else { 085 consumer = session.createConsumer(destination); 086 } 087 consumer.setMessageListener(this); 088 addResource(consumer); 089 } 090 091 public void onMessage(Message message) { 092 try { 093 TextMessage textMessage = (TextMessage)message; 094 095 // lets force the content to be deserialized 096 textMessage.getText(); 097 count(1); 098 099 // lets count the messages 100 101 // message.acknowledge(); 102 } catch (JMSException e) { 103 // TODO Auto-generated catch block 104 e.printStackTrace(); 105 } 106 } 107 108 }