001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.jmx; 018 019 import org.apache.activemq.broker.Broker; 020 import org.apache.activemq.broker.BrokerService; 021 import org.apache.activemq.broker.ConnectionContext; 022 import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory; 023 import org.apache.activemq.broker.region.Destination; 024 import org.apache.activemq.broker.region.DestinationFactory; 025 import org.apache.activemq.broker.region.DestinationFactoryImpl; 026 import org.apache.activemq.broker.region.DestinationInterceptor; 027 import org.apache.activemq.broker.region.Queue; 028 import org.apache.activemq.broker.region.Region; 029 import org.apache.activemq.broker.region.RegionBroker; 030 import org.apache.activemq.broker.region.Subscription; 031 import org.apache.activemq.broker.region.Topic; 032 import org.apache.activemq.broker.region.TopicSubscription; 033 import org.apache.activemq.command.ActiveMQDestination; 034 import org.apache.activemq.command.ActiveMQMessage; 035 import org.apache.activemq.command.ActiveMQTopic; 036 import org.apache.activemq.command.ConsumerInfo; 037 import org.apache.activemq.command.Message; 038 import org.apache.activemq.command.MessageId; 039 import org.apache.activemq.command.SubscriptionInfo; 040 import org.apache.activemq.store.MessageRecoveryListener; 041 import org.apache.activemq.store.PersistenceAdapter; 042 import org.apache.activemq.store.TopicMessageStore; 043 import org.apache.activemq.thread.TaskRunnerFactory; 044 import org.apache.activemq.usage.SystemUsage; 045 import org.apache.activemq.util.JMXSupport; 046 import org.apache.activemq.util.ServiceStopper; 047 import org.apache.activemq.util.SubscriptionKey; 048 import org.apache.commons.logging.Log; 049 import org.apache.commons.logging.LogFactory; 050 import java.io.IOException; 051 import java.util.ArrayList; 052 import java.util.HashMap; 053 import java.util.Hashtable; 054 import java.util.Iterator; 055 import java.util.List; 056 import java.util.Map; 057 import java.util.Set; 058 import java.util.Map.Entry; 059 import java.util.concurrent.ConcurrentHashMap; 060 import java.util.concurrent.CopyOnWriteArraySet; 061 import javax.management.InstanceNotFoundException; 062 import javax.management.MalformedObjectNameException; 063 import javax.management.ObjectName; 064 import javax.management.openmbean.CompositeData; 065 import javax.management.openmbean.CompositeDataSupport; 066 import javax.management.openmbean.CompositeType; 067 import javax.management.openmbean.OpenDataException; 068 import javax.management.openmbean.TabularData; 069 import javax.management.openmbean.TabularDataSupport; 070 import javax.management.openmbean.TabularType; 071 072 public class ManagedRegionBroker extends RegionBroker { 073 private static final Log LOG = LogFactory.getLog(ManagedRegionBroker.class); 074 private final ManagementContext managementContext; 075 private final ObjectName brokerObjectName; 076 private final Map<ObjectName, DestinationView> topics = new ConcurrentHashMap<ObjectName, DestinationView>(); 077 private final Map<ObjectName, DestinationView> queues = new ConcurrentHashMap<ObjectName, DestinationView>(); 078 private final Map<ObjectName, DestinationView> temporaryQueues = new ConcurrentHashMap<ObjectName, DestinationView>(); 079 private final Map<ObjectName, DestinationView> temporaryTopics = new ConcurrentHashMap<ObjectName, DestinationView>(); 080 private final Map<ObjectName, SubscriptionView> queueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); 081 private final Map<ObjectName, SubscriptionView> topicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); 082 private final Map<ObjectName, SubscriptionView> durableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); 083 private final Map<ObjectName, SubscriptionView> inactiveDurableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); 084 private final Map<ObjectName, SubscriptionView> temporaryQueueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); 085 private final Map<ObjectName, SubscriptionView> temporaryTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); 086 private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<SubscriptionKey, ObjectName>(); 087 private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<Subscription, ObjectName>(); 088 private final Set<ObjectName> registeredMBeans = new CopyOnWriteArraySet<ObjectName>(); 089 /* This is the first broker in the broker interceptor chain. */ 090 private Broker contextBroker; 091 092 public ManagedRegionBroker(BrokerService brokerService, ManagementContext context, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, 093 DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor) throws IOException { 094 super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor); 095 this.managementContext = context; 096 this.brokerObjectName = brokerObjectName; 097 } 098 099 public void start() throws Exception { 100 super.start(); 101 // build all existing durable subscriptions 102 buildExistingSubscriptions(); 103 } 104 105 protected void doStop(ServiceStopper stopper) { 106 super.doStop(stopper); 107 // lets remove any mbeans not yet removed 108 for (Iterator<ObjectName> iter = registeredMBeans.iterator(); iter.hasNext();) { 109 ObjectName name = iter.next(); 110 try { 111 managementContext.unregisterMBean(name); 112 } catch (InstanceNotFoundException e) { 113 LOG.warn("The MBean: " + name + " is no longer registered with JMX"); 114 } catch (Exception e) { 115 stopper.onException(this, e); 116 } 117 } 118 registeredMBeans.clear(); 119 } 120 121 protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 122 return new ManagedQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 123 } 124 125 protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 126 return new ManagedTempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 127 } 128 129 protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 130 return new ManagedTempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 131 } 132 133 protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 134 return new ManagedTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 135 } 136 137 public void register(ActiveMQDestination destName, Destination destination) { 138 // TODO refactor to allow views for custom destinations 139 try { 140 ObjectName objectName = createObjectName(destName); 141 DestinationView view; 142 if (destination instanceof Queue) { 143 view = new QueueView(this, (Queue)destination); 144 } else if (destination instanceof Topic) { 145 view = new TopicView(this, (Topic)destination); 146 } else { 147 view = null; 148 LOG.warn("JMX View is not supported for custom destination: " + destination); 149 } 150 if (view != null) { 151 registerDestination(objectName, destName, view); 152 } 153 } catch (Exception e) { 154 LOG.error("Failed to register destination " + destName, e); 155 } 156 } 157 158 public void unregister(ActiveMQDestination destName) { 159 try { 160 ObjectName objectName = createObjectName(destName); 161 unregisterDestination(objectName); 162 } catch (Exception e) { 163 LOG.error("Failed to unregister " + destName, e); 164 } 165 } 166 167 public ObjectName registerSubscription(ConnectionContext context, Subscription sub) { 168 String connectionClientId = context.getClientId(); 169 ObjectName brokerJmxObjectName = brokerObjectName; 170 String objectNameStr = getSubscriptionObjectName(sub, connectionClientId, brokerJmxObjectName); 171 172 SubscriptionKey key = new SubscriptionKey(context.getClientId(), sub.getConsumerInfo().getSubscriptionName()); 173 try { 174 ObjectName objectName = new ObjectName(objectNameStr); 175 SubscriptionView view; 176 if (sub.getConsumerInfo().isDurable()) { 177 view = new DurableSubscriptionView(this, context.getClientId(), sub); 178 } else { 179 if (sub instanceof TopicSubscription) { 180 view = new TopicSubscriptionView(context.getClientId(), (TopicSubscription)sub); 181 } else { 182 view = new SubscriptionView(context.getClientId(), sub); 183 } 184 } 185 registerSubscription(objectName, sub.getConsumerInfo(), key, view); 186 subscriptionMap.put(sub, objectName); 187 return objectName; 188 } catch (Exception e) { 189 LOG.error("Failed to register subscription " + sub, e); 190 return null; 191 } 192 } 193 194 public static String getSubscriptionObjectName(Subscription sub, String connectionClientId, ObjectName brokerJmxObjectName) { 195 Hashtable map = brokerJmxObjectName.getKeyPropertyList(); 196 String brokerDomain = brokerJmxObjectName.getDomain(); 197 String objectNameStr = brokerDomain + ":" + "BrokerName=" + map.get("BrokerName") + ",Type=Subscription,"; 198 String destinationType = "destinationType=" + sub.getConsumerInfo().getDestination().getDestinationTypeAsString(); 199 String destinationName = "destinationName=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getDestination().getPhysicalName()); 200 String clientId = "clientId=" + JMXSupport.encodeObjectNamePart(connectionClientId); 201 String persistentMode = "persistentMode="; 202 String consumerId = ""; 203 if (sub.getConsumerInfo().isDurable()) { 204 persistentMode += "Durable,subscriptionID=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getSubscriptionName()); 205 } else { 206 persistentMode += "Non-Durable"; 207 if (sub.getConsumerInfo() != null && sub.getConsumerInfo().getConsumerId() != null) { 208 consumerId = ",consumerId=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getConsumerId().toString()); 209 } 210 } 211 objectNameStr += persistentMode + ","; 212 objectNameStr += destinationType + ","; 213 objectNameStr += destinationName + ","; 214 objectNameStr += clientId; 215 objectNameStr += consumerId; 216 return objectNameStr; 217 } 218 219 public void unregisterSubscription(Subscription sub) { 220 ObjectName name = subscriptionMap.remove(sub); 221 if (name != null) { 222 try { 223 unregisterSubscription(name); 224 } catch (Exception e) { 225 LOG.error("Failed to unregister subscription " + sub, e); 226 } 227 } 228 } 229 230 protected void registerDestination(ObjectName key, ActiveMQDestination dest, DestinationView view) throws Exception { 231 if (dest.isQueue()) { 232 if (dest.isTemporary()) { 233 temporaryQueues.put(key, view); 234 } else { 235 queues.put(key, view); 236 } 237 } else { 238 if (dest.isTemporary()) { 239 temporaryTopics.put(key, view); 240 } else { 241 topics.put(key, view); 242 } 243 } 244 try { 245 AnnotatedMBean.registerMBean(managementContext, view, key); 246 registeredMBeans.add(key); 247 } catch (Throwable e) { 248 LOG.warn("Failed to register MBean: " + key); 249 LOG.debug("Failure reason: " + e, e); 250 } 251 } 252 253 protected void unregisterDestination(ObjectName key) throws Exception { 254 topics.remove(key); 255 queues.remove(key); 256 temporaryQueues.remove(key); 257 temporaryTopics.remove(key); 258 if (registeredMBeans.remove(key)) { 259 try { 260 managementContext.unregisterMBean(key); 261 } catch (Throwable e) { 262 LOG.warn("Failed to unregister MBean: " + key); 263 LOG.debug("Failure reason: " + e, e); 264 } 265 } 266 } 267 268 protected void registerSubscription(ObjectName key, ConsumerInfo info, SubscriptionKey subscriptionKey, SubscriptionView view) throws Exception { 269 ActiveMQDestination dest = info.getDestination(); 270 if (dest.isQueue()) { 271 if (dest.isTemporary()) { 272 temporaryQueueSubscribers.put(key, view); 273 } else { 274 queueSubscribers.put(key, view); 275 } 276 } else { 277 if (dest.isTemporary()) { 278 temporaryTopicSubscribers.put(key, view); 279 } else { 280 if (info.isDurable()) { 281 durableTopicSubscribers.put(key, view); 282 // unregister any inactive durable subs 283 try { 284 ObjectName inactiveName = subscriptionKeys.get(subscriptionKey); 285 if (inactiveName != null) { 286 inactiveDurableTopicSubscribers.remove(inactiveName); 287 registeredMBeans.remove(inactiveName); 288 managementContext.unregisterMBean(inactiveName); 289 } 290 } catch (Throwable e) { 291 LOG.error("Unable to unregister inactive durable subscriber: " + subscriptionKey, e); 292 } 293 } else { 294 topicSubscribers.put(key, view); 295 } 296 } 297 } 298 299 try { 300 AnnotatedMBean.registerMBean(managementContext, view, key); 301 registeredMBeans.add(key); 302 } catch (Throwable e) { 303 LOG.warn("Failed to register MBean: " + key); 304 LOG.debug("Failure reason: " + e, e); 305 } 306 307 } 308 309 protected void unregisterSubscription(ObjectName key) throws Exception { 310 queueSubscribers.remove(key); 311 topicSubscribers.remove(key); 312 inactiveDurableTopicSubscribers.remove(key); 313 temporaryQueueSubscribers.remove(key); 314 temporaryTopicSubscribers.remove(key); 315 if (registeredMBeans.remove(key)) { 316 try { 317 managementContext.unregisterMBean(key); 318 } catch (Throwable e) { 319 LOG.warn("Failed to unregister MBean: " + key); 320 LOG.debug("Failure reason: " + e, e); 321 } 322 } 323 DurableSubscriptionView view = (DurableSubscriptionView)durableTopicSubscribers.remove(key); 324 if (view != null) { 325 // need to put this back in the inactive list 326 SubscriptionKey subscriptionKey = new SubscriptionKey(view.getClientId(), view.getSubscriptionName()); 327 SubscriptionInfo info = new SubscriptionInfo(); 328 info.setClientId(subscriptionKey.getClientId()); 329 info.setSubscriptionName(subscriptionKey.getSubscriptionName()); 330 info.setDestination(new ActiveMQTopic(view.getDestinationName())); 331 addInactiveSubscription(subscriptionKey, info); 332 } 333 } 334 335 protected void buildExistingSubscriptions() throws Exception { 336 Map<SubscriptionKey, SubscriptionInfo> subscriptions = new HashMap<SubscriptionKey, SubscriptionInfo>(); 337 Set destinations = destinationFactory.getDestinations(); 338 if (destinations != null) { 339 for (Iterator iter = destinations.iterator(); iter.hasNext();) { 340 ActiveMQDestination dest = (ActiveMQDestination)iter.next(); 341 if (dest.isTopic()) { 342 SubscriptionInfo[] infos = destinationFactory.getAllDurableSubscriptions((ActiveMQTopic)dest); 343 if (infos != null) { 344 for (int i = 0; i < infos.length; i++) { 345 SubscriptionInfo info = infos[i]; 346 LOG.debug("Restoring durable subscription: " + info); 347 SubscriptionKey key = new SubscriptionKey(info); 348 subscriptions.put(key, info); 349 } 350 } 351 } 352 } 353 } 354 for (Iterator i = subscriptions.entrySet().iterator(); i.hasNext();) { 355 Map.Entry entry = (Entry)i.next(); 356 SubscriptionKey key = (SubscriptionKey)entry.getKey(); 357 SubscriptionInfo info = (SubscriptionInfo)entry.getValue(); 358 addInactiveSubscription(key, info); 359 } 360 } 361 362 protected void addInactiveSubscription(SubscriptionKey key, SubscriptionInfo info) { 363 Hashtable map = brokerObjectName.getKeyPropertyList(); 364 try { 365 ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + "," + "Type=Subscription," + "active=false," 366 + "name=" + JMXSupport.encodeObjectNamePart(key.toString()) + ""); 367 SubscriptionView view = new InactiveDurableSubscriptionView(this, key.getClientId(), info); 368 369 try { 370 AnnotatedMBean.registerMBean(managementContext, view, objectName); 371 registeredMBeans.add(objectName); 372 } catch (Throwable e) { 373 LOG.warn("Failed to register MBean: " + key); 374 LOG.debug("Failure reason: " + e, e); 375 } 376 377 inactiveDurableTopicSubscribers.put(objectName, view); 378 subscriptionKeys.put(key, objectName); 379 } catch (Exception e) { 380 LOG.error("Failed to register subscription " + info, e); 381 } 382 } 383 384 public CompositeData[] browse(SubscriptionView view) throws OpenDataException { 385 List<Message> messages = getSubscriberMessages(view); 386 CompositeData c[] = new CompositeData[messages.size()]; 387 for (int i = 0; i < c.length; i++) { 388 try { 389 c[i] = OpenTypeSupport.convert(messages.get(i)); 390 } catch (Throwable e) { 391 LOG.error("failed to browse : " + view, e); 392 } 393 } 394 return c; 395 } 396 397 public TabularData browseAsTable(SubscriptionView view) throws OpenDataException { 398 OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class); 399 List<Message> messages = getSubscriberMessages(view); 400 CompositeType ct = factory.getCompositeType(); 401 TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] {"JMSMessageID"}); 402 TabularDataSupport rc = new TabularDataSupport(tt); 403 for (int i = 0; i < messages.size(); i++) { 404 rc.put(new CompositeDataSupport(ct, factory.getFields(messages.get(i)))); 405 } 406 return rc; 407 } 408 409 protected List<Message> getSubscriberMessages(SubscriptionView view) { 410 // TODO It is very dangerous operation for big backlogs 411 if (!(destinationFactory instanceof DestinationFactoryImpl)) { 412 throw new RuntimeException("unsupported by " + destinationFactory); 413 } 414 PersistenceAdapter adapter = ((DestinationFactoryImpl)destinationFactory).getPersistenceAdapter(); 415 final List<Message> result = new ArrayList<Message>(); 416 try { 417 ActiveMQTopic topic = new ActiveMQTopic(view.getDestinationName()); 418 TopicMessageStore store = adapter.createTopicMessageStore(topic); 419 store.recover(new MessageRecoveryListener() { 420 public boolean recoverMessage(Message message) throws Exception { 421 result.add(message); 422 return true; 423 } 424 425 public boolean recoverMessageReference(MessageId messageReference) throws Exception { 426 throw new RuntimeException("Should not be called."); 427 } 428 429 public boolean hasSpace() { 430 return true; 431 } 432 433 public boolean isDuplicate(MessageId id) { 434 return false; 435 } 436 }); 437 } catch (Throwable e) { 438 LOG.error("Failed to browse messages for Subscription " + view, e); 439 } 440 return result; 441 442 } 443 444 protected ObjectName[] getTopics() { 445 Set<ObjectName> set = topics.keySet(); 446 return set.toArray(new ObjectName[set.size()]); 447 } 448 449 protected ObjectName[] getQueues() { 450 Set<ObjectName> set = queues.keySet(); 451 return set.toArray(new ObjectName[set.size()]); 452 } 453 454 protected ObjectName[] getTemporaryTopics() { 455 Set<ObjectName> set = temporaryTopics.keySet(); 456 return set.toArray(new ObjectName[set.size()]); 457 } 458 459 protected ObjectName[] getTemporaryQueues() { 460 Set<ObjectName> set = temporaryQueues.keySet(); 461 return set.toArray(new ObjectName[set.size()]); 462 } 463 464 protected ObjectName[] getTopicSubscribers() { 465 Set<ObjectName> set = topicSubscribers.keySet(); 466 return set.toArray(new ObjectName[set.size()]); 467 } 468 469 protected ObjectName[] getDurableTopicSubscribers() { 470 Set<ObjectName> set = durableTopicSubscribers.keySet(); 471 return set.toArray(new ObjectName[set.size()]); 472 } 473 474 protected ObjectName[] getQueueSubscribers() { 475 Set<ObjectName> set = queueSubscribers.keySet(); 476 return set.toArray(new ObjectName[set.size()]); 477 } 478 479 protected ObjectName[] getTemporaryTopicSubscribers() { 480 Set<ObjectName> set = temporaryTopicSubscribers.keySet(); 481 return set.toArray(new ObjectName[set.size()]); 482 } 483 484 protected ObjectName[] getTemporaryQueueSubscribers() { 485 Set<ObjectName> set = temporaryQueueSubscribers.keySet(); 486 return set.toArray(new ObjectName[set.size()]); 487 } 488 489 protected ObjectName[] getInactiveDurableTopicSubscribers() { 490 Set<ObjectName> set = inactiveDurableTopicSubscribers.keySet(); 491 return set.toArray(new ObjectName[set.size()]); 492 } 493 494 public Broker getContextBroker() { 495 return contextBroker; 496 } 497 498 public void setContextBroker(Broker contextBroker) { 499 this.contextBroker = contextBroker; 500 } 501 502 protected ObjectName createObjectName(ActiveMQDestination destName) throws MalformedObjectNameException { 503 // Build the object name for the destination 504 Hashtable map = brokerObjectName.getKeyPropertyList(); 505 ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + "," + "Type=" 506 + JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString()) + "," + "Destination=" 507 + JMXSupport.encodeObjectNamePart(destName.getPhysicalName())); 508 return objectName; 509 } 510 }