001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *     http://www.apache.org/licenses/LICENSE-2.0
010     *
011     *  Unless required by applicable law or agreed to in writing, software
012     *  distributed under the License is distributed on an "AS IS" BASIS,
013     *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     *  See the License for the specific language governing permissions and
015     *  limitations under the License.
016     */
017    package org.apache.activemq.util.osgi;
018    
019    import java.io.IOException;
020    import java.io.InputStream;
021    import java.io.InputStreamReader;
022    import java.io.BufferedReader;
023    import java.util.Properties;
024    import java.util.ArrayList;
025    import java.util.concurrent.ConcurrentHashMap;
026    import java.util.concurrent.ConcurrentMap;
027    import java.net.URL;
028    
029    import org.apache.activemq.util.FactoryFinder;
030    import org.apache.activemq.util.FactoryFinder.ObjectFactory;
031    import org.apache.commons.logging.LogFactory;
032    import org.apache.commons.logging.Log;
033    
034    import org.osgi.framework.Bundle;
035    import org.osgi.framework.BundleActivator;
036    import org.osgi.framework.BundleContext;
037    import org.osgi.framework.BundleEvent;
038    import org.osgi.framework.SynchronousBundleListener;
039    
040    /**
041     * An OSGi bundle activator for ActiveMQ which adapts the {@link org.apache.activemq.util.FactoryFinder}
042     * to the OSGi environment.
043     *
044     */
045    public class Activator implements BundleActivator, SynchronousBundleListener, ObjectFactory {
046    
047        private static final Log LOG = LogFactory.getLog(Activator.class);
048    
049        private final ConcurrentHashMap<String, Class> serviceCache = new ConcurrentHashMap<String, Class>();
050        private final ConcurrentMap<Long, BundleWrapper> bundleWrappers = new ConcurrentHashMap<Long, BundleWrapper>();
051        private BundleContext bundleContext;
052    
053        // ================================================================
054        // BundleActivator interface impl
055        // ================================================================
056    
057        public synchronized void start(BundleContext bundleContext) throws Exception {
058    
059            // This is how we replace the default FactoryFinder strategy
060            // with one that is more compatible in an OSGi env.
061            FactoryFinder.setObjectFactory(this);
062    
063            debug("activating");
064            this.bundleContext = bundleContext;
065            debug("checking existing bundles");
066            for (Bundle bundle : bundleContext.getBundles()) {
067                if (bundle.getState() == Bundle.RESOLVED || bundle.getState() == Bundle.STARTING ||
068                    bundle.getState() == Bundle.ACTIVE || bundle.getState() == Bundle.STOPPING) {
069                    register(bundle);
070                }
071            }
072            debug("activated");
073        }
074    
075    
076        public synchronized void stop(BundleContext bundleContext) throws Exception {
077            debug("deactivating");
078            bundleContext.removeBundleListener(this);
079            while (!bundleWrappers.isEmpty()) {
080                unregister(bundleWrappers.keySet().iterator().next());
081            }
082            debug("deactivated");
083            this.bundleContext = null;
084        }
085    
086        // ================================================================
087        // SynchronousBundleListener interface impl
088        // ================================================================
089    
090        public void bundleChanged(BundleEvent event) {
091            if (event.getType() == BundleEvent.RESOLVED) {
092                register(event.getBundle());
093            } else if (event.getType() == BundleEvent.UNRESOLVED || event.getType() == BundleEvent.UNINSTALLED) {
094                unregister(event.getBundle().getBundleId());
095            }
096        }
097    
098        protected void register(final Bundle bundle) {
099            debug("checking bundle " + bundle.getBundleId());
100            if( !isImportingUs(bundle) ) {
101                debug("The bundle does not import us: "+ bundle.getBundleId());
102                return;
103            }
104            bundleWrappers.put(bundle.getBundleId(), new BundleWrapper(bundle));
105        }
106    
107        /**
108         * When bundles unload.. we remove them thier cached Class entries from the
109         * serviceCache.  Future service lookups for the service will fail.
110         *
111         * TODO: consider a way to get the Broker release any references to
112         * instances of the service.
113         *
114         * @param bundleId
115         */
116        protected void unregister(long bundleId) {
117            BundleWrapper bundle = bundleWrappers.remove(bundleId);
118            if (bundle != null) {
119                for (String path : bundle.cachedServices) {
120                    debug("unregistering service for key: " +path );
121                    serviceCache.remove(path);
122                }
123            }
124        }
125    
126        // ================================================================
127        // ObjectFactory interface impl
128        // ================================================================
129    
130        public Object create(String path) throws IllegalAccessException, InstantiationException, IOException, ClassNotFoundException {
131            Class clazz = serviceCache.get(path);
132            if (clazz == null) {
133                StringBuffer warnings = new StringBuffer();
134                // We need to look for a bundle that has that class.
135                int wrrningCounter=1;
136                for (BundleWrapper wrapper : bundleWrappers.values()) {
137                    URL resource = wrapper.bundle.getResource(path);
138                    if( resource == null ) {
139                        continue;
140                    }
141    
142                    Properties properties = loadProperties(resource);
143    
144                    String className = properties.getProperty("class");
145                    if (className == null) {
146                        warnings.append("("+(wrrningCounter++)+") Invalid sevice file in bundle "+wrapper+": 'class' property not defined.");
147                        continue;
148                    }
149    
150                    try {
151                        clazz = wrapper.bundle.loadClass(className);
152                    } catch (ClassNotFoundException e) {
153                        warnings.append("("+(wrrningCounter++)+") Bundle "+wrapper+" could not load "+className+": "+e);
154                        continue;
155                    }
156    
157                    // Yay.. the class was found.  Now cache it.
158                    serviceCache.put(path, clazz);
159                    wrapper.cachedServices.add(path);
160                    break;
161                }
162    
163                if( clazz == null ) {
164                    // Since OSGi is such a tricky enviorment to work in.. lets give folks the
165                    // most information we can in the error message.
166                    String msg = "Service not found: '" + path + "'";
167                    if (warnings.length()!= 0) {
168                        msg += ", "+warnings;
169                    }
170                    throw new IOException(msg);
171                }
172            }
173            return clazz.newInstance();
174        }
175    
176        // ================================================================
177        // Internal Helper Methods
178        // ================================================================
179    
180        private void debug(Object msg) {
181            LOG.debug(msg);
182        }
183    
184        private Properties loadProperties(URL resource) throws IOException {
185            InputStream in = resource.openStream();
186            try {
187                BufferedReader br = new BufferedReader(new InputStreamReader(in, "UTF-8"));
188                Properties properties = new Properties();
189                properties.load(in);
190                return properties;
191            } finally {
192                try {
193                    in.close();
194                } catch (Exception e) {
195                }
196            }
197        }
198    
199        private boolean isImportingUs(Bundle bundle) {
200            try {
201                // If that bundle can load our classes.. then it must be importing us.
202                return bundle.loadClass(Activator.class.getName())==Activator.class;
203            } catch (ClassNotFoundException e) {
204                return false;
205            }
206        }
207    
208        private static class BundleWrapper {
209            private final Bundle bundle;
210            private final ArrayList<String> cachedServices = new ArrayList<String>();
211    
212            public BundleWrapper(Bundle bundle) {
213                this.bundle = bundle;
214            }
215        }
216    }