001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.util;
018    
019    import java.io.IOException;
020    import java.util.Set;
021    import javax.annotation.PostConstruct;
022    import org.apache.activemq.broker.BrokerPluginSupport;
023    import org.apache.activemq.broker.Connection;
024    import org.apache.activemq.broker.ConnectionContext;
025    import org.apache.activemq.broker.ConsumerBrokerExchange;
026    import org.apache.activemq.broker.ProducerBrokerExchange;
027    import org.apache.activemq.broker.region.Destination;
028    import org.apache.activemq.broker.region.MessageReference;
029    import org.apache.activemq.broker.region.Subscription;
030    import org.apache.activemq.command.ActiveMQDestination;
031    import org.apache.activemq.command.BrokerInfo;
032    import org.apache.activemq.command.ConnectionInfo;
033    import org.apache.activemq.command.ConsumerInfo;
034    import org.apache.activemq.command.DestinationInfo;
035    import org.apache.activemq.command.Message;
036    import org.apache.activemq.command.MessageAck;
037    import org.apache.activemq.command.MessageDispatch;
038    import org.apache.activemq.command.MessageDispatchNotification;
039    import org.apache.activemq.command.MessagePull;
040    import org.apache.activemq.command.ProducerInfo;
041    import org.apache.activemq.command.RemoveSubscriptionInfo;
042    import org.apache.activemq.command.Response;
043    import org.apache.activemq.command.SessionInfo;
044    import org.apache.activemq.command.TransactionId;
045    import org.apache.activemq.usage.Usage;
046    import org.apache.commons.logging.Log;
047    import org.apache.commons.logging.LogFactory;
048    
049    /**
050     * A simple Broker intercepter which allows you to enable/disable logging.
051     * 
052     * @org.apache.xbean.XBean
053     */
054    
055    public class LoggingBrokerPlugin extends BrokerPluginSupport {
056    
057        private static final Log LOG = LogFactory.getLog(LoggingBrokerPlugin.class);
058    
059        private boolean logAll = false;
060        private boolean logMessageEvents = false;
061        private boolean logConnectionEvents = true;
062        private boolean logTransactionEvents = false;
063        private boolean logConsumerEvents = false;
064        private boolean logProducerEvents = false;
065        private boolean logInternalEvents = false;
066    
067        /**
068         *
069         * @throws Exception
070         * @org.apache.xbean.InitMethod
071         */
072        @PostConstruct
073        public void afterPropertiesSet() throws Exception {
074            LOG.info("Created LoggingBrokerPlugin: " + this.toString());
075        }
076    
077        public boolean isLogAll() {
078            return logAll;
079        }
080        
081        /**
082         * Log all Events that go through the Plugin
083         */
084        public void setLogAll(boolean logAll) {
085            this.logAll = logAll;
086        }
087    
088        public boolean isLogMessageEvents() {
089            return logMessageEvents;
090        }
091    
092        /**
093         * Log Events that are related to message processing
094         */
095        public void setLogMessageEvents(boolean logMessageEvents) {
096            this.logMessageEvents = logMessageEvents;
097        }
098    
099        public boolean isLogConnectionEvents() {
100            return logConnectionEvents;
101        }
102    
103        /**
104         * Log Events that are related to connections and sessions
105         */
106        public void setLogConnectionEvents(boolean logConnectionEvents) {
107            this.logConnectionEvents = logConnectionEvents;
108        }
109    
110        public boolean isLogTransactionEvents() {
111            return logTransactionEvents;
112        }
113    
114        /**
115         * Log Events that are related to transaction processing
116         */
117        public void setLogTransactionEvents(boolean logTransactionEvents) {
118            this.logTransactionEvents = logTransactionEvents;
119        }
120    
121        public boolean isLogConsumerEvents() {
122            return logConsumerEvents;
123        }
124    
125        /**
126         * Log Events that are related to Consumers
127         */
128        public void setLogConsumerEvents(boolean logConsumerEvents) {
129            this.logConsumerEvents = logConsumerEvents;
130        }
131    
132        public boolean isLogProducerEvents() {
133            return logProducerEvents;
134        }
135    
136        /**
137         * Log Events that are related to Producers
138         */
139        public void setLogProducerEvents(boolean logProducerEvents) {
140            this.logProducerEvents = logProducerEvents;
141        }
142    
143        public boolean isLogInternalEvents() {
144            return logInternalEvents;
145        }
146    
147        /**
148         * Log Events that are normally internal to the broker
149         */
150        public void setLogInternalEvents(boolean logInternalEvents) {
151            this.logInternalEvents = logInternalEvents;
152        }
153    
154        public void acknowledge(ConsumerBrokerExchange consumerExchange,
155                MessageAck ack) throws Exception {
156            if (isLogAll() || isLogConsumerEvents()) {
157                LOG.info("Acknowledging message for client ID : "
158                        + consumerExchange.getConnectionContext().getClientId() 
159                        + (ack.getMessageCount() == 1 ? ", " + ack.getLastMessageId() : ""));
160                if (LOG.isTraceEnabled() && ack.getMessageCount() > 1) {
161                    LOG.trace("Message count: " + ack.getMessageCount()
162                            + ", First Message Id: " + ack.getFirstMessageId()
163                            + ", Last Message Id: " + ack.getLastMessageId());
164                }
165            }
166            super.acknowledge(consumerExchange, ack);
167        }
168    
169        public Response messagePull(ConnectionContext context, MessagePull pull)
170                throws Exception {
171            if (isLogAll() || isLogConsumerEvents()) {
172                LOG.info("Message Pull from : " + context.getClientId() + " on "
173                        + pull.getDestination().getPhysicalName());
174            }
175            return super.messagePull(context, pull);
176        }
177    
178        public void addConnection(ConnectionContext context, ConnectionInfo info)
179                throws Exception {
180            if (isLogAll() || isLogConnectionEvents()) {
181                LOG.info("Adding Connection : " + context);
182            }
183            super.addConnection(context, info);
184        }
185    
186        public Subscription addConsumer(ConnectionContext context, ConsumerInfo info)
187                throws Exception {
188            if (isLogAll() || isLogConsumerEvents()) {
189                LOG.info("Adding Consumer : " + info);
190            }
191            return super.addConsumer(context, info);
192        }
193    
194        public void addProducer(ConnectionContext context, ProducerInfo info)
195                throws Exception {
196            if (isLogAll() || isLogProducerEvents()) {
197                LOG.info("Adding Producer :" + info);
198            }
199            super.addProducer(context, info);
200        }
201    
202        public void commitTransaction(ConnectionContext context, TransactionId xid,
203                boolean onePhase) throws Exception {
204            if (isLogAll() || isLogTransactionEvents()) {
205                LOG.info("Commiting transaction : " + xid.getTransactionKey());
206            }
207            super.commitTransaction(context, xid, onePhase);
208        }
209    
210        public void removeSubscription(ConnectionContext context,
211                RemoveSubscriptionInfo info) throws Exception {
212            if (isLogAll() || isLogConsumerEvents()) {
213                LOG.info("Removing subscription : " + info);
214            }
215            super.removeSubscription(context, info);
216        }
217    
218        public TransactionId[] getPreparedTransactions(ConnectionContext context)
219                throws Exception {
220    
221            TransactionId[] result = super.getPreparedTransactions(context);
222            if ((isLogAll() || isLogTransactionEvents()) && result != null) {
223                StringBuffer tids = new StringBuffer();
224                for (TransactionId tid : result) {
225                    if (tids.length() > 0) {
226                        tids.append(", ");
227                    }
228                    tids.append(tid.getTransactionKey());
229                }
230                LOG.info("Prepared transactions : " + tids);
231            }
232            return result;
233        }
234    
235        public int prepareTransaction(ConnectionContext context, TransactionId xid)
236                throws Exception {
237            if (isLogAll() || isLogTransactionEvents()) {
238                LOG.info("Preparing transaction : " + xid.getTransactionKey());
239            }
240            return super.prepareTransaction(context, xid);
241        }
242    
243        public void removeConnection(ConnectionContext context,
244                ConnectionInfo info, Throwable error) throws Exception {
245            if (isLogAll() || isLogConnectionEvents()) {
246                LOG.info("Removing Connection : " + info);
247            }
248            super.removeConnection(context, info, error);
249        }
250    
251        public void removeConsumer(ConnectionContext context, ConsumerInfo info)
252                throws Exception {
253            if (isLogAll() || isLogConsumerEvents()) {
254                LOG.info("Removing Consumer : " + info);
255            }
256            super.removeConsumer(context, info);
257        }
258    
259        public void removeProducer(ConnectionContext context, ProducerInfo info)
260                throws Exception {
261            if (isLogAll() || isLogProducerEvents()) {
262                LOG.info("Removing Producer : " + info);
263            }
264            super.removeProducer(context, info);
265        }
266    
267        public void rollbackTransaction(ConnectionContext context, TransactionId xid)
268                throws Exception {
269            if (isLogAll() || isLogTransactionEvents()) {
270                LOG.info("Rolling back Transaction : " + xid.getTransactionKey());
271            }
272            super.rollbackTransaction(context, xid);
273        }
274    
275        public void send(ProducerBrokerExchange producerExchange,
276                Message messageSend) throws Exception {
277            if (isLogAll() || isLogProducerEvents()) {
278                LOG.info("Sending message : " + messageSend);
279            }
280            super.send(producerExchange, messageSend);
281        }
282    
283        public void beginTransaction(ConnectionContext context, TransactionId xid)
284                throws Exception {
285            if (isLogAll() || isLogTransactionEvents()) {
286                LOG.info("Beginning transaction : " + xid.getTransactionKey());
287            }
288            super.beginTransaction(context, xid);
289        }
290    
291        public void forgetTransaction(ConnectionContext context,
292                TransactionId transactionId) throws Exception {
293            if (isLogAll() || isLogTransactionEvents()) {
294                LOG.info("Forgetting transaction : "
295                        + transactionId.getTransactionKey());
296            }
297            super.forgetTransaction(context, transactionId);
298        }
299    
300        public Connection[] getClients() throws Exception {
301            Connection[] result = super.getClients();
302    
303            if (isLogAll() || isLogInternalEvents()) {
304                if (result == null) {
305                    LOG.info("Get Clients returned empty list.");
306                } else {
307                    StringBuffer cids = new StringBuffer();
308                    for (Connection c : result) {
309                        cids.append(cids.length() > 0 ? ", " : "");
310                        cids.append(c.getConnectionId());
311                    }
312                    LOG.info("Connected clients : " + cids);
313                }
314            }
315            return super.getClients();
316        }
317    
318        public org.apache.activemq.broker.region.Destination addDestination(
319                ConnectionContext context, ActiveMQDestination destination)
320                throws Exception {
321            if (isLogAll() || isLogInternalEvents()) {
322                LOG.info("Adding destination : "
323                        + destination.getDestinationTypeAsString() + ":"
324                        + destination.getPhysicalName());
325            }
326            return super.addDestination(context, destination);
327        }
328    
329        public void removeDestination(ConnectionContext context,
330                ActiveMQDestination destination, long timeout) throws Exception {
331            if (isLogAll() || isLogInternalEvents()) {
332                LOG.info("Removing destination : "
333                        + destination.getDestinationTypeAsString() + ":"
334                        + destination.getPhysicalName());
335            }
336            super.removeDestination(context, destination, timeout);
337        }
338    
339        public ActiveMQDestination[] getDestinations() throws Exception {
340            ActiveMQDestination[] result = super.getDestinations();
341            if (isLogAll() || isLogInternalEvents()) {
342                if (result == null) {
343                    LOG.info("Get Destinations returned empty list.");
344                } else {
345                    StringBuffer destinations = new StringBuffer();
346                    for (ActiveMQDestination dest : result) {
347                        destinations.append(destinations.length() > 0 ? ", " : "");
348                        destinations.append(dest.getPhysicalName());
349                    }
350                    LOG.info("Get Destinations : " + destinations);
351                }
352            }
353            return result;
354        }
355    
356        public void start() throws Exception {
357            if (isLogAll() || isLogInternalEvents()) {
358                LOG.info("Starting " + getBrokerName());
359            }
360            super.start();
361        }
362    
363        public void stop() throws Exception {
364            if (isLogAll() || isLogInternalEvents()) {
365                LOG.info("Stopping " + getBrokerName());
366            }
367            super.stop();
368        }
369    
370        public void addSession(ConnectionContext context, SessionInfo info)
371                throws Exception {
372            if (isLogAll() || isLogConnectionEvents()) {
373                LOG.info("Adding Session : " + info);
374            }
375            super.addSession(context, info);
376        }
377    
378        public void removeSession(ConnectionContext context, SessionInfo info)
379                throws Exception {
380            if (isLogAll() || isLogConnectionEvents()) {
381                LOG.info("Removing Session : " + info);
382            }
383            super.removeSession(context, info);
384        }
385    
386        public void addBroker(Connection connection, BrokerInfo info) {
387            if (isLogAll() || isLogInternalEvents()) {
388                LOG.info("Adding Broker " + info.getBrokerName());
389            }
390            super.addBroker(connection, info);
391        }
392    
393        public void removeBroker(Connection connection, BrokerInfo info) {
394            if (isLogAll() || isLogInternalEvents()) {
395                LOG.info("Removing Broker " + info.getBrokerName());
396            }
397            super.removeBroker(connection, info);
398        }
399    
400        public BrokerInfo[] getPeerBrokerInfos() {
401            BrokerInfo[] result = super.getPeerBrokerInfos();
402            if (isLogAll() || isLogInternalEvents()) {
403                if (result == null) {
404                    LOG.info("Get Peer Broker Infos returned empty list.");
405                } else {
406                    StringBuffer peers = new StringBuffer();
407                    for (BrokerInfo bi : result) {
408                        peers.append(peers.length() > 0 ? ", " : "");
409                        peers.append(bi.getBrokerName());
410                    }
411                    LOG.info("Get Peer Broker Infos : " + peers);
412                }
413            }
414            return result;
415        }
416    
417        public void preProcessDispatch(MessageDispatch messageDispatch) {
418            if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) {
419                LOG.info("preProcessDispatch :" + messageDispatch);
420            }
421            super.preProcessDispatch(messageDispatch);
422        }
423    
424        public void postProcessDispatch(MessageDispatch messageDispatch) {
425            if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) {
426                LOG.info("postProcessDispatch :" + messageDispatch);
427            }
428            super.postProcessDispatch(messageDispatch);
429        }
430    
431        public void processDispatchNotification(
432                MessageDispatchNotification messageDispatchNotification)
433                throws Exception {
434            if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) {
435                LOG.info("ProcessDispatchNotification :"
436                        + messageDispatchNotification);
437            }
438            super.processDispatchNotification(messageDispatchNotification);
439        }
440    
441        public Set<ActiveMQDestination> getDurableDestinations() {
442            Set<ActiveMQDestination> result = super.getDurableDestinations();
443            if (isLogAll() || isLogInternalEvents()) {
444                if (result == null) {
445                    LOG.info("Get Durable Destinations returned empty list.");
446                } else {
447                    StringBuffer destinations = new StringBuffer();
448                    for (ActiveMQDestination dest : result) {
449                        destinations.append(destinations.length() > 0 ? ", " : "");
450                        destinations.append(dest.getPhysicalName());
451                    }
452                    LOG.info("Get Durable Destinations : " + destinations);
453                }
454            }
455            return result;
456        }
457    
458        public void addDestinationInfo(ConnectionContext context,
459                DestinationInfo info) throws Exception {
460            if (isLogAll() || isLogInternalEvents()) {
461                LOG.info("Adding destination info : " + info);
462            }
463            super.addDestinationInfo(context, info);
464        }
465    
466        public void removeDestinationInfo(ConnectionContext context,
467                DestinationInfo info) throws Exception {
468            if (isLogAll() || isLogInternalEvents()) {
469                LOG.info("Removing destination info : " + info);
470            }
471            super.removeDestinationInfo(context, info);
472        }
473    
474        public void messageExpired(ConnectionContext context,
475                MessageReference message) {
476            if (isLogAll() || isLogInternalEvents()) {
477                String msg = "Unable to display message.";
478                try {
479                    msg = message.getMessage().toString();
480                } catch (IOException ioe) {
481                }
482                LOG.info("Message has expired : " + msg);
483            }
484            super.messageExpired(context, message);
485        }
486    
487        public void sendToDeadLetterQueue(ConnectionContext context,
488                MessageReference messageReference) {
489            if (isLogAll() || isLogInternalEvents()) {
490                String msg = "Unable to display message.";
491                try {
492                    msg = messageReference.getMessage().toString();
493                } catch (IOException ioe) {
494                }
495                LOG.info("Sending to DLQ : " + msg);
496            }
497        }
498    
499        public void fastProducer(ConnectionContext context,
500                ProducerInfo producerInfo) {
501            if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) {
502                LOG.info("Fast Producer : " + producerInfo);
503            }
504            super.fastProducer(context, producerInfo);
505        }
506    
507        public void isFull(ConnectionContext context, Destination destination,
508                Usage usage) {
509            if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) {
510                LOG.info("Destination is full : " + destination.getName());
511            }
512            super.isFull(context, destination, usage);
513        }
514    
515        public void messageConsumed(ConnectionContext context,
516                MessageReference messageReference) {
517            if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
518                String msg = "Unable to display message.";
519                try {
520                    msg = messageReference.getMessage().toString();
521                } catch (IOException ioe) {
522                }
523                LOG.info("Message consumed : " + msg);
524            }
525            super.messageConsumed(context, messageReference);
526        }
527    
528        public void messageDelivered(ConnectionContext context,
529                MessageReference messageReference) {
530            if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
531                String msg = "Unable to display message.";
532                try {
533                    msg = messageReference.getMessage().toString();
534                } catch (IOException ioe) {
535                }
536                LOG.info("Message delivered : " + msg);
537            }
538            super.messageDelivered(context, messageReference);
539        }
540    
541        public void messageDiscarded(ConnectionContext context,
542                MessageReference messageReference) {
543            if (isLogAll() || isLogInternalEvents()) {
544                String msg = "Unable to display message.";
545                try {
546                    msg = messageReference.getMessage().toString();
547                } catch (IOException ioe) {
548                }
549                LOG.info("Message discarded : " + msg);
550            }
551            super.messageDiscarded(context, messageReference);
552        }
553    
554        public void slowConsumer(ConnectionContext context,
555                Destination destination, Subscription subs) {
556            if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
557                LOG.info("Detected slow consumer on " + destination.getName());
558                StringBuffer buf = new StringBuffer("Connection(");
559                buf.append(subs.getConsumerInfo().getConsumerId().getConnectionId());
560                buf.append(") Session(");
561                buf.append(subs.getConsumerInfo().getConsumerId().getSessionId());
562                buf.append(")");
563                LOG.info(buf);
564            }
565            super.slowConsumer(context, destination, subs);
566        }
567    
568        public void nowMasterBroker() {
569            if (isLogAll() || isLogInternalEvents()) {
570                LOG.info("Is now the master broker : " + getBrokerName());
571            }
572            super.nowMasterBroker();
573        }
574    
575        public String toString() {
576            StringBuffer buf = new StringBuffer();
577            buf.append("LoggingBrokerPlugin(");
578            buf.append("logAll=");
579            buf.append(isLogAll());
580            buf.append(", logConnectionEvents=");
581            buf.append(isLogConnectionEvents());
582            buf.append(", logConsumerEvents=");
583            buf.append(isLogConsumerEvents());
584            buf.append(", logProducerEvents=");
585            buf.append(isLogProducerEvents());
586            buf.append(", logMessageEvents=");
587            buf.append(isLogMessageEvents());
588            buf.append(", logTransactionEvents=");
589            buf.append(isLogTransactionEvents());
590            buf.append(", logInternalEvents=");
591            buf.append(isLogInternalEvents());
592            buf.append(")");
593            return buf.toString();
594        }
595    }