001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.advisory;
018    
019    import java.util.Iterator;
020    import java.util.Map;
021    import java.util.Set;
022    import java.util.concurrent.ConcurrentHashMap;
023    
024    import org.apache.activemq.broker.Broker;
025    import org.apache.activemq.broker.BrokerFilter;
026    import org.apache.activemq.broker.ConnectionContext;
027    import org.apache.activemq.broker.ProducerBrokerExchange;
028    import org.apache.activemq.broker.region.Destination;
029    import org.apache.activemq.broker.region.MessageReference;
030    import org.apache.activemq.broker.region.Subscription;
031    import org.apache.activemq.command.ActiveMQDestination;
032    import org.apache.activemq.command.ActiveMQMessage;
033    import org.apache.activemq.command.ActiveMQTopic;
034    import org.apache.activemq.command.Command;
035    import org.apache.activemq.command.ConnectionId;
036    import org.apache.activemq.command.ConnectionInfo;
037    import org.apache.activemq.command.ConsumerId;
038    import org.apache.activemq.command.ConsumerInfo;
039    import org.apache.activemq.command.DestinationInfo;
040    import org.apache.activemq.command.Message;
041    import org.apache.activemq.command.MessageId;
042    import org.apache.activemq.command.ProducerId;
043    import org.apache.activemq.command.ProducerInfo;
044    import org.apache.activemq.security.SecurityContext;
045    import org.apache.activemq.state.ProducerState;
046    import org.apache.activemq.usage.Usage;
047    import org.apache.activemq.util.IdGenerator;
048    import org.apache.activemq.util.LongSequenceGenerator;
049    import org.apache.commons.logging.Log;
050    import org.apache.commons.logging.LogFactory;
051    
052    /**
053     * This broker filter handles tracking the state of the broker for purposes of
054     * publishing advisory messages to advisory consumers.
055     * 
056     * @version $Revision$
057     */
058    public class AdvisoryBroker extends BrokerFilter {
059    
060        private static final Log LOG = LogFactory.getLog(AdvisoryBroker.class);
061        private static final IdGenerator ID_GENERATOR = new IdGenerator();
062    
063        protected final ConcurrentHashMap<ConnectionId, ConnectionInfo> connections = new ConcurrentHashMap<ConnectionId, ConnectionInfo>();
064        protected final ConcurrentHashMap<ConsumerId, ConsumerInfo> consumers = new ConcurrentHashMap<ConsumerId, ConsumerInfo>();
065        protected final ConcurrentHashMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId, ProducerInfo>();
066        protected final ConcurrentHashMap<ActiveMQDestination, DestinationInfo> destinations = new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>();
067        protected final ProducerId advisoryProducerId = new ProducerId();
068        
069        private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
070        
071        public AdvisoryBroker(Broker next) {
072            super(next);
073            advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
074        }
075    
076        public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
077            super.addConnection(context, info);
078    
079            ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
080            //do not distribute usernames or passwords in advisory
081            ConnectionInfo copy = info.copy();
082            copy.setUserName("");
083            copy.setPassword("");
084            fireAdvisory(context, topic, copy);
085            connections.put(copy.getConnectionId(), copy);
086        }
087    
088        public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
089            Subscription answer = super.addConsumer(context, info);
090            
091            // Don't advise advisory topics.
092            if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
093                ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
094                consumers.put(info.getConsumerId(), info);
095                fireConsumerAdvisory(context, info.getDestination(), topic, info);
096            } else {
097                // We need to replay all the previously collected state objects
098                // for this newly added consumer.
099                if (AdvisorySupport.isConnectionAdvisoryTopic(info.getDestination())) {
100                    // Replay the connections.
101                    for (Iterator<ConnectionInfo> iter = connections.values().iterator(); iter.hasNext();) {
102                        ConnectionInfo value = iter.next();
103                        ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
104                        fireAdvisory(context, topic, value, info.getConsumerId());
105                    }
106                }
107    
108                // We need to replay all the previously collected destination
109                // objects
110                // for this newly added consumer.
111                if (AdvisorySupport.isDestinationAdvisoryTopic(info.getDestination())) {
112                    // Replay the destinations.
113                    for (Iterator<DestinationInfo> iter = destinations.values().iterator(); iter.hasNext();) {
114                        DestinationInfo value = iter.next();
115                        ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(value.getDestination());
116                        fireAdvisory(context, topic, value, info.getConsumerId());
117                    }
118                }
119    
120                // Replay the producers.
121                if (AdvisorySupport.isProducerAdvisoryTopic(info.getDestination())) {
122                    for (Iterator<ProducerInfo> iter = producers.values().iterator(); iter.hasNext();) {
123                        ProducerInfo value = iter.next();
124                        ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(value.getDestination());
125                        fireProducerAdvisory(context, value.getDestination(),topic, value, info.getConsumerId());
126                    }
127                }
128    
129                // Replay the consumers.
130                if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())) {
131                    for (Iterator<ConsumerInfo> iter = consumers.values().iterator(); iter.hasNext();) {
132                        ConsumerInfo value = iter.next();
133                        ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination());
134                        fireConsumerAdvisory(context,value.getDestination(), topic, value, info.getConsumerId());
135                    }
136                }
137            }
138            return answer;
139        }
140    
141        public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
142            super.addProducer(context, info);
143    
144            // Don't advise advisory topics.
145            if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
146                ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(info.getDestination());
147                fireProducerAdvisory(context, info.getDestination(), topic, info);
148                producers.put(info.getProducerId(), info);
149            }
150        }
151    
152        public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
153            Destination answer = super.addDestination(context, destination);
154            if (!AdvisorySupport.isAdvisoryTopic(destination)) {
155                DestinationInfo info = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination);
156                DestinationInfo previous = destinations.putIfAbsent(destination, info);
157                if( previous==null ) {
158                    ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
159                    fireAdvisory(context, topic, info);
160                }
161            }
162            return answer;
163        }
164    
165        public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
166            ActiveMQDestination destination = info.getDestination();
167            next.addDestinationInfo(context, info);
168    
169            if (!AdvisorySupport.isAdvisoryTopic(destination)) {
170                DestinationInfo previous = destinations.putIfAbsent(destination, info);
171                if( previous==null ) {
172                    ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
173                    fireAdvisory(context, topic, info);
174                }
175            }
176        }
177    
178        public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
179            super.removeDestination(context, destination, timeout);
180            DestinationInfo info = destinations.remove(destination);
181            if (info != null) {
182                info.setDestination(destination);
183                info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
184                ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
185                fireAdvisory(context, topic, info);
186                try {
187                    next.removeDestination(context, AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()), -1);
188                } catch (Exception expectedIfDestinationDidNotExistYet) {                
189                }
190                try {
191                    next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()), -1);
192                } catch (Exception expectedIfDestinationDidNotExistYet) {
193                }
194            }
195    
196        }
197    
198        public void removeDestinationInfo(ConnectionContext context, DestinationInfo destInfo) throws Exception {
199            super.removeDestinationInfo(context, destInfo);   
200            DestinationInfo info = destinations.remove(destInfo.getDestination());
201            if (info != null) {
202                info.setDestination(destInfo.getDestination());
203                info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
204                ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destInfo.getDestination());
205                fireAdvisory(context, topic, info);
206                try {
207                    next.removeDestination(context, AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()), -1);
208                } catch (Exception expectedIfDestinationDidNotExistYet) {
209                }
210                try {
211                    next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()), -1);
212                
213                } catch (Exception expectedIfDestinationDidNotExistYet) {
214                }
215            }
216    
217        }
218    
219        public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
220            super.removeConnection(context, info, error);
221    
222            ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
223            fireAdvisory(context, topic, info.createRemoveCommand());
224            connections.remove(info.getConnectionId());
225        }
226    
227        public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
228            super.removeConsumer(context, info);
229    
230            // Don't advise advisory topics.
231            ActiveMQDestination dest = info.getDestination();
232            if (!AdvisorySupport.isAdvisoryTopic(dest)) {
233                ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest);
234                consumers.remove(info.getConsumerId());
235                if (!dest.isTemporary() || destinations.contains(dest)) {
236                    fireConsumerAdvisory(context,dest, topic, info.createRemoveCommand());
237                }
238            }
239        }
240    
241        public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
242            super.removeProducer(context, info);
243    
244            // Don't advise advisory topics.
245            ActiveMQDestination dest = info.getDestination();
246            if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(dest)) {
247                ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(dest);
248                producers.remove(info.getProducerId());
249                if (!dest.isTemporary() || destinations.contains(dest)) {
250                    fireProducerAdvisory(context, dest,topic, info.createRemoveCommand());
251                }
252            }
253        }
254    
255        public void messageExpired(ConnectionContext context, MessageReference messageReference) {
256            super.messageExpired(context, messageReference);
257            try {
258                if(!messageReference.isAdvisory()) {
259                    ActiveMQTopic topic = AdvisorySupport.getExpiredMessageTopic(messageReference.getMessage().getDestination());
260                    Message payload = messageReference.getMessage().copy();
261                    payload.clearBody();
262                    ActiveMQMessage advisoryMessage = new ActiveMQMessage();
263                    advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString());
264                    fireAdvisory(context, topic, payload, null, advisoryMessage);
265                }
266            } catch (Exception e) {
267                LOG.warn("Failed to fire message expired advisory");
268            }
269        }
270        
271        public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
272            super.messageConsumed(context, messageReference);
273            try {
274                if(!messageReference.isAdvisory()) {
275                    ActiveMQTopic topic = AdvisorySupport.getMessageConsumedAdvisoryTopic(messageReference.getMessage().getDestination());
276                    Message payload = messageReference.getMessage().copy();
277                    payload.clearBody();
278                    fireAdvisory(context, topic,payload);
279                }
280            } catch (Exception e) {
281                LOG.warn("Failed to fire message consumed advisory");
282            }
283        }
284        
285        public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
286            super.messageDelivered(context, messageReference);
287            try {
288                if (!messageReference.isAdvisory()) {
289                    ActiveMQTopic topic = AdvisorySupport.getMessageDeliveredAdvisoryTopic(messageReference.getMessage().getDestination());
290                    Message payload = messageReference.getMessage().copy();
291                    payload.clearBody();
292                    fireAdvisory(context, topic,payload);
293                }
294            } catch (Exception e) {
295                LOG.warn("Failed to fire message delivered advisory");
296            }
297        }
298        
299        public void messageDiscarded(ConnectionContext context, MessageReference messageReference) {
300            super.messageDiscarded(context, messageReference);
301            try {
302                if (!messageReference.isAdvisory()) {
303                    ActiveMQTopic topic = AdvisorySupport.getMessageDiscardedAdvisoryTopic(messageReference.getMessage().getDestination());
304                    Message payload = messageReference.getMessage().copy();
305                    payload.clearBody();
306                    fireAdvisory(context, topic,payload);
307                }
308            } catch (Exception e) {
309                LOG.warn("Failed to fire message discarded advisory");
310            }
311        }
312        
313        public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) {
314            super.slowConsumer(context, destination,subs);
315            try {
316                ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination.getActiveMQDestination());
317                ActiveMQMessage advisoryMessage = new ActiveMQMessage();
318                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, subs.getConsumerInfo().getConsumerId().toString());
319                fireAdvisory(context, topic, subs.getConsumerInfo(), null, advisoryMessage);
320            } catch (Exception e) {
321                LOG.warn("Failed to fire message slow consumer advisory");
322            }
323        }
324        
325        public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
326            super.fastProducer(context, producerInfo);
327            try {
328                ActiveMQTopic topic = AdvisorySupport.getFastProducerAdvisoryTopic(producerInfo.getDestination());
329                ActiveMQMessage advisoryMessage = new ActiveMQMessage();
330                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_PRODUCER_ID, producerInfo.getProducerId().toString());
331                fireAdvisory(context, topic, producerInfo, null, advisoryMessage);
332            } catch (Exception e) {
333                LOG.warn("Failed to fire message fast producer advisory");
334            }
335        }
336        
337        public void isFull(ConnectionContext context, Destination destination, Usage usage) {
338            super.isFull(context, destination, usage);
339            if (AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination()) == false) {
340                try {
341    
342                    ActiveMQTopic topic = AdvisorySupport.getFullAdvisoryTopic(destination.getActiveMQDestination());
343                    ActiveMQMessage advisoryMessage = new ActiveMQMessage();
344                    advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_USAGE_NAME, usage.getName());
345                    fireAdvisory(context, topic, null, null, advisoryMessage);
346    
347                } catch (Exception e) {
348                    LOG.warn("Failed to fire message is full advisory");
349                }
350            }
351        }
352        
353        public void nowMasterBroker() {   
354            super.nowMasterBroker();
355            try {
356                ActiveMQTopic topic = AdvisorySupport.getMasterBrokerAdvisoryTopic();
357                ActiveMQMessage advisoryMessage = new ActiveMQMessage();                       
358                ConnectionContext context = new ConnectionContext();
359                context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
360                context.setBroker(getBrokerService().getBroker());
361                fireAdvisory(context, topic,null,null,advisoryMessage);
362            } catch (Exception e) {
363                LOG.warn("Failed to fire message master broker advisory");
364            }
365        }
366    
367        protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception {
368            fireAdvisory(context, topic, command, null);
369        }
370    
371        protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
372            ActiveMQMessage advisoryMessage = new ActiveMQMessage();
373            fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
374        }
375    
376        protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination,ActiveMQTopic topic, Command command) throws Exception {
377            fireConsumerAdvisory(context, consumerDestination,topic, command, null);
378        }
379    
380        protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination,ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
381            ActiveMQMessage advisoryMessage = new ActiveMQMessage();
382            int count = 0;
383            Set<Destination>set = getDestinations(consumerDestination);
384            if (set != null) {
385                for (Destination dest:set) {
386                    count += dest.getDestinationStatistics().getConsumers().getCount();
387                }
388            }
389            advisoryMessage.setIntProperty("consumerCount", count);
390            
391            fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
392        }
393    
394        protected void fireProducerAdvisory(ConnectionContext context,ActiveMQDestination producerDestination, ActiveMQTopic topic, Command command) throws Exception {
395            fireProducerAdvisory(context,producerDestination, topic, command, null);
396        }
397    
398        protected void fireProducerAdvisory(ConnectionContext context, ActiveMQDestination producerDestination,ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
399            ActiveMQMessage advisoryMessage = new ActiveMQMessage();
400            int count = 0;
401            if (producerDestination != null) {
402                Set<Destination> set = getDestinations(producerDestination);
403                if (set != null) {
404                    for (Destination dest : set) {
405                        count += dest.getDestinationStatistics().getProducers().getCount();
406                    }
407                }
408            }
409            advisoryMessage.setIntProperty("producerCount", count);
410            fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
411        }
412    
413        protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) throws Exception {
414            if (getBrokerService().isStarted()) {
415                //set properties
416                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName());
417                String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET";
418                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id);
419                
420                String[] uris = getBrokerService().getTransportConnectorURIs();
421                String url = getBrokerService().getVmConnectorURI().toString();
422                if (uris != null && uris.length > 0) {
423                    url = uris[0];
424                } 
425                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url);
426                
427                //set the data structure
428                advisoryMessage.setDataStructure(command);
429                advisoryMessage.setPersistent(false);
430                advisoryMessage.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
431                advisoryMessage.setMessageId(new MessageId(advisoryProducerId, messageIdGenerator.getNextSequenceId()));
432                advisoryMessage.setTargetConsumerId(targetConsumerId);
433                advisoryMessage.setDestination(topic);
434                advisoryMessage.setResponseRequired(false);
435                advisoryMessage.setProducerId(advisoryProducerId);
436                boolean originalFlowControl = context.isProducerFlowControl();
437                final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
438                producerExchange.setConnectionContext(context);
439                producerExchange.setMutable(true);
440                producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
441                try {
442                    context.setProducerFlowControl(false);
443                    next.send(producerExchange, advisoryMessage);
444                } finally {
445                    context.setProducerFlowControl(originalFlowControl);
446                }
447            }
448        }
449    
450        public Map<ConnectionId, ConnectionInfo> getAdvisoryConnections() {
451            return connections;
452        }
453    
454        public Map<ConsumerId, ConsumerInfo> getAdvisoryConsumers() {
455            return consumers;
456        }
457    
458        public Map<ProducerId, ProducerInfo> getAdvisoryProducers() {
459            return producers;
460        }
461    
462        public Map<ActiveMQDestination, DestinationInfo> getAdvisoryDestinations() {
463            return destinations;
464        }
465    }