001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region.cursors;
018    
019    import java.io.IOException;
020    import java.util.Iterator;
021    import java.util.LinkedList;
022    import java.util.concurrent.atomic.AtomicBoolean;
023    import java.util.concurrent.atomic.AtomicLong;
024    
025    import org.apache.activemq.broker.Broker;
026    import org.apache.activemq.broker.ConnectionContext;
027    import org.apache.activemq.broker.region.Destination;
028    import org.apache.activemq.broker.region.MessageReference;
029    import org.apache.activemq.broker.region.QueueMessageReference;
030    import org.apache.activemq.command.Message;
031    import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
032    import org.apache.activemq.kaha.CommandMarshaller;
033    import org.apache.activemq.kaha.ListContainer;
034    import org.apache.activemq.kaha.Store;
035    import org.apache.activemq.openwire.OpenWireFormat;
036    import org.apache.activemq.usage.SystemUsage;
037    import org.apache.activemq.usage.Usage;
038    import org.apache.activemq.usage.UsageListener;
039    import org.apache.commons.logging.Log;
040    import org.apache.commons.logging.LogFactory;
041    
042    /**
043     * persist pending messages pending message (messages awaiting dispatch to a
044     * consumer) cursor
045     * 
046     * @version $Revision: 911759 $
047     */
048    public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener {
049        private static final Log LOG = LogFactory.getLog(FilePendingMessageCursor.class);
050        private static final AtomicLong NAME_COUNT = new AtomicLong();
051        protected Broker broker;
052        private Store store;
053        private String name;
054        private LinkedList<MessageReference> memoryList = new LinkedList<MessageReference>();
055        private ListContainer<MessageReference> diskList;
056        private Iterator<MessageReference> iter;
057        private Destination regionDestination;
058        private boolean iterating;
059        private boolean flushRequired;
060        private AtomicBoolean started = new AtomicBoolean();
061        /**
062         * @param name
063         * @param store
064         */
065        public FilePendingMessageCursor(Broker broker,String name) {
066            this.useCache=false;
067            this.broker = broker;
068            //the store can be null if the BrokerService has persistence 
069            //turned off
070            this.store= broker.getTempDataStore();
071            this.name = NAME_COUNT.incrementAndGet() + "_" + name;
072        }
073    
074        public void start() throws Exception {
075            if (started.compareAndSet(false, true)) {
076                super.start();
077                if (systemUsage != null) {
078                    systemUsage.getMemoryUsage().addUsageListener(this);
079                }
080            }
081        }
082    
083        public void stop() throws Exception {
084            if (started.compareAndSet(true, false)) {
085                super.stop();
086                if (systemUsage != null) {
087                    systemUsage.getMemoryUsage().removeUsageListener(this);
088                }
089            }
090        }
091    
092        /**
093         * @return true if there are no pending messages
094         */
095        public synchronized boolean isEmpty() {
096            if(memoryList.isEmpty() && isDiskListEmpty()){
097                return true;
098            }
099            for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
100                MessageReference node = iterator.next();
101                if (node== QueueMessageReference.NULL_MESSAGE){
102                    continue;
103                }
104                if (!node.isDropped()) {
105                    return false;
106                }
107                // We can remove dropped references.
108                iterator.remove();
109            }
110            return isDiskListEmpty();
111        }
112        
113        
114    
115        /**
116         * reset the cursor
117         */
118        public synchronized void reset() {
119            iterating = true;
120            last = null;
121            iter = isDiskListEmpty() ? memoryList.iterator() : getDiskList().listIterator();
122        }
123    
124        public synchronized void release() {
125            iterating = false;
126            if (flushRequired) {
127                flushRequired = false;
128                flushToDisk();
129            }
130        }
131    
132        public synchronized void destroy() throws Exception {
133            stop();
134            for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();) {
135                Message node = (Message)i.next();
136                node.decrementReferenceCount();
137            }
138            memoryList.clear();
139            destroyDiskList();
140        }
141    
142        private void destroyDiskList() throws Exception {
143            if (!isDiskListEmpty()) {
144                Iterator<MessageReference> iterator = diskList.iterator();
145                while (iterator.hasNext()) {
146                    iterator.next();
147                    iterator.remove();
148                }
149                diskList.clear();
150            }   
151                store.deleteListContainer(name, "TopicSubscription");
152        }
153    
154        public synchronized LinkedList<MessageReference> pageInList(int maxItems) {
155            LinkedList<MessageReference> result = new LinkedList<MessageReference>();
156            int count = 0;
157            for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext() && count < maxItems;) {
158                MessageReference ref = i.next();
159                ref.incrementReferenceCount();
160                result.add(ref);
161                count++;
162            }
163            if (count < maxItems && !isDiskListEmpty()) {
164                for (Iterator<MessageReference> i = getDiskList().iterator(); i.hasNext() && count < maxItems;) {
165                    Message message = (Message)i.next();
166                    message.setRegionDestination(regionDestination);
167                    message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
168                    message.incrementReferenceCount();
169                    result.add(message);
170                    count++;
171                }
172            }
173            return result;
174        }
175    
176        /**
177         * add message to await dispatch
178         * 
179         * @param node
180         */
181        public synchronized void addMessageLast(MessageReference node) {
182            if (!node.isExpired()) {
183                try {
184                    regionDestination = node.getMessage().getRegionDestination();
185                    if (isDiskListEmpty()) {
186                        if (hasSpace() || this.store==null) {
187                            memoryList.add(node);
188                            node.incrementReferenceCount();
189                            return;
190                        }
191                    }
192                    if (!hasSpace()) {
193                        if (isDiskListEmpty()) {
194                            expireOldMessages();
195                            if (hasSpace()) {
196                                memoryList.add(node);
197                                node.incrementReferenceCount();
198                                return;
199                            } else {
200                                flushToDisk();
201                            }
202                        }
203                    }
204                    systemUsage.getTempUsage().waitForSpace();
205                    getDiskList().add(node);
206    
207                } catch (Exception e) {
208                    LOG.error("Caught an Exception adding a message: " + node
209                            + " first to FilePendingMessageCursor ", e);
210                    throw new RuntimeException(e);
211                }
212            } else {
213                discard(node);
214            }
215        }
216    
217        /**
218         * add message to await dispatch
219         * 
220         * @param node
221         */
222        public synchronized void addMessageFirst(MessageReference node) {
223            if (!node.isExpired()) {
224                try {
225                    regionDestination = node.getMessage().getRegionDestination();
226                    if (isDiskListEmpty()) {
227                        if (hasSpace()) {
228                            memoryList.addFirst(node);
229                            node.incrementReferenceCount();
230                            return;
231                        }
232                    }
233                    if (!hasSpace()) {
234                        if (isDiskListEmpty()) {
235                            expireOldMessages();
236                            if (hasSpace()) {
237                                memoryList.addFirst(node);
238                                node.incrementReferenceCount();
239                                return;
240                            } else {
241                                flushToDisk();
242                            }
243                        }
244                    }
245                    systemUsage.getTempUsage().waitForSpace();
246                    node.decrementReferenceCount();
247                    getDiskList().addFirst(node);
248    
249                } catch (Exception e) {
250                    LOG.error("Caught an Exception adding a message: " + node
251                            + " first to FilePendingMessageCursor ", e);
252                    throw new RuntimeException(e);
253                }
254            } else {
255                discard(node);
256            }
257        }
258    
259        /**
260         * @return true if there pending messages to dispatch
261         */
262        public synchronized boolean hasNext() {
263            return iter.hasNext();
264        }
265    
266        /**
267         * @return the next pending message
268         */
269        public synchronized MessageReference next() {
270            Message message = (Message)iter.next();
271            last = message;
272            if (!isDiskListEmpty()) {
273                // got from disk
274                message.setRegionDestination(regionDestination);
275                message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
276            }
277            message.incrementReferenceCount();
278            return message;
279        }
280    
281        /**
282         * remove the message at the cursor position
283         */
284        public synchronized void remove() {
285            iter.remove();
286            if (last != null) {
287                    last.decrementReferenceCount();
288            }
289        }
290    
291        /**
292         * @param node
293         * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference)
294         */
295        public synchronized void remove(MessageReference node) {
296            if (memoryList.remove(node)) {
297                    node.decrementReferenceCount();
298            }
299            if (!isDiskListEmpty()) {
300                getDiskList().remove(node);
301            }
302        }
303    
304        /**
305         * @return the number of pending messages
306         */
307        public synchronized int size() {
308            return memoryList.size() + (isDiskListEmpty() ? 0 : getDiskList().size());
309        }
310    
311        /**
312         * clear all pending messages
313         */
314        public synchronized void clear() {
315            memoryList.clear();
316            if (!isDiskListEmpty()) {
317                getDiskList().clear();
318            }
319            last=null;
320        }
321    
322            public synchronized boolean isFull() {
323    
324                    return super.isFull()
325                                    || (systemUsage != null && systemUsage.getTempUsage().isFull());
326    
327            }
328    
329        public boolean hasMessagesBufferedToDeliver() {
330            return !isEmpty();
331        }
332    
333        public void setSystemUsage(SystemUsage usageManager) {
334            super.setSystemUsage(usageManager);
335        }
336    
337        public void onUsageChanged(Usage usage, int oldPercentUsage,
338                int newPercentUsage) {
339            if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
340                synchronized (this) {
341                    flushRequired = true;
342                    if (!iterating) {
343                        expireOldMessages();
344                        if (!hasSpace()) {
345                            flushToDisk();
346                            flushRequired = false;
347                        }
348                    }
349                }
350            }
351        }
352        
353        public boolean isTransient() {
354            return true;
355        }
356    
357        protected boolean isSpaceInMemoryList() {
358            return hasSpace() && isDiskListEmpty();
359        }
360        
361        protected synchronized void expireOldMessages() {
362            if (!memoryList.isEmpty()) {
363                LinkedList<MessageReference> tmpList = new LinkedList<MessageReference>(this.memoryList);
364                this.memoryList = new LinkedList<MessageReference>();
365                while (!tmpList.isEmpty()) {
366                    MessageReference node = tmpList.removeFirst();
367                    if (node.isExpired()) {
368                        discard(node);
369                    }else {
370                        memoryList.add(node);
371                    }               
372                }
373            }
374    
375        }
376    
377        protected synchronized void flushToDisk() {
378           
379            if (!memoryList.isEmpty()) {
380                while (!memoryList.isEmpty()) {
381                    MessageReference node = memoryList.removeFirst();
382                    node.decrementReferenceCount();
383                    getDiskList().addLast(node);
384                }
385                memoryList.clear();
386            }
387        }
388    
389        protected boolean isDiskListEmpty() {
390            return diskList == null || diskList.isEmpty();
391        }
392    
393        protected ListContainer<MessageReference> getDiskList() {
394            if (diskList == null) {
395                try {
396                    diskList = store.getListContainer(name, "TopicSubscription", true);
397                    diskList.setMarshaller(new CommandMarshaller(new OpenWireFormat()));
398                } catch (IOException e) {
399                    LOG.error("Caught an IO Exception getting the DiskList " + name, e);
400                    throw new RuntimeException(e);
401                }
402            }
403            return diskList;
404        }
405        
406        protected void discard(MessageReference message) {
407            message.decrementReferenceCount();
408            if (LOG.isDebugEnabled()) {
409                LOG.debug("Discarding message " + message);
410            }
411            broker.getRoot().sendToDeadLetterQueue(new ConnectionContext(new NonCachedMessageEvaluationContext()), message);
412        }
413    }