001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.tool; 018 019 import java.util.Arrays; 020 021 import javax.jms.ConnectionFactory; 022 import javax.jms.DeliveryMode; 023 import javax.jms.Destination; 024 import javax.jms.JMSException; 025 import javax.jms.MessageProducer; 026 import javax.jms.TextMessage; 027 028 import org.apache.activemq.tool.properties.JmsClientProperties; 029 import org.apache.activemq.tool.properties.JmsProducerProperties; 030 import org.apache.commons.logging.Log; 031 import org.apache.commons.logging.LogFactory; 032 033 public class JmsProducerClient extends AbstractJmsMeasurableClient { 034 private static final Log LOG = LogFactory.getLog(JmsProducerClient.class); 035 036 protected JmsProducerProperties client; 037 protected MessageProducer jmsProducer; 038 protected TextMessage jmsTextMessage; 039 040 public JmsProducerClient(ConnectionFactory factory) { 041 this(new JmsProducerProperties(), factory); 042 } 043 044 public JmsProducerClient(JmsProducerProperties clientProps, ConnectionFactory factory) { 045 super(factory); 046 this.client = clientProps; 047 } 048 049 public void sendMessages() throws JMSException { 050 // Send a specific number of messages 051 if (client.getSendType().equalsIgnoreCase(JmsProducerProperties.COUNT_BASED_SENDING)) { 052 sendCountBasedMessages(client.getSendCount()); 053 054 // Send messages for a specific duration 055 } else { 056 sendTimeBasedMessages(client.getSendDuration()); 057 } 058 } 059 060 public void sendMessages(int destCount) throws JMSException { 061 this.destCount = destCount; 062 sendMessages(); 063 } 064 065 public void sendMessages(int destIndex, int destCount) throws JMSException { 066 this.destIndex = destIndex; 067 sendMessages(destCount); 068 } 069 070 public void sendCountBasedMessages(long messageCount) throws JMSException { 071 // Parse through different ways to send messages 072 // Avoided putting the condition inside the loop to prevent effect on performance 073 Destination[] dest = createDestination(destIndex, destCount); 074 075 // Create a producer, if none is created. 076 if (getJmsProducer() == null) { 077 if (dest.length == 1) { 078 createJmsProducer(dest[0]); 079 } else { 080 createJmsProducer(); 081 } 082 } 083 try { 084 getConnection().start(); 085 LOG.info("Starting to publish " + client.getMessageSize() + " byte(s) of " + messageCount + " messages..."); 086 087 // Send one type of message only, avoiding the creation of different messages on sending 088 if (!client.isCreateNewMsg()) { 089 // Create only a single message 090 createJmsTextMessage(); 091 092 // Send to more than one actual destination 093 if (dest.length > 1) { 094 for (int i = 0; i < messageCount; i++) { 095 for (int j = 0; j < dest.length; j++) { 096 getJmsProducer().send(dest[j], getJmsTextMessage()); 097 incThroughput(); 098 } 099 } 100 // Send to only one actual destination 101 } else { 102 for (int i = 0; i < messageCount; i++) { 103 getJmsProducer().send(getJmsTextMessage()); 104 incThroughput(); 105 } 106 } 107 108 // Send different type of messages using indexing to identify each one. 109 // Message size will vary. Definitely slower, since messages properties 110 // will be set individually each send. 111 } else { 112 // Send to more than one actual destination 113 if (dest.length > 1) { 114 for (int i = 0; i < messageCount; i++) { 115 for (int j = 0; j < dest.length; j++) { 116 getJmsProducer().send(dest[j], createJmsTextMessage("Text Message [" + i + "]")); 117 incThroughput(); 118 } 119 } 120 121 // Send to only one actual destination 122 } else { 123 for (int i = 0; i < messageCount; i++) { 124 getJmsProducer().send(createJmsTextMessage("Text Message [" + i + "]")); 125 incThroughput(); 126 } 127 } 128 } 129 } finally { 130 getConnection().close(); 131 } 132 } 133 134 public void sendTimeBasedMessages(long duration) throws JMSException { 135 long endTime = System.currentTimeMillis() + duration; 136 // Parse through different ways to send messages 137 // Avoided putting the condition inside the loop to prevent effect on performance 138 139 Destination[] dest = createDestination(destIndex, destCount); 140 141 // Create a producer, if none is created. 142 if (getJmsProducer() == null) { 143 if (dest.length == 1) { 144 createJmsProducer(dest[0]); 145 } else { 146 createJmsProducer(); 147 } 148 } 149 150 try { 151 getConnection().start(); 152 LOG.info("Starting to publish " + client.getMessageSize() + " byte(s) messages for " + duration + " ms"); 153 154 // Send one type of message only, avoiding the creation of different messages on sending 155 if (!client.isCreateNewMsg()) { 156 // Create only a single message 157 createJmsTextMessage(); 158 159 // Send to more than one actual destination 160 if (dest.length > 1) { 161 while (System.currentTimeMillis() < endTime) { 162 for (int j = 0; j < dest.length; j++) { 163 getJmsProducer().send(dest[j], getJmsTextMessage()); 164 incThroughput(); 165 } 166 } 167 // Send to only one actual destination 168 } else { 169 while (System.currentTimeMillis() < endTime) { 170 getJmsProducer().send(getJmsTextMessage()); 171 incThroughput(); 172 } 173 } 174 175 // Send different type of messages using indexing to identify each one. 176 // Message size will vary. Definitely slower, since messages properties 177 // will be set individually each send. 178 } else { 179 // Send to more than one actual destination 180 long count = 1; 181 if (dest.length > 1) { 182 while (System.currentTimeMillis() < endTime) { 183 for (int j = 0; j < dest.length; j++) { 184 getJmsProducer().send(dest[j], createJmsTextMessage("Text Message [" + count++ + "]")); 185 incThroughput(); 186 } 187 } 188 189 // Send to only one actual destination 190 } else { 191 while (System.currentTimeMillis() < endTime) { 192 193 getJmsProducer().send(createJmsTextMessage("Text Message [" + count++ + "]")); 194 incThroughput(); 195 } 196 } 197 } 198 } finally { 199 getConnection().close(); 200 } 201 } 202 203 public MessageProducer createJmsProducer() throws JMSException { 204 jmsProducer = getSession().createProducer(null); 205 if (client.getDeliveryMode().equalsIgnoreCase(JmsProducerProperties.DELIVERY_MODE_PERSISTENT)) { 206 LOG.info("Creating producer to possible multiple destinations with persistent delivery."); 207 jmsProducer.setDeliveryMode(DeliveryMode.PERSISTENT); 208 } else if (client.getDeliveryMode().equalsIgnoreCase(JmsProducerProperties.DELIVERY_MODE_NON_PERSISTENT)) { 209 LOG.info("Creating producer to possible multiple destinations with non-persistent delivery."); 210 jmsProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 211 } else { 212 LOG.warn("Unknown deliveryMode value. Defaulting to non-persistent."); 213 jmsProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 214 } 215 return jmsProducer; 216 } 217 218 public MessageProducer createJmsProducer(Destination dest) throws JMSException { 219 jmsProducer = getSession().createProducer(dest); 220 if (client.getDeliveryMode().equalsIgnoreCase(JmsProducerProperties.DELIVERY_MODE_PERSISTENT)) { 221 LOG.info("Creating producer to: " + dest.toString() + " with persistent delivery."); 222 jmsProducer.setDeliveryMode(DeliveryMode.PERSISTENT); 223 } else if (client.getDeliveryMode().equalsIgnoreCase(JmsProducerProperties.DELIVERY_MODE_NON_PERSISTENT)) { 224 LOG.info("Creating producer to: " + dest.toString() + " with non-persistent delivery."); 225 jmsProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 226 } else { 227 LOG.warn("Unknown deliveryMode value. Defaulting to non-persistent."); 228 jmsProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 229 } 230 return jmsProducer; 231 } 232 233 public MessageProducer getJmsProducer() { 234 return jmsProducer; 235 } 236 237 public TextMessage createJmsTextMessage() throws JMSException { 238 return createJmsTextMessage(client.getMessageSize()); 239 } 240 241 public TextMessage createJmsTextMessage(int size) throws JMSException { 242 jmsTextMessage = getSession().createTextMessage(buildText("", size)); 243 return jmsTextMessage; 244 } 245 246 public TextMessage createJmsTextMessage(String text) throws JMSException { 247 jmsTextMessage = getSession().createTextMessage(buildText(text, client.getMessageSize())); 248 return jmsTextMessage; 249 } 250 251 public TextMessage getJmsTextMessage() { 252 return jmsTextMessage; 253 } 254 255 public JmsClientProperties getClient() { 256 return client; 257 } 258 259 public void setClient(JmsClientProperties clientProps) { 260 client = (JmsProducerProperties)clientProps; 261 } 262 263 protected String buildText(String text, int size) { 264 byte[] data = new byte[size - text.length()]; 265 Arrays.fill(data, (byte) 0); 266 return text + new String(data); 267 } 268 }