001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region.cursors;
018    
019    import java.util.Collections;
020    import java.util.LinkedList;
021    import java.util.List;
022    import org.apache.activemq.ActiveMQMessageAudit;
023    import org.apache.activemq.broker.ConnectionContext;
024    import org.apache.activemq.broker.region.BaseDestination;
025    import org.apache.activemq.broker.region.Destination;
026    import org.apache.activemq.broker.region.MessageReference;
027    import org.apache.activemq.command.MessageId;
028    import org.apache.activemq.usage.SystemUsage;
029    
030    /**
031     * Abstract method holder for pending message (messages awaiting disptach to a
032     * consumer) cursor
033     * 
034     * @version $Revision: 882100 $
035     */
036    public class AbstractPendingMessageCursor implements PendingMessageCursor {
037        protected int memoryUsageHighWaterMark = 70;
038        protected int maxBatchSize = BaseDestination.MAX_PAGE_SIZE;
039        protected SystemUsage systemUsage;
040        protected int maxProducersToAudit=1024;
041        protected int maxAuditDepth=1000;
042        protected boolean enableAudit=true;
043        protected ActiveMQMessageAudit audit;
044        protected boolean useCache=true;
045        private boolean started=false;
046        protected MessageReference last = null;
047      
048    
049        public synchronized void start() throws Exception  {
050            if (!started && enableAudit && audit==null) {
051                audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
052            }
053            started=true;
054        }
055    
056        public synchronized void stop() throws Exception  {
057            started=false;
058            audit=null;
059            gc();
060        }
061    
062        public void add(ConnectionContext context, Destination destination) throws Exception {
063        }
064    
065        @SuppressWarnings("unchecked")
066        public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
067            return Collections.EMPTY_LIST;
068        }
069    
070        public boolean isRecoveryRequired() {
071            return true;
072        }
073    
074        public void addMessageFirst(MessageReference node) throws Exception {
075        }
076    
077        public void addMessageLast(MessageReference node) throws Exception {
078        }
079    
080        public void addRecoveredMessage(MessageReference node) throws Exception {
081            addMessageLast(node);
082        }
083    
084        public void clear() {
085        }
086    
087        public boolean hasNext() {
088            return false;
089        }
090    
091        public boolean isEmpty() {
092            return false;
093        }
094    
095        public boolean isEmpty(Destination destination) {
096            return isEmpty();
097        }
098    
099        public MessageReference next() {
100            return null;
101        }
102    
103        public void remove() {
104        }
105    
106        public void reset() {
107        }
108    
109        public int size() {
110            return 0;
111        }
112    
113        public int getMaxBatchSize() {
114            return maxBatchSize;
115        }
116    
117        public void setMaxBatchSize(int maxBatchSize) {
118            this.maxBatchSize = maxBatchSize;
119        }
120    
121        protected void fillBatch() throws Exception {
122        }
123    
124        public void resetForGC() {
125            reset();
126        }
127    
128        public void remove(MessageReference node) {
129        }
130    
131        public void gc() {
132        }
133    
134        public void setSystemUsage(SystemUsage usageManager) {
135            this.systemUsage = usageManager;
136        }
137    
138        public boolean hasSpace() {
139            return systemUsage != null ? (systemUsage.getMemoryUsage().getPercentUsage() < memoryUsageHighWaterMark) : true;
140        }
141    
142        public boolean isFull() {
143            return systemUsage != null ? systemUsage.getMemoryUsage().isFull() : false;
144        }
145    
146        public void release() {
147        }
148    
149        public boolean hasMessagesBufferedToDeliver() {
150            return false;
151        }
152    
153        /**
154         * @return the memoryUsageHighWaterMark
155         */
156        public int getMemoryUsageHighWaterMark() {
157            return memoryUsageHighWaterMark;
158        }
159    
160        /**
161         * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
162         */
163        public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
164            this.memoryUsageHighWaterMark = memoryUsageHighWaterMark;
165        }
166    
167        /**
168         * @return the usageManager
169         */
170        public SystemUsage getSystemUsage() {
171            return this.systemUsage;
172        }
173    
174        /**
175         * destroy the cursor
176         * 
177         * @throws Exception
178         */
179        public void destroy() throws Exception {
180            stop();
181        }
182    
183        /**
184         * Page in a restricted number of messages
185         * 
186         * @param maxItems maximum number of messages to return
187         * @return a list of paged in messages
188         */
189        public LinkedList<MessageReference> pageInList(int maxItems) {
190            throw new RuntimeException("Not supported");
191        }
192    
193        /**
194         * @return the maxProducersToAudit
195         */
196        public int getMaxProducersToAudit() {
197            return maxProducersToAudit;
198        }
199    
200        /**
201         * @param maxProducersToAudit the maxProducersToAudit to set
202         */
203        public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
204            this.maxProducersToAudit = maxProducersToAudit;
205            if (audit != null) {
206                audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
207            }
208        }
209    
210        /**
211         * @return the maxAuditDepth
212         */
213        public int getMaxAuditDepth() {
214            return maxAuditDepth;
215        }
216        
217    
218        /**
219         * @param maxAuditDepth the maxAuditDepth to set
220         */
221        public synchronized void setMaxAuditDepth(int maxAuditDepth) {
222            this.maxAuditDepth = maxAuditDepth;
223            if (audit != null) {
224                audit.setAuditDepth(maxAuditDepth);
225            }
226        }
227        
228        
229        /**
230         * @return the enableAudit
231         */
232        public boolean isEnableAudit() {
233            return enableAudit;
234        }
235    
236        /**
237         * @param enableAudit the enableAudit to set
238         */
239        public synchronized void setEnableAudit(boolean enableAudit) {
240            this.enableAudit = enableAudit;
241            if (enableAudit && started && audit==null) {
242                audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
243            }
244        }
245        
246        public boolean isTransient() {
247            return false;
248        }
249        
250           
251        /**
252         * set the audit
253         * @param audit new audit component
254         */
255        public void setMessageAudit(ActiveMQMessageAudit audit) {
256            this.audit=audit;
257        }
258        
259        
260        /**
261         * @return the audit
262         */
263        public ActiveMQMessageAudit getMessageAudit() {
264            return audit;
265        }
266        
267        public boolean isUseCache() {
268            return useCache;
269        }
270    
271        public void setUseCache(boolean useCache) {
272            this.useCache = useCache;
273        }
274    
275        public synchronized boolean isDuplicate(MessageId messageId) {
276            boolean unique = recordUniqueId(messageId);
277            rollback(messageId);
278            return !unique;
279        }
280        
281        /**
282         * records a message id and checks if it is a duplicate
283         * @param messageId
284         * @return true if id is unique, false otherwise.
285         */
286        public synchronized boolean recordUniqueId(MessageId messageId) {
287            if (!enableAudit || audit==null) {
288                return true;
289            }
290            return !audit.isDuplicate(messageId);
291        }
292        
293        public synchronized void rollback(MessageId id) {
294            if (audit != null) {
295                audit.rollback(id);
296            }
297        }
298        
299        protected synchronized boolean isStarted() {
300            return started;
301        }
302    }