001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.apache.activemq; 019 020 import java.util.List; 021 022 import javax.jms.JMSException; 023 024 import org.apache.activemq.command.ConsumerId; 025 import org.apache.activemq.command.MessageDispatch; 026 import org.apache.activemq.thread.Task; 027 import org.apache.activemq.thread.TaskRunner; 028 import org.apache.activemq.util.JMSExceptionSupport; 029 import org.apache.commons.logging.Log; 030 import org.apache.commons.logging.LogFactory; 031 032 /** 033 * A utility class used by the Session for dispatching messages asynchronously 034 * to consumers 035 * 036 * @version $Revision$ 037 * @see javax.jms.Session 038 */ 039 public class ActiveMQSessionExecutor implements Task { 040 private static final Log LOG = LogFactory.getLog(ActiveMQSessionExecutor.class); 041 042 private ActiveMQSession session; 043 private MessageDispatchChannel messageQueue = new MessageDispatchChannel(); 044 private boolean dispatchedBySessionPool; 045 private volatile TaskRunner taskRunner; 046 private boolean startedOrWarnedThatNotStarted; 047 048 ActiveMQSessionExecutor(ActiveMQSession session) { 049 this.session = session; 050 } 051 052 void setDispatchedBySessionPool(boolean value) { 053 dispatchedBySessionPool = value; 054 wakeup(); 055 } 056 057 void execute(MessageDispatch message) throws InterruptedException { 058 059 if (!startedOrWarnedThatNotStarted) { 060 061 ActiveMQConnection connection = session.connection; 062 long aboutUnstartedConnectionTimeout = connection.getWarnAboutUnstartedConnectionTimeout(); 063 if (connection.isStarted() || aboutUnstartedConnectionTimeout < 0L) { 064 startedOrWarnedThatNotStarted = true; 065 } else { 066 long elapsedTime = System.currentTimeMillis() - connection.getTimeCreated(); 067 068 // lets only warn when a significant amount of time has passed 069 // just in case its normal operation 070 if (elapsedTime > aboutUnstartedConnectionTimeout) { 071 LOG.warn("Received a message on a connection which is not yet started. Have you forgotten to call Connection.start()? Connection: " + connection 072 + " Received: " + message); 073 startedOrWarnedThatNotStarted = true; 074 } 075 } 076 } 077 078 if (!session.isSessionAsyncDispatch() && !dispatchedBySessionPool) { 079 dispatch(message); 080 } else { 081 messageQueue.enqueue(message); 082 wakeup(); 083 } 084 } 085 086 public void wakeup() { 087 if (!dispatchedBySessionPool) { 088 if (session.isSessionAsyncDispatch()) { 089 try { 090 TaskRunner taskRunner = this.taskRunner; 091 if (taskRunner == null) { 092 synchronized (this) { 093 if (this.taskRunner == null) { 094 if (!isRunning()) { 095 // stop has been called 096 return; 097 } 098 this.taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this, 099 "ActiveMQ Session: " + session.getSessionId()); 100 } 101 taskRunner = this.taskRunner; 102 } 103 } 104 taskRunner.wakeup(); 105 } catch (InterruptedException e) { 106 Thread.currentThread().interrupt(); 107 } 108 } else { 109 while (iterate()) { 110 } 111 } 112 } 113 } 114 115 void executeFirst(MessageDispatch message) { 116 messageQueue.enqueueFirst(message); 117 wakeup(); 118 } 119 120 public boolean hasUncomsumedMessages() { 121 return !messageQueue.isClosed() && messageQueue.isRunning() && !messageQueue.isEmpty(); 122 } 123 124 void dispatch(MessageDispatch message) { 125 126 // TODO - we should use a Map for this indexed by consumerId 127 128 for (ActiveMQMessageConsumer consumer : this.session.consumers) { 129 ConsumerId consumerId = message.getConsumerId(); 130 if (consumerId.equals(consumer.getConsumerId())) { 131 consumer.dispatch(message); 132 break; 133 } 134 } 135 } 136 137 synchronized void start() { 138 if (!messageQueue.isRunning()) { 139 messageQueue.start(); 140 if (hasUncomsumedMessages()) { 141 wakeup(); 142 } 143 } 144 } 145 146 void stop() throws JMSException { 147 try { 148 if (messageQueue.isRunning()) { 149 synchronized(this) { 150 messageQueue.stop(); 151 if (this.taskRunner != null) { 152 this.taskRunner.shutdown(); 153 this.taskRunner = null; 154 } 155 } 156 } 157 } catch (InterruptedException e) { 158 Thread.currentThread().interrupt(); 159 throw JMSExceptionSupport.create(e); 160 } 161 } 162 163 boolean isRunning() { 164 return messageQueue.isRunning(); 165 } 166 167 void close() { 168 messageQueue.close(); 169 } 170 171 void clear() { 172 messageQueue.clear(); 173 } 174 175 MessageDispatch dequeueNoWait() { 176 return messageQueue.dequeueNoWait(); 177 } 178 179 protected void clearMessagesInProgress() { 180 messageQueue.clear(); 181 } 182 183 public boolean isEmpty() { 184 return messageQueue.isEmpty(); 185 } 186 187 public boolean iterate() { 188 189 // Deliver any messages queued on the consumer to their listeners. 190 for (ActiveMQMessageConsumer consumer : this.session.consumers) { 191 if (consumer.iterate()) { 192 return true; 193 } 194 } 195 196 // No messages left queued on the listeners.. so now dispatch messages 197 // queued on the session 198 MessageDispatch message = messageQueue.dequeueNoWait(); 199 if (message == null) { 200 return false; 201 } else { 202 dispatch(message); 203 return !messageQueue.isEmpty(); 204 } 205 } 206 207 List getUnconsumedMessages() { 208 return messageQueue.removeAll(); 209 } 210 211 }