001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.advisory;
018    
019    import org.apache.activemq.ActiveMQMessageTransformation;
020    import org.apache.activemq.command.ActiveMQDestination;
021    import org.apache.activemq.command.ActiveMQTopic;
022    import javax.jms.Destination;
023    import javax.jms.JMSException;
024    public final class AdvisorySupport {
025        public static final String ADVISORY_TOPIC_PREFIX = "ActiveMQ.Advisory.";
026        public static final ActiveMQTopic CONNECTION_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX
027                + "Connection");
028        public static final ActiveMQTopic QUEUE_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Queue");
029        public static final ActiveMQTopic TOPIC_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Topic");
030        public static final ActiveMQTopic TEMP_QUEUE_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "TempQueue");
031        public static final ActiveMQTopic TEMP_TOPIC_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "TempTopic");
032        public static final String PRODUCER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Producer.";
033        public static final String QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Queue.";
034        public static final String TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Topic.";
035        public static final String CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Consumer.";
036        public static final String QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Queue.";
037        public static final String TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Topic.";
038        public static final String EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Topic.";
039        public static final String EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Queue.";
040        public static final String NO_TOPIC_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Topic.";
041        public static final String NO_QUEUE_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Queue.";
042        public static final String SLOW_CONSUMER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "SlowConsumer.";
043        public static final String FAST_PRODUCER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FastPorducer.";
044        public static final String MESSAGE_DISCAREDED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDiscarded.";
045        public static final String FULL_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FULL.";
046        public static final String MESSAGE_DELIVERED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDelivered.";
047        public static final String MESSAGE_CONSUMED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageConsumed.";
048        public static final String MASTER_BROKER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MasterBroker";
049        public static final String AGENT_TOPIC = "ActiveMQ.Agent";
050        public static final String ADIVSORY_MESSAGE_TYPE = "Advisory";
051        public static final String MSG_PROPERTY_ORIGIN_BROKER_ID = "originBrokerId";
052        public static final String MSG_PROPERTY_ORIGIN_BROKER_NAME = "originBrokerName";
053        public static final String MSG_PROPERTY_ORIGIN_BROKER_URL = "originBrokerURL";
054        public static final String MSG_PROPERTY_USAGE_NAME = "usageName";
055        public static final String MSG_PROPERTY_CONSUMER_ID = "consumerId";
056        public static final String MSG_PROPERTY_PRODUCER_ID = "producerId";
057        public static final String MSG_PROPERTY_MESSAGE_ID = "orignalMessageId";
058        public static final ActiveMQTopic TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(
059                TEMP_QUEUE_ADVISORY_TOPIC + "," + TEMP_TOPIC_ADVISORY_TOPIC);
060        private static final ActiveMQTopic AGENT_TOPIC_DESTINATION = new ActiveMQTopic(AGENT_TOPIC);
061    
062        private AdvisorySupport() {
063        }
064    
065        public static ActiveMQTopic getConnectionAdvisoryTopic() {
066            return CONNECTION_ADVISORY_TOPIC;
067        }
068    
069        public static ActiveMQTopic getConsumerAdvisoryTopic(Destination destination) throws JMSException {
070            return getConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
071        }
072    
073        public static ActiveMQTopic getConsumerAdvisoryTopic(ActiveMQDestination destination) {
074            if (destination.isQueue()) {
075                return new ActiveMQTopic(QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
076            } else {
077                return new ActiveMQTopic(TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
078            }
079        }
080    
081        public static ActiveMQTopic getProducerAdvisoryTopic(Destination destination) throws JMSException {
082            return getProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
083        }
084    
085        public static ActiveMQTopic getProducerAdvisoryTopic(ActiveMQDestination destination) {
086            if (destination.isQueue()) {
087                return new ActiveMQTopic(QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
088            } else {
089                return new ActiveMQTopic(TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
090            }
091        }
092    
093        public static ActiveMQTopic getExpiredMessageTopic(Destination destination) throws JMSException {
094            return getExpiredMessageTopic(ActiveMQMessageTransformation.transformDestination(destination));
095        }
096    
097        public static ActiveMQTopic getExpiredMessageTopic(ActiveMQDestination destination) {
098            if (destination.isQueue()) {
099                return getExpiredQueueMessageAdvisoryTopic(destination);
100            }
101            return getExpiredTopicMessageAdvisoryTopic(destination);
102        }
103    
104        public static ActiveMQTopic getExpiredTopicMessageAdvisoryTopic(ActiveMQDestination destination) {
105            String name = EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX + destination.getPhysicalName();
106            return new ActiveMQTopic(name);
107        }
108    
109        public static ActiveMQTopic getExpiredQueueMessageAdvisoryTopic(Destination destination) throws JMSException {
110            return getExpiredQueueMessageAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
111        }
112    
113        public static ActiveMQTopic getExpiredQueueMessageAdvisoryTopic(ActiveMQDestination destination) {
114            String name = EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX + destination.getPhysicalName();
115            return new ActiveMQTopic(name);
116        }
117    
118        public static ActiveMQTopic getNoTopicConsumersAdvisoryTopic(Destination destination) throws JMSException {
119            return getNoTopicConsumersAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
120        }
121    
122        public static ActiveMQTopic getNoTopicConsumersAdvisoryTopic(ActiveMQDestination destination) {
123            String name = NO_TOPIC_CONSUMERS_TOPIC_PREFIX + destination.getPhysicalName();
124            return new ActiveMQTopic(name);
125        }
126    
127        public static ActiveMQTopic getNoQueueConsumersAdvisoryTopic(Destination destination) throws JMSException {
128            return getNoQueueConsumersAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
129        }
130    
131        public static ActiveMQTopic getNoQueueConsumersAdvisoryTopic(ActiveMQDestination destination) {
132            String name = NO_QUEUE_CONSUMERS_TOPIC_PREFIX + destination.getPhysicalName();
133            return new ActiveMQTopic(name);
134        }
135    
136        public static ActiveMQTopic getSlowConsumerAdvisoryTopic(Destination destination) throws JMSException {
137            return getSlowConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
138        }
139    
140        public static ActiveMQTopic getSlowConsumerAdvisoryTopic(ActiveMQDestination destination) {
141            String name = SLOW_CONSUMER_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
142                    + destination.getPhysicalName();
143            return new ActiveMQTopic(name);
144        }
145    
146        public static ActiveMQTopic getFastProducerAdvisoryTopic(Destination destination) throws JMSException {
147            return getFastProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
148        }
149    
150        public static ActiveMQTopic getFastProducerAdvisoryTopic(ActiveMQDestination destination) {
151            String name = FAST_PRODUCER_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
152                    + destination.getPhysicalName();
153            return new ActiveMQTopic(name);
154        }
155    
156        public static ActiveMQTopic getMessageDiscardedAdvisoryTopic(Destination destination) throws JMSException {
157            return getMessageDiscardedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
158        }
159    
160        public static ActiveMQTopic getMessageDiscardedAdvisoryTopic(ActiveMQDestination destination) {
161            String name = MESSAGE_DISCAREDED_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
162                    + destination.getPhysicalName();
163            return new ActiveMQTopic(name);
164        }
165    
166        public static ActiveMQTopic getMessageDeliveredAdvisoryTopic(Destination destination) throws JMSException {
167            return getMessageDeliveredAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
168        }
169    
170        public static ActiveMQTopic getMessageDeliveredAdvisoryTopic(ActiveMQDestination destination) {
171            String name = MESSAGE_DELIVERED_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
172                    + destination.getPhysicalName();
173            return new ActiveMQTopic(name);
174        }
175    
176        public static ActiveMQTopic getMessageConsumedAdvisoryTopic(Destination destination) throws JMSException {
177            return getMessageConsumedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
178        }
179    
180        public static ActiveMQTopic getMessageConsumedAdvisoryTopic(ActiveMQDestination destination) {
181            String name = MESSAGE_CONSUMED_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
182                    + destination.getPhysicalName();
183            return new ActiveMQTopic(name);
184        }
185    
186        public static ActiveMQTopic getMasterBrokerAdvisoryTopic(Destination destination) throws JMSException {
187            return getMasterBrokerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
188        }
189    
190        public static ActiveMQTopic getMasterBrokerAdvisoryTopic() {
191            return new ActiveMQTopic(MASTER_BROKER_TOPIC_PREFIX);
192        }
193    
194        public static ActiveMQTopic getFullAdvisoryTopic(Destination destination) throws JMSException {
195            return getFullAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
196        }
197    
198        public static ActiveMQTopic getFullAdvisoryTopic(ActiveMQDestination destination) {
199            String name = FULL_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
200                    + destination.getPhysicalName();
201            return new ActiveMQTopic(name);
202        }
203    
204        public static ActiveMQTopic getDestinationAdvisoryTopic(Destination destination) throws JMSException {
205            return getDestinationAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
206        }
207    
208        public static ActiveMQTopic getDestinationAdvisoryTopic(ActiveMQDestination destination) {
209            switch (destination.getDestinationType()) {
210            case ActiveMQDestination.QUEUE_TYPE:
211                return QUEUE_ADVISORY_TOPIC;
212            case ActiveMQDestination.TOPIC_TYPE:
213                return TOPIC_ADVISORY_TOPIC;
214            case ActiveMQDestination.TEMP_QUEUE_TYPE:
215                return TEMP_QUEUE_ADVISORY_TOPIC;
216            case ActiveMQDestination.TEMP_TOPIC_TYPE:
217                return TEMP_TOPIC_ADVISORY_TOPIC;
218            default:
219                throw new RuntimeException("Unknown destination type: " + destination.getDestinationType());
220            }
221        }
222    
223        public static boolean isDestinationAdvisoryTopic(Destination destination) throws JMSException {
224            return isDestinationAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
225        }
226    
227        public static boolean isDestinationAdvisoryTopic(ActiveMQDestination destination) {
228            if (destination.isComposite()) {
229                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
230                for (int i = 0; i < compositeDestinations.length; i++) {
231                    if (isDestinationAdvisoryTopic(compositeDestinations[i])) {
232                        return true;
233                    }
234                }
235                return false;
236            } else {
237                return destination.equals(TEMP_QUEUE_ADVISORY_TOPIC) || destination.equals(TEMP_TOPIC_ADVISORY_TOPIC)
238                        || destination.equals(QUEUE_ADVISORY_TOPIC) || destination.equals(TOPIC_ADVISORY_TOPIC);
239            }
240        }
241    
242        public static boolean isAdvisoryTopic(Destination destination) throws JMSException {
243            return isAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
244        }
245    
246        public static boolean isAdvisoryTopic(ActiveMQDestination destination) {
247            if (destination.isComposite()) {
248                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
249                for (int i = 0; i < compositeDestinations.length; i++) {
250                    if (isAdvisoryTopic(compositeDestinations[i])) {
251                        return true;
252                    }
253                }
254                return false;
255            } else {
256                return destination.isTopic() && destination.getPhysicalName().startsWith(ADVISORY_TOPIC_PREFIX);
257            }
258        }
259    
260        public static boolean isConnectionAdvisoryTopic(Destination destination) throws JMSException {
261            return isConnectionAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
262        }
263    
264        public static boolean isConnectionAdvisoryTopic(ActiveMQDestination destination) {
265            if (destination.isComposite()) {
266                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
267                for (int i = 0; i < compositeDestinations.length; i++) {
268                    if (isConnectionAdvisoryTopic(compositeDestinations[i])) {
269                        return true;
270                    }
271                }
272                return false;
273            } else {
274                return destination.equals(CONNECTION_ADVISORY_TOPIC);
275            }
276        }
277    
278        public static boolean isProducerAdvisoryTopic(Destination destination) throws JMSException {
279            return isProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
280        }
281    
282        public static boolean isProducerAdvisoryTopic(ActiveMQDestination destination) {
283            if (destination.isComposite()) {
284                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
285                for (int i = 0; i < compositeDestinations.length; i++) {
286                    if (isProducerAdvisoryTopic(compositeDestinations[i])) {
287                        return true;
288                    }
289                }
290                return false;
291            } else {
292                return destination.isTopic() && destination.getPhysicalName().startsWith(PRODUCER_ADVISORY_TOPIC_PREFIX);
293            }
294        }
295    
296        public static boolean isConsumerAdvisoryTopic(Destination destination) throws JMSException {
297            return isConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
298        }
299    
300        public static boolean isConsumerAdvisoryTopic(ActiveMQDestination destination) {
301            if (destination.isComposite()) {
302                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
303                for (int i = 0; i < compositeDestinations.length; i++) {
304                    if (isConsumerAdvisoryTopic(compositeDestinations[i])) {
305                        return true;
306                    }
307                }
308                return false;
309            } else {
310                return destination.isTopic() && destination.getPhysicalName().startsWith(CONSUMER_ADVISORY_TOPIC_PREFIX);
311            }
312        }
313    
314        public static boolean isSlowConsumerAdvisoryTopic(Destination destination) throws JMSException {
315            return isSlowConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
316        }
317    
318        public static boolean isSlowConsumerAdvisoryTopic(ActiveMQDestination destination) {
319            if (destination.isComposite()) {
320                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
321                for (int i = 0; i < compositeDestinations.length; i++) {
322                    if (isSlowConsumerAdvisoryTopic(compositeDestinations[i])) {
323                        return true;
324                    }
325                }
326                return false;
327            } else {
328                return destination.isTopic() && destination.getPhysicalName().startsWith(SLOW_CONSUMER_TOPIC_PREFIX);
329            }
330        }
331    
332        public static boolean isFastProducerAdvisoryTopic(Destination destination) throws JMSException {
333            return isFastProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
334        }
335    
336        public static boolean isFastProducerAdvisoryTopic(ActiveMQDestination destination) {
337            if (destination.isComposite()) {
338                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
339                for (int i = 0; i < compositeDestinations.length; i++) {
340                    if (isFastProducerAdvisoryTopic(compositeDestinations[i])) {
341                        return true;
342                    }
343                }
344                return false;
345            } else {
346                return destination.isTopic() && destination.getPhysicalName().startsWith(FAST_PRODUCER_TOPIC_PREFIX);
347            }
348        }
349    
350        public static boolean isMessageConsumedAdvisoryTopic(Destination destination) throws JMSException {
351            return isMessageConsumedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
352        }
353    
354        public static boolean isMessageConsumedAdvisoryTopic(ActiveMQDestination destination) {
355            if (destination.isComposite()) {
356                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
357                for (int i = 0; i < compositeDestinations.length; i++) {
358                    if (isMessageConsumedAdvisoryTopic(compositeDestinations[i])) {
359                        return true;
360                    }
361                }
362                return false;
363            } else {
364                return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_CONSUMED_TOPIC_PREFIX);
365            }
366        }
367    
368        public static boolean isMasterBrokerAdvisoryTopic(Destination destination) throws JMSException {
369            return isMasterBrokerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
370        }
371    
372        public static boolean isMasterBrokerAdvisoryTopic(ActiveMQDestination destination) {
373            if (destination.isComposite()) {
374                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
375                for (int i = 0; i < compositeDestinations.length; i++) {
376                    if (isMasterBrokerAdvisoryTopic(compositeDestinations[i])) {
377                        return true;
378                    }
379                }
380                return false;
381            } else {
382                return destination.isTopic() && destination.getPhysicalName().startsWith(MASTER_BROKER_TOPIC_PREFIX);
383            }
384        }
385    
386        public static boolean isMessageDeliveredAdvisoryTopic(Destination destination) throws JMSException {
387            return isMessageDeliveredAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
388        }
389    
390        public static boolean isMessageDeliveredAdvisoryTopic(ActiveMQDestination destination) {
391            if (destination.isComposite()) {
392                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
393                for (int i = 0; i < compositeDestinations.length; i++) {
394                    if (isMessageDeliveredAdvisoryTopic(compositeDestinations[i])) {
395                        return true;
396                    }
397                }
398                return false;
399            } else {
400                return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_DELIVERED_TOPIC_PREFIX);
401            }
402        }
403    
404        public static boolean isMessageDiscardedAdvisoryTopic(Destination destination) throws JMSException {
405            return isMessageDiscardedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
406        }
407    
408        public static boolean isMessageDiscardedAdvisoryTopic(ActiveMQDestination destination) {
409            if (destination.isComposite()) {
410                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
411                for (int i = 0; i < compositeDestinations.length; i++) {
412                    if (isMessageDiscardedAdvisoryTopic(compositeDestinations[i])) {
413                        return true;
414                    }
415                }
416                return false;
417            } else {
418                return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_DISCAREDED_TOPIC_PREFIX);
419            }
420        }
421    
422        public static boolean isFullAdvisoryTopic(Destination destination) throws JMSException {
423            return isFullAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
424        }
425    
426        public static boolean isFullAdvisoryTopic(ActiveMQDestination destination) {
427            if (destination.isComposite()) {
428                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
429                for (int i = 0; i < compositeDestinations.length; i++) {
430                    if (isFullAdvisoryTopic(compositeDestinations[i])) {
431                        return true;
432                    }
433                }
434                return false;
435            } else {
436                return destination.isTopic() && destination.getPhysicalName().startsWith(FULL_TOPIC_PREFIX);
437            }
438        }
439    
440        /**
441         * Returns the agent topic which is used to send commands to the broker
442         */
443        public static Destination getAgentDestination() {
444            return AGENT_TOPIC_DESTINATION;
445        }
446    }