001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region.cursors; 018 019 import org.apache.activemq.broker.Broker; 020 import org.apache.activemq.broker.region.MessageReference; 021 import org.apache.activemq.broker.region.Queue; 022 import org.apache.activemq.command.Message; 023 import org.apache.activemq.usage.SystemUsage; 024 import org.apache.commons.logging.Log; 025 import org.apache.commons.logging.LogFactory; 026 027 /** 028 * Store based Cursor for Queues 029 * 030 * @version $Revision: 474985 $ 031 */ 032 public class StoreQueueCursor extends AbstractPendingMessageCursor { 033 034 private static final Log LOG = LogFactory.getLog(StoreQueueCursor.class); 035 private Broker broker; 036 private int pendingCount; 037 private Queue queue; 038 private PendingMessageCursor nonPersistent; 039 private QueueStorePrefetch persistent; 040 private boolean started; 041 private PendingMessageCursor currentCursor; 042 043 /** 044 * Construct 045 * 046 * @param queue 047 * @param tmpStore 048 */ 049 public StoreQueueCursor(Broker broker,Queue queue) { 050 this.broker=broker; 051 this.queue = queue; 052 this.persistent = new QueueStorePrefetch(queue); 053 currentCursor = persistent; 054 } 055 056 public synchronized void start() throws Exception { 057 started = true; 058 super.start(); 059 if (nonPersistent == null) { 060 if (broker.getBrokerService().isPersistent()) { 061 nonPersistent = new FilePendingMessageCursor(broker,queue.getName()); 062 }else { 063 nonPersistent = new VMPendingMessageCursor(); 064 } 065 nonPersistent.setMaxBatchSize(getMaxBatchSize()); 066 nonPersistent.setSystemUsage(systemUsage); 067 nonPersistent.setEnableAudit(isEnableAudit()); 068 nonPersistent.setMaxAuditDepth(getMaxAuditDepth()); 069 nonPersistent.setMaxProducersToAudit(getMaxProducersToAudit()); 070 } 071 nonPersistent.setMessageAudit(getMessageAudit()); 072 nonPersistent.start(); 073 persistent.setMessageAudit(getMessageAudit()); 074 persistent.start(); 075 pendingCount = persistent.size() + nonPersistent.size(); 076 } 077 078 public synchronized void stop() throws Exception { 079 started = false; 080 if (nonPersistent != null) { 081 nonPersistent.stop(); 082 nonPersistent.gc(); 083 } 084 persistent.stop(); 085 persistent.gc(); 086 super.stop(); 087 pendingCount = 0; 088 } 089 090 public synchronized void addMessageLast(MessageReference node) throws Exception { 091 if (node != null) { 092 Message msg = node.getMessage(); 093 if (started) { 094 pendingCount++; 095 if (!msg.isPersistent()) { 096 nonPersistent.addMessageLast(node); 097 } 098 } 099 if (msg.isPersistent()) { 100 persistent.addMessageLast(node); 101 } 102 } 103 } 104 105 public synchronized void addMessageFirst(MessageReference node) throws Exception { 106 if (node != null) { 107 Message msg = node.getMessage(); 108 if (started) { 109 pendingCount++; 110 if (!msg.isPersistent()) { 111 nonPersistent.addMessageFirst(node); 112 } 113 } 114 if (msg.isPersistent()) { 115 persistent.addMessageFirst(node); 116 } 117 } 118 } 119 120 public synchronized void clear() { 121 pendingCount = 0; 122 } 123 124 public synchronized boolean hasNext() { 125 try { 126 getNextCursor(); 127 } catch (Exception e) { 128 LOG.error("Failed to get current cursor ", e); 129 throw new RuntimeException(e); 130 } 131 return currentCursor != null ? currentCursor.hasNext() : false; 132 } 133 134 public synchronized MessageReference next() { 135 MessageReference result = currentCursor != null ? currentCursor.next() : null; 136 return result; 137 } 138 139 public synchronized void remove() { 140 if (currentCursor != null) { 141 currentCursor.remove(); 142 } 143 pendingCount--; 144 } 145 146 public synchronized void remove(MessageReference node) { 147 if (!node.isPersistent()) { 148 nonPersistent.remove(node); 149 } else { 150 persistent.remove(node); 151 } 152 pendingCount--; 153 } 154 155 public synchronized void reset() { 156 nonPersistent.reset(); 157 persistent.reset(); 158 pendingCount = persistent.size() + nonPersistent.size(); 159 } 160 161 public void release() { 162 nonPersistent.release(); 163 persistent.release(); 164 } 165 166 167 public synchronized int size() { 168 if (pendingCount < 0) { 169 pendingCount = persistent.size() + nonPersistent.size(); 170 } 171 return pendingCount; 172 } 173 174 public synchronized boolean isEmpty() { 175 // if negative, more messages arrived in store since last reset so non empty 176 return pendingCount == 0; 177 } 178 179 /** 180 * Informs the Broker if the subscription needs to intervention to recover 181 * it's state e.g. DurableTopicSubscriber may do 182 * 183 * @see org.apache.activemq.region.cursors.PendingMessageCursor 184 * @return true if recovery required 185 */ 186 public boolean isRecoveryRequired() { 187 return false; 188 } 189 190 /** 191 * @return the nonPersistent Cursor 192 */ 193 public PendingMessageCursor getNonPersistent() { 194 return this.nonPersistent; 195 } 196 197 /** 198 * @param nonPersistent cursor to set 199 */ 200 public void setNonPersistent(PendingMessageCursor nonPersistent) { 201 this.nonPersistent = nonPersistent; 202 } 203 204 public void setMaxBatchSize(int maxBatchSize) { 205 persistent.setMaxBatchSize(maxBatchSize); 206 if (nonPersistent != null) { 207 nonPersistent.setMaxBatchSize(maxBatchSize); 208 } 209 super.setMaxBatchSize(maxBatchSize); 210 } 211 212 213 public void setMaxProducersToAudit(int maxProducersToAudit) { 214 super.setMaxProducersToAudit(maxProducersToAudit); 215 if (persistent != null) { 216 persistent.setMaxProducersToAudit(maxProducersToAudit); 217 } 218 if (nonPersistent != null) { 219 nonPersistent.setMaxProducersToAudit(maxProducersToAudit); 220 } 221 } 222 223 public void setMaxAuditDepth(int maxAuditDepth) { 224 super.setMaxAuditDepth(maxAuditDepth); 225 if (persistent != null) { 226 persistent.setMaxAuditDepth(maxAuditDepth); 227 } 228 if (nonPersistent != null) { 229 nonPersistent.setMaxAuditDepth(maxAuditDepth); 230 } 231 } 232 233 public void setEnableAudit(boolean enableAudit) { 234 super.setEnableAudit(enableAudit); 235 if (persistent != null) { 236 persistent.setEnableAudit(enableAudit); 237 } 238 if (nonPersistent != null) { 239 nonPersistent.setEnableAudit(enableAudit); 240 } 241 } 242 243 public void setUseCache(boolean useCache) { 244 super.setUseCache(useCache); 245 if (persistent != null) { 246 persistent.setUseCache(useCache); 247 } 248 if (nonPersistent != null) { 249 nonPersistent.setUseCache(useCache); 250 } 251 } 252 253 public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) { 254 super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark); 255 if (persistent != null) { 256 persistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark); 257 } 258 if (nonPersistent != null) { 259 nonPersistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark); 260 } 261 } 262 263 264 265 public synchronized void gc() { 266 if (persistent != null) { 267 persistent.gc(); 268 } 269 if (nonPersistent != null) { 270 nonPersistent.gc(); 271 } 272 pendingCount = persistent.size() + nonPersistent.size(); 273 } 274 275 public void setSystemUsage(SystemUsage usageManager) { 276 super.setSystemUsage(usageManager); 277 if (persistent != null) { 278 persistent.setSystemUsage(usageManager); 279 } 280 if (nonPersistent != null) { 281 nonPersistent.setSystemUsage(usageManager); 282 } 283 } 284 285 protected synchronized PendingMessageCursor getNextCursor() throws Exception { 286 if (currentCursor == null || !currentCursor.hasMessagesBufferedToDeliver()) { 287 currentCursor = currentCursor == persistent ? nonPersistent : persistent; 288 // sanity check 289 if (currentCursor.isEmpty()) { 290 currentCursor = currentCursor == persistent ? nonPersistent : persistent; 291 } 292 } 293 return currentCursor; 294 } 295 }