001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq; 018 019 import java.util.ArrayList; 020 import java.util.LinkedList; 021 import java.util.List; 022 023 import javax.jms.JMSException; 024 025 import org.apache.activemq.command.MessageDispatch; 026 027 public class MessageDispatchChannel { 028 029 private final Object mutex = new Object(); 030 private final LinkedList<MessageDispatch> list; 031 private boolean closed; 032 private boolean running; 033 034 public MessageDispatchChannel() { 035 this.list = new LinkedList<MessageDispatch>(); 036 } 037 038 public void enqueue(MessageDispatch message) { 039 synchronized (mutex) { 040 list.addLast(message); 041 mutex.notify(); 042 } 043 } 044 045 public void enqueueFirst(MessageDispatch message) { 046 synchronized (mutex) { 047 list.addFirst(message); 048 mutex.notify(); 049 } 050 } 051 052 public boolean isEmpty() { 053 synchronized (mutex) { 054 return list.isEmpty(); 055 } 056 } 057 058 /** 059 * Used to get an enqueued message. The amount of time this method blocks is 060 * based on the timeout value. - if timeout==-1 then it blocks until a 061 * message is received. - if timeout==0 then it it tries to not block at 062 * all, it returns a message if it is available - if timeout>0 then it 063 * blocks up to timeout amount of time. Expired messages will consumed by 064 * this method. 065 * 066 * @throws JMSException 067 * @return null if we timeout or if the consumer is closed. 068 * @throws InterruptedException 069 */ 070 public MessageDispatch dequeue(long timeout) throws InterruptedException { 071 synchronized (mutex) { 072 // Wait until the consumer is ready to deliver messages. 073 while (timeout != 0 && !closed && (list.isEmpty() || !running)) { 074 if (timeout == -1) { 075 mutex.wait(); 076 } else { 077 mutex.wait(timeout); 078 break; 079 } 080 } 081 if (closed || !running || list.isEmpty()) { 082 return null; 083 } 084 return list.removeFirst(); 085 } 086 } 087 088 public MessageDispatch dequeueNoWait() { 089 synchronized (mutex) { 090 if (closed || !running || list.isEmpty()) { 091 return null; 092 } 093 return list.removeFirst(); 094 } 095 } 096 097 public MessageDispatch peek() { 098 synchronized (mutex) { 099 if (closed || !running || list.isEmpty()) { 100 return null; 101 } 102 return list.getFirst(); 103 } 104 } 105 106 public void start() { 107 synchronized (mutex) { 108 running = true; 109 mutex.notifyAll(); 110 } 111 } 112 113 public void stop() { 114 synchronized (mutex) { 115 running = false; 116 mutex.notifyAll(); 117 } 118 } 119 120 public void close() { 121 synchronized (mutex) { 122 if (!closed) { 123 running = false; 124 closed = true; 125 } 126 mutex.notifyAll(); 127 } 128 } 129 130 public void clear() { 131 synchronized (mutex) { 132 list.clear(); 133 } 134 } 135 136 public boolean isClosed() { 137 return closed; 138 } 139 140 public int size() { 141 synchronized (mutex) { 142 return list.size(); 143 } 144 } 145 146 public Object getMutex() { 147 return mutex; 148 } 149 150 public boolean isRunning() { 151 return running; 152 } 153 154 public List<MessageDispatch> removeAll() { 155 synchronized (mutex) { 156 ArrayList<MessageDispatch> rc = new ArrayList<MessageDispatch>(list); 157 list.clear(); 158 return rc; 159 } 160 } 161 162 public String toString() { 163 synchronized (mutex) { 164 return list.toString(); 165 } 166 } 167 }