001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.util.ArrayList;
020    import java.util.HashMap;
021    import java.util.Iterator;
022    import java.util.List;
023    import java.util.Map;
024    import java.util.Set;
025    import java.util.concurrent.ConcurrentHashMap;
026    import javax.jms.JMSException;
027    import org.apache.activemq.broker.ConnectionContext;
028    import org.apache.activemq.broker.ConsumerBrokerExchange;
029    import org.apache.activemq.broker.DestinationAlreadyExistsException;
030    import org.apache.activemq.broker.ProducerBrokerExchange;
031    import org.apache.activemq.command.ActiveMQDestination;
032    import org.apache.activemq.command.ConsumerId;
033    import org.apache.activemq.command.ConsumerInfo;
034    import org.apache.activemq.command.Message;
035    import org.apache.activemq.command.MessageAck;
036    import org.apache.activemq.command.MessageDispatchNotification;
037    import org.apache.activemq.command.MessagePull;
038    import org.apache.activemq.command.ProducerInfo;
039    import org.apache.activemq.command.RemoveSubscriptionInfo;
040    import org.apache.activemq.command.Response;
041    import org.apache.activemq.filter.DestinationFilter;
042    import org.apache.activemq.filter.DestinationMap;
043    import org.apache.activemq.security.SecurityContext;
044    import org.apache.activemq.thread.TaskRunnerFactory;
045    import org.apache.activemq.usage.SystemUsage;
046    import org.apache.commons.logging.Log;
047    import org.apache.commons.logging.LogFactory;
048    
049    /**
050     * @version $Revision: 1.14 $
051     */
052    public abstract class AbstractRegion implements Region {
053    
054        private static final Log LOG = LogFactory.getLog(AbstractRegion.class);
055    
056        protected final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
057        protected final DestinationMap destinationMap = new DestinationMap();
058        protected final Map<ConsumerId, Subscription> subscriptions = new ConcurrentHashMap<ConsumerId, Subscription>();
059        protected final SystemUsage usageManager;
060        protected final DestinationFactory destinationFactory;
061        protected final DestinationStatistics destinationStatistics;
062        protected final RegionBroker broker;
063        protected boolean autoCreateDestinations = true;
064        protected final TaskRunnerFactory taskRunnerFactory;
065        protected final Object destinationsMutex = new Object();
066        protected final Map<ConsumerId, Object> consumerChangeMutexMap = new HashMap<ConsumerId, Object>();
067        protected boolean started;
068    
069        public AbstractRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
070                              DestinationFactory destinationFactory) {
071            if (broker == null) {
072                throw new IllegalArgumentException("null broker");
073            }
074            this.broker = broker;
075            this.destinationStatistics = destinationStatistics;
076            this.usageManager = memoryManager;
077            this.taskRunnerFactory = taskRunnerFactory;
078            if (broker == null) {
079                throw new IllegalArgumentException("null destinationFactory");
080            }
081            this.destinationFactory = destinationFactory;
082        }
083    
084        public final  void start() throws Exception {
085            started = true;
086    
087            Set<ActiveMQDestination> inactiveDests = getInactiveDestinations();
088            for (Iterator<ActiveMQDestination> iter = inactiveDests.iterator(); iter.hasNext();) {
089                ActiveMQDestination dest = iter.next();
090    
091                ConnectionContext context = new ConnectionContext();
092                context.setBroker(broker.getBrokerService().getBroker());
093                context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
094                context.getBroker().addDestination(context, dest);
095            }
096            synchronized (destinationsMutex) {
097                for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) {
098                    Destination dest = i.next();
099                    dest.start();
100                }
101            }
102        }
103    
104        public void stop() throws Exception {
105            started = false;
106            synchronized (destinationsMutex) {
107                for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) {
108                    Destination dest = i.next();
109                    dest.stop();
110                }
111            }
112            destinations.clear();
113        }
114    
115        public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
116            LOG.debug(broker.getBrokerName() + " adding destination: " + destination);
117            synchronized (destinationsMutex) {
118                Destination dest = destinations.get(destination);
119                if (dest == null) {
120                    dest = createDestination(context, destination);
121                    // intercept if there is a valid interceptor defined
122                    DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
123                    if (destinationInterceptor != null) {
124                        dest = destinationInterceptor.intercept(dest);
125                    }
126                    dest.start();
127                    destinations.put(destination, dest);
128                    destinationMap.put(destination, dest);
129                    addSubscriptionsForDestination(context, dest);
130                }
131                return dest;
132            }
133        }
134    
135        public Map<ConsumerId, Subscription> getSubscriptions() {
136            return subscriptions;
137        }
138        
139        protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest) throws Exception {
140    
141            List<Subscription> rc = new ArrayList<Subscription>();
142            // Add all consumers that are interested in the destination.
143            for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
144                Subscription sub = iter.next();
145                if (sub.matches(dest.getActiveMQDestination())) {
146                    dest.addSubscription(context, sub);
147                    rc.add(sub);
148                }
149            }
150            return rc;
151    
152        }
153    
154        public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
155    
156            // No timeout.. then try to shut down right way, fails if there are
157            // current subscribers.
158            if (timeout == 0) {
159                for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
160                    Subscription sub = iter.next();
161                    if (sub.matches(destination)) {
162                        throw new JMSException("Destination still has an active subscription: " + destination);
163                    }
164                }
165            }
166    
167            if (timeout > 0) {
168                // TODO: implement a way to notify the subscribers that we want to
169                // take the down
170                // the destination and that they should un-subscribe.. Then wait up
171                // to timeout time before
172                // dropping the subscription.
173            }
174    
175            LOG.debug("Removing destination: " + destination);
176            
177            synchronized (destinationsMutex) {
178                Destination dest = destinations.remove(destination);
179                if (dest != null) {
180                    // timeout<0 or we timed out, we now force any remaining
181                    // subscriptions to un-subscribe.
182                    for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
183                        Subscription sub = iter.next();
184                        if (sub.matches(destination)) {
185                            dest.removeSubscription(context, sub, 0l);
186                        }
187                    }
188                    destinationMap.removeAll(destination);
189                    dispose(context,dest);
190                    DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
191                    if (destinationInterceptor != null) {
192                        destinationInterceptor.remove(dest);
193                    }
194    
195                } else {   
196                    LOG.debug("Destination doesn't exist: " + dest);
197                }
198            }
199        }
200    
201        /**
202         * Provide an exact or wildcard lookup of destinations in the region
203         * 
204         * @return a set of matching destination objects.
205         */
206        public Set<Destination> getDestinations(ActiveMQDestination destination) {
207            synchronized (destinationsMutex) {
208                return destinationMap.get(destination);
209            }
210        }
211    
212        public Map<ActiveMQDestination, Destination> getDestinationMap() {
213            synchronized (destinationsMutex) {
214                return new HashMap<ActiveMQDestination, Destination>(destinations);
215            }
216        }
217    
218        public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
219            LOG.debug(broker.getBrokerName() + " adding consumer: " + info.getConsumerId() + " for destination: " + info.getDestination());
220            ActiveMQDestination destination = info.getDestination();
221            if (destination != null && !destination.isPattern() && !destination.isComposite()) {
222                // lets auto-create the destination
223                lookup(context, destination);
224            }
225    
226            Object addGuard;
227            synchronized (consumerChangeMutexMap) {
228                addGuard = consumerChangeMutexMap.get(info.getConsumerId());
229                if (addGuard == null) {
230                    addGuard = new Object();
231                    consumerChangeMutexMap.put(info.getConsumerId(), addGuard);
232                }
233            }
234            synchronized (addGuard) {
235                Subscription o = subscriptions.get(info.getConsumerId());
236                if (o != null) {
237                    LOG.warn("A duplicate subscription was detected. Clients may be misbehaving. Later warnings you may see about subscription removal are a consequence of this.");
238                    return o;
239                }
240    
241                // We may need to add some destinations that are in persistent store
242                // but not active
243                // in the broker.
244                //
245                // TODO: think about this a little more. This is good cause
246                // destinations are not loaded into
247                // memory until a client needs to use the queue, but a management
248                // agent viewing the
249                // broker will not see a destination that exists in persistent
250                // store. We may want to
251                // eagerly load all destinations into the broker but have an
252                // inactive state for the
253                // destination which has reduced memory usage.
254                //
255                DestinationFilter.parseFilter(info.getDestination());
256    
257                Subscription sub = createSubscription(context, info);
258    
259                subscriptions.put(info.getConsumerId(), sub);
260    
261                // At this point we're done directly manipulating subscriptions,
262                // but we need to retain the synchronized block here. Consider
263                // otherwise what would happen if at this point a second
264                // thread added, then removed, as would be allowed with
265                // no mutex held. Remove is only essentially run once
266                // so everything after this point would be leaked.
267    
268                // Add the subscription to all the matching queues.
269                // But copy the matches first - to prevent deadlocks
270                List<Destination>addList = new ArrayList<Destination>();
271                synchronized(destinationsMutex) {
272                    for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
273                        Destination dest = (Destination)iter.next();
274                        addList.add(dest);
275                    }
276                }
277                
278                for (Destination dest:addList) {
279                    dest.addSubscription(context, sub);
280                }
281    
282                if (info.isBrowser()) {
283                    ((QueueBrowserSubscription)sub).destinationsAdded();
284                }
285    
286                return sub;
287            }
288        }
289    
290        /**
291         * Get all the Destinations that are in storage
292         * 
293         * @return Set of all stored destinations
294         */
295        public Set getDurableDestinations() {
296            return destinationFactory.getDestinations();
297        }
298    
299        /**
300         * @return all Destinations that don't have active consumers
301         */
302        protected Set<ActiveMQDestination> getInactiveDestinations() {
303            Set<ActiveMQDestination> inactiveDests = destinationFactory.getDestinations();
304            synchronized (destinationsMutex) {
305                inactiveDests.removeAll(destinations.keySet());
306            }
307            return inactiveDests;
308        }
309    
310        public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
311            LOG.debug(broker.getBrokerName() + " removing consumer: " + info.getConsumerId() + " for destination: " + info.getDestination());
312    
313            Subscription sub = subscriptions.remove(info.getConsumerId());
314            //The sub could be removed elsewhere - see ConnectionSplitBroker
315            if (sub != null) {
316    
317                // remove the subscription from all the matching queues.
318                List<Destination> removeList = new ArrayList<Destination>();
319                synchronized (destinationsMutex) {
320                    for (Iterator iter = destinationMap.get(info.getDestination())
321                            .iterator(); iter.hasNext();) {
322                        Destination dest = (Destination) iter.next();
323                        removeList.add(dest);
324                        
325                    }
326                }
327                for(Destination dest:removeList) {
328                  dest.removeSubscription(context, sub, info.getLastDeliveredSequenceId());
329                }
330    
331                destroySubscription(sub);
332            }
333            synchronized (consumerChangeMutexMap) {
334                consumerChangeMutexMap.remove(info.getConsumerId());
335            }
336        }
337    
338        protected void destroySubscription(Subscription sub) {
339            sub.destroy();
340        }
341    
342        public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
343            throw new JMSException("Invalid operation.");
344        }
345    
346        public void send(final ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
347            final ConnectionContext context = producerExchange.getConnectionContext();
348    
349            if (producerExchange.isMutable() || producerExchange.getRegionDestination() == null) {
350                final Destination regionDestination = lookup(context, messageSend.getDestination());
351                producerExchange.setRegionDestination(regionDestination);
352            }
353    
354            producerExchange.getRegionDestination().send(producerExchange, messageSend);
355        }
356    
357        public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
358            Subscription sub = consumerExchange.getSubscription();
359            if (sub == null) {
360                sub = subscriptions.get(ack.getConsumerId());        
361                if (sub == null) {
362                    if (!consumerExchange.getConnectionContext().isInRecoveryMode()) {
363                        LOG.warn("Ack for non existent subscription, ack:" + ack); 
364                        throw new IllegalArgumentException(
365                            "The subscription does not exist: "
366                            + ack.getConsumerId());
367                    } else {
368                        return;
369                    }
370                }
371                consumerExchange.setSubscription(sub);
372            }
373            sub.acknowledge(consumerExchange.getConnectionContext(), ack);
374        }
375    
376        public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
377            Subscription sub = subscriptions.get(pull.getConsumerId());
378            if (sub == null) {
379                throw new IllegalArgumentException("The subscription does not exist: " + pull.getConsumerId());
380            }
381            return sub.pullMessage(context, pull);
382        }
383    
384        protected Destination lookup(ConnectionContext context, ActiveMQDestination destination) throws Exception {
385            Destination dest = null;
386            synchronized (destinationsMutex) {
387                dest = destinations.get(destination);
388            }
389            if (dest == null) {
390                if (autoCreateDestinations) {
391                    // Try to auto create the destination... re-invoke broker
392                    // from the
393                    // top so that the proper security checks are performed.
394                    try {
395                        context.getBroker().addDestination(context, destination);
396                        dest = addDestination(context, destination);
397                    } catch (DestinationAlreadyExistsException e) {
398                        // if the destination already exists then lets ignore
399                        // this error
400                    }
401                    // We should now have the dest created.
402                    synchronized (destinationsMutex) {
403                        dest = destinations.get(destination);
404                    }
405                }
406                if (dest == null) {
407                    throw new JMSException("The destination " + destination + " does not exist.");
408                }
409            }
410            return dest;
411        }
412    
413        public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
414            Subscription sub = subscriptions.get(messageDispatchNotification.getConsumerId());
415            if (sub != null) {
416                sub.processMessageDispatchNotification(messageDispatchNotification);
417            } else {
418                throw new JMSException("Slave broker out of sync with master - Subscription: "
419                        + messageDispatchNotification.getConsumerId()
420                        + " on " + messageDispatchNotification.getDestination()
421                        + " does not exist for dispatch of message: "
422                        + messageDispatchNotification.getMessageId());
423            }
424        }
425        
426        /*
427         * For a Queue/TempQueue, dispatch order is imperative to match acks, so the dispatch is deferred till 
428         * the notification to ensure that the subscription chosen by the master is used. AMQ-2102
429         */ 
430        protected void processDispatchNotificationViaDestination(MessageDispatchNotification messageDispatchNotification) throws Exception {
431            Destination dest = null;
432            synchronized (destinationsMutex) {
433                dest = destinations.get(messageDispatchNotification.getDestination());
434            }
435            if (dest != null) {
436                dest.processDispatchNotification(messageDispatchNotification);
437            } else {
438                throw new JMSException(
439                        "Slave broker out of sync with master - Destination: " 
440                                + messageDispatchNotification.getDestination()
441                                + " does not exist for consumer "
442                                + messageDispatchNotification.getConsumerId()
443                                + " with message: "
444                                + messageDispatchNotification.getMessageId());
445            }
446        }
447    
448        public void gc() {
449            for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
450                Subscription sub = iter.next();
451                sub.gc();
452            }
453            synchronized (destinationsMutex) {
454                for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
455                    Destination dest = iter.next();
456                    dest.gc();
457                }
458            }
459        }
460    
461        protected abstract Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws Exception;
462    
463        protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
464            return destinationFactory.createDestination(context, destination, destinationStatistics);
465        }
466    
467        public boolean isAutoCreateDestinations() {
468            return autoCreateDestinations;
469        }
470    
471        public void setAutoCreateDestinations(boolean autoCreateDestinations) {
472            this.autoCreateDestinations = autoCreateDestinations;
473        }
474        
475        public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception{
476            synchronized (destinationsMutex) {
477                for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
478                    Destination dest = (Destination) iter.next();
479                    dest.addProducer(context, info);
480                }
481            }
482        }
483    
484        /**
485         * Removes a Producer.
486         * @param context the environment the operation is being executed under.
487         * @throws Exception TODO
488         */
489        public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception{
490            synchronized (destinationsMutex) {
491                for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
492                    Destination dest = (Destination)iter.next();
493                    dest.removeProducer(context, info);
494                }
495            }
496        }
497        
498        protected void dispose(ConnectionContext context,Destination dest) throws Exception {
499            dest.dispose(context);
500            dest.stop();
501            destinationFactory.removeDestination(dest);
502        }
503    }