001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.jmx;
018    
019    import org.apache.activemq.ActiveMQConnectionFactory;
020    import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
021    import org.apache.activemq.broker.region.Destination;
022    import org.apache.activemq.broker.region.Subscription;
023    import org.apache.activemq.command.ActiveMQDestination;
024    import org.apache.activemq.command.ActiveMQMessage;
025    import org.apache.activemq.command.ActiveMQTextMessage;
026    import org.apache.activemq.command.Message;
027    import org.apache.activemq.filter.BooleanExpression;
028    import org.apache.activemq.filter.MessageEvaluationContext;
029    import org.apache.activemq.selector.SelectorParser;
030    import org.apache.commons.logging.Log;
031    import org.apache.commons.logging.LogFactory;
032    import java.io.IOException;
033    import java.util.ArrayList;
034    import java.util.Collections;
035    import java.util.Iterator;
036    import java.util.List;
037    import java.util.Map;
038    import javax.jms.Connection;
039    import javax.jms.InvalidSelectorException;
040    import javax.jms.MessageProducer;
041    import javax.jms.Session;
042    import javax.management.MalformedObjectNameException;
043    import javax.management.ObjectName;
044    import javax.management.openmbean.CompositeData;
045    import javax.management.openmbean.CompositeDataSupport;
046    import javax.management.openmbean.CompositeType;
047    import javax.management.openmbean.OpenDataException;
048    import javax.management.openmbean.TabularData;
049    import javax.management.openmbean.TabularDataSupport;
050    import javax.management.openmbean.TabularType;
051    
052    public class DestinationView implements DestinationViewMBean {
053        private static final Log LOG = LogFactory.getLog(DestinationViewMBean.class);
054        protected final Destination destination;
055        protected final ManagedRegionBroker broker;
056    
057        public DestinationView(ManagedRegionBroker broker, Destination destination) {
058            this.broker = broker;
059            this.destination = destination;
060        }
061    
062        public void gc() {
063            destination.gc();
064        }
065    
066        public String getName() {
067            return destination.getName();
068        }
069    
070        public void resetStatistics() {
071            destination.getDestinationStatistics().reset();
072        }
073    
074        public long getEnqueueCount() {
075            return destination.getDestinationStatistics().getEnqueues().getCount();
076        }
077    
078        public long getDequeueCount() {
079            return destination.getDestinationStatistics().getDequeues().getCount();
080        }
081    
082        public long getDispatchCount() {
083            return destination.getDestinationStatistics().getDispatched().getCount();
084        }
085    
086        public long getInFlightCount() {
087            return destination.getDestinationStatistics().getInflight().getCount();
088        }
089    
090        public long getExpiredCount() {
091            return destination.getDestinationStatistics().getExpired().getCount();
092        }
093    
094        public long getConsumerCount() {
095            return destination.getDestinationStatistics().getConsumers().getCount();
096        }
097    
098        public long getQueueSize() {
099            return destination.getDestinationStatistics().getMessages().getCount();
100        }
101    
102        public long getMessagesCached() {
103            return destination.getDestinationStatistics().getMessagesCached().getCount();
104        }
105    
106        public int getMemoryPercentUsage() {
107            return destination.getMemoryUsage().getPercentUsage();
108        }
109    
110        public long getMemoryLimit() {
111            return destination.getMemoryUsage().getLimit();
112        }
113    
114        public void setMemoryLimit(long limit) {
115            destination.getMemoryUsage().setLimit(limit);
116        }
117    
118        public double getAverageEnqueueTime() {
119            return destination.getDestinationStatistics().getProcessTime().getAverageTime();
120        }
121    
122        public long getMaxEnqueueTime() {
123            return destination.getDestinationStatistics().getProcessTime().getMaxTime();
124        }
125    
126        public long getMinEnqueueTime() {
127            return destination.getDestinationStatistics().getProcessTime().getMinTime();
128        }
129    
130        public CompositeData[] browse() throws OpenDataException {
131            try {
132                return browse(null);
133            } catch (InvalidSelectorException e) {
134                // should not happen.
135                throw new RuntimeException(e);
136            }
137        }
138    
139        public CompositeData[] browse(String selector) throws OpenDataException, InvalidSelectorException {
140            Message[] messages = destination.browse();
141            ArrayList<CompositeData> c = new ArrayList<CompositeData>();
142    
143            MessageEvaluationContext ctx = new MessageEvaluationContext();
144            ctx.setDestination(destination.getActiveMQDestination());
145            BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector);
146    
147            for (int i = 0; i < messages.length; i++) {
148                try {
149    
150                    if (selectorExpression == null) {
151                        c.add(OpenTypeSupport.convert(messages[i]));
152                    } else {
153                        ctx.setMessageReference(messages[i]);
154                        if (selectorExpression.matches(ctx)) {
155                            c.add(OpenTypeSupport.convert(messages[i]));
156                        }
157                    }
158    
159                } catch (Throwable e) {
160                    // TODO DELETE ME
161                    System.out.println(e);
162                    e.printStackTrace();
163                    // TODO DELETE ME
164                    LOG.warn("exception browsing destination", e);
165                }
166            }
167    
168            CompositeData rc[] = new CompositeData[c.size()];
169            c.toArray(rc);
170            return rc;
171        }
172    
173        /**
174         * Browses the current destination returning a list of messages
175         */
176        public List<Object> browseMessages() throws InvalidSelectorException {
177            return browseMessages(null);
178        }
179    
180        /**
181         * Browses the current destination with the given selector returning a list
182         * of messages
183         */
184        public List<Object> browseMessages(String selector) throws InvalidSelectorException {
185            Message[] messages = destination.browse();
186            ArrayList<Object> answer = new ArrayList<Object>();
187    
188            MessageEvaluationContext ctx = new MessageEvaluationContext();
189            ctx.setDestination(destination.getActiveMQDestination());
190            BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector);
191    
192            for (int i = 0; i < messages.length; i++) {
193                try {
194                    Message message = messages[i];
195                    if (selectorExpression == null) {
196                        answer.add(OpenTypeSupport.convert(message));
197                    } else {
198                        ctx.setMessageReference(message);
199                        if (selectorExpression.matches(ctx)) {
200                            answer.add(message);
201                        }
202                    }
203    
204                } catch (Throwable e) {
205                    LOG.warn("exception browsing destination", e);
206                }
207            }
208            return answer;
209        }
210    
211        public TabularData browseAsTable() throws OpenDataException {
212            try {
213                return browseAsTable(null);
214            } catch (InvalidSelectorException e) {
215                throw new RuntimeException(e);
216            }
217        }
218    
219        public TabularData browseAsTable(String selector) throws OpenDataException, InvalidSelectorException {
220            OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class);
221            Message[] messages = destination.browse();
222            CompositeType ct = factory.getCompositeType();
223            TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] { "JMSMessageID" });
224            TabularDataSupport rc = new TabularDataSupport(tt);
225    
226            MessageEvaluationContext ctx = new MessageEvaluationContext();
227            ctx.setDestination(destination.getActiveMQDestination());
228            BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector);
229    
230            for (int i = 0; i < messages.length; i++) {
231                try {
232                    if (selectorExpression == null) {
233                        rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i])));
234                    } else {
235                        ctx.setMessageReference(messages[i]);
236                        if (selectorExpression.matches(ctx)) {
237                            rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i])));
238                        }
239                    }
240                } catch (Throwable e) {
241                    LOG.warn("exception browsing destination", e);
242                }
243            }
244    
245            return rc;
246        }
247    
248        public String sendTextMessage(String body) throws Exception {
249            return sendTextMessage(Collections.EMPTY_MAP, body);
250        }
251    
252        public String sendTextMessage(Map headers, String body) throws Exception {
253            return sendTextMessage(headers, body, null, null);
254        }
255    
256        public String sendTextMessage(String body, String user, String password) throws Exception {
257            return sendTextMessage(Collections.EMPTY_MAP, body, user, password);
258        }
259    
260        public String sendTextMessage(Map headers, String body, String userName, String password) throws Exception {
261    
262            String brokerUrl = "vm://" + broker.getBrokerName();
263            ActiveMQDestination dest = destination.getActiveMQDestination();
264    
265            ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUrl);
266            Connection connection = null;
267            try {
268    
269                connection = cf.createConnection(userName, password);
270                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
271                MessageProducer producer = session.createProducer(dest);
272                ActiveMQTextMessage msg = (ActiveMQTextMessage) session.createTextMessage(body);
273    
274                for (Iterator iter = headers.entrySet().iterator(); iter.hasNext();) {
275                    Map.Entry entry = (Map.Entry) iter.next();
276                    msg.setObjectProperty((String) entry.getKey(), entry.getValue());
277                }
278    
279                producer.setDeliveryMode(msg.getJMSDeliveryMode());
280                producer.setPriority(msg.getPriority());
281                long ttl = msg.getExpiration() - System.currentTimeMillis();
282                producer.setTimeToLive(ttl > 0 ? ttl : 0);
283                producer.send(msg);
284    
285                return msg.getJMSMessageID();
286    
287            } finally {
288                connection.close();
289            }
290    
291        }
292    
293        public int getMaxAuditDepth() {
294            return destination.getMaxAuditDepth();
295        }
296    
297        public int getMaxProducersToAudit() {
298            return destination.getMaxProducersToAudit();
299        }
300    
301        public boolean isEnableAudit() {
302            return destination.isEnableAudit();
303        }
304    
305        public void setEnableAudit(boolean enableAudit) {
306            destination.setEnableAudit(enableAudit);
307        }
308    
309        public void setMaxAuditDepth(int maxAuditDepth) {
310            destination.setMaxAuditDepth(maxAuditDepth);
311        }
312    
313        public void setMaxProducersToAudit(int maxProducersToAudit) {
314            destination.setMaxProducersToAudit(maxProducersToAudit);
315        }
316    
317        public float getMemoryUsagePortion() {
318            return destination.getMemoryUsage().getUsagePortion();
319        }
320    
321        public long getProducerCount() {
322            return destination.getDestinationStatistics().getProducers().getCount();
323        }
324    
325        public boolean isProducerFlowControl() {
326            return destination.isProducerFlowControl();
327        }
328    
329        public void setMemoryUsagePortion(float value) {
330            destination.getMemoryUsage().setUsagePortion(value);
331        }
332    
333        public void setProducerFlowControl(boolean producerFlowControl) {
334            destination.setProducerFlowControl(producerFlowControl);
335        }
336    
337        /**
338         * Set's the interval at which warnings about producers being blocked by
339         * resource usage will be triggered. Values of 0 or less will disable
340         * warnings
341         * 
342         * @param blockedProducerWarningInterval the interval at which warning about
343         *            blocked producers will be triggered.
344         */
345        public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
346            destination.setBlockedProducerWarningInterval(blockedProducerWarningInterval);
347        }
348    
349        /**
350         * 
351         * @return the interval at which warning about blocked producers will be
352         *         triggered.
353         */
354        public long getBlockedProducerWarningInterval() {
355            return destination.getBlockedProducerWarningInterval();
356        }
357    
358        public int getMaxPageSize() {
359            return destination.getMaxPageSize();
360        }
361    
362        public void setMaxPageSize(int pageSize) {
363            destination.setMaxPageSize(pageSize);
364        }
365    
366        public boolean isUseCache() {
367            return destination.isUseCache();
368        }
369    
370        public void setUseCache(boolean value) {
371            destination.setUseCache(value);
372        }
373    
374        public ObjectName[] getSubscriptions() throws IOException, MalformedObjectNameException {
375            List<Subscription> subscriptions = destination.getConsumers();
376            ObjectName[] answer = new ObjectName[subscriptions.size()];
377            ObjectName objectName = broker.getBrokerService().getBrokerObjectName();
378            int index = 0;
379            for (Subscription subscription : subscriptions) {
380                String connectionClientId = subscription.getContext().getClientId();
381                String objectNameStr = ManagedRegionBroker.getSubscriptionObjectName(subscription, connectionClientId, objectName);
382                answer[index++] = new ObjectName(objectNameStr);
383            }
384            return answer;
385        }
386    
387    }