001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region.cursors;
018    
019    import java.util.Iterator;
020    import java.util.LinkedHashMap;
021    import java.util.Map.Entry;
022    import org.apache.activemq.broker.region.Destination;
023    import org.apache.activemq.broker.region.MessageReference;
024    import org.apache.activemq.command.Message;
025    import org.apache.activemq.command.MessageId;
026    import org.apache.activemq.store.MessageRecoveryListener;
027    import org.apache.commons.logging.Log;
028    import org.apache.commons.logging.LogFactory;
029    
030    /**
031     *  Store based cursor
032     *
033     */
034    public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements MessageRecoveryListener {
035        private static final Log LOG = LogFactory.getLog(AbstractStoreCursor.class);
036        protected final Destination regionDestination;
037        private final LinkedHashMap<MessageId,Message> batchList = new LinkedHashMap<MessageId,Message> ();
038        private Iterator<Entry<MessageId, Message>> iterator = null;
039        private boolean cacheEnabled=false;
040        protected boolean batchResetNeeded = true;
041        protected boolean storeHasMessages = false;
042        protected int size;
043        private MessageId lastCachedId;
044        
045        protected AbstractStoreCursor(Destination destination) {
046            this.regionDestination=destination;
047        }
048        
049        @Override
050        public final synchronized void start() throws Exception{
051            if (!isStarted()) {
052                super.start();
053                clear();
054                resetBatch();
055                this.size = getStoreSize();
056                this.storeHasMessages=this.size > 0;
057                if (!this.storeHasMessages&&useCache) {
058                    cacheEnabled=true;
059                }
060            } 
061        }
062        
063        @Override
064        public final synchronized void stop() throws Exception {
065            resetBatch();
066            super.stop();
067            gc();
068        }
069    
070        
071        public final boolean recoverMessage(Message message) throws Exception {
072            return recoverMessage(message,false);
073        }
074        
075        public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception {
076            boolean recovered = false;
077            if (recordUniqueId(message.getMessageId())) {
078                if (!cached) {
079                    message.setRegionDestination(regionDestination);
080                    if( message.getMemoryUsage()==null ) {
081                        message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
082                    }
083                }
084                message.incrementReferenceCount();
085                batchList.put(message.getMessageId(), message);
086                clearIterator(true);
087                recovered = true;
088            } else {
089                /*
090                 * we should expect to get these - as the message is recorded as it before it goes into
091                 * the cache. If subsequently, we pull out that message from the store (before its deleted)
092                 * it will be a duplicate - but should be ignored
093                 */
094                //LOG.error(regionDestination.getActiveMQDestination().getPhysicalName() + " cursor got duplicate: " + message);
095                storeHasMessages = true;
096            }
097            return recovered;
098        }
099        
100        @Override
101        public final void reset() {
102            if (batchList.isEmpty()) {
103                try {
104                    fillBatch();
105                } catch (Exception e) {
106                    LOG.error("Failed to fill batch", e);
107                    throw new RuntimeException(e);
108                }
109            }
110            clearIterator(true);
111            size();
112        }
113        
114        @Override
115        public synchronized void release() {
116            clearIterator(false);
117        }
118        
119        private synchronized void clearIterator(boolean ensureIterator) {
120            boolean haveIterator = this.iterator != null;
121            this.iterator=null;
122            last = null;
123            if(haveIterator&&ensureIterator) {
124                ensureIterator();
125            }
126        }
127        
128        private synchronized void ensureIterator() {
129            if(this.iterator==null) {
130                this.iterator=this.batchList.entrySet().iterator();
131            }
132        }
133    
134    
135        public final void finished() {
136        }
137            
138        @Override
139        public final synchronized boolean hasNext() {
140            if (batchList.isEmpty()) {
141                try {
142                    fillBatch();
143                } catch (Exception e) {
144                    LOG.error("Failed to fill batch", e);
145                    throw new RuntimeException(e);
146                }
147            }
148            ensureIterator();
149            return this.iterator.hasNext();
150        }
151        
152        @Override
153        public final synchronized MessageReference next() {
154            MessageReference result = null;
155            if (!this.batchList.isEmpty()&&this.iterator.hasNext()) {
156                result = this.iterator.next().getValue();
157            }
158            last = result;
159            if (result != null) {
160                result.incrementReferenceCount();
161            }
162            return result;
163        }
164        
165        @Override
166        public final synchronized void addMessageLast(MessageReference node) throws Exception {
167            if (cacheEnabled && hasSpace()) {
168                recoverMessage(node.getMessage(),true);
169                lastCachedId = node.getMessageId();
170            } else {
171                if (cacheEnabled) {
172                    cacheEnabled=false;
173                    if (LOG.isDebugEnabled()) {
174                        LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() + " disabling cache on size:" + size
175                                + ", lastCachedIdSeq: " + (lastCachedId == null ? -1 : lastCachedId.getBrokerSequenceId())
176                                + " current node seqId: " + node.getMessageId().getBrokerSequenceId());
177                    }
178                    // sync with store on disabling the cache
179                    if (lastCachedId != null) {
180                        setBatch(lastCachedId);
181                    }
182                }
183            }
184            size++;
185        }
186    
187        protected void setBatch(MessageId messageId) throws Exception {
188        }
189    
190        @Override
191        public final synchronized void addMessageFirst(MessageReference node) throws Exception {
192            cacheEnabled=false;
193            size++;
194        }
195    
196        @Override
197        public final synchronized void remove() {
198            size--;
199            if (iterator!=null) {
200                iterator.remove();
201            }
202            if (last != null) {
203                last.decrementReferenceCount();
204            }
205            if (size==0 && isStarted() && useCache && hasSpace() && isStoreEmpty()) {
206                if (LOG.isDebugEnabled()) {
207                    LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() + " enabling cache on last remove");
208                }
209                cacheEnabled=true;
210            }
211        }
212    
213        @Override
214        public final synchronized void remove(MessageReference node) {
215            size--;
216            cacheEnabled=false;
217            batchList.remove(node.getMessageId());
218        }
219        
220        @Override
221        public final synchronized void clear() {
222            gc();
223        }
224        
225        @Override
226        public final synchronized void gc() {
227            for (Message msg : batchList.values()) {
228                rollback(msg.getMessageId());
229                msg.decrementReferenceCount();
230            }
231            batchList.clear();
232            clearIterator(false);
233            batchResetNeeded = true;
234            this.cacheEnabled=false;
235            if (isStarted()) { 
236                size = getStoreSize();
237            } else {
238                size = 0;
239            }
240        }
241        
242        @Override
243        protected final synchronized void fillBatch() {
244            if (batchResetNeeded) {
245                resetBatch();
246                this.batchResetNeeded = false;
247            }
248            if( this.batchList.isEmpty() && (this.storeHasMessages ||this.size >0)) {
249                this.storeHasMessages = false;
250                try {
251                    doFillBatch();
252                } catch (Exception e) {
253                    LOG.error("Failed to fill batch", e);
254                    throw new RuntimeException(e);
255                }
256                if (!this.batchList.isEmpty()) {
257                    this.storeHasMessages=true;
258                }
259            }
260        }
261        
262        @Override
263        public final synchronized boolean isEmpty() {
264            // negative means more messages added to store through queue.send since last reset
265            return size == 0;
266        }
267    
268        @Override
269        public final synchronized boolean hasMessagesBufferedToDeliver() {
270            return !batchList.isEmpty();
271        }
272    
273        @Override
274        public final synchronized int size() {
275            if (size < 0) {
276                this.size = getStoreSize();
277            }
278            return size;
279        }
280        
281        
282        protected abstract void doFillBatch() throws Exception;
283        
284        protected abstract void resetBatch();
285        
286        protected abstract int getStoreSize();
287        
288        protected abstract boolean isStoreEmpty();
289    }