001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.net.URI;
022    import java.net.URISyntaxException;
023    import java.net.UnknownHostException;
024    import java.util.ArrayList;
025    import java.util.HashMap;
026    import java.util.Iterator;
027    import java.util.List;
028    import java.util.Map;
029    import java.util.Set;
030    import java.util.concurrent.CopyOnWriteArrayList;
031    import java.util.concurrent.CountDownLatch;
032    import java.util.concurrent.TimeUnit;
033    import java.util.concurrent.atomic.AtomicBoolean;
034    
035    import javax.annotation.PostConstruct;
036    import javax.annotation.PreDestroy;
037    import javax.management.MalformedObjectNameException;
038    import javax.management.ObjectName;
039    
040    import org.apache.activemq.ActiveMQConnectionMetaData;
041    import org.apache.activemq.ConfigurationException;
042    import org.apache.activemq.Service;
043    import org.apache.activemq.advisory.AdvisoryBroker;
044    import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
045    import org.apache.activemq.broker.ft.MasterConnector;
046    import org.apache.activemq.broker.jmx.AnnotatedMBean;
047    import org.apache.activemq.broker.jmx.BrokerView;
048    import org.apache.activemq.broker.jmx.ConnectorView;
049    import org.apache.activemq.broker.jmx.ConnectorViewMBean;
050    import org.apache.activemq.broker.jmx.FTConnectorView;
051    import org.apache.activemq.broker.jmx.JmsConnectorView;
052    import org.apache.activemq.broker.jmx.ManagedRegionBroker;
053    import org.apache.activemq.broker.jmx.ManagementContext;
054    import org.apache.activemq.broker.jmx.NetworkConnectorView;
055    import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
056    import org.apache.activemq.broker.jmx.ProxyConnectorView;
057    import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
058    import org.apache.activemq.broker.region.Destination;
059    import org.apache.activemq.broker.region.DestinationFactory;
060    import org.apache.activemq.broker.region.DestinationFactoryImpl;
061    import org.apache.activemq.broker.region.DestinationInterceptor;
062    import org.apache.activemq.broker.region.RegionBroker;
063    import org.apache.activemq.broker.region.policy.PolicyMap;
064    import org.apache.activemq.broker.region.virtual.MirroredQueue;
065    import org.apache.activemq.broker.region.virtual.VirtualDestination;
066    import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
067    import org.apache.activemq.broker.region.virtual.VirtualTopic;
068    import org.apache.activemq.command.ActiveMQDestination;
069    import org.apache.activemq.command.BrokerId;
070    import org.apache.activemq.kaha.Store;
071    import org.apache.activemq.kaha.StoreFactory;
072    import org.apache.activemq.network.ConnectionFilter;
073    import org.apache.activemq.network.DiscoveryNetworkConnector;
074    import org.apache.activemq.network.NetworkConnector;
075    import org.apache.activemq.network.jms.JmsConnector;
076    import org.apache.activemq.proxy.ProxyConnector;
077    import org.apache.activemq.security.MessageAuthorizationPolicy;
078    import org.apache.activemq.security.SecurityContext;
079    import org.apache.activemq.selector.SelectorParser;
080    import org.apache.activemq.store.PersistenceAdapter;
081    import org.apache.activemq.store.PersistenceAdapterFactory;
082    import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
083    import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
084    import org.apache.activemq.thread.TaskRunnerFactory;
085    import org.apache.activemq.transport.TransportFactory;
086    import org.apache.activemq.transport.TransportServer;
087    import org.apache.activemq.transport.vm.VMTransportFactory;
088    import org.apache.activemq.usage.SystemUsage;
089    import org.apache.activemq.util.DefaultIOExceptionHandler;
090    import org.apache.activemq.util.IOExceptionHandler;
091    import org.apache.activemq.util.IOExceptionSupport;
092    import org.apache.activemq.util.IOHelper;
093    import org.apache.activemq.util.JMXSupport;
094    import org.apache.activemq.util.ServiceStopper;
095    import org.apache.activemq.util.URISupport;
096    import org.apache.commons.logging.Log;
097    import org.apache.commons.logging.LogFactory;
098    /**
099     * Manages the lifecycle of an ActiveMQ Broker. A BrokerService consists of a
100     * number of transport connectors, network connectors and a bunch of properties
101     * which can be used to configure the broker as its lazily created.
102     * 
103     * @version $Revision: 1.1 $
104     * @org.apache.xbean.XBean
105     */
106    public class BrokerService implements Service {
107        protected CountDownLatch slaveStartSignal = new CountDownLatch(1);
108        public static final String DEFAULT_PORT = "61616";
109        public static final String LOCAL_HOST_NAME;
110        public static final String DEFAULT_BROKER_NAME = "localhost";
111        private static final Log LOG = LogFactory.getLog(BrokerService.class);
112        private static final long serialVersionUID = 7353129142305630237L;
113        private boolean useJmx = true;
114        private boolean enableStatistics = true;
115        private boolean persistent = true;
116        private boolean populateJMSXUserID;
117        private boolean useShutdownHook = true;
118        private boolean useLoggingForShutdownErrors;
119        private boolean shutdownOnMasterFailure;
120        private boolean shutdownOnSlaveFailure;
121        private boolean waitForSlave;
122        private long waitForSlaveTimeout = 600000L;
123        private boolean passiveSlave;
124        private String brokerName = DEFAULT_BROKER_NAME;
125        private File dataDirectoryFile;
126        private File tmpDataDirectory;
127        private Broker broker;
128        private BrokerView adminView;
129        private ManagementContext managementContext;
130        private ObjectName brokerObjectName;
131        private TaskRunnerFactory taskRunnerFactory;
132        private TaskRunnerFactory persistenceTaskRunnerFactory;
133        private SystemUsage systemUsage;
134        private SystemUsage producerSystemUsage;
135        private SystemUsage consumerSystemUsaage;
136        private PersistenceAdapter persistenceAdapter;
137        private PersistenceAdapterFactory persistenceFactory;
138        protected DestinationFactory destinationFactory;
139        private MessageAuthorizationPolicy messageAuthorizationPolicy;
140        private List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>();
141        private List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>();
142        private List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>();
143        private List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>();
144        private List<Service> services = new ArrayList<Service>();
145        private MasterConnector masterConnector;
146        private String masterConnectorURI;
147        private transient Thread shutdownHook;
148        private String[] transportConnectorURIs;
149        private String[] networkConnectorURIs;
150        private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges
151        // to other jms messaging
152        // systems
153        private boolean deleteAllMessagesOnStartup;
154        private boolean advisorySupport = true;
155        private URI vmConnectorURI;
156        private PolicyMap destinationPolicy;
157        private AtomicBoolean started = new AtomicBoolean(false);
158        private AtomicBoolean stopped = new AtomicBoolean(false);
159        private BrokerPlugin[] plugins;
160        private boolean keepDurableSubsActive = true;
161        private boolean useVirtualTopics = true;
162        private boolean useMirroredQueues = false;
163        private boolean useTempMirroredQueues = true;
164        private BrokerId brokerId;
165        private DestinationInterceptor[] destinationInterceptors;
166        private ActiveMQDestination[] destinations;
167        private Store tempDataStore;
168        private int persistenceThreadPriority = Thread.MAX_PRIORITY;
169        private boolean useLocalHostBrokerName;
170        private CountDownLatch stoppedLatch = new CountDownLatch(1);
171        private CountDownLatch startedLatch = new CountDownLatch(1);
172        private boolean supportFailOver;
173        private Broker regionBroker;
174        private int producerSystemUsagePortion = 60;
175        private int consumerSystemUsagePortion = 40;
176        private boolean splitSystemUsageForProducersConsumers;
177        private boolean monitorConnectionSplits = false;
178        private int taskRunnerPriority = Thread.NORM_PRIORITY;
179        private boolean dedicatedTaskRunner;
180        private boolean cacheTempDestinations = false;// useful for failover
181        private int timeBeforePurgeTempDestinations = 5000;
182        private List<Runnable> shutdownHooks = new ArrayList<Runnable>();
183        private boolean systemExitOnShutdown;
184        private int systemExitOnShutdownExitCode;
185        private SslContext sslContext;
186        private boolean forceStart = false;
187        private IOExceptionHandler ioExceptionHandler;
188    
189            static {
190            String localHostName = "localhost";
191            try {
192                localHostName = java.net.InetAddress.getLocalHost().getHostName();
193            } catch (UnknownHostException e) {
194                LOG.error("Failed to resolve localhost");
195            }
196            LOCAL_HOST_NAME = localHostName;
197        }
198    
199        @Override
200        public String toString() {
201            return "BrokerService[" + getBrokerName() + "]";
202        }
203    
204        /**
205         * Adds a new transport connector for the given bind address
206         * 
207         * @return the newly created and added transport connector
208         * @throws Exception
209         */
210        public TransportConnector addConnector(String bindAddress) throws Exception {
211            return addConnector(new URI(bindAddress));
212        }
213    
214        /**
215         * Adds a new transport connector for the given bind address
216         * 
217         * @return the newly created and added transport connector
218         * @throws Exception
219         */
220        public TransportConnector addConnector(URI bindAddress) throws Exception {
221            return addConnector(createTransportConnector(bindAddress));
222        }
223    
224        /**
225         * Adds a new transport connector for the given TransportServer transport
226         * 
227         * @return the newly created and added transport connector
228         * @throws Exception
229         */
230        public TransportConnector addConnector(TransportServer transport) throws Exception {
231            return addConnector(new TransportConnector(transport));
232        }
233    
234        /**
235         * Adds a new transport connector
236         * 
237         * @return the transport connector
238         * @throws Exception
239         */
240        public TransportConnector addConnector(TransportConnector connector) throws Exception {
241            transportConnectors.add(connector);
242            return connector;
243        }
244    
245        /**
246         * Stops and removes a transport connector from the broker.
247         * 
248         * @param connector
249         * @return true if the connector has been previously added to the broker
250         * @throws Exception
251         */
252        public boolean removeConnector(TransportConnector connector) throws Exception {
253            boolean rc = transportConnectors.remove(connector);
254            if (rc) {
255                unregisterConnectorMBean(connector);
256            }
257            return rc;
258        }
259    
260        /**
261         * Adds a new network connector using the given discovery address
262         * 
263         * @return the newly created and added network connector
264         * @throws Exception
265         */
266        public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception {
267            return addNetworkConnector(new URI(discoveryAddress));
268        }
269    
270        /**
271         * Adds a new proxy connector using the given bind address
272         * 
273         * @return the newly created and added network connector
274         * @throws Exception
275         */
276        public ProxyConnector addProxyConnector(String bindAddress) throws Exception {
277            return addProxyConnector(new URI(bindAddress));
278        }
279    
280        /**
281         * Adds a new network connector using the given discovery address
282         * 
283         * @return the newly created and added network connector
284         * @throws Exception
285         */
286        public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception {
287            if (!isAdvisorySupport()) {
288                throw new javax.jms.IllegalStateException(
289                        "Networks require advisory messages to function - advisories are currently disabled");
290            }
291            NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress);
292            return addNetworkConnector(connector);
293        }
294    
295        /**
296         * Adds a new proxy connector using the given bind address
297         * 
298         * @return the newly created and added network connector
299         * @throws Exception
300         */
301        public ProxyConnector addProxyConnector(URI bindAddress) throws Exception {
302            ProxyConnector connector = new ProxyConnector();
303            connector.setBind(bindAddress);
304            connector.setRemote(new URI("fanout:multicast://default"));
305            return addProxyConnector(connector);
306        }
307    
308        /**
309         * Adds a new network connector to connect this broker to a federated
310         * network
311         */
312        public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception {
313            connector.setBrokerService(this);
314            URI uri = getVmConnectorURI();
315            Map<String, String> map = new HashMap<String, String>(URISupport.parseParamters(uri));
316            map.put("network", "true");
317            uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
318            connector.setLocalUri(uri);
319            // Set a connection filter so that the connector does not establish loop
320            // back connections.
321            connector.setConnectionFilter(new ConnectionFilter() {
322                public boolean connectTo(URI location) {
323                    List<TransportConnector> transportConnectors = getTransportConnectors();
324                    for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
325                        try {
326                            TransportConnector tc = iter.next();
327                            if (location.equals(tc.getConnectUri())) {
328                                return false;
329                            }
330                        } catch (Throwable e) {
331                        }
332                    }
333                    return true;
334                }
335            });
336            networkConnectors.add(connector);
337            if (isUseJmx()) {
338                registerNetworkConnectorMBean(connector);
339            }
340            return connector;
341        }
342    
343        /**
344         * Removes the given network connector without stopping it. The caller
345         * should call {@link NetworkConnector#stop()} to close the connector
346         */
347        public boolean removeNetworkConnector(NetworkConnector connector) {
348            boolean answer = networkConnectors.remove(connector);
349            if (answer) {
350                unregisterNetworkConnectorMBean(connector);
351            }
352            return answer;
353        }
354    
355        public ProxyConnector addProxyConnector(ProxyConnector connector) throws Exception {
356            URI uri = getVmConnectorURI();
357            connector.setLocalUri(uri);
358            proxyConnectors.add(connector);
359            if (isUseJmx()) {
360                registerProxyConnectorMBean(connector);
361            }
362            return connector;
363        }
364    
365        public JmsConnector addJmsConnector(JmsConnector connector) throws Exception {
366            connector.setBrokerService(this);
367            jmsConnectors.add(connector);
368            if (isUseJmx()) {
369                registerJmsConnectorMBean(connector);
370            }
371            return connector;
372        }
373    
374        public JmsConnector removeJmsConnector(JmsConnector connector) {
375            if (jmsConnectors.remove(connector)) {
376                return connector;
377            }
378            return null;
379        }
380    
381        /**
382         * @return Returns the masterConnectorURI.
383         */
384        public String getMasterConnectorURI() {
385            return masterConnectorURI;
386        }
387    
388        /**
389         * @param masterConnectorURI
390         *            The masterConnectorURI to set.
391         */
392        public void setMasterConnectorURI(String masterConnectorURI) {
393            this.masterConnectorURI = masterConnectorURI;
394        }
395    
396        /**
397         * @return true if this Broker is a slave to a Master
398         */
399        public boolean isSlave() {
400            return (masterConnector != null && masterConnector.isSlave()) ||
401                (masterConnector != null && masterConnector.isStoppedBeforeStart());
402        }
403    
404        public void masterFailed() {
405            if (shutdownOnMasterFailure) {
406                LOG.fatal("The Master has failed ... shutting down");
407                try {
408                    stop();
409                } catch (Exception e) {
410                    LOG.error("Failed to stop for master failure", e);
411                }
412            } else {
413                LOG.warn("Master Failed - starting all connectors");
414                try {
415                    startAllConnectors();
416                    broker.nowMasterBroker();
417                } catch (Exception e) {
418                    LOG.error("Failed to startAllConnectors", e);
419                }
420            }
421        }
422    
423        public boolean isStarted() {
424            return started.get();
425        }
426    
427        public void start(boolean force) throws Exception {
428            forceStart = force;
429            stopped.set(false);
430            started.set(false);
431            start();
432        }
433    
434        // Service interface
435        // -------------------------------------------------------------------------
436    
437        /**
438         *
439         * @throws Exception
440         * @org. apache.xbean.InitMethod
441         */
442        @PostConstruct
443        public void start() throws Exception {
444            if (stopped.get() || !started.compareAndSet(false, true)) {
445                // lets just ignore redundant start() calls
446                // as its way too easy to not be completely sure if start() has been
447                // called or not with the gazillion of different configuration
448                // mechanisms
449                // throw new IllegalStateException("Allready started.");
450                return;
451            }
452            try {
453                    if (systemExitOnShutdown && useShutdownHook) {
454                            throw new ConfigurationException("'useShutdownHook' property cannot be be used with 'systemExitOnShutdown', please turn it off (useShutdownHook=false)");
455                    }
456                processHelperProperties();
457                if (isUseJmx()) {
458                    startManagementContext();
459                }
460                getPersistenceAdapter().setUsageManager(getProducerSystemUsage());
461                getPersistenceAdapter().setBrokerName(getBrokerName());
462                LOG.info("Using Persistence Adapter: " + getPersistenceAdapter());
463                if (deleteAllMessagesOnStartup) {
464                    deleteAllMessages();
465                }
466                getPersistenceAdapter().start();
467                startDestinations();
468                addShutdownHook();
469                getBroker().start();
470                if (isUseJmx()) {
471                    if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) {
472                            // try to restart management context
473                            // typical for slaves that use the same ports as master
474                            managementContext.stop();
475                            startManagementContext();
476                    }
477                    ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker;
478                    managedBroker.setContextBroker(broker);
479                    adminView.setBroker(managedBroker);
480                }
481                BrokerRegistry.getInstance().bind(getBrokerName(), this);
482                // see if there is a MasterBroker service and if so, configure
483                // it and start it.
484                for (Service service : services) {
485                    if (service instanceof MasterConnector) {
486                        configureService(service);
487                        service.start();
488                    }
489                }
490                if (!isSlave() && (this.masterConnector == null || isShutdownOnMasterFailure() == false)) {
491                    startAllConnectors();
492                }
493                if (!stopped.get()) {
494                    if (isUseJmx() && masterConnector != null) {
495                        registerFTConnectorMBean(masterConnector);
496                    }
497                }
498                brokerId = broker.getBrokerId();
499                if (ioExceptionHandler == null) {
500                    setIoExceptionHandler(new DefaultIOExceptionHandler());
501                }
502                LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") started");
503                getBroker().brokerServiceStarted();
504                startedLatch.countDown();
505            } catch (Exception e) {
506                LOG.error("Failed to start ActiveMQ JMS Message Broker. Reason: " + e, e);
507                try {
508                    if (!stopped.get()) {
509                        stop();
510                    }
511                } catch (Exception ex) {
512                    LOG.warn("Failed to stop broker after failure in start ", ex);
513                }
514                throw e;
515            }
516        }
517    
518        /**
519         *
520         * @throws Exception
521         * @org.apache .xbean.DestroyMethod
522         */
523        @PreDestroy
524        public void stop() throws Exception {
525            if (!started.get()) {
526                return;
527            }
528            
529            if (systemExitOnShutdown) {
530                    new Thread() {
531                            public void run() {
532                                    System.exit(systemExitOnShutdownExitCode);
533                            }
534                    }.start();
535            }
536            
537            LOG.info("ActiveMQ Message Broker (" + getBrokerName() + ", " + brokerId + ") is shutting down");
538            removeShutdownHook();
539            ServiceStopper stopper = new ServiceStopper();
540            if (services != null) {
541                for (Service service : services) {
542                    stopper.stop(service);
543                }
544            }
545            stopAllConnectors(stopper);
546            // remove any VMTransports connected
547            // this has to be done after services are stopped,
548            // to avoid timimg issue with discovery (spinning up a new instance)
549            BrokerRegistry.getInstance().unbind(getBrokerName());
550            VMTransportFactory.stopped(getBrokerName());
551            if (broker != null) {
552                stopper.stop(broker);
553            }
554            if (tempDataStore != null) {
555                tempDataStore.close();
556            }
557            stopper.stop(persistenceAdapter);
558            if (isUseJmx()) {
559                stopper.stop(getManagementContext());
560            }
561            // Clear SelectorParser cache to free memory
562            SelectorParser.clearCache();
563            stopped.set(true);
564            stoppedLatch.countDown();
565            if (masterConnectorURI == null) {
566                // master start has not finished yet
567                if (slaveStartSignal.getCount() == 1) {
568                    started.set(false);
569                    slaveStartSignal.countDown();
570                }
571            } else {
572                for (Service service : services) {
573                    if (service instanceof MasterConnector) {
574                        MasterConnector mConnector = (MasterConnector) service;
575                        if (!mConnector.isSlave()) {
576                            // means should be slave but not connected to master yet
577                            started.set(false);
578                            mConnector.stopBeforeConnected();
579                        }
580                    }
581                }
582            }
583            LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") stopped");
584            synchronized (shutdownHooks) {
585                for (Runnable hook : shutdownHooks) {
586                    try {
587                        hook.run();
588                    } catch (Throwable e) {
589                        stopper.onException(hook, e);
590                    }
591                }
592            }
593            
594            stopper.throwFirstException();
595        }
596        
597        public boolean checkQueueSize(String queueName) {
598            long count = 0;
599            long queueSize = 0;
600            Map<ActiveMQDestination, Destination> destinationMap = regionBroker.getDestinationMap();
601            for (Map.Entry<ActiveMQDestination, Destination> entry : destinationMap.entrySet()) {
602                if (entry.getKey().isQueue()) {
603                    if (entry.getValue().getName().matches(queueName)) {
604                        queueSize = entry.getValue().getDestinationStatistics().getMessages().getCount();
605                        count = queueSize;
606                        if (queueSize > 0) {
607                            LOG.info("Queue has pending message:" + entry.getValue().getName() + " queueSize is:"
608                                    + queueSize);
609                        }
610                    }
611                }
612            }
613            return count == 0;
614        }
615    
616        /**
617         * This method (both connectorName and queueName are using regex to match)
618         * 1. stop the connector (supposed the user input the connector which the
619         * clients connect to) 2. to check whether there is any pending message on
620         * the queues defined by queueName 3. supposedly, after stop the connector,
621         * client should failover to other broker and pending messages should be
622         * forwarded. if no pending messages, the method finally call stop to stop
623         * the broker.
624         * 
625         * @param connectorName
626         * @param queueName
627         * @param timeout
628         * @param pollInterval
629         * @throws Exception
630         */
631        public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval)
632                throws Exception {
633            if (isUseJmx()) {
634                if (connectorName == null || queueName == null || timeout <= 0) {
635                    throw new Exception(
636                            "connectorName and queueName cannot be null and timeout should be >0 for stopGracefully.");
637                }
638                if (pollInterval <= 0) {
639                    pollInterval = 30;
640                }
641                LOG.info("Stop gracefully with connectorName:" + connectorName + " queueName:" + queueName + " timeout:"
642                        + timeout + " pollInterval:" + pollInterval);
643                TransportConnector connector;
644                for (int i = 0; i < transportConnectors.size(); i++) {
645                    connector = transportConnectors.get(i);
646                    if (connector != null && connector.getName() != null && connector.getName().matches(connectorName)) {
647                        connector.stop();
648                    }
649                }
650                long start = System.currentTimeMillis();
651                while (System.currentTimeMillis() - start < timeout * 1000) {
652                    // check quesize until it gets zero
653                    if (checkQueueSize(queueName)) {
654                        stop();
655                        break;
656                    } else {
657                        Thread.sleep(pollInterval * 1000);
658                    }
659                }
660                if (stopped.get()) {
661                    LOG.info("Successfully stop the broker.");
662                } else {
663                    LOG.info("There is still pending message on the queue. Please check and stop the broker manually.");
664                }
665            }
666        }
667    
668        /**
669         * A helper method to block the caller thread until the broker has been
670         * stopped
671         */
672        public void waitUntilStopped() {
673            while (isStarted() && !stopped.get()) {
674                try {
675                    stoppedLatch.await();
676                } catch (InterruptedException e) {
677                    // ignore
678                }
679            }
680        }
681    
682        /**
683         * A helper method to block the caller thread until the broker has been
684         * started
685         */
686        public void waitUntilStarted() {
687            boolean waitSucceeded = false;
688            while (isStarted() && !stopped.get() && !waitSucceeded) {
689                try {
690                    waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS);
691                } catch (InterruptedException ignore) {
692                }
693            }
694        }
695    
696        // Properties
697        // -------------------------------------------------------------------------
698        /**
699         * Returns the message broker
700         */
701        public Broker getBroker() throws Exception {
702            if (broker == null) {
703                LOG.info("ActiveMQ " + ActiveMQConnectionMetaData.PROVIDER_VERSION + " JMS Message Broker ("
704                        + getBrokerName() + ") is starting");
705                LOG.info("For help or more information please see: http://activemq.apache.org/");
706                broker = createBroker();
707            }
708            return broker;
709        }
710    
711        /**
712         * Returns the administration view of the broker; used to create and destroy
713         * resources such as queues and topics. Note this method returns null if JMX
714         * is disabled.
715         */
716        public BrokerView getAdminView() throws Exception {
717            if (adminView == null) {
718                // force lazy creation
719                getBroker();
720            }
721            return adminView;
722        }
723    
724        public void setAdminView(BrokerView adminView) {
725            this.adminView = adminView;
726        }
727    
728        public String getBrokerName() {
729            return brokerName;
730        }
731    
732        /**
733         * Sets the name of this broker; which must be unique in the network
734         * 
735         * @param brokerName
736         */
737        public void setBrokerName(String brokerName) {
738            if (brokerName == null) {
739                throw new NullPointerException("The broker name cannot be null");
740            }
741            String str = brokerName.replaceAll("[^a-zA-Z0-9\\.\\_\\-\\:]", "_");
742            if (!str.equals(brokerName)) {
743                LOG.error("Broker Name: " + brokerName + " contained illegal characters - replaced with " + str);
744            }
745            this.brokerName = str.trim();
746        }
747    
748        public PersistenceAdapterFactory getPersistenceFactory() {
749            if (persistenceFactory == null) {
750                persistenceFactory = createPersistenceFactory();
751            }
752            return persistenceFactory;
753        }
754    
755        public File getDataDirectoryFile() {
756            if (dataDirectoryFile == null) {
757                dataDirectoryFile = new File(IOHelper.getDefaultDataDirectory());
758            }
759            return dataDirectoryFile;
760        }
761    
762        public File getBrokerDataDirectory() {
763            String brokerDir = getBrokerName();
764            return new File(getDataDirectoryFile(), brokerDir);
765        }
766    
767        /**
768         * Sets the directory in which the data files will be stored by default for
769         * the JDBC and Journal persistence adaptors.
770         * 
771         * @param dataDirectory
772         *            the directory to store data files
773         */
774        public void setDataDirectory(String dataDirectory) {
775            setDataDirectoryFile(new File(dataDirectory));
776        }
777    
778        /**
779         * Sets the directory in which the data files will be stored by default for
780         * the JDBC and Journal persistence adaptors.
781         * 
782         * @param dataDirectoryFile
783         *            the directory to store data files
784         */
785        public void setDataDirectoryFile(File dataDirectoryFile) {
786            this.dataDirectoryFile = dataDirectoryFile;
787        }
788    
789        /**
790         * @return the tmpDataDirectory
791         */
792        public File getTmpDataDirectory() {
793            if (tmpDataDirectory == null) {
794                tmpDataDirectory = new File(getBrokerDataDirectory(), "tmp_storage");
795            }
796            return tmpDataDirectory;
797        }
798    
799        /**
800         * @param tmpDataDirectory
801         *            the tmpDataDirectory to set
802         */
803        public void setTmpDataDirectory(File tmpDataDirectory) {
804            this.tmpDataDirectory = tmpDataDirectory;
805        }
806    
807        public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) {
808            this.persistenceFactory = persistenceFactory;
809        }
810    
811        public void setDestinationFactory(DestinationFactory destinationFactory) {
812            this.destinationFactory = destinationFactory;
813        }
814    
815        public boolean isPersistent() {
816            return persistent;
817        }
818    
819        /**
820         * Sets whether or not persistence is enabled or disabled.
821         */
822        public void setPersistent(boolean persistent) {
823            this.persistent = persistent;
824        }
825    
826        public boolean isPopulateJMSXUserID() {
827            return populateJMSXUserID;
828        }
829    
830        /**
831         * Sets whether or not the broker should populate the JMSXUserID header.
832         */
833        public void setPopulateJMSXUserID(boolean populateJMSXUserID) {
834            this.populateJMSXUserID = populateJMSXUserID;
835        }
836    
837        public SystemUsage getSystemUsage() {
838            try {
839                if (systemUsage == null) {
840                    systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore());
841                    systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 64); // Default
842                                                                             // 64
843                                                                             // Meg
844                    systemUsage.getTempUsage().setLimit(1024L * 1024 * 1024 * 100); // 10
845                                                                                    // Gb
846                    systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1024 * 100); // 100
847                                                                                     // GB
848                    addService(this.systemUsage);
849                }
850                return systemUsage;
851            } catch (IOException e) {
852                LOG.fatal("Cannot create SystemUsage", e);
853                throw new RuntimeException("Fatally failed to create SystemUsage" + e.getMessage());
854            }
855        }
856    
857        public void setSystemUsage(SystemUsage memoryManager) {
858            if (this.systemUsage != null) {
859                removeService(this.systemUsage);
860            }
861            this.systemUsage = memoryManager;
862            addService(this.systemUsage);
863        }
864    
865        /**
866         * @return the consumerUsageManager
867         * @throws IOException
868         */
869        public SystemUsage getConsumerSystemUsage() throws IOException {
870            if (this.consumerSystemUsaage == null) {
871                if (splitSystemUsageForProducersConsumers) {
872                    this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer");
873                    float portion = consumerSystemUsagePortion / 100f;
874                    this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(portion);
875                    addService(this.consumerSystemUsaage);
876                } else {
877                    consumerSystemUsaage = getSystemUsage();
878                }
879            }
880            return this.consumerSystemUsaage;
881        }
882    
883        /**
884         * @param consumerSystemUsaage
885         *            the storeSystemUsage to set
886         */
887        public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) {
888            if (this.consumerSystemUsaage != null) {
889                removeService(this.consumerSystemUsaage);
890            }
891            this.consumerSystemUsaage = consumerSystemUsaage;
892            addService(this.consumerSystemUsaage);
893        }
894    
895        /**
896         * @return the producerUsageManager
897         * @throws IOException
898         */
899        public SystemUsage getProducerSystemUsage() throws IOException {
900            if (producerSystemUsage == null) {
901                if (splitSystemUsageForProducersConsumers) {
902                    producerSystemUsage = new SystemUsage(getSystemUsage(), "Producer");
903                    float portion = producerSystemUsagePortion / 100f;
904                    producerSystemUsage.getMemoryUsage().setUsagePortion(portion);
905                    addService(producerSystemUsage);
906                } else {
907                    producerSystemUsage = getSystemUsage();
908                }
909            }
910            return producerSystemUsage;
911        }
912    
913        /**
914         * @param producerUsageManager
915         *            the producerUsageManager to set
916         */
917        public void setProducerSystemUsage(SystemUsage producerUsageManager) {
918            if (this.producerSystemUsage != null) {
919                removeService(this.producerSystemUsage);
920            }
921            this.producerSystemUsage = producerUsageManager;
922            addService(this.producerSystemUsage);
923        }
924    
925        public PersistenceAdapter getPersistenceAdapter() throws IOException {
926            if (persistenceAdapter == null) {
927                persistenceAdapter = createPersistenceAdapter();
928                configureService(persistenceAdapter);
929                this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
930            }
931            return persistenceAdapter;
932        }
933    
934        /**
935         * Sets the persistence adaptor implementation to use for this broker
936         * 
937         * @throws IOException
938         */
939        public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException {
940            this.persistenceAdapter = persistenceAdapter;
941            configureService(this.persistenceAdapter);
942            this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
943        }
944    
945        public TaskRunnerFactory getTaskRunnerFactory() {
946            if (taskRunnerFactory == null) {
947                taskRunnerFactory = new TaskRunnerFactory("BrokerService", getTaskRunnerPriority(), true, 1000,
948                        isDedicatedTaskRunner());
949            }
950            return taskRunnerFactory;
951        }
952    
953        public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
954            this.taskRunnerFactory = taskRunnerFactory;
955        }
956    
957        public TaskRunnerFactory getPersistenceTaskRunnerFactory() {
958            if (taskRunnerFactory == null) {
959                persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority,
960                        true, 1000, isDedicatedTaskRunner());
961            }
962            return persistenceTaskRunnerFactory;
963        }
964    
965        public void setPersistenceTaskRunnerFactory(TaskRunnerFactory persistenceTaskRunnerFactory) {
966            this.persistenceTaskRunnerFactory = persistenceTaskRunnerFactory;
967        }
968    
969        public boolean isUseJmx() {
970            return useJmx;
971        }
972    
973        public boolean isEnableStatistics() {
974            return enableStatistics;
975        }
976    
977        /**
978         * Sets whether or not the Broker's services enable statistics or not.
979         */
980        public void setEnableStatistics(boolean enableStatistics) {
981            this.enableStatistics = enableStatistics;
982        }
983    
984        /**
985         * Sets whether or not the Broker's services should be exposed into JMX or
986         * not.
987         */
988        public void setUseJmx(boolean useJmx) {
989            this.useJmx = useJmx;
990        }
991    
992        public ObjectName getBrokerObjectName() throws IOException {
993            if (brokerObjectName == null) {
994                brokerObjectName = createBrokerObjectName();
995            }
996            return brokerObjectName;
997        }
998    
999        /**
1000         * Sets the JMX ObjectName for this broker
1001         */
1002        public void setBrokerObjectName(ObjectName brokerObjectName) {
1003            this.brokerObjectName = brokerObjectName;
1004        }
1005    
1006        public ManagementContext getManagementContext() {
1007            if (managementContext == null) {
1008                managementContext = new ManagementContext();
1009            }
1010            return managementContext;
1011        }
1012    
1013        public void setManagementContext(ManagementContext managementContext) {
1014            this.managementContext = managementContext;
1015        }
1016    
1017        public NetworkConnector getNetworkConnectorByName(String connectorName) {
1018            for (NetworkConnector connector : networkConnectors) {
1019                if (connector.getName().equals(connectorName)) {
1020                    return connector;
1021                }
1022            }
1023            return null;
1024        }
1025    
1026        public String[] getNetworkConnectorURIs() {
1027            return networkConnectorURIs;
1028        }
1029    
1030        public void setNetworkConnectorURIs(String[] networkConnectorURIs) {
1031            this.networkConnectorURIs = networkConnectorURIs;
1032        }
1033    
1034        public TransportConnector getConnectorByName(String connectorName) {
1035            for (TransportConnector connector : transportConnectors) {
1036                if (connector.getName().equals(connectorName)) {
1037                    return connector;
1038                }
1039            }
1040            return null;
1041        }
1042        
1043        public Map<String, String> getTransportConnectorURIsAsMap() {
1044            Map<String, String> answer = new HashMap<String, String>();
1045            for (TransportConnector connector : transportConnectors) {
1046                try {
1047                    URI uri = connector.getConnectUri();
1048                    String scheme = uri.getScheme();
1049                    if (scheme != null) {
1050                        answer.put(scheme.toLowerCase(), uri.toString());
1051                    }
1052                } catch (Exception e) {
1053                    LOG.debug("Failed to read URI to build transportURIsAsMap", e);
1054                }
1055            }
1056            return answer;
1057        }
1058    
1059        public String[] getTransportConnectorURIs() {
1060            return transportConnectorURIs;
1061        }
1062    
1063        public void setTransportConnectorURIs(String[] transportConnectorURIs) {
1064            this.transportConnectorURIs = transportConnectorURIs;
1065        }
1066    
1067        /**
1068         * @return Returns the jmsBridgeConnectors.
1069         */
1070        public JmsConnector[] getJmsBridgeConnectors() {
1071            return jmsBridgeConnectors;
1072        }
1073    
1074        /**
1075         * @param jmsConnectors
1076         *            The jmsBridgeConnectors to set.
1077         */
1078        public void setJmsBridgeConnectors(JmsConnector[] jmsConnectors) {
1079            this.jmsBridgeConnectors = jmsConnectors;
1080        }
1081    
1082        public Service[] getServices() {
1083            return (Service[]) services.toArray(new Service[0]);
1084        }
1085    
1086        /**
1087         * Sets the services associated with this broker such as a
1088         * {@link MasterConnector}
1089         */
1090        public void setServices(Service[] services) {
1091            this.services.clear();
1092            if (services != null) {
1093                for (int i = 0; i < services.length; i++) {
1094                    this.services.add(services[i]);
1095                }
1096            }
1097        }
1098    
1099        /**
1100         * Adds a new service so that it will be started as part of the broker
1101         * lifecycle
1102         */
1103        public void addService(Service service) {
1104            services.add(service);
1105        }
1106    
1107        public void removeService(Service service) {
1108            services.remove(service);
1109        }
1110    
1111        public boolean isUseLoggingForShutdownErrors() {
1112            return useLoggingForShutdownErrors;
1113        }
1114    
1115        /**
1116         * Sets whether or not we should use commons-logging when reporting errors
1117         * when shutting down the broker
1118         */
1119        public void setUseLoggingForShutdownErrors(boolean useLoggingForShutdownErrors) {
1120            this.useLoggingForShutdownErrors = useLoggingForShutdownErrors;
1121        }
1122    
1123        public boolean isUseShutdownHook() {
1124            return useShutdownHook;
1125        }
1126    
1127        /**
1128         * Sets whether or not we should use a shutdown handler to close down the
1129         * broker cleanly if the JVM is terminated. It is recommended you leave this
1130         * enabled.
1131         */
1132        public void setUseShutdownHook(boolean useShutdownHook) {
1133            this.useShutdownHook = useShutdownHook;
1134        }
1135    
1136        public boolean isAdvisorySupport() {
1137            return advisorySupport;
1138        }
1139    
1140        /**
1141         * Allows the support of advisory messages to be disabled for performance
1142         * reasons.
1143         */
1144        public void setAdvisorySupport(boolean advisorySupport) {
1145            this.advisorySupport = advisorySupport;
1146        }
1147    
1148        public List<TransportConnector> getTransportConnectors() {
1149            return new ArrayList<TransportConnector>(transportConnectors);
1150        }
1151    
1152        /**
1153         * Sets the transport connectors which this broker will listen on for new
1154         * clients
1155         * 
1156         * @org.apache.xbean.Property 
1157         *                            nestedType="org.apache.activemq.broker.TransportConnector"
1158         */
1159        public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception {
1160            for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
1161                TransportConnector connector = iter.next();
1162                addConnector(connector);
1163            }
1164        }
1165    
1166        public List<NetworkConnector> getNetworkConnectors() {
1167            return new ArrayList<NetworkConnector>(networkConnectors);
1168        }
1169    
1170        public List<ProxyConnector> getProxyConnectors() {
1171            return new ArrayList<ProxyConnector>(proxyConnectors);
1172        }
1173    
1174        /**
1175         * Sets the network connectors which this broker will use to connect to
1176         * other brokers in a federated network
1177         * 
1178         * @org.apache.xbean.Property 
1179         *                            nestedType="org.apache.activemq.network.NetworkConnector"
1180         */
1181        public void setNetworkConnectors(List networkConnectors) throws Exception {
1182            for (Iterator iter = networkConnectors.iterator(); iter.hasNext();) {
1183                NetworkConnector connector = (NetworkConnector) iter.next();
1184                addNetworkConnector(connector);
1185            }
1186        }
1187    
1188        /**
1189         * Sets the network connectors which this broker will use to connect to
1190         * other brokers in a federated network
1191         */
1192        public void setProxyConnectors(List proxyConnectors) throws Exception {
1193            for (Iterator iter = proxyConnectors.iterator(); iter.hasNext();) {
1194                ProxyConnector connector = (ProxyConnector) iter.next();
1195                addProxyConnector(connector);
1196            }
1197        }
1198    
1199        public PolicyMap getDestinationPolicy() {
1200            return destinationPolicy;
1201        }
1202    
1203        /**
1204         * Sets the destination specific policies available either for exact
1205         * destinations or for wildcard areas of destinations.
1206         */
1207        public void setDestinationPolicy(PolicyMap policyMap) {
1208            this.destinationPolicy = policyMap;
1209        }
1210    
1211        public BrokerPlugin[] getPlugins() {
1212            return plugins;
1213        }
1214    
1215        /**
1216         * Sets a number of broker plugins to install such as for security
1217         * authentication or authorization
1218         */
1219        public void setPlugins(BrokerPlugin[] plugins) {
1220            this.plugins = plugins;
1221        }
1222    
1223        public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
1224            return messageAuthorizationPolicy;
1225        }
1226    
1227        /**
1228         * Sets the policy used to decide if the current connection is authorized to
1229         * consume a given message
1230         */
1231        public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
1232            this.messageAuthorizationPolicy = messageAuthorizationPolicy;
1233        }
1234    
1235        /**
1236         * Delete all messages from the persistent store
1237         * 
1238         * @throws IOException
1239         */
1240        public void deleteAllMessages() throws IOException {
1241            getPersistenceAdapter().deleteAllMessages();
1242        }
1243    
1244        public boolean isDeleteAllMessagesOnStartup() {
1245            return deleteAllMessagesOnStartup;
1246        }
1247    
1248        /**
1249         * Sets whether or not all messages are deleted on startup - mostly only
1250         * useful for testing.
1251         */
1252        public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup) {
1253            this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup;
1254        }
1255    
1256        public URI getVmConnectorURI() {
1257            if (vmConnectorURI == null) {
1258                try {
1259                    vmConnectorURI = new URI("vm://" + getBrokerName().replaceAll("[^a-zA-Z0-9\\.\\_\\-]", "_"));
1260                } catch (URISyntaxException e) {
1261                    LOG.error("Badly formed URI from " + getBrokerName(), e);
1262                }
1263            }
1264            return vmConnectorURI;
1265        }
1266    
1267        public void setVmConnectorURI(URI vmConnectorURI) {
1268            this.vmConnectorURI = vmConnectorURI;
1269        }
1270    
1271        /**
1272         * @return Returns the shutdownOnMasterFailure.
1273         */
1274        public boolean isShutdownOnMasterFailure() {
1275            return shutdownOnMasterFailure;
1276        }
1277    
1278        /**
1279         * @param shutdownOnMasterFailure
1280         *            The shutdownOnMasterFailure to set.
1281         */
1282        public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure) {
1283            this.shutdownOnMasterFailure = shutdownOnMasterFailure;
1284        }
1285    
1286        public boolean isKeepDurableSubsActive() {
1287            return keepDurableSubsActive;
1288        }
1289    
1290        public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
1291            this.keepDurableSubsActive = keepDurableSubsActive;
1292        }
1293    
1294        public boolean isUseVirtualTopics() {
1295            return useVirtualTopics;
1296        }
1297    
1298        /**
1299         * Sets whether or not <a
1300         * href="http://activemq.apache.org/virtual-destinations.html">Virtual
1301         * Topics</a> should be supported by default if they have not been
1302         * explicitly configured.
1303         */
1304        public void setUseVirtualTopics(boolean useVirtualTopics) {
1305            this.useVirtualTopics = useVirtualTopics;
1306        }
1307    
1308        public DestinationInterceptor[] getDestinationInterceptors() {
1309            return destinationInterceptors;
1310        }
1311    
1312        public boolean isUseMirroredQueues() {
1313            return useMirroredQueues;
1314        }
1315    
1316        /**
1317         * Sets whether or not <a
1318         * href="http://activemq.apache.org/mirrored-queues.html">Mirrored
1319         * Queues</a> should be supported by default if they have not been
1320         * explicitly configured.
1321         */
1322        public void setUseMirroredQueues(boolean useMirroredQueues) {
1323            this.useMirroredQueues = useMirroredQueues;
1324        }
1325    
1326        /**
1327         * Sets the destination interceptors to use
1328         */
1329        public void setDestinationInterceptors(DestinationInterceptor[] destinationInterceptors) {
1330            this.destinationInterceptors = destinationInterceptors;
1331        }
1332    
1333        public ActiveMQDestination[] getDestinations() {
1334            return destinations;
1335        }
1336    
1337        /**
1338         * Sets the destinations which should be loaded/created on startup
1339         */
1340        public void setDestinations(ActiveMQDestination[] destinations) {
1341            this.destinations = destinations;
1342        }
1343    
1344        /**
1345         * @return the tempDataStore
1346         */
1347        public synchronized Store getTempDataStore() {
1348            if (tempDataStore == null) {
1349                if (!isPersistent()) {
1350                    return null;
1351                }
1352                boolean result = true;
1353                boolean empty = true;
1354                try {
1355                    File directory = getTmpDataDirectory();
1356                    if (directory.exists() && directory.isDirectory()) {
1357                        File[] files = directory.listFiles();
1358                        if (files != null && files.length > 0) {
1359                            empty = false;
1360                            for (int i = 0; i < files.length; i++) {
1361                                File file = files[i];
1362                                if (!file.isDirectory()) {
1363                                    result &= file.delete();
1364                                }
1365                            }
1366                        }
1367                    }
1368                    if (!empty) {
1369                        String str = result ? "Successfully deleted" : "Failed to delete";
1370                        LOG.info(str + " temporary storage");
1371                    }
1372                    tempDataStore = StoreFactory.open(getTmpDataDirectory(), "rw");
1373                } catch (IOException e) {
1374                    throw new RuntimeException(e);
1375                }
1376            }
1377            return tempDataStore;
1378        }
1379    
1380        /**
1381         * @param tempDataStore
1382         *            the tempDataStore to set
1383         */
1384        public void setTempDataStore(Store tempDataStore) {
1385            this.tempDataStore = tempDataStore;
1386        }
1387    
1388        public int getPersistenceThreadPriority() {
1389            return persistenceThreadPriority;
1390        }
1391    
1392        public void setPersistenceThreadPriority(int persistenceThreadPriority) {
1393            this.persistenceThreadPriority = persistenceThreadPriority;
1394        }
1395    
1396        /**
1397         * @return the useLocalHostBrokerName
1398         */
1399        public boolean isUseLocalHostBrokerName() {
1400            return this.useLocalHostBrokerName;
1401        }
1402    
1403        /**
1404         * @param useLocalHostBrokerName
1405         *            the useLocalHostBrokerName to set
1406         */
1407        public void setUseLocalHostBrokerName(boolean useLocalHostBrokerName) {
1408            this.useLocalHostBrokerName = useLocalHostBrokerName;
1409            if (useLocalHostBrokerName && !started.get() && brokerName == null || brokerName == DEFAULT_BROKER_NAME) {
1410                brokerName = LOCAL_HOST_NAME;
1411            }
1412        }
1413    
1414        /**
1415         * @return the supportFailOver
1416         */
1417        public boolean isSupportFailOver() {
1418            return this.supportFailOver;
1419        }
1420    
1421        /**
1422         * @param supportFailOver
1423         *            the supportFailOver to set
1424         */
1425        public void setSupportFailOver(boolean supportFailOver) {
1426            this.supportFailOver = supportFailOver;
1427        }
1428    
1429        /**
1430         * Looks up and lazily creates if necessary the destination for the given
1431         * JMS name
1432         */
1433        public Destination getDestination(ActiveMQDestination destination) throws Exception {
1434            return getBroker().addDestination(getAdminConnectionContext(), destination);
1435        }
1436    
1437        public void removeDestination(ActiveMQDestination destination) throws Exception {
1438            getBroker().removeDestination(getAdminConnectionContext(), destination, 0);
1439        }
1440    
1441        public int getProducerSystemUsagePortion() {
1442            return producerSystemUsagePortion;
1443        }
1444    
1445        public void setProducerSystemUsagePortion(int producerSystemUsagePortion) {
1446            this.producerSystemUsagePortion = producerSystemUsagePortion;
1447        }
1448    
1449        public int getConsumerSystemUsagePortion() {
1450            return consumerSystemUsagePortion;
1451        }
1452    
1453        public void setConsumerSystemUsagePortion(int consumerSystemUsagePortion) {
1454            this.consumerSystemUsagePortion = consumerSystemUsagePortion;
1455        }
1456    
1457        public boolean isSplitSystemUsageForProducersConsumers() {
1458            return splitSystemUsageForProducersConsumers;
1459        }
1460    
1461        public void setSplitSystemUsageForProducersConsumers(boolean splitSystemUsageForProducersConsumers) {
1462            this.splitSystemUsageForProducersConsumers = splitSystemUsageForProducersConsumers;
1463        }
1464    
1465        public boolean isMonitorConnectionSplits() {
1466            return monitorConnectionSplits;
1467        }
1468    
1469        public void setMonitorConnectionSplits(boolean monitorConnectionSplits) {
1470            this.monitorConnectionSplits = monitorConnectionSplits;
1471        }
1472    
1473        public int getTaskRunnerPriority() {
1474            return taskRunnerPriority;
1475        }
1476    
1477        public void setTaskRunnerPriority(int taskRunnerPriority) {
1478            this.taskRunnerPriority = taskRunnerPriority;
1479        }
1480    
1481        public boolean isDedicatedTaskRunner() {
1482            return dedicatedTaskRunner;
1483        }
1484    
1485        public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) {
1486            this.dedicatedTaskRunner = dedicatedTaskRunner;
1487        }
1488    
1489        public boolean isCacheTempDestinations() {
1490            return cacheTempDestinations;
1491        }
1492    
1493        public void setCacheTempDestinations(boolean cacheTempDestinations) {
1494            this.cacheTempDestinations = cacheTempDestinations;
1495        }
1496    
1497        public int getTimeBeforePurgeTempDestinations() {
1498            return timeBeforePurgeTempDestinations;
1499        }
1500    
1501        public void setTimeBeforePurgeTempDestinations(int timeBeforePurgeTempDestinations) {
1502            this.timeBeforePurgeTempDestinations = timeBeforePurgeTempDestinations;
1503        }
1504    
1505        public boolean isUseTempMirroredQueues() {
1506            return useTempMirroredQueues;
1507        }
1508    
1509        public void setUseTempMirroredQueues(boolean useTempMirroredQueues) {
1510            this.useTempMirroredQueues = useTempMirroredQueues;
1511        }
1512    
1513        //
1514        // Implementation methods
1515        // -------------------------------------------------------------------------
1516        /**
1517         * Handles any lazy-creation helper properties which are added to make
1518         * things easier to configure inside environments such as Spring
1519         * 
1520         * @throws Exception
1521         */
1522        protected void processHelperProperties() throws Exception {
1523            boolean masterServiceExists = false;
1524            if (transportConnectorURIs != null) {
1525                for (int i = 0; i < transportConnectorURIs.length; i++) {
1526                    String uri = transportConnectorURIs[i];
1527                    addConnector(uri);
1528                }
1529            }
1530            if (networkConnectorURIs != null) {
1531                for (int i = 0; i < networkConnectorURIs.length; i++) {
1532                    String uri = networkConnectorURIs[i];
1533                    addNetworkConnector(uri);
1534                }
1535            }
1536            if (jmsBridgeConnectors != null) {
1537                for (int i = 0; i < jmsBridgeConnectors.length; i++) {
1538                    addJmsConnector(jmsBridgeConnectors[i]);
1539                }
1540            }
1541            for (Service service : services) {
1542                if (service instanceof MasterConnector) {
1543                    masterServiceExists = true;
1544                    break;
1545                }
1546            }
1547            if (masterConnectorURI != null) {
1548                if (masterServiceExists) {
1549                    throw new IllegalStateException(
1550                            "Cannot specify masterConnectorURI when a masterConnector is already registered via the services property");
1551                } else {
1552                    addService(new MasterConnector(masterConnectorURI));
1553                }
1554            }
1555        }
1556    
1557        protected void stopAllConnectors(ServiceStopper stopper) {
1558            for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
1559                NetworkConnector connector = iter.next();
1560                unregisterNetworkConnectorMBean(connector);
1561                stopper.stop(connector);
1562            }
1563            for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
1564                ProxyConnector connector = iter.next();
1565                stopper.stop(connector);
1566            }
1567            for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
1568                JmsConnector connector = iter.next();
1569                stopper.stop(connector);
1570            }
1571            for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
1572                TransportConnector connector = iter.next();
1573                stopper.stop(connector);
1574            }
1575        }
1576    
1577        protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException {
1578            try {
1579                ObjectName objectName = createConnectorObjectName(connector);
1580                connector = connector.asManagedConnector(getManagementContext(), objectName);
1581                ConnectorViewMBean view = new ConnectorView(connector);
1582                AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1583                return connector;
1584            } catch (Throwable e) {
1585                throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e.getMessage(), e);
1586            }
1587        }
1588    
1589        protected void unregisterConnectorMBean(TransportConnector connector) throws IOException {
1590            if (isUseJmx()) {
1591                try {
1592                    ObjectName objectName = createConnectorObjectName(connector);
1593                    getManagementContext().unregisterMBean(objectName);
1594                } catch (Throwable e) {
1595                    throw IOExceptionSupport.create(
1596                            "Transport Connector could not be unregistered in JMX: " + e.getMessage(), e);
1597                }
1598            }
1599        }
1600    
1601        protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
1602            return adaptor;
1603        }
1604    
1605        protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
1606            if (isUseJmx()) {
1607            }
1608        }
1609    
1610        private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException {
1611            return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1612                    + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Connector," + "ConnectorName="
1613                    + JMXSupport.encodeObjectNamePart(connector.getName()));
1614        }
1615    
1616        protected void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException {
1617            NetworkConnectorViewMBean view = new NetworkConnectorView(connector);
1618            try {
1619                ObjectName objectName = createNetworkConnectorObjectName(connector);
1620                connector.setObjectName(objectName);
1621                AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1622            } catch (Throwable e) {
1623                throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e);
1624            }
1625        }
1626    
1627        protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector)
1628                throws MalformedObjectNameException {
1629            return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1630                    + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=NetworkConnector,"
1631                    + "NetworkConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
1632        }
1633    
1634        protected void unregisterNetworkConnectorMBean(NetworkConnector connector) {
1635            if (isUseJmx()) {
1636                try {
1637                    ObjectName objectName = createNetworkConnectorObjectName(connector);
1638                    getManagementContext().unregisterMBean(objectName);
1639                } catch (Exception e) {
1640                    LOG.error("Network Connector could not be unregistered from JMX: " + e, e);
1641                }
1642            }
1643        }
1644    
1645        protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException {
1646            ProxyConnectorView view = new ProxyConnectorView(connector);
1647            try {
1648                ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1649                        + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=ProxyConnector,"
1650                        + "ProxyConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
1651                AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1652            } catch (Throwable e) {
1653                throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
1654            }
1655        }
1656    
1657        protected void registerFTConnectorMBean(MasterConnector connector) throws IOException {
1658            FTConnectorView view = new FTConnectorView(connector);
1659            try {
1660                ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1661                        + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=MasterConnector");
1662                AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1663            } catch (Throwable e) {
1664                throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
1665            }
1666        }
1667    
1668        protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException {
1669            JmsConnectorView view = new JmsConnectorView(connector);
1670            try {
1671                ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1672                        + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=JmsConnector,"
1673                        + "JmsConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
1674                AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1675            } catch (Throwable e) {
1676                throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
1677            }
1678        }
1679    
1680        /**
1681         * Factory method to create a new broker
1682         * 
1683         * @throws Exception
1684         * @throws
1685         * @throws
1686         */
1687        protected Broker createBroker() throws Exception {
1688            regionBroker = createRegionBroker();
1689            Broker broker = addInterceptors(regionBroker);
1690            // Add a filter that will stop access to the broker once stopped
1691            broker = new MutableBrokerFilter(broker) {
1692                Broker old;
1693    
1694                public void stop() throws Exception {
1695                    old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) {
1696                        // Just ignore additional stop actions.
1697                        public void stop() throws Exception {
1698                        }
1699                    });
1700                    old.stop();
1701                }
1702    
1703                public void start() throws Exception {
1704                    if (forceStart && old != null) {
1705                        this.next.set(old);
1706                    }
1707                    getNext().start();
1708                }
1709            };
1710            return broker;
1711        }
1712    
1713        /**
1714         * Factory method to create the core region broker onto which interceptors
1715         * are added
1716         * 
1717         * @throws Exception
1718         */
1719        protected Broker createRegionBroker() throws Exception {
1720            if (destinationInterceptors == null) {
1721                destinationInterceptors = createDefaultDestinationInterceptor();
1722            }
1723            configureServices(destinationInterceptors);
1724            DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors);
1725            if (destinationFactory == null) {
1726                destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter());
1727            }
1728            return createRegionBroker(destinationInterceptor);
1729        }
1730    
1731        protected Broker createRegionBroker(DestinationInterceptor destinationInterceptor) throws IOException {
1732            RegionBroker regionBroker;
1733            if (isUseJmx()) {
1734                regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(),
1735                        getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor);
1736            } else {
1737                regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory,
1738                        destinationInterceptor);
1739            }
1740            destinationFactory.setRegionBroker(regionBroker);
1741            regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
1742            regionBroker.setBrokerName(getBrokerName());
1743            regionBroker.getDestinationStatistics().setEnabled(enableStatistics);
1744            return regionBroker;
1745        }
1746    
1747        /**
1748         * Create the default destination interceptor
1749         */
1750        protected DestinationInterceptor[] createDefaultDestinationInterceptor() {
1751            List<DestinationInterceptor> answer = new ArrayList<DestinationInterceptor>();
1752            if (isUseVirtualTopics()) {
1753                VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
1754                VirtualTopic virtualTopic = new VirtualTopic();
1755                virtualTopic.setName("VirtualTopic.>");
1756                VirtualDestination[] virtualDestinations = { virtualTopic };
1757                interceptor.setVirtualDestinations(virtualDestinations);
1758                answer.add(interceptor);
1759            }
1760            if (isUseMirroredQueues()) {
1761                MirroredQueue interceptor = new MirroredQueue();
1762                answer.add(interceptor);
1763            }
1764            DestinationInterceptor[] array = new DestinationInterceptor[answer.size()];
1765            answer.toArray(array);
1766            return array;
1767        }
1768    
1769        /**
1770         * Strategy method to add interceptors to the broker
1771         * 
1772         * @throws IOException
1773         */
1774        protected Broker addInterceptors(Broker broker) throws Exception {
1775            broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore());
1776            if (isAdvisorySupport()) {
1777                broker = new AdvisoryBroker(broker);
1778            }
1779            broker = new CompositeDestinationBroker(broker);
1780            if (isPopulateJMSXUserID()) {
1781                broker = new UserIDBroker(broker);
1782            }
1783            if (isMonitorConnectionSplits()) {
1784                broker = new ConnectionSplitBroker(broker);
1785            }
1786            if (plugins != null) {
1787                for (int i = 0; i < plugins.length; i++) {
1788                    BrokerPlugin plugin = plugins[i];
1789                    broker = plugin.installPlugin(broker);
1790                }
1791            }
1792            return broker;
1793        }
1794    
1795        protected PersistenceAdapter createPersistenceAdapter() throws IOException {
1796            if (isPersistent()) {
1797                return getPersistenceFactory().createPersistenceAdapter();
1798            } else {
1799                return new MemoryPersistenceAdapter();
1800            }
1801        }
1802    
1803        protected AMQPersistenceAdapterFactory createPersistenceFactory() {
1804            AMQPersistenceAdapterFactory factory = new AMQPersistenceAdapterFactory();
1805            factory.setDataDirectory(getBrokerDataDirectory());
1806            factory.setTaskRunnerFactory(getPersistenceTaskRunnerFactory());
1807            factory.setBrokerName(getBrokerName());
1808            return factory;
1809        }
1810    
1811        protected ObjectName createBrokerObjectName() throws IOException {
1812            try {
1813                return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1814                        + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Broker");
1815            } catch (Throwable e) {
1816                throw IOExceptionSupport.create("Invalid JMX broker name: " + brokerName, e);
1817            }
1818        }
1819    
1820        protected TransportConnector createTransportConnector(URI brokerURI) throws Exception {
1821            TransportServer transport = TransportFactory.bind(this, brokerURI);
1822            return new TransportConnector(transport);
1823        }
1824    
1825        /**
1826         * Extracts the port from the options
1827         */
1828        protected Object getPort(Map options) {
1829            Object port = options.get("port");
1830            if (port == null) {
1831                port = DEFAULT_PORT;
1832                LOG.warn("No port specified so defaulting to: " + port);
1833            }
1834            return port;
1835        }
1836    
1837        protected void addShutdownHook() {
1838            if (useShutdownHook) {
1839                shutdownHook = new Thread("ActiveMQ ShutdownHook") {
1840                    public void run() {
1841                        containerShutdown();
1842                    }
1843                };
1844                Runtime.getRuntime().addShutdownHook(shutdownHook);
1845            }
1846        }
1847    
1848        protected void removeShutdownHook() {
1849            if (shutdownHook != null) {
1850                try {
1851                    Runtime.getRuntime().removeShutdownHook(shutdownHook);
1852                } catch (Exception e) {
1853                    LOG.debug("Caught exception, must be shutting down: " + e);
1854                }
1855            }
1856        }
1857    
1858        /**
1859         * Causes a clean shutdown of the container when the VM is being shut down
1860         */
1861        protected void containerShutdown() {
1862            try {
1863                stop();
1864            } catch (IOException e) {
1865                Throwable linkedException = e.getCause();
1866                if (linkedException != null) {
1867                    logError("Failed to shut down: " + e + ". Reason: " + linkedException, linkedException);
1868                } else {
1869                    logError("Failed to shut down: " + e, e);
1870                }
1871                if (!useLoggingForShutdownErrors) {
1872                    e.printStackTrace(System.err);
1873                }
1874            } catch (Exception e) {
1875                logError("Failed to shut down: " + e, e);
1876            }
1877        }
1878    
1879        protected void logError(String message, Throwable e) {
1880            if (useLoggingForShutdownErrors) {
1881                LOG.error("Failed to shut down: " + e);
1882            } else {
1883                System.err.println("Failed to shut down: " + e);
1884            }
1885        }
1886    
1887        /**
1888         * Starts any configured destinations on startup
1889         */
1890        protected void startDestinations() throws Exception {
1891            if (destinations != null) {
1892                ConnectionContext adminConnectionContext = getAdminConnectionContext();
1893                for (int i = 0; i < destinations.length; i++) {
1894                    ActiveMQDestination destination = destinations[i];
1895                    getBroker().addDestination(adminConnectionContext, destination);
1896                }
1897            }
1898        }
1899    
1900        /**
1901         * Returns the broker's administration connection context used for
1902         * configuring the broker at startup
1903         */
1904        public ConnectionContext getAdminConnectionContext() throws Exception {
1905            ConnectionContext adminConnectionContext = getBroker().getAdminConnectionContext();
1906            if (adminConnectionContext == null) {
1907                adminConnectionContext = createAdminConnectionContext();
1908                getBroker().setAdminConnectionContext(adminConnectionContext);
1909            }
1910            return adminConnectionContext;
1911        }
1912    
1913        /**
1914         * Factory method to create the new administration connection context
1915         * object. Note this method is here rather than inside a default broker
1916         * implementation to ensure that the broker reference inside it is the outer
1917         * most interceptor
1918         */
1919        protected ConnectionContext createAdminConnectionContext() throws Exception {
1920            ConnectionContext context = new ConnectionContext();
1921            context.setBroker(getBroker());
1922            context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
1923            return context;
1924        }
1925    
1926        protected void waitForSlave() {
1927            try {
1928                if (!slaveStartSignal.await(waitForSlaveTimeout, TimeUnit.MILLISECONDS)) {
1929                    throw new IllegalStateException("Gave up waiting for slave to start after " + waitForSlaveTimeout + " milliseconds."); 
1930                }
1931            } catch (InterruptedException e) {
1932                LOG.error("Exception waiting for slave:" + e);
1933            }
1934        }
1935    
1936        protected void slaveConnectionEstablished() {
1937            slaveStartSignal.countDown();
1938        }
1939        
1940        protected void startManagementContext() throws Exception {
1941            getManagementContext().start();
1942            adminView = new BrokerView(this, null);
1943            ObjectName objectName = getBrokerObjectName();
1944            AnnotatedMBean.registerMBean(getManagementContext(), adminView, objectName);
1945        }
1946    
1947        /**
1948         * Start all transport and network connections, proxies and bridges
1949         * 
1950         * @throws Exception
1951         */
1952        protected void startAllConnectors() throws Exception {
1953            if (!isSlave()) {
1954                Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations();
1955                List<TransportConnector> al = new ArrayList<TransportConnector>();
1956                for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
1957                    TransportConnector connector = iter.next();
1958                    connector.setBrokerService(this);
1959                    al.add(startTransportConnector(connector));
1960                }
1961                if (al.size() > 0) {
1962                    // let's clear the transportConnectors list and replace it with
1963                    // the started transportConnector instances
1964                    this.transportConnectors.clear();
1965                    setTransportConnectors(al);
1966                }
1967                URI uri = getVmConnectorURI();
1968                Map<String, String> map = new HashMap<String, String>(URISupport.parseParamters(uri));
1969                map.put("network", "true");
1970                map.put("async", "false");
1971                uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
1972                if (isWaitForSlave()) {
1973                    waitForSlave();
1974                }
1975                if (!stopped.get()) {
1976                    for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
1977                        NetworkConnector connector = iter.next();
1978                        connector.setLocalUri(uri);
1979                        connector.setBrokerName(getBrokerName());
1980                        connector.setDurableDestinations(durableDestinations);
1981                        connector.start();
1982                    }
1983                    for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
1984                        ProxyConnector connector = iter.next();
1985                        connector.start();
1986                    }
1987                    for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
1988                        JmsConnector connector = iter.next();
1989                        connector.start();
1990                    }
1991                    for (Service service : services) {
1992                        configureService(service);
1993                        service.start();
1994                    }
1995                }
1996            }
1997        }
1998    
1999        protected TransportConnector startTransportConnector(TransportConnector connector) throws Exception {
2000            connector.setTaskRunnerFactory(getTaskRunnerFactory());
2001            MessageAuthorizationPolicy policy = getMessageAuthorizationPolicy();
2002            if (policy != null) {
2003                connector.setMessageAuthorizationPolicy(policy);
2004            }
2005            if (isUseJmx()) {
2006                connector = registerConnectorMBean(connector);
2007            }
2008            connector.getStatistics().setEnabled(enableStatistics);
2009            connector.start();
2010            return connector;
2011        }
2012    
2013        /**
2014         * Perform any custom dependency injection
2015         */
2016        protected void configureServices(Object[] services) {
2017            for (Object service : services) {
2018                configureService(service);
2019            }
2020        }
2021    
2022        /**
2023         * Perform any custom dependency injection
2024         */
2025        protected void configureService(Object service) {
2026            if (service instanceof BrokerServiceAware) {
2027                BrokerServiceAware serviceAware = (BrokerServiceAware) service;
2028                serviceAware.setBrokerService(this);
2029            }
2030            if (masterConnector == null) {
2031                if (service instanceof MasterConnector) {
2032                    masterConnector = (MasterConnector) service;
2033                    supportFailOver = true;
2034                }
2035            }
2036        }
2037        
2038        public void handleIOException(IOException exception) {
2039            if (ioExceptionHandler != null) {
2040                ioExceptionHandler.handle(exception);
2041             } else {
2042                LOG.info("Ignoring IO exception, " + exception, exception);
2043             }
2044        }
2045    
2046        /**
2047         * Starts all destiantions in persistence store. This includes all inactive
2048         * destinations
2049         */
2050        protected void startDestinationsInPersistenceStore(Broker broker) throws Exception {
2051            Set destinations = destinationFactory.getDestinations();
2052            if (destinations != null) {
2053                Iterator iter = destinations.iterator();
2054                ConnectionContext adminConnectionContext = broker.getAdminConnectionContext();
2055                if (adminConnectionContext == null) {
2056                    ConnectionContext context = new ConnectionContext();
2057                    context.setBroker(broker);
2058                    adminConnectionContext = context;
2059                    broker.setAdminConnectionContext(adminConnectionContext);
2060                }
2061                while (iter.hasNext()) {
2062                    ActiveMQDestination destination = (ActiveMQDestination) iter.next();
2063                    broker.addDestination(adminConnectionContext, destination);
2064                }
2065            }
2066        }
2067    
2068        public Broker getRegionBroker() {
2069            return regionBroker;
2070        }
2071    
2072        public void setRegionBroker(Broker regionBroker) {
2073            this.regionBroker = regionBroker;
2074        }
2075    
2076        public void addShutdownHook(Runnable hook) {
2077            synchronized (shutdownHooks) {
2078                shutdownHooks.add(hook);
2079            }
2080        }
2081    
2082        public void removeShutdownHook(Runnable hook) {
2083            synchronized (shutdownHooks) {
2084                shutdownHooks.remove(hook);
2085            }
2086        }
2087    
2088        public boolean isSystemExitOnShutdown() {
2089            return systemExitOnShutdown;
2090        }
2091    
2092        public void setSystemExitOnShutdown(boolean systemExitOnShutdown) {
2093            this.systemExitOnShutdown = systemExitOnShutdown;
2094        }
2095    
2096        public int getSystemExitOnShutdownExitCode() {
2097            return systemExitOnShutdownExitCode;
2098        }
2099    
2100        public void setSystemExitOnShutdownExitCode(int systemExitOnShutdownExitCode) {
2101            this.systemExitOnShutdownExitCode = systemExitOnShutdownExitCode;
2102        }
2103    
2104        public SslContext getSslContext() {
2105            return sslContext;
2106        }
2107    
2108        public void setSslContext(SslContext sslContext) {
2109            this.sslContext = sslContext;
2110        }
2111    
2112        public boolean isShutdownOnSlaveFailure() {
2113            return shutdownOnSlaveFailure;
2114        }
2115    
2116        public void setShutdownOnSlaveFailure(boolean shutdownOnSlaveFailure) {
2117            this.shutdownOnSlaveFailure = shutdownOnSlaveFailure;
2118        }
2119    
2120        public boolean isWaitForSlave() {
2121            return waitForSlave;
2122        }
2123    
2124        public void setWaitForSlave(boolean waitForSlave) {
2125            this.waitForSlave = waitForSlave;
2126        }
2127      
2128        public long getWaitForSlaveTimeout() {
2129            return this.waitForSlaveTimeout;
2130        }
2131        
2132        public void setWaitForSlaveTimeout(long waitForSlaveTimeout) {
2133            this.waitForSlaveTimeout = waitForSlaveTimeout;
2134        }
2135        
2136        public CountDownLatch getSlaveStartSignal() {
2137            return slaveStartSignal;
2138        }
2139    
2140        /**
2141         * Get the passiveSlave
2142         * @return the passiveSlave
2143         */
2144        public boolean isPassiveSlave() {
2145            return this.passiveSlave;
2146        }
2147    
2148        /**
2149         * Set the passiveSlave
2150         * @param passiveSlave the passiveSlave to set
2151         */
2152        public void setPassiveSlave(boolean passiveSlave) {
2153            this.passiveSlave = passiveSlave;
2154        }
2155        
2156        public void setIoExceptionHandler(IOExceptionHandler ioExceptionHandler) {
2157            ioExceptionHandler.setBrokerService(this);
2158            this.ioExceptionHandler = ioExceptionHandler;
2159        }
2160        
2161       
2162    }