001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker; 018 019 import java.io.File; 020 import java.io.IOException; 021 import java.net.URI; 022 import java.net.URISyntaxException; 023 import java.net.UnknownHostException; 024 import java.util.ArrayList; 025 import java.util.HashMap; 026 import java.util.Iterator; 027 import java.util.List; 028 import java.util.Map; 029 import java.util.Set; 030 import java.util.concurrent.CopyOnWriteArrayList; 031 import java.util.concurrent.CountDownLatch; 032 import java.util.concurrent.TimeUnit; 033 import java.util.concurrent.atomic.AtomicBoolean; 034 035 import javax.annotation.PostConstruct; 036 import javax.annotation.PreDestroy; 037 import javax.management.MalformedObjectNameException; 038 import javax.management.ObjectName; 039 040 import org.apache.activemq.ActiveMQConnectionMetaData; 041 import org.apache.activemq.ConfigurationException; 042 import org.apache.activemq.Service; 043 import org.apache.activemq.advisory.AdvisoryBroker; 044 import org.apache.activemq.broker.cluster.ConnectionSplitBroker; 045 import org.apache.activemq.broker.ft.MasterConnector; 046 import org.apache.activemq.broker.jmx.AnnotatedMBean; 047 import org.apache.activemq.broker.jmx.BrokerView; 048 import org.apache.activemq.broker.jmx.ConnectorView; 049 import org.apache.activemq.broker.jmx.ConnectorViewMBean; 050 import org.apache.activemq.broker.jmx.FTConnectorView; 051 import org.apache.activemq.broker.jmx.JmsConnectorView; 052 import org.apache.activemq.broker.jmx.ManagedRegionBroker; 053 import org.apache.activemq.broker.jmx.ManagementContext; 054 import org.apache.activemq.broker.jmx.NetworkConnectorView; 055 import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean; 056 import org.apache.activemq.broker.jmx.ProxyConnectorView; 057 import org.apache.activemq.broker.region.CompositeDestinationInterceptor; 058 import org.apache.activemq.broker.region.Destination; 059 import org.apache.activemq.broker.region.DestinationFactory; 060 import org.apache.activemq.broker.region.DestinationFactoryImpl; 061 import org.apache.activemq.broker.region.DestinationInterceptor; 062 import org.apache.activemq.broker.region.RegionBroker; 063 import org.apache.activemq.broker.region.policy.PolicyMap; 064 import org.apache.activemq.broker.region.virtual.MirroredQueue; 065 import org.apache.activemq.broker.region.virtual.VirtualDestination; 066 import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; 067 import org.apache.activemq.broker.region.virtual.VirtualTopic; 068 import org.apache.activemq.command.ActiveMQDestination; 069 import org.apache.activemq.command.BrokerId; 070 import org.apache.activemq.kaha.Store; 071 import org.apache.activemq.kaha.StoreFactory; 072 import org.apache.activemq.network.ConnectionFilter; 073 import org.apache.activemq.network.DiscoveryNetworkConnector; 074 import org.apache.activemq.network.NetworkConnector; 075 import org.apache.activemq.network.jms.JmsConnector; 076 import org.apache.activemq.proxy.ProxyConnector; 077 import org.apache.activemq.security.MessageAuthorizationPolicy; 078 import org.apache.activemq.security.SecurityContext; 079 import org.apache.activemq.selector.SelectorParser; 080 import org.apache.activemq.store.PersistenceAdapter; 081 import org.apache.activemq.store.PersistenceAdapterFactory; 082 import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory; 083 import org.apache.activemq.store.memory.MemoryPersistenceAdapter; 084 import org.apache.activemq.thread.TaskRunnerFactory; 085 import org.apache.activemq.transport.TransportFactory; 086 import org.apache.activemq.transport.TransportServer; 087 import org.apache.activemq.transport.vm.VMTransportFactory; 088 import org.apache.activemq.usage.SystemUsage; 089 import org.apache.activemq.util.DefaultIOExceptionHandler; 090 import org.apache.activemq.util.IOExceptionHandler; 091 import org.apache.activemq.util.IOExceptionSupport; 092 import org.apache.activemq.util.IOHelper; 093 import org.apache.activemq.util.JMXSupport; 094 import org.apache.activemq.util.ServiceStopper; 095 import org.apache.activemq.util.URISupport; 096 import org.apache.commons.logging.Log; 097 import org.apache.commons.logging.LogFactory; 098 /** 099 * Manages the lifecycle of an ActiveMQ Broker. A BrokerService consists of a 100 * number of transport connectors, network connectors and a bunch of properties 101 * which can be used to configure the broker as its lazily created. 102 * 103 * @version $Revision: 1.1 $ 104 * @org.apache.xbean.XBean 105 */ 106 public class BrokerService implements Service { 107 protected CountDownLatch slaveStartSignal = new CountDownLatch(1); 108 public static final String DEFAULT_PORT = "61616"; 109 public static final String LOCAL_HOST_NAME; 110 public static final String DEFAULT_BROKER_NAME = "localhost"; 111 private static final Log LOG = LogFactory.getLog(BrokerService.class); 112 private static final long serialVersionUID = 7353129142305630237L; 113 private boolean useJmx = true; 114 private boolean enableStatistics = true; 115 private boolean persistent = true; 116 private boolean populateJMSXUserID; 117 private boolean useShutdownHook = true; 118 private boolean useLoggingForShutdownErrors; 119 private boolean shutdownOnMasterFailure; 120 private boolean shutdownOnSlaveFailure; 121 private boolean waitForSlave; 122 private long waitForSlaveTimeout = 600000L; 123 private boolean passiveSlave; 124 private String brokerName = DEFAULT_BROKER_NAME; 125 private File dataDirectoryFile; 126 private File tmpDataDirectory; 127 private Broker broker; 128 private BrokerView adminView; 129 private ManagementContext managementContext; 130 private ObjectName brokerObjectName; 131 private TaskRunnerFactory taskRunnerFactory; 132 private TaskRunnerFactory persistenceTaskRunnerFactory; 133 private SystemUsage systemUsage; 134 private SystemUsage producerSystemUsage; 135 private SystemUsage consumerSystemUsaage; 136 private PersistenceAdapter persistenceAdapter; 137 private PersistenceAdapterFactory persistenceFactory; 138 protected DestinationFactory destinationFactory; 139 private MessageAuthorizationPolicy messageAuthorizationPolicy; 140 private List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>(); 141 private List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>(); 142 private List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>(); 143 private List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>(); 144 private List<Service> services = new ArrayList<Service>(); 145 private MasterConnector masterConnector; 146 private String masterConnectorURI; 147 private transient Thread shutdownHook; 148 private String[] transportConnectorURIs; 149 private String[] networkConnectorURIs; 150 private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges 151 // to other jms messaging 152 // systems 153 private boolean deleteAllMessagesOnStartup; 154 private boolean advisorySupport = true; 155 private URI vmConnectorURI; 156 private PolicyMap destinationPolicy; 157 private AtomicBoolean started = new AtomicBoolean(false); 158 private AtomicBoolean stopped = new AtomicBoolean(false); 159 private BrokerPlugin[] plugins; 160 private boolean keepDurableSubsActive = true; 161 private boolean useVirtualTopics = true; 162 private boolean useMirroredQueues = false; 163 private boolean useTempMirroredQueues = true; 164 private BrokerId brokerId; 165 private DestinationInterceptor[] destinationInterceptors; 166 private ActiveMQDestination[] destinations; 167 private Store tempDataStore; 168 private int persistenceThreadPriority = Thread.MAX_PRIORITY; 169 private boolean useLocalHostBrokerName; 170 private CountDownLatch stoppedLatch = new CountDownLatch(1); 171 private CountDownLatch startedLatch = new CountDownLatch(1); 172 private boolean supportFailOver; 173 private Broker regionBroker; 174 private int producerSystemUsagePortion = 60; 175 private int consumerSystemUsagePortion = 40; 176 private boolean splitSystemUsageForProducersConsumers; 177 private boolean monitorConnectionSplits = false; 178 private int taskRunnerPriority = Thread.NORM_PRIORITY; 179 private boolean dedicatedTaskRunner; 180 private boolean cacheTempDestinations = false;// useful for failover 181 private int timeBeforePurgeTempDestinations = 5000; 182 private List<Runnable> shutdownHooks = new ArrayList<Runnable>(); 183 private boolean systemExitOnShutdown; 184 private int systemExitOnShutdownExitCode; 185 private SslContext sslContext; 186 private boolean forceStart = false; 187 private IOExceptionHandler ioExceptionHandler; 188 189 static { 190 String localHostName = "localhost"; 191 try { 192 localHostName = java.net.InetAddress.getLocalHost().getHostName(); 193 } catch (UnknownHostException e) { 194 LOG.error("Failed to resolve localhost"); 195 } 196 LOCAL_HOST_NAME = localHostName; 197 } 198 199 @Override 200 public String toString() { 201 return "BrokerService[" + getBrokerName() + "]"; 202 } 203 204 /** 205 * Adds a new transport connector for the given bind address 206 * 207 * @return the newly created and added transport connector 208 * @throws Exception 209 */ 210 public TransportConnector addConnector(String bindAddress) throws Exception { 211 return addConnector(new URI(bindAddress)); 212 } 213 214 /** 215 * Adds a new transport connector for the given bind address 216 * 217 * @return the newly created and added transport connector 218 * @throws Exception 219 */ 220 public TransportConnector addConnector(URI bindAddress) throws Exception { 221 return addConnector(createTransportConnector(bindAddress)); 222 } 223 224 /** 225 * Adds a new transport connector for the given TransportServer transport 226 * 227 * @return the newly created and added transport connector 228 * @throws Exception 229 */ 230 public TransportConnector addConnector(TransportServer transport) throws Exception { 231 return addConnector(new TransportConnector(transport)); 232 } 233 234 /** 235 * Adds a new transport connector 236 * 237 * @return the transport connector 238 * @throws Exception 239 */ 240 public TransportConnector addConnector(TransportConnector connector) throws Exception { 241 transportConnectors.add(connector); 242 return connector; 243 } 244 245 /** 246 * Stops and removes a transport connector from the broker. 247 * 248 * @param connector 249 * @return true if the connector has been previously added to the broker 250 * @throws Exception 251 */ 252 public boolean removeConnector(TransportConnector connector) throws Exception { 253 boolean rc = transportConnectors.remove(connector); 254 if (rc) { 255 unregisterConnectorMBean(connector); 256 } 257 return rc; 258 } 259 260 /** 261 * Adds a new network connector using the given discovery address 262 * 263 * @return the newly created and added network connector 264 * @throws Exception 265 */ 266 public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception { 267 return addNetworkConnector(new URI(discoveryAddress)); 268 } 269 270 /** 271 * Adds a new proxy connector using the given bind address 272 * 273 * @return the newly created and added network connector 274 * @throws Exception 275 */ 276 public ProxyConnector addProxyConnector(String bindAddress) throws Exception { 277 return addProxyConnector(new URI(bindAddress)); 278 } 279 280 /** 281 * Adds a new network connector using the given discovery address 282 * 283 * @return the newly created and added network connector 284 * @throws Exception 285 */ 286 public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception { 287 if (!isAdvisorySupport()) { 288 throw new javax.jms.IllegalStateException( 289 "Networks require advisory messages to function - advisories are currently disabled"); 290 } 291 NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress); 292 return addNetworkConnector(connector); 293 } 294 295 /** 296 * Adds a new proxy connector using the given bind address 297 * 298 * @return the newly created and added network connector 299 * @throws Exception 300 */ 301 public ProxyConnector addProxyConnector(URI bindAddress) throws Exception { 302 ProxyConnector connector = new ProxyConnector(); 303 connector.setBind(bindAddress); 304 connector.setRemote(new URI("fanout:multicast://default")); 305 return addProxyConnector(connector); 306 } 307 308 /** 309 * Adds a new network connector to connect this broker to a federated 310 * network 311 */ 312 public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception { 313 connector.setBrokerService(this); 314 URI uri = getVmConnectorURI(); 315 Map<String, String> map = new HashMap<String, String>(URISupport.parseParamters(uri)); 316 map.put("network", "true"); 317 uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map)); 318 connector.setLocalUri(uri); 319 // Set a connection filter so that the connector does not establish loop 320 // back connections. 321 connector.setConnectionFilter(new ConnectionFilter() { 322 public boolean connectTo(URI location) { 323 List<TransportConnector> transportConnectors = getTransportConnectors(); 324 for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) { 325 try { 326 TransportConnector tc = iter.next(); 327 if (location.equals(tc.getConnectUri())) { 328 return false; 329 } 330 } catch (Throwable e) { 331 } 332 } 333 return true; 334 } 335 }); 336 networkConnectors.add(connector); 337 if (isUseJmx()) { 338 registerNetworkConnectorMBean(connector); 339 } 340 return connector; 341 } 342 343 /** 344 * Removes the given network connector without stopping it. The caller 345 * should call {@link NetworkConnector#stop()} to close the connector 346 */ 347 public boolean removeNetworkConnector(NetworkConnector connector) { 348 boolean answer = networkConnectors.remove(connector); 349 if (answer) { 350 unregisterNetworkConnectorMBean(connector); 351 } 352 return answer; 353 } 354 355 public ProxyConnector addProxyConnector(ProxyConnector connector) throws Exception { 356 URI uri = getVmConnectorURI(); 357 connector.setLocalUri(uri); 358 proxyConnectors.add(connector); 359 if (isUseJmx()) { 360 registerProxyConnectorMBean(connector); 361 } 362 return connector; 363 } 364 365 public JmsConnector addJmsConnector(JmsConnector connector) throws Exception { 366 connector.setBrokerService(this); 367 jmsConnectors.add(connector); 368 if (isUseJmx()) { 369 registerJmsConnectorMBean(connector); 370 } 371 return connector; 372 } 373 374 public JmsConnector removeJmsConnector(JmsConnector connector) { 375 if (jmsConnectors.remove(connector)) { 376 return connector; 377 } 378 return null; 379 } 380 381 /** 382 * @return Returns the masterConnectorURI. 383 */ 384 public String getMasterConnectorURI() { 385 return masterConnectorURI; 386 } 387 388 /** 389 * @param masterConnectorURI 390 * The masterConnectorURI to set. 391 */ 392 public void setMasterConnectorURI(String masterConnectorURI) { 393 this.masterConnectorURI = masterConnectorURI; 394 } 395 396 /** 397 * @return true if this Broker is a slave to a Master 398 */ 399 public boolean isSlave() { 400 return (masterConnector != null && masterConnector.isSlave()) || 401 (masterConnector != null && masterConnector.isStoppedBeforeStart()); 402 } 403 404 public void masterFailed() { 405 if (shutdownOnMasterFailure) { 406 LOG.fatal("The Master has failed ... shutting down"); 407 try { 408 stop(); 409 } catch (Exception e) { 410 LOG.error("Failed to stop for master failure", e); 411 } 412 } else { 413 LOG.warn("Master Failed - starting all connectors"); 414 try { 415 startAllConnectors(); 416 broker.nowMasterBroker(); 417 } catch (Exception e) { 418 LOG.error("Failed to startAllConnectors", e); 419 } 420 } 421 } 422 423 public boolean isStarted() { 424 return started.get(); 425 } 426 427 public void start(boolean force) throws Exception { 428 forceStart = force; 429 stopped.set(false); 430 started.set(false); 431 start(); 432 } 433 434 // Service interface 435 // ------------------------------------------------------------------------- 436 437 /** 438 * 439 * @throws Exception 440 * @org. apache.xbean.InitMethod 441 */ 442 @PostConstruct 443 public void start() throws Exception { 444 if (stopped.get() || !started.compareAndSet(false, true)) { 445 // lets just ignore redundant start() calls 446 // as its way too easy to not be completely sure if start() has been 447 // called or not with the gazillion of different configuration 448 // mechanisms 449 // throw new IllegalStateException("Allready started."); 450 return; 451 } 452 try { 453 if (systemExitOnShutdown && useShutdownHook) { 454 throw new ConfigurationException("'useShutdownHook' property cannot be be used with 'systemExitOnShutdown', please turn it off (useShutdownHook=false)"); 455 } 456 processHelperProperties(); 457 if (isUseJmx()) { 458 startManagementContext(); 459 } 460 getPersistenceAdapter().setUsageManager(getProducerSystemUsage()); 461 getPersistenceAdapter().setBrokerName(getBrokerName()); 462 LOG.info("Using Persistence Adapter: " + getPersistenceAdapter()); 463 if (deleteAllMessagesOnStartup) { 464 deleteAllMessages(); 465 } 466 getPersistenceAdapter().start(); 467 startDestinations(); 468 addShutdownHook(); 469 getBroker().start(); 470 if (isUseJmx()) { 471 if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) { 472 // try to restart management context 473 // typical for slaves that use the same ports as master 474 managementContext.stop(); 475 startManagementContext(); 476 } 477 ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker; 478 managedBroker.setContextBroker(broker); 479 adminView.setBroker(managedBroker); 480 } 481 BrokerRegistry.getInstance().bind(getBrokerName(), this); 482 // see if there is a MasterBroker service and if so, configure 483 // it and start it. 484 for (Service service : services) { 485 if (service instanceof MasterConnector) { 486 configureService(service); 487 service.start(); 488 } 489 } 490 if (!isSlave() && (this.masterConnector == null || isShutdownOnMasterFailure() == false)) { 491 startAllConnectors(); 492 } 493 if (!stopped.get()) { 494 if (isUseJmx() && masterConnector != null) { 495 registerFTConnectorMBean(masterConnector); 496 } 497 } 498 brokerId = broker.getBrokerId(); 499 if (ioExceptionHandler == null) { 500 setIoExceptionHandler(new DefaultIOExceptionHandler()); 501 } 502 LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") started"); 503 getBroker().brokerServiceStarted(); 504 startedLatch.countDown(); 505 } catch (Exception e) { 506 LOG.error("Failed to start ActiveMQ JMS Message Broker. Reason: " + e, e); 507 try { 508 if (!stopped.get()) { 509 stop(); 510 } 511 } catch (Exception ex) { 512 LOG.warn("Failed to stop broker after failure in start ", ex); 513 } 514 throw e; 515 } 516 } 517 518 /** 519 * 520 * @throws Exception 521 * @org.apache .xbean.DestroyMethod 522 */ 523 @PreDestroy 524 public void stop() throws Exception { 525 if (!started.get()) { 526 return; 527 } 528 529 if (systemExitOnShutdown) { 530 new Thread() { 531 public void run() { 532 System.exit(systemExitOnShutdownExitCode); 533 } 534 }.start(); 535 } 536 537 LOG.info("ActiveMQ Message Broker (" + getBrokerName() + ", " + brokerId + ") is shutting down"); 538 removeShutdownHook(); 539 ServiceStopper stopper = new ServiceStopper(); 540 if (services != null) { 541 for (Service service : services) { 542 stopper.stop(service); 543 } 544 } 545 stopAllConnectors(stopper); 546 // remove any VMTransports connected 547 // this has to be done after services are stopped, 548 // to avoid timimg issue with discovery (spinning up a new instance) 549 BrokerRegistry.getInstance().unbind(getBrokerName()); 550 VMTransportFactory.stopped(getBrokerName()); 551 if (broker != null) { 552 stopper.stop(broker); 553 } 554 if (tempDataStore != null) { 555 tempDataStore.close(); 556 } 557 stopper.stop(persistenceAdapter); 558 if (isUseJmx()) { 559 stopper.stop(getManagementContext()); 560 } 561 // Clear SelectorParser cache to free memory 562 SelectorParser.clearCache(); 563 stopped.set(true); 564 stoppedLatch.countDown(); 565 if (masterConnectorURI == null) { 566 // master start has not finished yet 567 if (slaveStartSignal.getCount() == 1) { 568 started.set(false); 569 slaveStartSignal.countDown(); 570 } 571 } else { 572 for (Service service : services) { 573 if (service instanceof MasterConnector) { 574 MasterConnector mConnector = (MasterConnector) service; 575 if (!mConnector.isSlave()) { 576 // means should be slave but not connected to master yet 577 started.set(false); 578 mConnector.stopBeforeConnected(); 579 } 580 } 581 } 582 } 583 LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") stopped"); 584 synchronized (shutdownHooks) { 585 for (Runnable hook : shutdownHooks) { 586 try { 587 hook.run(); 588 } catch (Throwable e) { 589 stopper.onException(hook, e); 590 } 591 } 592 } 593 594 stopper.throwFirstException(); 595 } 596 597 public boolean checkQueueSize(String queueName) { 598 long count = 0; 599 long queueSize = 0; 600 Map<ActiveMQDestination, Destination> destinationMap = regionBroker.getDestinationMap(); 601 for (Map.Entry<ActiveMQDestination, Destination> entry : destinationMap.entrySet()) { 602 if (entry.getKey().isQueue()) { 603 if (entry.getValue().getName().matches(queueName)) { 604 queueSize = entry.getValue().getDestinationStatistics().getMessages().getCount(); 605 count = queueSize; 606 if (queueSize > 0) { 607 LOG.info("Queue has pending message:" + entry.getValue().getName() + " queueSize is:" 608 + queueSize); 609 } 610 } 611 } 612 } 613 return count == 0; 614 } 615 616 /** 617 * This method (both connectorName and queueName are using regex to match) 618 * 1. stop the connector (supposed the user input the connector which the 619 * clients connect to) 2. to check whether there is any pending message on 620 * the queues defined by queueName 3. supposedly, after stop the connector, 621 * client should failover to other broker and pending messages should be 622 * forwarded. if no pending messages, the method finally call stop to stop 623 * the broker. 624 * 625 * @param connectorName 626 * @param queueName 627 * @param timeout 628 * @param pollInterval 629 * @throws Exception 630 */ 631 public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) 632 throws Exception { 633 if (isUseJmx()) { 634 if (connectorName == null || queueName == null || timeout <= 0) { 635 throw new Exception( 636 "connectorName and queueName cannot be null and timeout should be >0 for stopGracefully."); 637 } 638 if (pollInterval <= 0) { 639 pollInterval = 30; 640 } 641 LOG.info("Stop gracefully with connectorName:" + connectorName + " queueName:" + queueName + " timeout:" 642 + timeout + " pollInterval:" + pollInterval); 643 TransportConnector connector; 644 for (int i = 0; i < transportConnectors.size(); i++) { 645 connector = transportConnectors.get(i); 646 if (connector != null && connector.getName() != null && connector.getName().matches(connectorName)) { 647 connector.stop(); 648 } 649 } 650 long start = System.currentTimeMillis(); 651 while (System.currentTimeMillis() - start < timeout * 1000) { 652 // check quesize until it gets zero 653 if (checkQueueSize(queueName)) { 654 stop(); 655 break; 656 } else { 657 Thread.sleep(pollInterval * 1000); 658 } 659 } 660 if (stopped.get()) { 661 LOG.info("Successfully stop the broker."); 662 } else { 663 LOG.info("There is still pending message on the queue. Please check and stop the broker manually."); 664 } 665 } 666 } 667 668 /** 669 * A helper method to block the caller thread until the broker has been 670 * stopped 671 */ 672 public void waitUntilStopped() { 673 while (isStarted() && !stopped.get()) { 674 try { 675 stoppedLatch.await(); 676 } catch (InterruptedException e) { 677 // ignore 678 } 679 } 680 } 681 682 /** 683 * A helper method to block the caller thread until the broker has been 684 * started 685 */ 686 public void waitUntilStarted() { 687 boolean waitSucceeded = false; 688 while (isStarted() && !stopped.get() && !waitSucceeded) { 689 try { 690 waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS); 691 } catch (InterruptedException ignore) { 692 } 693 } 694 } 695 696 // Properties 697 // ------------------------------------------------------------------------- 698 /** 699 * Returns the message broker 700 */ 701 public Broker getBroker() throws Exception { 702 if (broker == null) { 703 LOG.info("ActiveMQ " + ActiveMQConnectionMetaData.PROVIDER_VERSION + " JMS Message Broker (" 704 + getBrokerName() + ") is starting"); 705 LOG.info("For help or more information please see: http://activemq.apache.org/"); 706 broker = createBroker(); 707 } 708 return broker; 709 } 710 711 /** 712 * Returns the administration view of the broker; used to create and destroy 713 * resources such as queues and topics. Note this method returns null if JMX 714 * is disabled. 715 */ 716 public BrokerView getAdminView() throws Exception { 717 if (adminView == null) { 718 // force lazy creation 719 getBroker(); 720 } 721 return adminView; 722 } 723 724 public void setAdminView(BrokerView adminView) { 725 this.adminView = adminView; 726 } 727 728 public String getBrokerName() { 729 return brokerName; 730 } 731 732 /** 733 * Sets the name of this broker; which must be unique in the network 734 * 735 * @param brokerName 736 */ 737 public void setBrokerName(String brokerName) { 738 if (brokerName == null) { 739 throw new NullPointerException("The broker name cannot be null"); 740 } 741 String str = brokerName.replaceAll("[^a-zA-Z0-9\\.\\_\\-\\:]", "_"); 742 if (!str.equals(brokerName)) { 743 LOG.error("Broker Name: " + brokerName + " contained illegal characters - replaced with " + str); 744 } 745 this.brokerName = str.trim(); 746 } 747 748 public PersistenceAdapterFactory getPersistenceFactory() { 749 if (persistenceFactory == null) { 750 persistenceFactory = createPersistenceFactory(); 751 } 752 return persistenceFactory; 753 } 754 755 public File getDataDirectoryFile() { 756 if (dataDirectoryFile == null) { 757 dataDirectoryFile = new File(IOHelper.getDefaultDataDirectory()); 758 } 759 return dataDirectoryFile; 760 } 761 762 public File getBrokerDataDirectory() { 763 String brokerDir = getBrokerName(); 764 return new File(getDataDirectoryFile(), brokerDir); 765 } 766 767 /** 768 * Sets the directory in which the data files will be stored by default for 769 * the JDBC and Journal persistence adaptors. 770 * 771 * @param dataDirectory 772 * the directory to store data files 773 */ 774 public void setDataDirectory(String dataDirectory) { 775 setDataDirectoryFile(new File(dataDirectory)); 776 } 777 778 /** 779 * Sets the directory in which the data files will be stored by default for 780 * the JDBC and Journal persistence adaptors. 781 * 782 * @param dataDirectoryFile 783 * the directory to store data files 784 */ 785 public void setDataDirectoryFile(File dataDirectoryFile) { 786 this.dataDirectoryFile = dataDirectoryFile; 787 } 788 789 /** 790 * @return the tmpDataDirectory 791 */ 792 public File getTmpDataDirectory() { 793 if (tmpDataDirectory == null) { 794 tmpDataDirectory = new File(getBrokerDataDirectory(), "tmp_storage"); 795 } 796 return tmpDataDirectory; 797 } 798 799 /** 800 * @param tmpDataDirectory 801 * the tmpDataDirectory to set 802 */ 803 public void setTmpDataDirectory(File tmpDataDirectory) { 804 this.tmpDataDirectory = tmpDataDirectory; 805 } 806 807 public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) { 808 this.persistenceFactory = persistenceFactory; 809 } 810 811 public void setDestinationFactory(DestinationFactory destinationFactory) { 812 this.destinationFactory = destinationFactory; 813 } 814 815 public boolean isPersistent() { 816 return persistent; 817 } 818 819 /** 820 * Sets whether or not persistence is enabled or disabled. 821 */ 822 public void setPersistent(boolean persistent) { 823 this.persistent = persistent; 824 } 825 826 public boolean isPopulateJMSXUserID() { 827 return populateJMSXUserID; 828 } 829 830 /** 831 * Sets whether or not the broker should populate the JMSXUserID header. 832 */ 833 public void setPopulateJMSXUserID(boolean populateJMSXUserID) { 834 this.populateJMSXUserID = populateJMSXUserID; 835 } 836 837 public SystemUsage getSystemUsage() { 838 try { 839 if (systemUsage == null) { 840 systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore()); 841 systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 64); // Default 842 // 64 843 // Meg 844 systemUsage.getTempUsage().setLimit(1024L * 1024 * 1024 * 100); // 10 845 // Gb 846 systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1024 * 100); // 100 847 // GB 848 addService(this.systemUsage); 849 } 850 return systemUsage; 851 } catch (IOException e) { 852 LOG.fatal("Cannot create SystemUsage", e); 853 throw new RuntimeException("Fatally failed to create SystemUsage" + e.getMessage()); 854 } 855 } 856 857 public void setSystemUsage(SystemUsage memoryManager) { 858 if (this.systemUsage != null) { 859 removeService(this.systemUsage); 860 } 861 this.systemUsage = memoryManager; 862 addService(this.systemUsage); 863 } 864 865 /** 866 * @return the consumerUsageManager 867 * @throws IOException 868 */ 869 public SystemUsage getConsumerSystemUsage() throws IOException { 870 if (this.consumerSystemUsaage == null) { 871 if (splitSystemUsageForProducersConsumers) { 872 this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer"); 873 float portion = consumerSystemUsagePortion / 100f; 874 this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(portion); 875 addService(this.consumerSystemUsaage); 876 } else { 877 consumerSystemUsaage = getSystemUsage(); 878 } 879 } 880 return this.consumerSystemUsaage; 881 } 882 883 /** 884 * @param consumerSystemUsaage 885 * the storeSystemUsage to set 886 */ 887 public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) { 888 if (this.consumerSystemUsaage != null) { 889 removeService(this.consumerSystemUsaage); 890 } 891 this.consumerSystemUsaage = consumerSystemUsaage; 892 addService(this.consumerSystemUsaage); 893 } 894 895 /** 896 * @return the producerUsageManager 897 * @throws IOException 898 */ 899 public SystemUsage getProducerSystemUsage() throws IOException { 900 if (producerSystemUsage == null) { 901 if (splitSystemUsageForProducersConsumers) { 902 producerSystemUsage = new SystemUsage(getSystemUsage(), "Producer"); 903 float portion = producerSystemUsagePortion / 100f; 904 producerSystemUsage.getMemoryUsage().setUsagePortion(portion); 905 addService(producerSystemUsage); 906 } else { 907 producerSystemUsage = getSystemUsage(); 908 } 909 } 910 return producerSystemUsage; 911 } 912 913 /** 914 * @param producerUsageManager 915 * the producerUsageManager to set 916 */ 917 public void setProducerSystemUsage(SystemUsage producerUsageManager) { 918 if (this.producerSystemUsage != null) { 919 removeService(this.producerSystemUsage); 920 } 921 this.producerSystemUsage = producerUsageManager; 922 addService(this.producerSystemUsage); 923 } 924 925 public PersistenceAdapter getPersistenceAdapter() throws IOException { 926 if (persistenceAdapter == null) { 927 persistenceAdapter = createPersistenceAdapter(); 928 configureService(persistenceAdapter); 929 this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter); 930 } 931 return persistenceAdapter; 932 } 933 934 /** 935 * Sets the persistence adaptor implementation to use for this broker 936 * 937 * @throws IOException 938 */ 939 public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException { 940 this.persistenceAdapter = persistenceAdapter; 941 configureService(this.persistenceAdapter); 942 this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter); 943 } 944 945 public TaskRunnerFactory getTaskRunnerFactory() { 946 if (taskRunnerFactory == null) { 947 taskRunnerFactory = new TaskRunnerFactory("BrokerService", getTaskRunnerPriority(), true, 1000, 948 isDedicatedTaskRunner()); 949 } 950 return taskRunnerFactory; 951 } 952 953 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { 954 this.taskRunnerFactory = taskRunnerFactory; 955 } 956 957 public TaskRunnerFactory getPersistenceTaskRunnerFactory() { 958 if (taskRunnerFactory == null) { 959 persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority, 960 true, 1000, isDedicatedTaskRunner()); 961 } 962 return persistenceTaskRunnerFactory; 963 } 964 965 public void setPersistenceTaskRunnerFactory(TaskRunnerFactory persistenceTaskRunnerFactory) { 966 this.persistenceTaskRunnerFactory = persistenceTaskRunnerFactory; 967 } 968 969 public boolean isUseJmx() { 970 return useJmx; 971 } 972 973 public boolean isEnableStatistics() { 974 return enableStatistics; 975 } 976 977 /** 978 * Sets whether or not the Broker's services enable statistics or not. 979 */ 980 public void setEnableStatistics(boolean enableStatistics) { 981 this.enableStatistics = enableStatistics; 982 } 983 984 /** 985 * Sets whether or not the Broker's services should be exposed into JMX or 986 * not. 987 */ 988 public void setUseJmx(boolean useJmx) { 989 this.useJmx = useJmx; 990 } 991 992 public ObjectName getBrokerObjectName() throws IOException { 993 if (brokerObjectName == null) { 994 brokerObjectName = createBrokerObjectName(); 995 } 996 return brokerObjectName; 997 } 998 999 /** 1000 * Sets the JMX ObjectName for this broker 1001 */ 1002 public void setBrokerObjectName(ObjectName brokerObjectName) { 1003 this.brokerObjectName = brokerObjectName; 1004 } 1005 1006 public ManagementContext getManagementContext() { 1007 if (managementContext == null) { 1008 managementContext = new ManagementContext(); 1009 } 1010 return managementContext; 1011 } 1012 1013 public void setManagementContext(ManagementContext managementContext) { 1014 this.managementContext = managementContext; 1015 } 1016 1017 public NetworkConnector getNetworkConnectorByName(String connectorName) { 1018 for (NetworkConnector connector : networkConnectors) { 1019 if (connector.getName().equals(connectorName)) { 1020 return connector; 1021 } 1022 } 1023 return null; 1024 } 1025 1026 public String[] getNetworkConnectorURIs() { 1027 return networkConnectorURIs; 1028 } 1029 1030 public void setNetworkConnectorURIs(String[] networkConnectorURIs) { 1031 this.networkConnectorURIs = networkConnectorURIs; 1032 } 1033 1034 public TransportConnector getConnectorByName(String connectorName) { 1035 for (TransportConnector connector : transportConnectors) { 1036 if (connector.getName().equals(connectorName)) { 1037 return connector; 1038 } 1039 } 1040 return null; 1041 } 1042 1043 public Map<String, String> getTransportConnectorURIsAsMap() { 1044 Map<String, String> answer = new HashMap<String, String>(); 1045 for (TransportConnector connector : transportConnectors) { 1046 try { 1047 URI uri = connector.getConnectUri(); 1048 String scheme = uri.getScheme(); 1049 if (scheme != null) { 1050 answer.put(scheme.toLowerCase(), uri.toString()); 1051 } 1052 } catch (Exception e) { 1053 LOG.debug("Failed to read URI to build transportURIsAsMap", e); 1054 } 1055 } 1056 return answer; 1057 } 1058 1059 public String[] getTransportConnectorURIs() { 1060 return transportConnectorURIs; 1061 } 1062 1063 public void setTransportConnectorURIs(String[] transportConnectorURIs) { 1064 this.transportConnectorURIs = transportConnectorURIs; 1065 } 1066 1067 /** 1068 * @return Returns the jmsBridgeConnectors. 1069 */ 1070 public JmsConnector[] getJmsBridgeConnectors() { 1071 return jmsBridgeConnectors; 1072 } 1073 1074 /** 1075 * @param jmsConnectors 1076 * The jmsBridgeConnectors to set. 1077 */ 1078 public void setJmsBridgeConnectors(JmsConnector[] jmsConnectors) { 1079 this.jmsBridgeConnectors = jmsConnectors; 1080 } 1081 1082 public Service[] getServices() { 1083 return (Service[]) services.toArray(new Service[0]); 1084 } 1085 1086 /** 1087 * Sets the services associated with this broker such as a 1088 * {@link MasterConnector} 1089 */ 1090 public void setServices(Service[] services) { 1091 this.services.clear(); 1092 if (services != null) { 1093 for (int i = 0; i < services.length; i++) { 1094 this.services.add(services[i]); 1095 } 1096 } 1097 } 1098 1099 /** 1100 * Adds a new service so that it will be started as part of the broker 1101 * lifecycle 1102 */ 1103 public void addService(Service service) { 1104 services.add(service); 1105 } 1106 1107 public void removeService(Service service) { 1108 services.remove(service); 1109 } 1110 1111 public boolean isUseLoggingForShutdownErrors() { 1112 return useLoggingForShutdownErrors; 1113 } 1114 1115 /** 1116 * Sets whether or not we should use commons-logging when reporting errors 1117 * when shutting down the broker 1118 */ 1119 public void setUseLoggingForShutdownErrors(boolean useLoggingForShutdownErrors) { 1120 this.useLoggingForShutdownErrors = useLoggingForShutdownErrors; 1121 } 1122 1123 public boolean isUseShutdownHook() { 1124 return useShutdownHook; 1125 } 1126 1127 /** 1128 * Sets whether or not we should use a shutdown handler to close down the 1129 * broker cleanly if the JVM is terminated. It is recommended you leave this 1130 * enabled. 1131 */ 1132 public void setUseShutdownHook(boolean useShutdownHook) { 1133 this.useShutdownHook = useShutdownHook; 1134 } 1135 1136 public boolean isAdvisorySupport() { 1137 return advisorySupport; 1138 } 1139 1140 /** 1141 * Allows the support of advisory messages to be disabled for performance 1142 * reasons. 1143 */ 1144 public void setAdvisorySupport(boolean advisorySupport) { 1145 this.advisorySupport = advisorySupport; 1146 } 1147 1148 public List<TransportConnector> getTransportConnectors() { 1149 return new ArrayList<TransportConnector>(transportConnectors); 1150 } 1151 1152 /** 1153 * Sets the transport connectors which this broker will listen on for new 1154 * clients 1155 * 1156 * @org.apache.xbean.Property 1157 * nestedType="org.apache.activemq.broker.TransportConnector" 1158 */ 1159 public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception { 1160 for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) { 1161 TransportConnector connector = iter.next(); 1162 addConnector(connector); 1163 } 1164 } 1165 1166 public List<NetworkConnector> getNetworkConnectors() { 1167 return new ArrayList<NetworkConnector>(networkConnectors); 1168 } 1169 1170 public List<ProxyConnector> getProxyConnectors() { 1171 return new ArrayList<ProxyConnector>(proxyConnectors); 1172 } 1173 1174 /** 1175 * Sets the network connectors which this broker will use to connect to 1176 * other brokers in a federated network 1177 * 1178 * @org.apache.xbean.Property 1179 * nestedType="org.apache.activemq.network.NetworkConnector" 1180 */ 1181 public void setNetworkConnectors(List networkConnectors) throws Exception { 1182 for (Iterator iter = networkConnectors.iterator(); iter.hasNext();) { 1183 NetworkConnector connector = (NetworkConnector) iter.next(); 1184 addNetworkConnector(connector); 1185 } 1186 } 1187 1188 /** 1189 * Sets the network connectors which this broker will use to connect to 1190 * other brokers in a federated network 1191 */ 1192 public void setProxyConnectors(List proxyConnectors) throws Exception { 1193 for (Iterator iter = proxyConnectors.iterator(); iter.hasNext();) { 1194 ProxyConnector connector = (ProxyConnector) iter.next(); 1195 addProxyConnector(connector); 1196 } 1197 } 1198 1199 public PolicyMap getDestinationPolicy() { 1200 return destinationPolicy; 1201 } 1202 1203 /** 1204 * Sets the destination specific policies available either for exact 1205 * destinations or for wildcard areas of destinations. 1206 */ 1207 public void setDestinationPolicy(PolicyMap policyMap) { 1208 this.destinationPolicy = policyMap; 1209 } 1210 1211 public BrokerPlugin[] getPlugins() { 1212 return plugins; 1213 } 1214 1215 /** 1216 * Sets a number of broker plugins to install such as for security 1217 * authentication or authorization 1218 */ 1219 public void setPlugins(BrokerPlugin[] plugins) { 1220 this.plugins = plugins; 1221 } 1222 1223 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() { 1224 return messageAuthorizationPolicy; 1225 } 1226 1227 /** 1228 * Sets the policy used to decide if the current connection is authorized to 1229 * consume a given message 1230 */ 1231 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) { 1232 this.messageAuthorizationPolicy = messageAuthorizationPolicy; 1233 } 1234 1235 /** 1236 * Delete all messages from the persistent store 1237 * 1238 * @throws IOException 1239 */ 1240 public void deleteAllMessages() throws IOException { 1241 getPersistenceAdapter().deleteAllMessages(); 1242 } 1243 1244 public boolean isDeleteAllMessagesOnStartup() { 1245 return deleteAllMessagesOnStartup; 1246 } 1247 1248 /** 1249 * Sets whether or not all messages are deleted on startup - mostly only 1250 * useful for testing. 1251 */ 1252 public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup) { 1253 this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup; 1254 } 1255 1256 public URI getVmConnectorURI() { 1257 if (vmConnectorURI == null) { 1258 try { 1259 vmConnectorURI = new URI("vm://" + getBrokerName().replaceAll("[^a-zA-Z0-9\\.\\_\\-]", "_")); 1260 } catch (URISyntaxException e) { 1261 LOG.error("Badly formed URI from " + getBrokerName(), e); 1262 } 1263 } 1264 return vmConnectorURI; 1265 } 1266 1267 public void setVmConnectorURI(URI vmConnectorURI) { 1268 this.vmConnectorURI = vmConnectorURI; 1269 } 1270 1271 /** 1272 * @return Returns the shutdownOnMasterFailure. 1273 */ 1274 public boolean isShutdownOnMasterFailure() { 1275 return shutdownOnMasterFailure; 1276 } 1277 1278 /** 1279 * @param shutdownOnMasterFailure 1280 * The shutdownOnMasterFailure to set. 1281 */ 1282 public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure) { 1283 this.shutdownOnMasterFailure = shutdownOnMasterFailure; 1284 } 1285 1286 public boolean isKeepDurableSubsActive() { 1287 return keepDurableSubsActive; 1288 } 1289 1290 public void setKeepDurableSubsActive(boolean keepDurableSubsActive) { 1291 this.keepDurableSubsActive = keepDurableSubsActive; 1292 } 1293 1294 public boolean isUseVirtualTopics() { 1295 return useVirtualTopics; 1296 } 1297 1298 /** 1299 * Sets whether or not <a 1300 * href="http://activemq.apache.org/virtual-destinations.html">Virtual 1301 * Topics</a> should be supported by default if they have not been 1302 * explicitly configured. 1303 */ 1304 public void setUseVirtualTopics(boolean useVirtualTopics) { 1305 this.useVirtualTopics = useVirtualTopics; 1306 } 1307 1308 public DestinationInterceptor[] getDestinationInterceptors() { 1309 return destinationInterceptors; 1310 } 1311 1312 public boolean isUseMirroredQueues() { 1313 return useMirroredQueues; 1314 } 1315 1316 /** 1317 * Sets whether or not <a 1318 * href="http://activemq.apache.org/mirrored-queues.html">Mirrored 1319 * Queues</a> should be supported by default if they have not been 1320 * explicitly configured. 1321 */ 1322 public void setUseMirroredQueues(boolean useMirroredQueues) { 1323 this.useMirroredQueues = useMirroredQueues; 1324 } 1325 1326 /** 1327 * Sets the destination interceptors to use 1328 */ 1329 public void setDestinationInterceptors(DestinationInterceptor[] destinationInterceptors) { 1330 this.destinationInterceptors = destinationInterceptors; 1331 } 1332 1333 public ActiveMQDestination[] getDestinations() { 1334 return destinations; 1335 } 1336 1337 /** 1338 * Sets the destinations which should be loaded/created on startup 1339 */ 1340 public void setDestinations(ActiveMQDestination[] destinations) { 1341 this.destinations = destinations; 1342 } 1343 1344 /** 1345 * @return the tempDataStore 1346 */ 1347 public synchronized Store getTempDataStore() { 1348 if (tempDataStore == null) { 1349 if (!isPersistent()) { 1350 return null; 1351 } 1352 boolean result = true; 1353 boolean empty = true; 1354 try { 1355 File directory = getTmpDataDirectory(); 1356 if (directory.exists() && directory.isDirectory()) { 1357 File[] files = directory.listFiles(); 1358 if (files != null && files.length > 0) { 1359 empty = false; 1360 for (int i = 0; i < files.length; i++) { 1361 File file = files[i]; 1362 if (!file.isDirectory()) { 1363 result &= file.delete(); 1364 } 1365 } 1366 } 1367 } 1368 if (!empty) { 1369 String str = result ? "Successfully deleted" : "Failed to delete"; 1370 LOG.info(str + " temporary storage"); 1371 } 1372 tempDataStore = StoreFactory.open(getTmpDataDirectory(), "rw"); 1373 } catch (IOException e) { 1374 throw new RuntimeException(e); 1375 } 1376 } 1377 return tempDataStore; 1378 } 1379 1380 /** 1381 * @param tempDataStore 1382 * the tempDataStore to set 1383 */ 1384 public void setTempDataStore(Store tempDataStore) { 1385 this.tempDataStore = tempDataStore; 1386 } 1387 1388 public int getPersistenceThreadPriority() { 1389 return persistenceThreadPriority; 1390 } 1391 1392 public void setPersistenceThreadPriority(int persistenceThreadPriority) { 1393 this.persistenceThreadPriority = persistenceThreadPriority; 1394 } 1395 1396 /** 1397 * @return the useLocalHostBrokerName 1398 */ 1399 public boolean isUseLocalHostBrokerName() { 1400 return this.useLocalHostBrokerName; 1401 } 1402 1403 /** 1404 * @param useLocalHostBrokerName 1405 * the useLocalHostBrokerName to set 1406 */ 1407 public void setUseLocalHostBrokerName(boolean useLocalHostBrokerName) { 1408 this.useLocalHostBrokerName = useLocalHostBrokerName; 1409 if (useLocalHostBrokerName && !started.get() && brokerName == null || brokerName == DEFAULT_BROKER_NAME) { 1410 brokerName = LOCAL_HOST_NAME; 1411 } 1412 } 1413 1414 /** 1415 * @return the supportFailOver 1416 */ 1417 public boolean isSupportFailOver() { 1418 return this.supportFailOver; 1419 } 1420 1421 /** 1422 * @param supportFailOver 1423 * the supportFailOver to set 1424 */ 1425 public void setSupportFailOver(boolean supportFailOver) { 1426 this.supportFailOver = supportFailOver; 1427 } 1428 1429 /** 1430 * Looks up and lazily creates if necessary the destination for the given 1431 * JMS name 1432 */ 1433 public Destination getDestination(ActiveMQDestination destination) throws Exception { 1434 return getBroker().addDestination(getAdminConnectionContext(), destination); 1435 } 1436 1437 public void removeDestination(ActiveMQDestination destination) throws Exception { 1438 getBroker().removeDestination(getAdminConnectionContext(), destination, 0); 1439 } 1440 1441 public int getProducerSystemUsagePortion() { 1442 return producerSystemUsagePortion; 1443 } 1444 1445 public void setProducerSystemUsagePortion(int producerSystemUsagePortion) { 1446 this.producerSystemUsagePortion = producerSystemUsagePortion; 1447 } 1448 1449 public int getConsumerSystemUsagePortion() { 1450 return consumerSystemUsagePortion; 1451 } 1452 1453 public void setConsumerSystemUsagePortion(int consumerSystemUsagePortion) { 1454 this.consumerSystemUsagePortion = consumerSystemUsagePortion; 1455 } 1456 1457 public boolean isSplitSystemUsageForProducersConsumers() { 1458 return splitSystemUsageForProducersConsumers; 1459 } 1460 1461 public void setSplitSystemUsageForProducersConsumers(boolean splitSystemUsageForProducersConsumers) { 1462 this.splitSystemUsageForProducersConsumers = splitSystemUsageForProducersConsumers; 1463 } 1464 1465 public boolean isMonitorConnectionSplits() { 1466 return monitorConnectionSplits; 1467 } 1468 1469 public void setMonitorConnectionSplits(boolean monitorConnectionSplits) { 1470 this.monitorConnectionSplits = monitorConnectionSplits; 1471 } 1472 1473 public int getTaskRunnerPriority() { 1474 return taskRunnerPriority; 1475 } 1476 1477 public void setTaskRunnerPriority(int taskRunnerPriority) { 1478 this.taskRunnerPriority = taskRunnerPriority; 1479 } 1480 1481 public boolean isDedicatedTaskRunner() { 1482 return dedicatedTaskRunner; 1483 } 1484 1485 public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) { 1486 this.dedicatedTaskRunner = dedicatedTaskRunner; 1487 } 1488 1489 public boolean isCacheTempDestinations() { 1490 return cacheTempDestinations; 1491 } 1492 1493 public void setCacheTempDestinations(boolean cacheTempDestinations) { 1494 this.cacheTempDestinations = cacheTempDestinations; 1495 } 1496 1497 public int getTimeBeforePurgeTempDestinations() { 1498 return timeBeforePurgeTempDestinations; 1499 } 1500 1501 public void setTimeBeforePurgeTempDestinations(int timeBeforePurgeTempDestinations) { 1502 this.timeBeforePurgeTempDestinations = timeBeforePurgeTempDestinations; 1503 } 1504 1505 public boolean isUseTempMirroredQueues() { 1506 return useTempMirroredQueues; 1507 } 1508 1509 public void setUseTempMirroredQueues(boolean useTempMirroredQueues) { 1510 this.useTempMirroredQueues = useTempMirroredQueues; 1511 } 1512 1513 // 1514 // Implementation methods 1515 // ------------------------------------------------------------------------- 1516 /** 1517 * Handles any lazy-creation helper properties which are added to make 1518 * things easier to configure inside environments such as Spring 1519 * 1520 * @throws Exception 1521 */ 1522 protected void processHelperProperties() throws Exception { 1523 boolean masterServiceExists = false; 1524 if (transportConnectorURIs != null) { 1525 for (int i = 0; i < transportConnectorURIs.length; i++) { 1526 String uri = transportConnectorURIs[i]; 1527 addConnector(uri); 1528 } 1529 } 1530 if (networkConnectorURIs != null) { 1531 for (int i = 0; i < networkConnectorURIs.length; i++) { 1532 String uri = networkConnectorURIs[i]; 1533 addNetworkConnector(uri); 1534 } 1535 } 1536 if (jmsBridgeConnectors != null) { 1537 for (int i = 0; i < jmsBridgeConnectors.length; i++) { 1538 addJmsConnector(jmsBridgeConnectors[i]); 1539 } 1540 } 1541 for (Service service : services) { 1542 if (service instanceof MasterConnector) { 1543 masterServiceExists = true; 1544 break; 1545 } 1546 } 1547 if (masterConnectorURI != null) { 1548 if (masterServiceExists) { 1549 throw new IllegalStateException( 1550 "Cannot specify masterConnectorURI when a masterConnector is already registered via the services property"); 1551 } else { 1552 addService(new MasterConnector(masterConnectorURI)); 1553 } 1554 } 1555 } 1556 1557 protected void stopAllConnectors(ServiceStopper stopper) { 1558 for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) { 1559 NetworkConnector connector = iter.next(); 1560 unregisterNetworkConnectorMBean(connector); 1561 stopper.stop(connector); 1562 } 1563 for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) { 1564 ProxyConnector connector = iter.next(); 1565 stopper.stop(connector); 1566 } 1567 for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) { 1568 JmsConnector connector = iter.next(); 1569 stopper.stop(connector); 1570 } 1571 for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) { 1572 TransportConnector connector = iter.next(); 1573 stopper.stop(connector); 1574 } 1575 } 1576 1577 protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException { 1578 try { 1579 ObjectName objectName = createConnectorObjectName(connector); 1580 connector = connector.asManagedConnector(getManagementContext(), objectName); 1581 ConnectorViewMBean view = new ConnectorView(connector); 1582 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 1583 return connector; 1584 } catch (Throwable e) { 1585 throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e.getMessage(), e); 1586 } 1587 } 1588 1589 protected void unregisterConnectorMBean(TransportConnector connector) throws IOException { 1590 if (isUseJmx()) { 1591 try { 1592 ObjectName objectName = createConnectorObjectName(connector); 1593 getManagementContext().unregisterMBean(objectName); 1594 } catch (Throwable e) { 1595 throw IOExceptionSupport.create( 1596 "Transport Connector could not be unregistered in JMX: " + e.getMessage(), e); 1597 } 1598 } 1599 } 1600 1601 protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException { 1602 return adaptor; 1603 } 1604 1605 protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException { 1606 if (isUseJmx()) { 1607 } 1608 } 1609 1610 private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException { 1611 return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName=" 1612 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Connector," + "ConnectorName=" 1613 + JMXSupport.encodeObjectNamePart(connector.getName())); 1614 } 1615 1616 protected void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException { 1617 NetworkConnectorViewMBean view = new NetworkConnectorView(connector); 1618 try { 1619 ObjectName objectName = createNetworkConnectorObjectName(connector); 1620 connector.setObjectName(objectName); 1621 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 1622 } catch (Throwable e) { 1623 throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e); 1624 } 1625 } 1626 1627 protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector) 1628 throws MalformedObjectNameException { 1629 return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName=" 1630 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=NetworkConnector," 1631 + "NetworkConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName())); 1632 } 1633 1634 protected void unregisterNetworkConnectorMBean(NetworkConnector connector) { 1635 if (isUseJmx()) { 1636 try { 1637 ObjectName objectName = createNetworkConnectorObjectName(connector); 1638 getManagementContext().unregisterMBean(objectName); 1639 } catch (Exception e) { 1640 LOG.error("Network Connector could not be unregistered from JMX: " + e, e); 1641 } 1642 } 1643 } 1644 1645 protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException { 1646 ProxyConnectorView view = new ProxyConnectorView(connector); 1647 try { 1648 ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName=" 1649 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=ProxyConnector," 1650 + "ProxyConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName())); 1651 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 1652 } catch (Throwable e) { 1653 throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e); 1654 } 1655 } 1656 1657 protected void registerFTConnectorMBean(MasterConnector connector) throws IOException { 1658 FTConnectorView view = new FTConnectorView(connector); 1659 try { 1660 ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName=" 1661 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=MasterConnector"); 1662 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 1663 } catch (Throwable e) { 1664 throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e); 1665 } 1666 } 1667 1668 protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException { 1669 JmsConnectorView view = new JmsConnectorView(connector); 1670 try { 1671 ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName=" 1672 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=JmsConnector," 1673 + "JmsConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName())); 1674 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 1675 } catch (Throwable e) { 1676 throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e); 1677 } 1678 } 1679 1680 /** 1681 * Factory method to create a new broker 1682 * 1683 * @throws Exception 1684 * @throws 1685 * @throws 1686 */ 1687 protected Broker createBroker() throws Exception { 1688 regionBroker = createRegionBroker(); 1689 Broker broker = addInterceptors(regionBroker); 1690 // Add a filter that will stop access to the broker once stopped 1691 broker = new MutableBrokerFilter(broker) { 1692 Broker old; 1693 1694 public void stop() throws Exception { 1695 old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) { 1696 // Just ignore additional stop actions. 1697 public void stop() throws Exception { 1698 } 1699 }); 1700 old.stop(); 1701 } 1702 1703 public void start() throws Exception { 1704 if (forceStart && old != null) { 1705 this.next.set(old); 1706 } 1707 getNext().start(); 1708 } 1709 }; 1710 return broker; 1711 } 1712 1713 /** 1714 * Factory method to create the core region broker onto which interceptors 1715 * are added 1716 * 1717 * @throws Exception 1718 */ 1719 protected Broker createRegionBroker() throws Exception { 1720 if (destinationInterceptors == null) { 1721 destinationInterceptors = createDefaultDestinationInterceptor(); 1722 } 1723 configureServices(destinationInterceptors); 1724 DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors); 1725 if (destinationFactory == null) { 1726 destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter()); 1727 } 1728 return createRegionBroker(destinationInterceptor); 1729 } 1730 1731 protected Broker createRegionBroker(DestinationInterceptor destinationInterceptor) throws IOException { 1732 RegionBroker regionBroker; 1733 if (isUseJmx()) { 1734 regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(), 1735 getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor); 1736 } else { 1737 regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, 1738 destinationInterceptor); 1739 } 1740 destinationFactory.setRegionBroker(regionBroker); 1741 regionBroker.setKeepDurableSubsActive(keepDurableSubsActive); 1742 regionBroker.setBrokerName(getBrokerName()); 1743 regionBroker.getDestinationStatistics().setEnabled(enableStatistics); 1744 return regionBroker; 1745 } 1746 1747 /** 1748 * Create the default destination interceptor 1749 */ 1750 protected DestinationInterceptor[] createDefaultDestinationInterceptor() { 1751 List<DestinationInterceptor> answer = new ArrayList<DestinationInterceptor>(); 1752 if (isUseVirtualTopics()) { 1753 VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor(); 1754 VirtualTopic virtualTopic = new VirtualTopic(); 1755 virtualTopic.setName("VirtualTopic.>"); 1756 VirtualDestination[] virtualDestinations = { virtualTopic }; 1757 interceptor.setVirtualDestinations(virtualDestinations); 1758 answer.add(interceptor); 1759 } 1760 if (isUseMirroredQueues()) { 1761 MirroredQueue interceptor = new MirroredQueue(); 1762 answer.add(interceptor); 1763 } 1764 DestinationInterceptor[] array = new DestinationInterceptor[answer.size()]; 1765 answer.toArray(array); 1766 return array; 1767 } 1768 1769 /** 1770 * Strategy method to add interceptors to the broker 1771 * 1772 * @throws IOException 1773 */ 1774 protected Broker addInterceptors(Broker broker) throws Exception { 1775 broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore()); 1776 if (isAdvisorySupport()) { 1777 broker = new AdvisoryBroker(broker); 1778 } 1779 broker = new CompositeDestinationBroker(broker); 1780 if (isPopulateJMSXUserID()) { 1781 broker = new UserIDBroker(broker); 1782 } 1783 if (isMonitorConnectionSplits()) { 1784 broker = new ConnectionSplitBroker(broker); 1785 } 1786 if (plugins != null) { 1787 for (int i = 0; i < plugins.length; i++) { 1788 BrokerPlugin plugin = plugins[i]; 1789 broker = plugin.installPlugin(broker); 1790 } 1791 } 1792 return broker; 1793 } 1794 1795 protected PersistenceAdapter createPersistenceAdapter() throws IOException { 1796 if (isPersistent()) { 1797 return getPersistenceFactory().createPersistenceAdapter(); 1798 } else { 1799 return new MemoryPersistenceAdapter(); 1800 } 1801 } 1802 1803 protected AMQPersistenceAdapterFactory createPersistenceFactory() { 1804 AMQPersistenceAdapterFactory factory = new AMQPersistenceAdapterFactory(); 1805 factory.setDataDirectory(getBrokerDataDirectory()); 1806 factory.setTaskRunnerFactory(getPersistenceTaskRunnerFactory()); 1807 factory.setBrokerName(getBrokerName()); 1808 return factory; 1809 } 1810 1811 protected ObjectName createBrokerObjectName() throws IOException { 1812 try { 1813 return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName=" 1814 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Broker"); 1815 } catch (Throwable e) { 1816 throw IOExceptionSupport.create("Invalid JMX broker name: " + brokerName, e); 1817 } 1818 } 1819 1820 protected TransportConnector createTransportConnector(URI brokerURI) throws Exception { 1821 TransportServer transport = TransportFactory.bind(this, brokerURI); 1822 return new TransportConnector(transport); 1823 } 1824 1825 /** 1826 * Extracts the port from the options 1827 */ 1828 protected Object getPort(Map options) { 1829 Object port = options.get("port"); 1830 if (port == null) { 1831 port = DEFAULT_PORT; 1832 LOG.warn("No port specified so defaulting to: " + port); 1833 } 1834 return port; 1835 } 1836 1837 protected void addShutdownHook() { 1838 if (useShutdownHook) { 1839 shutdownHook = new Thread("ActiveMQ ShutdownHook") { 1840 public void run() { 1841 containerShutdown(); 1842 } 1843 }; 1844 Runtime.getRuntime().addShutdownHook(shutdownHook); 1845 } 1846 } 1847 1848 protected void removeShutdownHook() { 1849 if (shutdownHook != null) { 1850 try { 1851 Runtime.getRuntime().removeShutdownHook(shutdownHook); 1852 } catch (Exception e) { 1853 LOG.debug("Caught exception, must be shutting down: " + e); 1854 } 1855 } 1856 } 1857 1858 /** 1859 * Causes a clean shutdown of the container when the VM is being shut down 1860 */ 1861 protected void containerShutdown() { 1862 try { 1863 stop(); 1864 } catch (IOException e) { 1865 Throwable linkedException = e.getCause(); 1866 if (linkedException != null) { 1867 logError("Failed to shut down: " + e + ". Reason: " + linkedException, linkedException); 1868 } else { 1869 logError("Failed to shut down: " + e, e); 1870 } 1871 if (!useLoggingForShutdownErrors) { 1872 e.printStackTrace(System.err); 1873 } 1874 } catch (Exception e) { 1875 logError("Failed to shut down: " + e, e); 1876 } 1877 } 1878 1879 protected void logError(String message, Throwable e) { 1880 if (useLoggingForShutdownErrors) { 1881 LOG.error("Failed to shut down: " + e); 1882 } else { 1883 System.err.println("Failed to shut down: " + e); 1884 } 1885 } 1886 1887 /** 1888 * Starts any configured destinations on startup 1889 */ 1890 protected void startDestinations() throws Exception { 1891 if (destinations != null) { 1892 ConnectionContext adminConnectionContext = getAdminConnectionContext(); 1893 for (int i = 0; i < destinations.length; i++) { 1894 ActiveMQDestination destination = destinations[i]; 1895 getBroker().addDestination(adminConnectionContext, destination); 1896 } 1897 } 1898 } 1899 1900 /** 1901 * Returns the broker's administration connection context used for 1902 * configuring the broker at startup 1903 */ 1904 public ConnectionContext getAdminConnectionContext() throws Exception { 1905 ConnectionContext adminConnectionContext = getBroker().getAdminConnectionContext(); 1906 if (adminConnectionContext == null) { 1907 adminConnectionContext = createAdminConnectionContext(); 1908 getBroker().setAdminConnectionContext(adminConnectionContext); 1909 } 1910 return adminConnectionContext; 1911 } 1912 1913 /** 1914 * Factory method to create the new administration connection context 1915 * object. Note this method is here rather than inside a default broker 1916 * implementation to ensure that the broker reference inside it is the outer 1917 * most interceptor 1918 */ 1919 protected ConnectionContext createAdminConnectionContext() throws Exception { 1920 ConnectionContext context = new ConnectionContext(); 1921 context.setBroker(getBroker()); 1922 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 1923 return context; 1924 } 1925 1926 protected void waitForSlave() { 1927 try { 1928 if (!slaveStartSignal.await(waitForSlaveTimeout, TimeUnit.MILLISECONDS)) { 1929 throw new IllegalStateException("Gave up waiting for slave to start after " + waitForSlaveTimeout + " milliseconds."); 1930 } 1931 } catch (InterruptedException e) { 1932 LOG.error("Exception waiting for slave:" + e); 1933 } 1934 } 1935 1936 protected void slaveConnectionEstablished() { 1937 slaveStartSignal.countDown(); 1938 } 1939 1940 protected void startManagementContext() throws Exception { 1941 getManagementContext().start(); 1942 adminView = new BrokerView(this, null); 1943 ObjectName objectName = getBrokerObjectName(); 1944 AnnotatedMBean.registerMBean(getManagementContext(), adminView, objectName); 1945 } 1946 1947 /** 1948 * Start all transport and network connections, proxies and bridges 1949 * 1950 * @throws Exception 1951 */ 1952 protected void startAllConnectors() throws Exception { 1953 if (!isSlave()) { 1954 Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations(); 1955 List<TransportConnector> al = new ArrayList<TransportConnector>(); 1956 for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) { 1957 TransportConnector connector = iter.next(); 1958 connector.setBrokerService(this); 1959 al.add(startTransportConnector(connector)); 1960 } 1961 if (al.size() > 0) { 1962 // let's clear the transportConnectors list and replace it with 1963 // the started transportConnector instances 1964 this.transportConnectors.clear(); 1965 setTransportConnectors(al); 1966 } 1967 URI uri = getVmConnectorURI(); 1968 Map<String, String> map = new HashMap<String, String>(URISupport.parseParamters(uri)); 1969 map.put("network", "true"); 1970 map.put("async", "false"); 1971 uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map)); 1972 if (isWaitForSlave()) { 1973 waitForSlave(); 1974 } 1975 if (!stopped.get()) { 1976 for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) { 1977 NetworkConnector connector = iter.next(); 1978 connector.setLocalUri(uri); 1979 connector.setBrokerName(getBrokerName()); 1980 connector.setDurableDestinations(durableDestinations); 1981 connector.start(); 1982 } 1983 for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) { 1984 ProxyConnector connector = iter.next(); 1985 connector.start(); 1986 } 1987 for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) { 1988 JmsConnector connector = iter.next(); 1989 connector.start(); 1990 } 1991 for (Service service : services) { 1992 configureService(service); 1993 service.start(); 1994 } 1995 } 1996 } 1997 } 1998 1999 protected TransportConnector startTransportConnector(TransportConnector connector) throws Exception { 2000 connector.setTaskRunnerFactory(getTaskRunnerFactory()); 2001 MessageAuthorizationPolicy policy = getMessageAuthorizationPolicy(); 2002 if (policy != null) { 2003 connector.setMessageAuthorizationPolicy(policy); 2004 } 2005 if (isUseJmx()) { 2006 connector = registerConnectorMBean(connector); 2007 } 2008 connector.getStatistics().setEnabled(enableStatistics); 2009 connector.start(); 2010 return connector; 2011 } 2012 2013 /** 2014 * Perform any custom dependency injection 2015 */ 2016 protected void configureServices(Object[] services) { 2017 for (Object service : services) { 2018 configureService(service); 2019 } 2020 } 2021 2022 /** 2023 * Perform any custom dependency injection 2024 */ 2025 protected void configureService(Object service) { 2026 if (service instanceof BrokerServiceAware) { 2027 BrokerServiceAware serviceAware = (BrokerServiceAware) service; 2028 serviceAware.setBrokerService(this); 2029 } 2030 if (masterConnector == null) { 2031 if (service instanceof MasterConnector) { 2032 masterConnector = (MasterConnector) service; 2033 supportFailOver = true; 2034 } 2035 } 2036 } 2037 2038 public void handleIOException(IOException exception) { 2039 if (ioExceptionHandler != null) { 2040 ioExceptionHandler.handle(exception); 2041 } else { 2042 LOG.info("Ignoring IO exception, " + exception, exception); 2043 } 2044 } 2045 2046 /** 2047 * Starts all destiantions in persistence store. This includes all inactive 2048 * destinations 2049 */ 2050 protected void startDestinationsInPersistenceStore(Broker broker) throws Exception { 2051 Set destinations = destinationFactory.getDestinations(); 2052 if (destinations != null) { 2053 Iterator iter = destinations.iterator(); 2054 ConnectionContext adminConnectionContext = broker.getAdminConnectionContext(); 2055 if (adminConnectionContext == null) { 2056 ConnectionContext context = new ConnectionContext(); 2057 context.setBroker(broker); 2058 adminConnectionContext = context; 2059 broker.setAdminConnectionContext(adminConnectionContext); 2060 } 2061 while (iter.hasNext()) { 2062 ActiveMQDestination destination = (ActiveMQDestination) iter.next(); 2063 broker.addDestination(adminConnectionContext, destination); 2064 } 2065 } 2066 } 2067 2068 public Broker getRegionBroker() { 2069 return regionBroker; 2070 } 2071 2072 public void setRegionBroker(Broker regionBroker) { 2073 this.regionBroker = regionBroker; 2074 } 2075 2076 public void addShutdownHook(Runnable hook) { 2077 synchronized (shutdownHooks) { 2078 shutdownHooks.add(hook); 2079 } 2080 } 2081 2082 public void removeShutdownHook(Runnable hook) { 2083 synchronized (shutdownHooks) { 2084 shutdownHooks.remove(hook); 2085 } 2086 } 2087 2088 public boolean isSystemExitOnShutdown() { 2089 return systemExitOnShutdown; 2090 } 2091 2092 public void setSystemExitOnShutdown(boolean systemExitOnShutdown) { 2093 this.systemExitOnShutdown = systemExitOnShutdown; 2094 } 2095 2096 public int getSystemExitOnShutdownExitCode() { 2097 return systemExitOnShutdownExitCode; 2098 } 2099 2100 public void setSystemExitOnShutdownExitCode(int systemExitOnShutdownExitCode) { 2101 this.systemExitOnShutdownExitCode = systemExitOnShutdownExitCode; 2102 } 2103 2104 public SslContext getSslContext() { 2105 return sslContext; 2106 } 2107 2108 public void setSslContext(SslContext sslContext) { 2109 this.sslContext = sslContext; 2110 } 2111 2112 public boolean isShutdownOnSlaveFailure() { 2113 return shutdownOnSlaveFailure; 2114 } 2115 2116 public void setShutdownOnSlaveFailure(boolean shutdownOnSlaveFailure) { 2117 this.shutdownOnSlaveFailure = shutdownOnSlaveFailure; 2118 } 2119 2120 public boolean isWaitForSlave() { 2121 return waitForSlave; 2122 } 2123 2124 public void setWaitForSlave(boolean waitForSlave) { 2125 this.waitForSlave = waitForSlave; 2126 } 2127 2128 public long getWaitForSlaveTimeout() { 2129 return this.waitForSlaveTimeout; 2130 } 2131 2132 public void setWaitForSlaveTimeout(long waitForSlaveTimeout) { 2133 this.waitForSlaveTimeout = waitForSlaveTimeout; 2134 } 2135 2136 public CountDownLatch getSlaveStartSignal() { 2137 return slaveStartSignal; 2138 } 2139 2140 /** 2141 * Get the passiveSlave 2142 * @return the passiveSlave 2143 */ 2144 public boolean isPassiveSlave() { 2145 return this.passiveSlave; 2146 } 2147 2148 /** 2149 * Set the passiveSlave 2150 * @param passiveSlave the passiveSlave to set 2151 */ 2152 public void setPassiveSlave(boolean passiveSlave) { 2153 this.passiveSlave = passiveSlave; 2154 } 2155 2156 public void setIoExceptionHandler(IOExceptionHandler ioExceptionHandler) { 2157 ioExceptionHandler.setBrokerService(this); 2158 this.ioExceptionHandler = ioExceptionHandler; 2159 } 2160 2161 2162 }