001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.console.filter;
018    
019    import java.net.URI;
020    import java.util.Collections;
021    import java.util.Iterator;
022    import java.util.List;
023    
024    import javax.jms.Connection;
025    import javax.jms.Destination;
026    import javax.jms.JMSException;
027    import javax.jms.QueueBrowser;
028    import javax.jms.Session;
029    
030    import org.apache.activemq.ActiveMQConnectionFactory;
031    import org.apache.activemq.command.ActiveMQQueue;
032    import org.apache.activemq.command.ActiveMQTopic;
033    
034    public class AmqMessagesQueryFilter extends AbstractQueryFilter {
035    
036        private URI brokerUrl;
037        private Destination destination;
038    
039        /**
040         * Create a JMS message query filter
041         * 
042         * @param brokerUrl - broker url to connect to
043         * @param destination - JMS destination to query
044         */
045        public AmqMessagesQueryFilter(URI brokerUrl, Destination destination) {
046            super(null);
047            this.brokerUrl = brokerUrl;
048            this.destination = destination;
049        }
050    
051        /**
052         * Queries the specified destination using the message selector format query
053         * 
054         * @param queries - message selector queries
055         * @return list messages that matches the selector
056         * @throws Exception
057         */
058        public List query(List queries) throws Exception {
059            String selector = "";
060    
061            // Convert to message selector
062            for (Iterator i = queries.iterator(); i.hasNext();) {
063                selector = selector + "(" + i.next().toString() + ") AND ";
064            }
065    
066            // Remove last AND
067            if (!selector.equals("")) {
068                selector = selector.substring(0, selector.length() - 5);
069            }
070    
071            if (destination instanceof ActiveMQQueue) {
072                return queryMessages((ActiveMQQueue)destination, selector);
073            } else {
074                return queryMessages((ActiveMQTopic)destination, selector);
075            }
076        }
077    
078        /**
079         * Query the messages of a queue destination using a queue browser
080         * 
081         * @param queue - queue destination
082         * @param selector - message selector
083         * @return list of messages that matches the selector
084         * @throws Exception
085         */
086        protected List queryMessages(ActiveMQQueue queue, String selector) throws Exception {
087            Connection conn = createConnection(getBrokerUrl());
088    
089            Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
090            QueueBrowser browser = sess.createBrowser(queue, selector);
091    
092            List messages = Collections.list(browser.getEnumeration());
093    
094            conn.close();
095    
096            return messages;
097        }
098    
099        /**
100         * Query the messages of a topic destination using a message consumer
101         * 
102         * @param topic - topic destination
103         * @param selector - message selector
104         * @return list of messages that matches the selector
105         * @throws Exception
106         */
107        protected List queryMessages(ActiveMQTopic topic, String selector) throws Exception {
108            // TODO: should we use a durable subscriber or a retroactive non-durable
109            // subscriber?
110            // TODO: if a durable subscriber is used, how do we manage it?
111            // subscribe/unsubscribe tasks?
112            return null;
113        }
114    
115        /**
116         * Create and start a JMS connection
117         * 
118         * @param brokerUrl - broker url to connect to.
119         * @return JMS connection
120         * @throws JMSException
121         */
122        protected Connection createConnection(URI brokerUrl) throws JMSException {
123            Connection conn = (new ActiveMQConnectionFactory(brokerUrl)).createConnection();
124            conn.start();
125            return conn;
126        }
127    
128        /**
129         * Get the broker url being used.
130         * 
131         * @return broker url
132         */
133        public URI getBrokerUrl() {
134            return brokerUrl;
135        }
136    
137        /**
138         * Set the broker url to use.
139         * 
140         * @param brokerUrl - broker url
141         */
142        public void setBrokerUrl(URI brokerUrl) {
143            this.brokerUrl = brokerUrl;
144        }
145    
146        /**
147         * Get the destination being used.
148         * 
149         * @return - JMS destination
150         */
151        public Destination getDestination() {
152            return destination;
153        }
154    
155        /**
156         * Set the destination to use.
157         * 
158         * @param destination - JMS destination
159         */
160        public void setDestination(Destination destination) {
161            this.destination = destination;
162        }
163    
164    }