001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.util.HashSet;
020    import java.util.Iterator;
021    import java.util.List;
022    import java.util.Set;
023    import java.util.concurrent.ConcurrentHashMap;
024    
025    import javax.jms.InvalidDestinationException;
026    import javax.jms.JMSException;
027    
028    import org.apache.activemq.advisory.AdvisorySupport;
029    import org.apache.activemq.broker.ConnectionContext;
030    import org.apache.activemq.broker.region.policy.PolicyEntry;
031    import org.apache.activemq.command.ActiveMQDestination;
032    import org.apache.activemq.command.ConnectionId;
033    import org.apache.activemq.command.ConsumerId;
034    import org.apache.activemq.command.ConsumerInfo;
035    import org.apache.activemq.command.RemoveSubscriptionInfo;
036    import org.apache.activemq.command.SessionId;
037    import org.apache.activemq.command.SubscriptionInfo;
038    import org.apache.activemq.store.TopicMessageStore;
039    import org.apache.activemq.thread.TaskRunnerFactory;
040    import org.apache.activemq.usage.SystemUsage;
041    import org.apache.activemq.util.LongSequenceGenerator;
042    import org.apache.activemq.util.SubscriptionKey;
043    import org.apache.commons.logging.Log;
044    import org.apache.commons.logging.LogFactory;
045    
046    /**
047     * @version $Revision: 1.12 $
048     */
049    public class TopicRegion extends AbstractRegion {
050        private static final Log LOG = LogFactory.getLog(TopicRegion.class);
051        protected final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
052        private final LongSequenceGenerator recoveredDurableSubIdGenerator = new LongSequenceGenerator();
053        private final SessionId recoveredDurableSubSessionId = new SessionId(new ConnectionId("OFFLINE"), recoveredDurableSubIdGenerator.getNextSequenceId());
054        private boolean keepDurableSubsActive;
055    
056        public TopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
057                           DestinationFactory destinationFactory) {
058            super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
059    
060        }
061    
062        public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
063            if (info.isDurable()) {
064                ActiveMQDestination destination = info.getDestination();
065                if (!destination.isPattern()) {
066                    // Make sure the destination is created.
067                    lookup(context, destination);
068                }
069                String clientId = context.getClientId();
070                String subscriptionName = info.getSubscriptionName();
071                SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
072                DurableTopicSubscription sub = durableSubscriptions.get(key);
073                if (sub != null) {
074                    if (sub.isActive()) {
075                        throw new JMSException("Durable consumer is in use for client: " + clientId + " and subscriptionName: " + subscriptionName);
076                    }
077                    // Has the selector changed??
078                    if (hasDurableSubChanged(info, sub.getConsumerInfo())) {
079                        // Remove the consumer first then add it.
080                        durableSubscriptions.remove(key);
081                        synchronized (destinationsMutex) {
082                            for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
083                                Destination dest = iter.next();
084                                //Account for virtual destinations
085                                if (dest instanceof Topic){
086                                    Topic topic = (Topic)dest;
087                                    topic.deleteSubscription(context, key);
088                                }
089                            }
090                        }
091                        super.removeConsumer(context, sub.getConsumerInfo());
092                        super.addConsumer(context, info);
093                        sub = durableSubscriptions.get(key);
094                    } else {
095                        // Change the consumer id key of the durable sub.
096                        if (sub.getConsumerInfo().getConsumerId() != null) {
097                            subscriptions.remove(sub.getConsumerInfo().getConsumerId());
098                        }
099                        subscriptions.put(info.getConsumerId(), sub);
100                    }
101                } else {
102                    super.addConsumer(context, info);
103                    sub = durableSubscriptions.get(key);
104                    if (sub == null) {
105                        throw new JMSException("Cannot use the same consumerId: " + info.getConsumerId() + " for two different durable subscriptions clientID: " + key.getClientId()
106                                               + " subscriberName: " + key.getSubscriptionName());
107                    }
108                }
109                sub.activate(usageManager, context, info);
110                return sub;
111            } else {
112                return super.addConsumer(context, info);
113            }
114        }
115    
116        public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
117            if (info.isDurable()) {
118    
119                SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
120                DurableTopicSubscription sub = durableSubscriptions.get(key);
121                if (sub != null) {
122                    sub.deactivate(keepDurableSubsActive);
123                }
124    
125            } else {
126                super.removeConsumer(context, info);
127            }
128        }
129    
130        public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
131            SubscriptionKey key = new SubscriptionKey(info.getClientId(), info.getSubscriptionName());
132            DurableTopicSubscription sub = durableSubscriptions.get(key);
133            if (sub == null) {
134                throw new InvalidDestinationException("No durable subscription exists for: " + info.getSubscriptionName());
135            }
136            if (sub.isActive()) {
137                throw new JMSException("Durable consumer is in use");
138            }
139    
140            durableSubscriptions.remove(key);
141            synchronized (destinationsMutex) {
142                for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
143                    Destination dest = iter.next();
144                    //Account for virtual destinations
145                    if (dest instanceof Topic){
146                        Topic topic = (Topic)dest;
147                        topic.deleteSubscription(context, key);
148                    }
149                }
150            }
151            super.removeConsumer(context, sub.getConsumerInfo());
152        }
153    
154        public String toString() {
155            return "TopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%";
156        }
157    
158        @Override
159        protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest) throws Exception {
160    
161            List<Subscription> rc = super.addSubscriptionsForDestination(context, dest);
162            Set<Subscription> dupChecker = new HashSet<Subscription>(rc);
163    
164            TopicMessageStore store = (TopicMessageStore)dest.getMessageStore();
165            // Eagerly recover the durable subscriptions
166            if (store != null) {
167                SubscriptionInfo[] infos = store.getAllSubscriptions();
168                for (int i = 0; i < infos.length; i++) {
169    
170                    SubscriptionInfo info = infos[i];
171                    LOG.debug("Restoring durable subscription: " + infos);
172                    SubscriptionKey key = new SubscriptionKey(info);
173    
174                    // A single durable sub may be subscribing to multiple topics.
175                    // so it might exist already.
176                    DurableTopicSubscription sub = durableSubscriptions.get(key);
177                    ConsumerInfo consumerInfo = createInactiveConsumerInfo(info);
178                    if (sub == null) {
179                        ConnectionContext c = new ConnectionContext();
180                        c.setBroker(context.getBroker());
181                        c.setClientId(key.getClientId());
182                        c.setConnectionId(consumerInfo.getConsumerId().getParentId().getParentId());
183                        sub = (DurableTopicSubscription)createSubscription(c, consumerInfo);
184                    }
185    
186                    if (dupChecker.contains(sub)) {
187                        continue;
188                    }
189    
190                    dupChecker.add(sub);
191                    rc.add(sub);
192                    dest.addSubscription(context, sub);
193                }
194    
195                // Now perhaps there other durable subscriptions (via wild card)
196                // that would match this destination..
197                durableSubscriptions.values();
198                for (Iterator<DurableTopicSubscription> iterator = durableSubscriptions.values().iterator(); iterator.hasNext();) {
199                    DurableTopicSubscription sub = iterator.next();
200                    // Skip over subscriptions that we allready added..
201                    if (dupChecker.contains(sub)) {
202                        continue;
203                    }
204    
205                    if (sub.matches(dest.getActiveMQDestination())) {
206                        rc.add(sub);
207                        dest.addSubscription(context, sub);
208                    }
209                }
210            }
211    
212            return rc;
213        }
214    
215        private ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) {
216            ConsumerInfo rc = new ConsumerInfo();
217            rc.setSelector(info.getSelector());
218            rc.setSubscriptionName(info.getSubscriptionName());
219            rc.setDestination(info.getSubscribedDestination());
220            rc.setConsumerId(createConsumerId());
221            return rc;
222        }
223    
224        private ConsumerId createConsumerId() {
225            return new ConsumerId(recoveredDurableSubSessionId, recoveredDurableSubIdGenerator.getNextSequenceId());
226        }
227    
228        protected void configureTopic(Topic topic, ActiveMQDestination destination) {
229            if (broker.getDestinationPolicy() != null) {
230                PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
231                if (entry != null) {
232                    entry.configure(topic);
233                }
234            }
235        }
236    
237        protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
238            ActiveMQDestination destination = info.getDestination();
239            
240            if (info.isDurable()) {
241                if (AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
242                    throw new JMSException("Cannot create a durable subscription for an advisory Topic");
243                }
244                SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
245                DurableTopicSubscription sub = durableSubscriptions.get(key);
246                
247                if (sub == null) {
248                    
249                    sub = new DurableTopicSubscription(broker, usageManager, context, info, keepDurableSubsActive);
250                    if (destination != null && broker.getDestinationPolicy() != null) {
251                        PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
252                        if (entry != null) {
253                            entry.configure(broker, usageManager, sub);
254                        }
255                    }
256                    durableSubscriptions.put(key, sub);
257                } else {
258                    throw new JMSException("That durable subscription is already active.");
259                }
260                return sub;
261            }
262            try {
263                TopicSubscription answer = new TopicSubscription(broker, context, info, usageManager);
264                // lets configure the subscription depending on the destination
265                if (destination != null && broker.getDestinationPolicy() != null) {
266                    PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
267                    if (entry != null) {
268                        entry.configure(broker, usageManager, answer);
269                    }
270                }
271                answer.init();
272                return answer;
273            } catch (Exception e) {
274                LOG.error("Failed to create TopicSubscription ", e);
275                JMSException jmsEx = new JMSException("Couldn't create TopicSubscription");
276                jmsEx.setLinkedException(e);
277                throw jmsEx;
278            }
279        }
280    
281        /**
282         */
283        private boolean hasDurableSubChanged(ConsumerInfo info1, ConsumerInfo info2) {
284            if (info1.getSelector() != null ^ info2.getSelector() != null) {
285                return true;
286            }
287            if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector())) {
288                return true;
289            }
290            return !info1.getDestination().equals(info2.getDestination());
291        }
292    
293        protected Set<ActiveMQDestination> getInactiveDestinations() {
294            Set<ActiveMQDestination> inactiveDestinations = super.getInactiveDestinations();
295            for (Iterator<ActiveMQDestination> iter = inactiveDestinations.iterator(); iter.hasNext();) {
296                ActiveMQDestination dest = iter.next();
297                if (!dest.isTopic()) {
298                    iter.remove();
299                }
300            }
301            return inactiveDestinations;
302        }
303    
304        public boolean isKeepDurableSubsActive() {
305            return keepDurableSubsActive;
306        }
307    
308        public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
309            this.keepDurableSubsActive = keepDurableSubsActive;
310        }
311    
312    }