001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.advisory; 018 019 import org.apache.activemq.ActiveMQMessageTransformation; 020 import org.apache.activemq.command.ActiveMQDestination; 021 import org.apache.activemq.command.ActiveMQTopic; 022 import javax.jms.Destination; 023 import javax.jms.JMSException; 024 public final class AdvisorySupport { 025 public static final String ADVISORY_TOPIC_PREFIX = "ActiveMQ.Advisory."; 026 public static final ActiveMQTopic CONNECTION_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX 027 + "Connection"); 028 public static final ActiveMQTopic QUEUE_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Queue"); 029 public static final ActiveMQTopic TOPIC_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Topic"); 030 public static final ActiveMQTopic TEMP_QUEUE_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "TempQueue"); 031 public static final ActiveMQTopic TEMP_TOPIC_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "TempTopic"); 032 public static final String PRODUCER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Producer."; 033 public static final String QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Queue."; 034 public static final String TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Topic."; 035 public static final String CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Consumer."; 036 public static final String QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Queue."; 037 public static final String TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Topic."; 038 public static final String EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Topic."; 039 public static final String EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Queue."; 040 public static final String NO_TOPIC_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Topic."; 041 public static final String NO_QUEUE_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Queue."; 042 public static final String SLOW_CONSUMER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "SlowConsumer."; 043 public static final String FAST_PRODUCER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FastPorducer."; 044 public static final String MESSAGE_DISCAREDED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDiscarded."; 045 public static final String FULL_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FULL."; 046 public static final String MESSAGE_DELIVERED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDelivered."; 047 public static final String MESSAGE_CONSUMED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageConsumed."; 048 public static final String MASTER_BROKER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MasterBroker"; 049 public static final String AGENT_TOPIC = "ActiveMQ.Agent"; 050 public static final String ADIVSORY_MESSAGE_TYPE = "Advisory"; 051 public static final String MSG_PROPERTY_ORIGIN_BROKER_ID = "originBrokerId"; 052 public static final String MSG_PROPERTY_ORIGIN_BROKER_NAME = "originBrokerName"; 053 public static final String MSG_PROPERTY_ORIGIN_BROKER_URL = "originBrokerURL"; 054 public static final String MSG_PROPERTY_USAGE_NAME = "usageName"; 055 public static final String MSG_PROPERTY_CONSUMER_ID = "consumerId"; 056 public static final String MSG_PROPERTY_PRODUCER_ID = "producerId"; 057 public static final String MSG_PROPERTY_MESSAGE_ID = "orignalMessageId"; 058 public static final ActiveMQTopic TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic( 059 TEMP_QUEUE_ADVISORY_TOPIC + "," + TEMP_TOPIC_ADVISORY_TOPIC); 060 private static final ActiveMQTopic AGENT_TOPIC_DESTINATION = new ActiveMQTopic(AGENT_TOPIC); 061 062 private AdvisorySupport() { 063 } 064 065 public static ActiveMQTopic getConnectionAdvisoryTopic() { 066 return CONNECTION_ADVISORY_TOPIC; 067 } 068 069 public static ActiveMQTopic getConsumerAdvisoryTopic(Destination destination) throws JMSException { 070 return getConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 071 } 072 073 public static ActiveMQTopic getConsumerAdvisoryTopic(ActiveMQDestination destination) { 074 if (destination.isQueue()) { 075 return new ActiveMQTopic(QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName()); 076 } else { 077 return new ActiveMQTopic(TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName()); 078 } 079 } 080 081 public static ActiveMQTopic getProducerAdvisoryTopic(Destination destination) throws JMSException { 082 return getProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 083 } 084 085 public static ActiveMQTopic getProducerAdvisoryTopic(ActiveMQDestination destination) { 086 if (destination.isQueue()) { 087 return new ActiveMQTopic(QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName()); 088 } else { 089 return new ActiveMQTopic(TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName()); 090 } 091 } 092 093 public static ActiveMQTopic getExpiredMessageTopic(Destination destination) throws JMSException { 094 return getExpiredMessageTopic(ActiveMQMessageTransformation.transformDestination(destination)); 095 } 096 097 public static ActiveMQTopic getExpiredMessageTopic(ActiveMQDestination destination) { 098 if (destination.isQueue()) { 099 return getExpiredQueueMessageAdvisoryTopic(destination); 100 } 101 return getExpiredTopicMessageAdvisoryTopic(destination); 102 } 103 104 public static ActiveMQTopic getExpiredTopicMessageAdvisoryTopic(ActiveMQDestination destination) { 105 String name = EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX + destination.getPhysicalName(); 106 return new ActiveMQTopic(name); 107 } 108 109 public static ActiveMQTopic getExpiredQueueMessageAdvisoryTopic(Destination destination) throws JMSException { 110 return getExpiredQueueMessageAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 111 } 112 113 public static ActiveMQTopic getExpiredQueueMessageAdvisoryTopic(ActiveMQDestination destination) { 114 String name = EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX + destination.getPhysicalName(); 115 return new ActiveMQTopic(name); 116 } 117 118 public static ActiveMQTopic getNoTopicConsumersAdvisoryTopic(Destination destination) throws JMSException { 119 return getNoTopicConsumersAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 120 } 121 122 public static ActiveMQTopic getNoTopicConsumersAdvisoryTopic(ActiveMQDestination destination) { 123 String name = NO_TOPIC_CONSUMERS_TOPIC_PREFIX + destination.getPhysicalName(); 124 return new ActiveMQTopic(name); 125 } 126 127 public static ActiveMQTopic getNoQueueConsumersAdvisoryTopic(Destination destination) throws JMSException { 128 return getNoQueueConsumersAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 129 } 130 131 public static ActiveMQTopic getNoQueueConsumersAdvisoryTopic(ActiveMQDestination destination) { 132 String name = NO_QUEUE_CONSUMERS_TOPIC_PREFIX + destination.getPhysicalName(); 133 return new ActiveMQTopic(name); 134 } 135 136 public static ActiveMQTopic getSlowConsumerAdvisoryTopic(Destination destination) throws JMSException { 137 return getSlowConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 138 } 139 140 public static ActiveMQTopic getSlowConsumerAdvisoryTopic(ActiveMQDestination destination) { 141 String name = SLOW_CONSUMER_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "." 142 + destination.getPhysicalName(); 143 return new ActiveMQTopic(name); 144 } 145 146 public static ActiveMQTopic getFastProducerAdvisoryTopic(Destination destination) throws JMSException { 147 return getFastProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 148 } 149 150 public static ActiveMQTopic getFastProducerAdvisoryTopic(ActiveMQDestination destination) { 151 String name = FAST_PRODUCER_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "." 152 + destination.getPhysicalName(); 153 return new ActiveMQTopic(name); 154 } 155 156 public static ActiveMQTopic getMessageDiscardedAdvisoryTopic(Destination destination) throws JMSException { 157 return getMessageDiscardedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 158 } 159 160 public static ActiveMQTopic getMessageDiscardedAdvisoryTopic(ActiveMQDestination destination) { 161 String name = MESSAGE_DISCAREDED_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "." 162 + destination.getPhysicalName(); 163 return new ActiveMQTopic(name); 164 } 165 166 public static ActiveMQTopic getMessageDeliveredAdvisoryTopic(Destination destination) throws JMSException { 167 return getMessageDeliveredAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 168 } 169 170 public static ActiveMQTopic getMessageDeliveredAdvisoryTopic(ActiveMQDestination destination) { 171 String name = MESSAGE_DELIVERED_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "." 172 + destination.getPhysicalName(); 173 return new ActiveMQTopic(name); 174 } 175 176 public static ActiveMQTopic getMessageConsumedAdvisoryTopic(Destination destination) throws JMSException { 177 return getMessageConsumedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 178 } 179 180 public static ActiveMQTopic getMessageConsumedAdvisoryTopic(ActiveMQDestination destination) { 181 String name = MESSAGE_CONSUMED_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "." 182 + destination.getPhysicalName(); 183 return new ActiveMQTopic(name); 184 } 185 186 public static ActiveMQTopic getMasterBrokerAdvisoryTopic(Destination destination) throws JMSException { 187 return getMasterBrokerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 188 } 189 190 public static ActiveMQTopic getMasterBrokerAdvisoryTopic() { 191 return new ActiveMQTopic(MASTER_BROKER_TOPIC_PREFIX); 192 } 193 194 public static ActiveMQTopic getFullAdvisoryTopic(Destination destination) throws JMSException { 195 return getFullAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 196 } 197 198 public static ActiveMQTopic getFullAdvisoryTopic(ActiveMQDestination destination) { 199 String name = FULL_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "." 200 + destination.getPhysicalName(); 201 return new ActiveMQTopic(name); 202 } 203 204 public static ActiveMQTopic getDestinationAdvisoryTopic(Destination destination) throws JMSException { 205 return getDestinationAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 206 } 207 208 public static ActiveMQTopic getDestinationAdvisoryTopic(ActiveMQDestination destination) { 209 switch (destination.getDestinationType()) { 210 case ActiveMQDestination.QUEUE_TYPE: 211 return QUEUE_ADVISORY_TOPIC; 212 case ActiveMQDestination.TOPIC_TYPE: 213 return TOPIC_ADVISORY_TOPIC; 214 case ActiveMQDestination.TEMP_QUEUE_TYPE: 215 return TEMP_QUEUE_ADVISORY_TOPIC; 216 case ActiveMQDestination.TEMP_TOPIC_TYPE: 217 return TEMP_TOPIC_ADVISORY_TOPIC; 218 default: 219 throw new RuntimeException("Unknown destination type: " + destination.getDestinationType()); 220 } 221 } 222 223 public static boolean isDestinationAdvisoryTopic(Destination destination) throws JMSException { 224 return isDestinationAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 225 } 226 227 public static boolean isDestinationAdvisoryTopic(ActiveMQDestination destination) { 228 if (destination.isComposite()) { 229 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 230 for (int i = 0; i < compositeDestinations.length; i++) { 231 if (isDestinationAdvisoryTopic(compositeDestinations[i])) { 232 return true; 233 } 234 } 235 return false; 236 } else { 237 return destination.equals(TEMP_QUEUE_ADVISORY_TOPIC) || destination.equals(TEMP_TOPIC_ADVISORY_TOPIC) 238 || destination.equals(QUEUE_ADVISORY_TOPIC) || destination.equals(TOPIC_ADVISORY_TOPIC); 239 } 240 } 241 242 public static boolean isAdvisoryTopic(Destination destination) throws JMSException { 243 return isAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 244 } 245 246 public static boolean isAdvisoryTopic(ActiveMQDestination destination) { 247 if (destination.isComposite()) { 248 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 249 for (int i = 0; i < compositeDestinations.length; i++) { 250 if (isAdvisoryTopic(compositeDestinations[i])) { 251 return true; 252 } 253 } 254 return false; 255 } else { 256 return destination.isTopic() && destination.getPhysicalName().startsWith(ADVISORY_TOPIC_PREFIX); 257 } 258 } 259 260 public static boolean isConnectionAdvisoryTopic(Destination destination) throws JMSException { 261 return isConnectionAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 262 } 263 264 public static boolean isConnectionAdvisoryTopic(ActiveMQDestination destination) { 265 if (destination.isComposite()) { 266 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 267 for (int i = 0; i < compositeDestinations.length; i++) { 268 if (isConnectionAdvisoryTopic(compositeDestinations[i])) { 269 return true; 270 } 271 } 272 return false; 273 } else { 274 return destination.equals(CONNECTION_ADVISORY_TOPIC); 275 } 276 } 277 278 public static boolean isProducerAdvisoryTopic(Destination destination) throws JMSException { 279 return isProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 280 } 281 282 public static boolean isProducerAdvisoryTopic(ActiveMQDestination destination) { 283 if (destination.isComposite()) { 284 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 285 for (int i = 0; i < compositeDestinations.length; i++) { 286 if (isProducerAdvisoryTopic(compositeDestinations[i])) { 287 return true; 288 } 289 } 290 return false; 291 } else { 292 return destination.isTopic() && destination.getPhysicalName().startsWith(PRODUCER_ADVISORY_TOPIC_PREFIX); 293 } 294 } 295 296 public static boolean isConsumerAdvisoryTopic(Destination destination) throws JMSException { 297 return isConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 298 } 299 300 public static boolean isConsumerAdvisoryTopic(ActiveMQDestination destination) { 301 if (destination.isComposite()) { 302 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 303 for (int i = 0; i < compositeDestinations.length; i++) { 304 if (isConsumerAdvisoryTopic(compositeDestinations[i])) { 305 return true; 306 } 307 } 308 return false; 309 } else { 310 return destination.isTopic() && destination.getPhysicalName().startsWith(CONSUMER_ADVISORY_TOPIC_PREFIX); 311 } 312 } 313 314 public static boolean isSlowConsumerAdvisoryTopic(Destination destination) throws JMSException { 315 return isSlowConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 316 } 317 318 public static boolean isSlowConsumerAdvisoryTopic(ActiveMQDestination destination) { 319 if (destination.isComposite()) { 320 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 321 for (int i = 0; i < compositeDestinations.length; i++) { 322 if (isSlowConsumerAdvisoryTopic(compositeDestinations[i])) { 323 return true; 324 } 325 } 326 return false; 327 } else { 328 return destination.isTopic() && destination.getPhysicalName().startsWith(SLOW_CONSUMER_TOPIC_PREFIX); 329 } 330 } 331 332 public static boolean isFastProducerAdvisoryTopic(Destination destination) throws JMSException { 333 return isFastProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 334 } 335 336 public static boolean isFastProducerAdvisoryTopic(ActiveMQDestination destination) { 337 if (destination.isComposite()) { 338 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 339 for (int i = 0; i < compositeDestinations.length; i++) { 340 if (isFastProducerAdvisoryTopic(compositeDestinations[i])) { 341 return true; 342 } 343 } 344 return false; 345 } else { 346 return destination.isTopic() && destination.getPhysicalName().startsWith(FAST_PRODUCER_TOPIC_PREFIX); 347 } 348 } 349 350 public static boolean isMessageConsumedAdvisoryTopic(Destination destination) throws JMSException { 351 return isMessageConsumedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 352 } 353 354 public static boolean isMessageConsumedAdvisoryTopic(ActiveMQDestination destination) { 355 if (destination.isComposite()) { 356 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 357 for (int i = 0; i < compositeDestinations.length; i++) { 358 if (isMessageConsumedAdvisoryTopic(compositeDestinations[i])) { 359 return true; 360 } 361 } 362 return false; 363 } else { 364 return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_CONSUMED_TOPIC_PREFIX); 365 } 366 } 367 368 public static boolean isMasterBrokerAdvisoryTopic(Destination destination) throws JMSException { 369 return isMasterBrokerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 370 } 371 372 public static boolean isMasterBrokerAdvisoryTopic(ActiveMQDestination destination) { 373 if (destination.isComposite()) { 374 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 375 for (int i = 0; i < compositeDestinations.length; i++) { 376 if (isMasterBrokerAdvisoryTopic(compositeDestinations[i])) { 377 return true; 378 } 379 } 380 return false; 381 } else { 382 return destination.isTopic() && destination.getPhysicalName().startsWith(MASTER_BROKER_TOPIC_PREFIX); 383 } 384 } 385 386 public static boolean isMessageDeliveredAdvisoryTopic(Destination destination) throws JMSException { 387 return isMessageDeliveredAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 388 } 389 390 public static boolean isMessageDeliveredAdvisoryTopic(ActiveMQDestination destination) { 391 if (destination.isComposite()) { 392 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 393 for (int i = 0; i < compositeDestinations.length; i++) { 394 if (isMessageDeliveredAdvisoryTopic(compositeDestinations[i])) { 395 return true; 396 } 397 } 398 return false; 399 } else { 400 return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_DELIVERED_TOPIC_PREFIX); 401 } 402 } 403 404 public static boolean isMessageDiscardedAdvisoryTopic(Destination destination) throws JMSException { 405 return isMessageDiscardedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 406 } 407 408 public static boolean isMessageDiscardedAdvisoryTopic(ActiveMQDestination destination) { 409 if (destination.isComposite()) { 410 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 411 for (int i = 0; i < compositeDestinations.length; i++) { 412 if (isMessageDiscardedAdvisoryTopic(compositeDestinations[i])) { 413 return true; 414 } 415 } 416 return false; 417 } else { 418 return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_DISCAREDED_TOPIC_PREFIX); 419 } 420 } 421 422 public static boolean isFullAdvisoryTopic(Destination destination) throws JMSException { 423 return isFullAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 424 } 425 426 public static boolean isFullAdvisoryTopic(ActiveMQDestination destination) { 427 if (destination.isComposite()) { 428 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 429 for (int i = 0; i < compositeDestinations.length; i++) { 430 if (isFullAdvisoryTopic(compositeDestinations[i])) { 431 return true; 432 } 433 } 434 return false; 435 } else { 436 return destination.isTopic() && destination.getPhysicalName().startsWith(FULL_TOPIC_PREFIX); 437 } 438 } 439 440 /** 441 * Returns the agent topic which is used to send commands to the broker 442 */ 443 public static Destination getAgentDestination() { 444 return AGENT_TOPIC_DESTINATION; 445 } 446 }