001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region.cursors;
018    
019    import java.util.Collections;
020    import java.util.HashMap;
021    import java.util.List;
022    import java.util.Map;
023    import java.util.concurrent.CopyOnWriteArrayList;
024    
025    import org.apache.activemq.advisory.AdvisorySupport;
026    import org.apache.activemq.broker.Broker;
027    import org.apache.activemq.broker.ConnectionContext;
028    import org.apache.activemq.broker.region.Destination;
029    import org.apache.activemq.broker.region.MessageReference;
030    import org.apache.activemq.broker.region.Subscription;
031    import org.apache.activemq.broker.region.Topic;
032    import org.apache.activemq.command.Message;
033    import org.apache.activemq.usage.SystemUsage;
034    import org.apache.commons.logging.Log;
035    import org.apache.commons.logging.LogFactory;
036    
037    /**
038     * persist pending messages pending message (messages awaiting dispatch to a
039     * consumer) cursor
040     * 
041     * @version $Revision: 813962 $
042     */
043    public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
044    
045        private static final Log LOG = LogFactory.getLog(StoreDurableSubscriberCursor.class);
046        private final String clientId;
047        private final String subscriberName;
048        private final Map<Destination, TopicStorePrefetch> topics = new HashMap<Destination, TopicStorePrefetch>();
049        private final List<PendingMessageCursor> storePrefetches = new CopyOnWriteArrayList<PendingMessageCursor>();
050        private final PendingMessageCursor nonPersistent;
051        private PendingMessageCursor currentCursor;
052        private final Subscription subscription;
053        /**
054         * @param broker Broker for this cursor
055         * @param clientId clientId for this cursor
056         * @param subscriberName subscriber name for this cursor
057         * @param maxBatchSize currently ignored
058         * @param subscription  subscription for this cursor
059         */
060        public StoreDurableSubscriberCursor(Broker broker,String clientId, String subscriberName,int maxBatchSize, Subscription subscription) {
061            this.subscription=subscription;
062            this.clientId = clientId;
063            this.subscriberName = subscriberName;
064            if (broker.getBrokerService().isPersistent()) {
065                this.nonPersistent = new FilePendingMessageCursor(broker,clientId + subscriberName);
066            }else {
067                this.nonPersistent = new VMPendingMessageCursor();
068            }
069            
070            this.nonPersistent.setMaxBatchSize(maxBatchSize);
071            this.nonPersistent.setSystemUsage(systemUsage);
072            this.storePrefetches.add(this.nonPersistent);
073        }
074    
075        public synchronized void start() throws Exception {
076            if (!isStarted()) {
077                super.start();
078                for (PendingMessageCursor tsp : storePrefetches) {
079                    tsp.setMessageAudit(getMessageAudit());
080                    tsp.start();
081                }
082            }
083        }
084    
085        public synchronized void stop() throws Exception {
086            if (isStarted()) {
087                super.stop();
088                for (PendingMessageCursor tsp : storePrefetches) {
089                    tsp.stop();
090                }
091            }
092        }
093    
094        /**
095         * Add a destination
096         * 
097         * @param context
098         * @param destination
099         * @throws Exception
100         */
101        public synchronized void add(ConnectionContext context, Destination destination) throws Exception {
102            if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) {
103                TopicStorePrefetch tsp = new TopicStorePrefetch(this.subscription,(Topic)destination, clientId, subscriberName);
104                tsp.setMaxBatchSize(getMaxBatchSize());
105                tsp.setSystemUsage(systemUsage);
106                tsp.setEnableAudit(isEnableAudit());
107                tsp.setMaxAuditDepth(getMaxAuditDepth());
108                tsp.setMaxProducersToAudit(getMaxProducersToAudit());
109                tsp.setMemoryUsageHighWaterMark(getMemoryUsageHighWaterMark());
110                topics.put(destination, tsp);
111                storePrefetches.add(tsp);
112                if (isStarted()) {
113                    tsp.start();
114                }
115            }
116        }
117    
118        /**
119         * remove a destination
120         * 
121         * @param context
122         * @param destination
123         * @throws Exception
124         */
125        public synchronized List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
126            PendingMessageCursor tsp = topics.remove(destination);
127            if (tsp != null) {
128                storePrefetches.remove(tsp);
129            }
130            return Collections.EMPTY_LIST;
131        }
132    
133        /**
134         * @return true if there are no pending messages
135         */
136        public synchronized boolean isEmpty() {
137            for (PendingMessageCursor tsp : storePrefetches) {
138                if( !tsp.isEmpty() )
139                    return false;
140            }
141            return true;
142        }
143    
144        public synchronized boolean isEmpty(Destination destination) {
145            boolean result = true;
146            TopicStorePrefetch tsp = topics.get(destination);
147            if (tsp != null) {
148                result = tsp.isEmpty();
149            }
150            return result;
151        }
152    
153        /**
154         * Informs the Broker if the subscription needs to intervention to recover
155         * it's state e.g. DurableTopicSubscriber may do
156         * 
157         * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor
158         * @return true if recovery required
159         */
160        public boolean isRecoveryRequired() {
161            return false;
162        }
163    
164        public synchronized void addMessageLast(MessageReference node) throws Exception {
165            if (node != null) {
166                Message msg = node.getMessage();
167                if (isStarted()) {
168                    if (!msg.isPersistent()) {
169                        nonPersistent.addMessageLast(node);
170                    }
171                }
172                if (msg.isPersistent()) {
173                    Destination dest = msg.getRegionDestination();
174                    TopicStorePrefetch tsp = topics.get(dest);
175                    if (tsp != null) {
176                        tsp.addMessageLast(node);
177                    }
178                }
179            }
180        }
181    
182        public synchronized void addRecoveredMessage(MessageReference node) throws Exception {
183            nonPersistent.addMessageLast(node);
184        }
185    
186        public synchronized void clear() {
187            for (PendingMessageCursor tsp : storePrefetches) {
188                tsp.clear();
189            }
190        }
191    
192        public synchronized boolean hasNext() {
193            boolean result = true;
194            if (result) {
195                try {
196                    currentCursor = getNextCursor();
197                } catch (Exception e) {
198                    LOG.error("Failed to get current cursor ", e);
199                    throw new RuntimeException(e);
200                }
201                result = currentCursor != null ? currentCursor.hasNext() : false;
202            }
203            return result;
204        }
205    
206        public synchronized MessageReference next() {
207            MessageReference result = currentCursor != null ? currentCursor.next() : null;
208            return result;
209        }
210    
211        public synchronized void remove() {
212            if (currentCursor != null) {
213                currentCursor.remove();
214            }
215        }
216    
217        public synchronized void remove(MessageReference node) {
218            if (currentCursor != null) {
219                currentCursor.remove(node);
220            }
221        }
222    
223        public synchronized void reset() {
224            for (PendingMessageCursor storePrefetch : storePrefetches) {
225                storePrefetch.reset();
226            }
227        }
228    
229        public synchronized void release() {
230            for (PendingMessageCursor storePrefetch : storePrefetches) {
231                storePrefetch.release();
232            }
233        }
234    
235        public synchronized int size() {
236            int pendingCount=0;
237            for (PendingMessageCursor tsp : storePrefetches) {
238                pendingCount += tsp.size();
239            }
240            return pendingCount;
241        }
242    
243        public void setMaxBatchSize(int maxBatchSize) {
244            for (PendingMessageCursor storePrefetch : storePrefetches) {
245                storePrefetch.setMaxBatchSize(maxBatchSize);
246            }
247            super.setMaxBatchSize(maxBatchSize);
248        }
249    
250        public synchronized void gc() {
251            for (PendingMessageCursor tsp : storePrefetches) {
252                tsp.gc();
253            }
254        }
255    
256        public void setSystemUsage(SystemUsage usageManager) {
257            super.setSystemUsage(usageManager);
258            for (PendingMessageCursor tsp : storePrefetches) {
259                tsp.setSystemUsage(usageManager);
260            }
261        }
262        
263        public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
264            super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
265            for (PendingMessageCursor cursor : storePrefetches) {
266                cursor.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
267            }
268        }
269        
270        public void setMaxProducersToAudit(int maxProducersToAudit) {
271            super.setMaxProducersToAudit(maxProducersToAudit);
272            for (PendingMessageCursor cursor : storePrefetches) {
273                cursor.setMaxAuditDepth(maxAuditDepth);
274            }
275        }
276    
277        public void setMaxAuditDepth(int maxAuditDepth) {
278            super.setMaxAuditDepth(maxAuditDepth);
279            for (PendingMessageCursor cursor : storePrefetches) {
280                cursor.setMaxAuditDepth(maxAuditDepth);
281            }
282        }
283        
284        public void setEnableAudit(boolean enableAudit) {
285            super.setEnableAudit(enableAudit);
286            for (PendingMessageCursor cursor : storePrefetches) {
287                cursor.setEnableAudit(enableAudit);
288            }
289        }
290        
291        public  void setUseCache(boolean useCache) {
292            super.setUseCache(useCache);
293            for (PendingMessageCursor cursor : storePrefetches) {
294                cursor.setUseCache(useCache);
295            }
296        }
297        
298        protected synchronized PendingMessageCursor getNextCursor() throws Exception {
299            if (currentCursor == null || currentCursor.isEmpty()) {
300                currentCursor = null;
301                for (PendingMessageCursor tsp : storePrefetches) {
302                    if (tsp.hasNext()) {
303                        currentCursor = tsp;
304                        break;
305                    }
306                }
307                // round-robin
308                if (storePrefetches.size()>1) {
309                    PendingMessageCursor first = storePrefetches.remove(0);
310                    storePrefetches.add(first);
311                }
312            }
313            return currentCursor;
314        }
315        
316        public String toString() {
317            return "StoreDurableSubscriber(" + clientId + ":" + subscriberName + ")";
318        }
319    }