001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.util.ArrayList;
020    import java.util.LinkedList;
021    import java.util.List;
022    
023    import javax.jms.JMSException;
024    
025    import org.apache.activemq.command.MessageDispatch;
026    
027    public class MessageDispatchChannel {
028    
029        private final Object mutex = new Object();
030        private final LinkedList<MessageDispatch> list;
031        private boolean closed;
032        private boolean running;
033    
034        public MessageDispatchChannel() {
035            this.list = new LinkedList<MessageDispatch>();
036        }
037    
038        public void enqueue(MessageDispatch message) {
039            synchronized (mutex) {
040                list.addLast(message);
041                mutex.notify();
042            }
043        }
044    
045        public void enqueueFirst(MessageDispatch message) {
046            synchronized (mutex) {
047                list.addFirst(message);
048                mutex.notify();
049            }
050        }
051    
052        public boolean isEmpty() {
053            synchronized (mutex) {
054                return list.isEmpty();
055            }
056        }
057    
058        /**
059         * Used to get an enqueued message. The amount of time this method blocks is
060         * based on the timeout value. - if timeout==-1 then it blocks until a
061         * message is received. - if timeout==0 then it it tries to not block at
062         * all, it returns a message if it is available - if timeout>0 then it
063         * blocks up to timeout amount of time. Expired messages will consumed by
064         * this method.
065         * 
066         * @throws JMSException
067         * @return null if we timeout or if the consumer is closed.
068         * @throws InterruptedException
069         */
070        public MessageDispatch dequeue(long timeout) throws InterruptedException {
071            synchronized (mutex) {
072                // Wait until the consumer is ready to deliver messages.
073                while (timeout != 0 && !closed && (list.isEmpty() || !running)) {
074                    if (timeout == -1) {
075                        mutex.wait();
076                    } else {
077                        mutex.wait(timeout);
078                        break;
079                    }
080                }
081                if (closed || !running || list.isEmpty()) {
082                    return null;
083                }
084                return list.removeFirst();
085            }
086        }
087    
088        public MessageDispatch dequeueNoWait() {
089            synchronized (mutex) {
090                if (closed || !running || list.isEmpty()) {
091                    return null;
092                }
093                return list.removeFirst();
094            }
095        }
096    
097        public MessageDispatch peek() {
098            synchronized (mutex) {
099                if (closed || !running || list.isEmpty()) {
100                    return null;
101                }
102                return list.getFirst();
103            }
104        }
105    
106        public void start() {
107            synchronized (mutex) {
108                running = true;
109                mutex.notifyAll();
110            }
111        }
112    
113        public void stop() {
114            synchronized (mutex) {
115                running = false;
116                mutex.notifyAll();
117            }
118        }
119    
120        public void close() {
121            synchronized (mutex) {
122                if (!closed) {
123                    running = false;
124                    closed = true;
125                }
126                mutex.notifyAll();
127            }
128        }
129    
130        public void clear() {
131            synchronized (mutex) {
132                list.clear();
133            }
134        }
135    
136        public boolean isClosed() {
137            return closed;
138        }
139    
140        public int size() {
141            synchronized (mutex) {
142                return list.size();
143            }
144        }
145    
146        public Object getMutex() {
147            return mutex;
148        }
149    
150        public boolean isRunning() {
151            return running;
152        }
153    
154        public List<MessageDispatch> removeAll() {
155            synchronized (mutex) {
156                ArrayList<MessageDispatch> rc = new ArrayList<MessageDispatch>(list);
157                list.clear();
158                return rc;
159            }
160        }
161    
162        public String toString() {
163            synchronized (mutex) {
164                return list.toString();
165            }
166        }
167    }