001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.memory.list;
018    
019    import java.io.IOException;
020    import java.util.ArrayList;
021    import java.util.Iterator;
022    import java.util.LinkedList;
023    import java.util.List;
024    import org.apache.activemq.broker.region.MessageReference;
025    import org.apache.activemq.command.ActiveMQDestination;
026    import org.apache.activemq.command.Message;
027    import org.apache.activemq.filter.DestinationFilter;
028    import org.apache.commons.logging.Log;
029    import org.apache.commons.logging.LogFactory;
030    
031    /**
032     * A simple fixed size {@link MessageList} where there is a single, fixed size
033     * list that all messages are added to for simplicity. Though this will lead to
034     * possibly slow recovery times as many more messages than is necessary will
035     * have to be iterated through for each subscription.
036     * 
037     * @version $Revision: 1.1 $
038     */
039    public class SimpleMessageList implements MessageList {
040        private static final Log LOG = LogFactory.getLog(SimpleMessageList.class);
041        private LinkedList<MessageReference> list = new LinkedList<MessageReference>();
042        private int maximumSize = 100 * 64 * 1024;
043        private int size;
044        private Object lock = new Object();
045    
046        public SimpleMessageList() {
047        }
048    
049        public SimpleMessageList(int maximumSize) {
050            this.maximumSize = maximumSize;
051        }
052    
053        public void add(MessageReference node) {
054            int delta = node.getMessageHardRef().getSize();
055            synchronized (lock) {
056                list.add(node);
057                size += delta;
058                while (size > maximumSize) {
059                    MessageReference evicted = list.removeFirst();
060                    size -= evicted.getMessageHardRef().getSize();
061                }
062            }
063        }
064    
065        public List<MessageReference> getMessages(ActiveMQDestination destination) {
066            return getList();
067        }
068    
069        public Message[] browse(ActiveMQDestination destination) {
070            List<Message> result = new ArrayList<Message>();
071            DestinationFilter filter = DestinationFilter.parseFilter(destination);
072            synchronized (lock) {
073                for (Iterator<MessageReference> i = list.iterator(); i.hasNext();) {
074                    MessageReference ref = i.next();
075                    Message msg;
076                    try {
077                        msg = ref.getMessage();
078                        if (filter.matches(msg.getDestination())) {
079                            result.add(msg);
080                        }
081                    } catch (IOException e) {
082                        LOG.error("Failed to get Message from MessageReference: " + ref, e);
083                    }
084    
085                }
086            }
087            return result.toArray(new Message[result.size()]);
088        }
089    
090        /**
091         * Returns a copy of the list
092         */
093        public List<MessageReference> getList() {
094            synchronized (lock) {
095                return new ArrayList<MessageReference>(list);
096            }
097        }
098    
099        public int getSize() {
100            synchronized (lock) {
101                return size;
102            }
103        }
104    
105        public void clear() {
106            synchronized (lock) {
107                list.clear();
108                size = 0;
109            }
110        }
111    
112    }