001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region; 018 019 import java.util.ArrayList; 020 import java.util.HashMap; 021 import java.util.Iterator; 022 import java.util.List; 023 import java.util.Map; 024 import java.util.Set; 025 import java.util.concurrent.ConcurrentHashMap; 026 import javax.jms.JMSException; 027 import org.apache.activemq.broker.ConnectionContext; 028 import org.apache.activemq.broker.ConsumerBrokerExchange; 029 import org.apache.activemq.broker.DestinationAlreadyExistsException; 030 import org.apache.activemq.broker.ProducerBrokerExchange; 031 import org.apache.activemq.command.ActiveMQDestination; 032 import org.apache.activemq.command.ConsumerId; 033 import org.apache.activemq.command.ConsumerInfo; 034 import org.apache.activemq.command.Message; 035 import org.apache.activemq.command.MessageAck; 036 import org.apache.activemq.command.MessageDispatchNotification; 037 import org.apache.activemq.command.MessagePull; 038 import org.apache.activemq.command.ProducerInfo; 039 import org.apache.activemq.command.RemoveSubscriptionInfo; 040 import org.apache.activemq.command.Response; 041 import org.apache.activemq.filter.DestinationFilter; 042 import org.apache.activemq.filter.DestinationMap; 043 import org.apache.activemq.security.SecurityContext; 044 import org.apache.activemq.thread.TaskRunnerFactory; 045 import org.apache.activemq.usage.SystemUsage; 046 import org.apache.commons.logging.Log; 047 import org.apache.commons.logging.LogFactory; 048 049 /** 050 * @version $Revision: 1.14 $ 051 */ 052 public abstract class AbstractRegion implements Region { 053 054 private static final Log LOG = LogFactory.getLog(AbstractRegion.class); 055 056 protected final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>(); 057 protected final DestinationMap destinationMap = new DestinationMap(); 058 protected final Map<ConsumerId, Subscription> subscriptions = new ConcurrentHashMap<ConsumerId, Subscription>(); 059 protected final SystemUsage usageManager; 060 protected final DestinationFactory destinationFactory; 061 protected final DestinationStatistics destinationStatistics; 062 protected final RegionBroker broker; 063 protected boolean autoCreateDestinations = true; 064 protected final TaskRunnerFactory taskRunnerFactory; 065 protected final Object destinationsMutex = new Object(); 066 protected final Map<ConsumerId, Object> consumerChangeMutexMap = new HashMap<ConsumerId, Object>(); 067 protected boolean started; 068 069 public AbstractRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, 070 DestinationFactory destinationFactory) { 071 if (broker == null) { 072 throw new IllegalArgumentException("null broker"); 073 } 074 this.broker = broker; 075 this.destinationStatistics = destinationStatistics; 076 this.usageManager = memoryManager; 077 this.taskRunnerFactory = taskRunnerFactory; 078 if (broker == null) { 079 throw new IllegalArgumentException("null destinationFactory"); 080 } 081 this.destinationFactory = destinationFactory; 082 } 083 084 public final void start() throws Exception { 085 started = true; 086 087 Set<ActiveMQDestination> inactiveDests = getInactiveDestinations(); 088 for (Iterator<ActiveMQDestination> iter = inactiveDests.iterator(); iter.hasNext();) { 089 ActiveMQDestination dest = iter.next(); 090 091 ConnectionContext context = new ConnectionContext(); 092 context.setBroker(broker.getBrokerService().getBroker()); 093 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 094 context.getBroker().addDestination(context, dest); 095 } 096 synchronized (destinationsMutex) { 097 for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) { 098 Destination dest = i.next(); 099 dest.start(); 100 } 101 } 102 } 103 104 public void stop() throws Exception { 105 started = false; 106 synchronized (destinationsMutex) { 107 for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) { 108 Destination dest = i.next(); 109 dest.stop(); 110 } 111 } 112 destinations.clear(); 113 } 114 115 public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { 116 LOG.debug(broker.getBrokerName() + " adding destination: " + destination); 117 synchronized (destinationsMutex) { 118 Destination dest = destinations.get(destination); 119 if (dest == null) { 120 dest = createDestination(context, destination); 121 // intercept if there is a valid interceptor defined 122 DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor(); 123 if (destinationInterceptor != null) { 124 dest = destinationInterceptor.intercept(dest); 125 } 126 dest.start(); 127 destinations.put(destination, dest); 128 destinationMap.put(destination, dest); 129 addSubscriptionsForDestination(context, dest); 130 } 131 return dest; 132 } 133 } 134 135 public Map<ConsumerId, Subscription> getSubscriptions() { 136 return subscriptions; 137 } 138 139 protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest) throws Exception { 140 141 List<Subscription> rc = new ArrayList<Subscription>(); 142 // Add all consumers that are interested in the destination. 143 for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) { 144 Subscription sub = iter.next(); 145 if (sub.matches(dest.getActiveMQDestination())) { 146 dest.addSubscription(context, sub); 147 rc.add(sub); 148 } 149 } 150 return rc; 151 152 } 153 154 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { 155 156 // No timeout.. then try to shut down right way, fails if there are 157 // current subscribers. 158 if (timeout == 0) { 159 for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) { 160 Subscription sub = iter.next(); 161 if (sub.matches(destination)) { 162 throw new JMSException("Destination still has an active subscription: " + destination); 163 } 164 } 165 } 166 167 if (timeout > 0) { 168 // TODO: implement a way to notify the subscribers that we want to 169 // take the down 170 // the destination and that they should un-subscribe.. Then wait up 171 // to timeout time before 172 // dropping the subscription. 173 } 174 175 LOG.debug("Removing destination: " + destination); 176 177 synchronized (destinationsMutex) { 178 Destination dest = destinations.remove(destination); 179 if (dest != null) { 180 // timeout<0 or we timed out, we now force any remaining 181 // subscriptions to un-subscribe. 182 for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) { 183 Subscription sub = iter.next(); 184 if (sub.matches(destination)) { 185 dest.removeSubscription(context, sub, 0l); 186 } 187 } 188 destinationMap.removeAll(destination); 189 dispose(context,dest); 190 DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor(); 191 if (destinationInterceptor != null) { 192 destinationInterceptor.remove(dest); 193 } 194 195 } else { 196 LOG.debug("Destination doesn't exist: " + dest); 197 } 198 } 199 } 200 201 /** 202 * Provide an exact or wildcard lookup of destinations in the region 203 * 204 * @return a set of matching destination objects. 205 */ 206 public Set<Destination> getDestinations(ActiveMQDestination destination) { 207 synchronized (destinationsMutex) { 208 return destinationMap.get(destination); 209 } 210 } 211 212 public Map<ActiveMQDestination, Destination> getDestinationMap() { 213 synchronized (destinationsMutex) { 214 return new HashMap<ActiveMQDestination, Destination>(destinations); 215 } 216 } 217 218 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 219 LOG.debug(broker.getBrokerName() + " adding consumer: " + info.getConsumerId() + " for destination: " + info.getDestination()); 220 ActiveMQDestination destination = info.getDestination(); 221 if (destination != null && !destination.isPattern() && !destination.isComposite()) { 222 // lets auto-create the destination 223 lookup(context, destination); 224 } 225 226 Object addGuard; 227 synchronized (consumerChangeMutexMap) { 228 addGuard = consumerChangeMutexMap.get(info.getConsumerId()); 229 if (addGuard == null) { 230 addGuard = new Object(); 231 consumerChangeMutexMap.put(info.getConsumerId(), addGuard); 232 } 233 } 234 synchronized (addGuard) { 235 Subscription o = subscriptions.get(info.getConsumerId()); 236 if (o != null) { 237 LOG.warn("A duplicate subscription was detected. Clients may be misbehaving. Later warnings you may see about subscription removal are a consequence of this."); 238 return o; 239 } 240 241 // We may need to add some destinations that are in persistent store 242 // but not active 243 // in the broker. 244 // 245 // TODO: think about this a little more. This is good cause 246 // destinations are not loaded into 247 // memory until a client needs to use the queue, but a management 248 // agent viewing the 249 // broker will not see a destination that exists in persistent 250 // store. We may want to 251 // eagerly load all destinations into the broker but have an 252 // inactive state for the 253 // destination which has reduced memory usage. 254 // 255 DestinationFilter.parseFilter(info.getDestination()); 256 257 Subscription sub = createSubscription(context, info); 258 259 subscriptions.put(info.getConsumerId(), sub); 260 261 // At this point we're done directly manipulating subscriptions, 262 // but we need to retain the synchronized block here. Consider 263 // otherwise what would happen if at this point a second 264 // thread added, then removed, as would be allowed with 265 // no mutex held. Remove is only essentially run once 266 // so everything after this point would be leaked. 267 268 // Add the subscription to all the matching queues. 269 // But copy the matches first - to prevent deadlocks 270 List<Destination>addList = new ArrayList<Destination>(); 271 synchronized(destinationsMutex) { 272 for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) { 273 Destination dest = (Destination)iter.next(); 274 addList.add(dest); 275 } 276 } 277 278 for (Destination dest:addList) { 279 dest.addSubscription(context, sub); 280 } 281 282 if (info.isBrowser()) { 283 ((QueueBrowserSubscription)sub).destinationsAdded(); 284 } 285 286 return sub; 287 } 288 } 289 290 /** 291 * Get all the Destinations that are in storage 292 * 293 * @return Set of all stored destinations 294 */ 295 public Set getDurableDestinations() { 296 return destinationFactory.getDestinations(); 297 } 298 299 /** 300 * @return all Destinations that don't have active consumers 301 */ 302 protected Set<ActiveMQDestination> getInactiveDestinations() { 303 Set<ActiveMQDestination> inactiveDests = destinationFactory.getDestinations(); 304 synchronized (destinationsMutex) { 305 inactiveDests.removeAll(destinations.keySet()); 306 } 307 return inactiveDests; 308 } 309 310 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 311 LOG.debug(broker.getBrokerName() + " removing consumer: " + info.getConsumerId() + " for destination: " + info.getDestination()); 312 313 Subscription sub = subscriptions.remove(info.getConsumerId()); 314 //The sub could be removed elsewhere - see ConnectionSplitBroker 315 if (sub != null) { 316 317 // remove the subscription from all the matching queues. 318 List<Destination> removeList = new ArrayList<Destination>(); 319 synchronized (destinationsMutex) { 320 for (Iterator iter = destinationMap.get(info.getDestination()) 321 .iterator(); iter.hasNext();) { 322 Destination dest = (Destination) iter.next(); 323 removeList.add(dest); 324 325 } 326 } 327 for(Destination dest:removeList) { 328 dest.removeSubscription(context, sub, info.getLastDeliveredSequenceId()); 329 } 330 331 destroySubscription(sub); 332 } 333 synchronized (consumerChangeMutexMap) { 334 consumerChangeMutexMap.remove(info.getConsumerId()); 335 } 336 } 337 338 protected void destroySubscription(Subscription sub) { 339 sub.destroy(); 340 } 341 342 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { 343 throw new JMSException("Invalid operation."); 344 } 345 346 public void send(final ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { 347 final ConnectionContext context = producerExchange.getConnectionContext(); 348 349 if (producerExchange.isMutable() || producerExchange.getRegionDestination() == null) { 350 final Destination regionDestination = lookup(context, messageSend.getDestination()); 351 producerExchange.setRegionDestination(regionDestination); 352 } 353 354 producerExchange.getRegionDestination().send(producerExchange, messageSend); 355 } 356 357 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { 358 Subscription sub = consumerExchange.getSubscription(); 359 if (sub == null) { 360 sub = subscriptions.get(ack.getConsumerId()); 361 if (sub == null) { 362 if (!consumerExchange.getConnectionContext().isInRecoveryMode()) { 363 LOG.warn("Ack for non existent subscription, ack:" + ack); 364 throw new IllegalArgumentException( 365 "The subscription does not exist: " 366 + ack.getConsumerId()); 367 } else { 368 return; 369 } 370 } 371 consumerExchange.setSubscription(sub); 372 } 373 sub.acknowledge(consumerExchange.getConnectionContext(), ack); 374 } 375 376 public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { 377 Subscription sub = subscriptions.get(pull.getConsumerId()); 378 if (sub == null) { 379 throw new IllegalArgumentException("The subscription does not exist: " + pull.getConsumerId()); 380 } 381 return sub.pullMessage(context, pull); 382 } 383 384 protected Destination lookup(ConnectionContext context, ActiveMQDestination destination) throws Exception { 385 Destination dest = null; 386 synchronized (destinationsMutex) { 387 dest = destinations.get(destination); 388 } 389 if (dest == null) { 390 if (autoCreateDestinations) { 391 // Try to auto create the destination... re-invoke broker 392 // from the 393 // top so that the proper security checks are performed. 394 try { 395 context.getBroker().addDestination(context, destination); 396 dest = addDestination(context, destination); 397 } catch (DestinationAlreadyExistsException e) { 398 // if the destination already exists then lets ignore 399 // this error 400 } 401 // We should now have the dest created. 402 synchronized (destinationsMutex) { 403 dest = destinations.get(destination); 404 } 405 } 406 if (dest == null) { 407 throw new JMSException("The destination " + destination + " does not exist."); 408 } 409 } 410 return dest; 411 } 412 413 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 414 Subscription sub = subscriptions.get(messageDispatchNotification.getConsumerId()); 415 if (sub != null) { 416 sub.processMessageDispatchNotification(messageDispatchNotification); 417 } else { 418 throw new JMSException("Slave broker out of sync with master - Subscription: " 419 + messageDispatchNotification.getConsumerId() 420 + " on " + messageDispatchNotification.getDestination() 421 + " does not exist for dispatch of message: " 422 + messageDispatchNotification.getMessageId()); 423 } 424 } 425 426 /* 427 * For a Queue/TempQueue, dispatch order is imperative to match acks, so the dispatch is deferred till 428 * the notification to ensure that the subscription chosen by the master is used. AMQ-2102 429 */ 430 protected void processDispatchNotificationViaDestination(MessageDispatchNotification messageDispatchNotification) throws Exception { 431 Destination dest = null; 432 synchronized (destinationsMutex) { 433 dest = destinations.get(messageDispatchNotification.getDestination()); 434 } 435 if (dest != null) { 436 dest.processDispatchNotification(messageDispatchNotification); 437 } else { 438 throw new JMSException( 439 "Slave broker out of sync with master - Destination: " 440 + messageDispatchNotification.getDestination() 441 + " does not exist for consumer " 442 + messageDispatchNotification.getConsumerId() 443 + " with message: " 444 + messageDispatchNotification.getMessageId()); 445 } 446 } 447 448 public void gc() { 449 for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) { 450 Subscription sub = iter.next(); 451 sub.gc(); 452 } 453 synchronized (destinationsMutex) { 454 for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) { 455 Destination dest = iter.next(); 456 dest.gc(); 457 } 458 } 459 } 460 461 protected abstract Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws Exception; 462 463 protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { 464 return destinationFactory.createDestination(context, destination, destinationStatistics); 465 } 466 467 public boolean isAutoCreateDestinations() { 468 return autoCreateDestinations; 469 } 470 471 public void setAutoCreateDestinations(boolean autoCreateDestinations) { 472 this.autoCreateDestinations = autoCreateDestinations; 473 } 474 475 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception{ 476 synchronized (destinationsMutex) { 477 for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) { 478 Destination dest = (Destination) iter.next(); 479 dest.addProducer(context, info); 480 } 481 } 482 } 483 484 /** 485 * Removes a Producer. 486 * @param context the environment the operation is being executed under. 487 * @throws Exception TODO 488 */ 489 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception{ 490 synchronized (destinationsMutex) { 491 for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) { 492 Destination dest = (Destination)iter.next(); 493 dest.removeProducer(context, info); 494 } 495 } 496 } 497 498 protected void dispose(ConnectionContext context,Destination dest) throws Exception { 499 dest.dispose(context); 500 dest.stop(); 501 destinationFactory.removeDestination(dest); 502 } 503 }