001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.network;
018    
019    import java.net.URI;
020    import java.net.URISyntaxException;
021    import java.util.Collection;
022    import java.util.HashMap;
023    import java.util.HashSet;
024    import java.util.List;
025    import java.util.Map;
026    import java.util.Set;
027    import java.util.concurrent.ConcurrentHashMap;
028    import java.util.concurrent.CopyOnWriteArrayList;
029    
030    import javax.management.MalformedObjectNameException;
031    import javax.management.ObjectName;
032    
033    import org.apache.activemq.Service;
034    import org.apache.activemq.broker.BrokerService;
035    import org.apache.activemq.broker.jmx.AnnotatedMBean;
036    import org.apache.activemq.broker.jmx.NetworkBridgeView;
037    import org.apache.activemq.broker.jmx.NetworkBridgeViewMBean;
038    import org.apache.activemq.command.ActiveMQDestination;
039    import org.apache.activemq.command.ConsumerId;
040    import org.apache.activemq.transport.Transport;
041    import org.apache.activemq.transport.TransportFactory;
042    import org.apache.activemq.util.JMXSupport;
043    import org.apache.activemq.util.ServiceStopper;
044    import org.apache.activemq.util.ServiceSupport;
045    import org.apache.commons.logging.Log;
046    import org.apache.commons.logging.LogFactory;
047    
048    /**
049     * @version $Revision$
050     */
051    public abstract class NetworkConnector extends NetworkBridgeConfiguration implements Service {
052    
053        private static final Log LOG = LogFactory.getLog(NetworkConnector.class);
054        protected URI localURI;
055        protected ConnectionFilter connectionFilter;
056        protected ConcurrentHashMap<URI, NetworkBridge> bridges = new ConcurrentHashMap<URI, NetworkBridge>();
057        
058        protected ServiceSupport serviceSupport = new ServiceSupport() {
059    
060            protected void doStart() throws Exception {
061                handleStart();
062            }
063    
064            protected void doStop(ServiceStopper stopper) throws Exception {
065                handleStop(stopper);
066            }
067        };
068    
069        private Set<ActiveMQDestination> durableDestinations;
070        private List<ActiveMQDestination> excludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>();
071        private List<ActiveMQDestination> dynamicallyIncludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>();
072        private List<ActiveMQDestination> staticallyIncludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>();
073        private BrokerService brokerService;
074        private ObjectName objectName;
075        
076        public NetworkConnector() {
077        }
078    
079        public NetworkConnector(URI localURI) {
080            this.localURI = localURI;
081        }
082    
083        public URI getLocalUri() throws URISyntaxException {
084            return localURI;
085        }
086    
087        public void setLocalUri(URI localURI) {
088            this.localURI = localURI;
089        }
090    
091        /**
092         * @return Returns the durableDestinations.
093         */
094        public Set getDurableDestinations() {
095            return durableDestinations;
096        }
097    
098        /**
099         * @param durableDestinations The durableDestinations to set.
100         */
101        public void setDurableDestinations(Set<ActiveMQDestination> durableDestinations) {
102            this.durableDestinations = durableDestinations;
103        }
104    
105        /**
106         * @return Returns the excludedDestinations.
107         */
108        public List<ActiveMQDestination> getExcludedDestinations() {
109            return excludedDestinations;
110        }
111    
112        /**
113         * @param excludedDestinations The excludedDestinations to set.
114         */
115        public void setExcludedDestinations(List<ActiveMQDestination> excludedDestinations) {
116            this.excludedDestinations = excludedDestinations;
117        }
118    
119        public void addExcludedDestination(ActiveMQDestination destiantion) {
120            this.excludedDestinations.add(destiantion);
121        }
122    
123        /**
124         * @return Returns the staticallyIncludedDestinations.
125         */
126        public List<ActiveMQDestination> getStaticallyIncludedDestinations() {
127            return staticallyIncludedDestinations;
128        }
129    
130        /**
131         * @param staticallyIncludedDestinations The staticallyIncludedDestinations
132         *                to set.
133         */
134        public void setStaticallyIncludedDestinations(List<ActiveMQDestination> staticallyIncludedDestinations) {
135            this.staticallyIncludedDestinations = staticallyIncludedDestinations;
136        }
137    
138        public void addStaticallyIncludedDestination(ActiveMQDestination destiantion) {
139            this.staticallyIncludedDestinations.add(destiantion);
140        }
141    
142        /**
143         * @return Returns the dynamicallyIncludedDestinations.
144         */
145        public List<ActiveMQDestination> getDynamicallyIncludedDestinations() {
146            return dynamicallyIncludedDestinations;
147        }
148    
149        /**
150         * @param dynamicallyIncludedDestinations The
151         *                dynamicallyIncludedDestinations to set.
152         */
153        public void setDynamicallyIncludedDestinations(List<ActiveMQDestination> dynamicallyIncludedDestinations) {
154            this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
155        }
156    
157        public void addDynamicallyIncludedDestination(ActiveMQDestination destiantion) {
158            this.dynamicallyIncludedDestinations.add(destiantion);
159        }
160    
161        public ConnectionFilter getConnectionFilter() {
162            return connectionFilter;
163        }
164    
165        public void setConnectionFilter(ConnectionFilter connectionFilter) {
166            this.connectionFilter = connectionFilter;
167        }
168    
169        // Implementation methods
170        // -------------------------------------------------------------------------
171        protected NetworkBridge configureBridge(DemandForwardingBridgeSupport result) {
172            List<ActiveMQDestination> destsList = getDynamicallyIncludedDestinations();
173            ActiveMQDestination dests[] = destsList.toArray(new ActiveMQDestination[destsList.size()]);
174            result.setDynamicallyIncludedDestinations(dests);
175            destsList = getExcludedDestinations();
176            dests = destsList.toArray(new ActiveMQDestination[destsList.size()]);
177            result.setExcludedDestinations(dests);
178            destsList = getStaticallyIncludedDestinations();
179            dests = destsList.toArray(new ActiveMQDestination[destsList.size()]);
180            result.setStaticallyIncludedDestinations(dests);
181            if (durableDestinations != null) {
182                
183                HashSet<ActiveMQDestination> topics = new HashSet<ActiveMQDestination>();
184                for (ActiveMQDestination d : durableDestinations) {
185                    if( d.isTopic() ) {
186                        topics.add(d);
187                    }
188                }
189                
190                ActiveMQDestination[] dest = new ActiveMQDestination[topics.size()];
191                dest = (ActiveMQDestination[])topics.toArray(dest);
192                result.setDurableDestinations(dest);
193            }
194            return result;
195        }
196    
197        protected Transport createLocalTransport() throws Exception {
198            return TransportFactory.connect(localURI);
199        }
200    
201        public void start() throws Exception {
202            serviceSupport.start();
203        }
204    
205        public void stop() throws Exception {
206            serviceSupport.stop();
207        }
208    
209        protected void handleStart() throws Exception {
210            if (localURI == null) {
211                throw new IllegalStateException("You must configure the 'localURI' property");
212            }
213            LOG.info("Network Connector " + getName() + " Started");
214        }
215    
216        protected void handleStop(ServiceStopper stopper) throws Exception {
217            LOG.info("Network Connector " + getName() + " Stopped");
218        }
219    
220        public ObjectName getObjectName() {
221            return objectName;
222        }
223    
224        public void setObjectName(ObjectName objectName) {
225            this.objectName = objectName;
226        }
227    
228        public BrokerService getBrokerService() {
229            return brokerService;
230        }
231    
232        public void setBrokerService(BrokerService brokerService) {
233            this.brokerService = brokerService;
234        }
235    
236        protected void registerNetworkBridgeMBean(NetworkBridge bridge) {
237            if (!getBrokerService().isUseJmx()) {
238                return;
239            }
240            NetworkBridgeViewMBean view = new NetworkBridgeView(bridge);
241            try {
242                ObjectName objectName = createNetworkBridgeObjectName(bridge);
243                AnnotatedMBean.registerMBean(getBrokerService().getManagementContext(), view, objectName);
244            } catch (Throwable e) {
245                LOG.debug("Network bridge could not be registered in JMX: " + e.getMessage(), e);
246            }
247        }
248    
249        protected void unregisterNetworkBridgeMBean(NetworkBridge bridge) {
250            if (!getBrokerService().isUseJmx()) {
251                return;
252            }
253            try {
254                ObjectName objectName = createNetworkBridgeObjectName(bridge);
255                getBrokerService().getManagementContext().unregisterMBean(objectName);
256            } catch (Throwable e) {
257                LOG.debug("Network bridge could not be unregistered in JMX: " + e.getMessage(), e);
258            }
259        }
260        
261    
262        @SuppressWarnings("unchecked")
263        protected ObjectName createNetworkBridgeObjectName(NetworkBridge bridge) throws MalformedObjectNameException {
264            ObjectName connectorName = getObjectName();
265            Map<String, String> map = new HashMap<String, String>(connectorName.getKeyPropertyList());
266            return new ObjectName(connectorName.getDomain() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart((String)map.get("BrokerName")) + "," + "Type=NetworkBridge,"
267                                  + "NetworkConnectorName=" + JMXSupport.encodeObjectNamePart((String)map.get("NetworkConnectorName")) + "," + "Name="
268                                  + JMXSupport.encodeObjectNamePart(JMXSupport.encodeObjectNamePart(bridge.getRemoteAddress())));
269        }
270    
271        // ask all the bridges as we can't know to which this consumer is tied
272        public boolean removeDemandSubscription(ConsumerId consumerId) {
273            boolean removeSucceeded = false;
274            for (NetworkBridge bridge : bridges.values()) {
275                if (bridge instanceof DemandForwardingBridgeSupport) {
276                    DemandForwardingBridgeSupport demandBridge = (DemandForwardingBridgeSupport) bridge;
277                    if (demandBridge.removeDemandSubscriptionByLocalId(consumerId)) {
278                        removeSucceeded = true;
279                        break;
280                    }
281                }
282            }
283            return removeSucceeded;
284        }
285        
286        public Collection<NetworkBridge> activeBridges() {
287            return bridges.values();
288        }
289    
290    }