001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region.cursors; 018 019 import java.util.Collections; 020 import java.util.LinkedList; 021 import java.util.List; 022 import org.apache.activemq.ActiveMQMessageAudit; 023 import org.apache.activemq.broker.ConnectionContext; 024 import org.apache.activemq.broker.region.BaseDestination; 025 import org.apache.activemq.broker.region.Destination; 026 import org.apache.activemq.broker.region.MessageReference; 027 import org.apache.activemq.command.MessageId; 028 import org.apache.activemq.usage.SystemUsage; 029 030 /** 031 * Abstract method holder for pending message (messages awaiting disptach to a 032 * consumer) cursor 033 * 034 * @version $Revision: 882100 $ 035 */ 036 public class AbstractPendingMessageCursor implements PendingMessageCursor { 037 protected int memoryUsageHighWaterMark = 70; 038 protected int maxBatchSize = BaseDestination.MAX_PAGE_SIZE; 039 protected SystemUsage systemUsage; 040 protected int maxProducersToAudit=1024; 041 protected int maxAuditDepth=1000; 042 protected boolean enableAudit=true; 043 protected ActiveMQMessageAudit audit; 044 protected boolean useCache=true; 045 private boolean started=false; 046 protected MessageReference last = null; 047 048 049 public synchronized void start() throws Exception { 050 if (!started && enableAudit && audit==null) { 051 audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit); 052 } 053 started=true; 054 } 055 056 public synchronized void stop() throws Exception { 057 started=false; 058 audit=null; 059 gc(); 060 } 061 062 public void add(ConnectionContext context, Destination destination) throws Exception { 063 } 064 065 @SuppressWarnings("unchecked") 066 public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception { 067 return Collections.EMPTY_LIST; 068 } 069 070 public boolean isRecoveryRequired() { 071 return true; 072 } 073 074 public void addMessageFirst(MessageReference node) throws Exception { 075 } 076 077 public void addMessageLast(MessageReference node) throws Exception { 078 } 079 080 public void addRecoveredMessage(MessageReference node) throws Exception { 081 addMessageLast(node); 082 } 083 084 public void clear() { 085 } 086 087 public boolean hasNext() { 088 return false; 089 } 090 091 public boolean isEmpty() { 092 return false; 093 } 094 095 public boolean isEmpty(Destination destination) { 096 return isEmpty(); 097 } 098 099 public MessageReference next() { 100 return null; 101 } 102 103 public void remove() { 104 } 105 106 public void reset() { 107 } 108 109 public int size() { 110 return 0; 111 } 112 113 public int getMaxBatchSize() { 114 return maxBatchSize; 115 } 116 117 public void setMaxBatchSize(int maxBatchSize) { 118 this.maxBatchSize = maxBatchSize; 119 } 120 121 protected void fillBatch() throws Exception { 122 } 123 124 public void resetForGC() { 125 reset(); 126 } 127 128 public void remove(MessageReference node) { 129 } 130 131 public void gc() { 132 } 133 134 public void setSystemUsage(SystemUsage usageManager) { 135 this.systemUsage = usageManager; 136 } 137 138 public boolean hasSpace() { 139 return systemUsage != null ? (systemUsage.getMemoryUsage().getPercentUsage() < memoryUsageHighWaterMark) : true; 140 } 141 142 public boolean isFull() { 143 return systemUsage != null ? systemUsage.getMemoryUsage().isFull() : false; 144 } 145 146 public void release() { 147 } 148 149 public boolean hasMessagesBufferedToDeliver() { 150 return false; 151 } 152 153 /** 154 * @return the memoryUsageHighWaterMark 155 */ 156 public int getMemoryUsageHighWaterMark() { 157 return memoryUsageHighWaterMark; 158 } 159 160 /** 161 * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set 162 */ 163 public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) { 164 this.memoryUsageHighWaterMark = memoryUsageHighWaterMark; 165 } 166 167 /** 168 * @return the usageManager 169 */ 170 public SystemUsage getSystemUsage() { 171 return this.systemUsage; 172 } 173 174 /** 175 * destroy the cursor 176 * 177 * @throws Exception 178 */ 179 public void destroy() throws Exception { 180 stop(); 181 } 182 183 /** 184 * Page in a restricted number of messages 185 * 186 * @param maxItems maximum number of messages to return 187 * @return a list of paged in messages 188 */ 189 public LinkedList<MessageReference> pageInList(int maxItems) { 190 throw new RuntimeException("Not supported"); 191 } 192 193 /** 194 * @return the maxProducersToAudit 195 */ 196 public int getMaxProducersToAudit() { 197 return maxProducersToAudit; 198 } 199 200 /** 201 * @param maxProducersToAudit the maxProducersToAudit to set 202 */ 203 public synchronized void setMaxProducersToAudit(int maxProducersToAudit) { 204 this.maxProducersToAudit = maxProducersToAudit; 205 if (audit != null) { 206 audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit); 207 } 208 } 209 210 /** 211 * @return the maxAuditDepth 212 */ 213 public int getMaxAuditDepth() { 214 return maxAuditDepth; 215 } 216 217 218 /** 219 * @param maxAuditDepth the maxAuditDepth to set 220 */ 221 public synchronized void setMaxAuditDepth(int maxAuditDepth) { 222 this.maxAuditDepth = maxAuditDepth; 223 if (audit != null) { 224 audit.setAuditDepth(maxAuditDepth); 225 } 226 } 227 228 229 /** 230 * @return the enableAudit 231 */ 232 public boolean isEnableAudit() { 233 return enableAudit; 234 } 235 236 /** 237 * @param enableAudit the enableAudit to set 238 */ 239 public synchronized void setEnableAudit(boolean enableAudit) { 240 this.enableAudit = enableAudit; 241 if (enableAudit && started && audit==null) { 242 audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit); 243 } 244 } 245 246 public boolean isTransient() { 247 return false; 248 } 249 250 251 /** 252 * set the audit 253 * @param audit new audit component 254 */ 255 public void setMessageAudit(ActiveMQMessageAudit audit) { 256 this.audit=audit; 257 } 258 259 260 /** 261 * @return the audit 262 */ 263 public ActiveMQMessageAudit getMessageAudit() { 264 return audit; 265 } 266 267 public boolean isUseCache() { 268 return useCache; 269 } 270 271 public void setUseCache(boolean useCache) { 272 this.useCache = useCache; 273 } 274 275 public synchronized boolean isDuplicate(MessageId messageId) { 276 boolean unique = recordUniqueId(messageId); 277 rollback(messageId); 278 return !unique; 279 } 280 281 /** 282 * records a message id and checks if it is a duplicate 283 * @param messageId 284 * @return true if id is unique, false otherwise. 285 */ 286 public synchronized boolean recordUniqueId(MessageId messageId) { 287 if (!enableAudit || audit==null) { 288 return true; 289 } 290 return !audit.isDuplicate(messageId); 291 } 292 293 public synchronized void rollback(MessageId id) { 294 if (audit != null) { 295 audit.rollback(id); 296 } 297 } 298 299 protected synchronized boolean isStarted() { 300 return started; 301 } 302 }