001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.discovery.multicast;
018    
019    import java.io.IOException;
020    import java.net.DatagramPacket;
021    import java.net.InetAddress;
022    import java.net.InetSocketAddress;
023    import java.net.MulticastSocket;
024    import java.net.NetworkInterface;
025    import java.net.SocketAddress;
026    import java.net.SocketTimeoutException;
027    import java.net.URI;
028    import java.util.Iterator;
029    import java.util.Map;
030    import java.util.concurrent.ConcurrentHashMap;
031    import java.util.concurrent.ExecutorService;
032    import java.util.concurrent.LinkedBlockingQueue;
033    import java.util.concurrent.ThreadFactory;
034    import java.util.concurrent.ThreadPoolExecutor;
035    import java.util.concurrent.TimeUnit;
036    import java.util.concurrent.atomic.AtomicBoolean;
037    
038    import org.apache.activemq.command.DiscoveryEvent;
039    import org.apache.activemq.transport.discovery.DiscoveryAgent;
040    import org.apache.activemq.transport.discovery.DiscoveryListener;
041    import org.apache.commons.logging.Log;
042    import org.apache.commons.logging.LogFactory;
043    
044    /**
045     * A {@link DiscoveryAgent} using a multicast address and heartbeat packets
046     * encoded using any wireformat, but openwire by default.
047     * 
048     * @version $Revision$
049     */
050    public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
051    
052        public static final String DEFAULT_DISCOVERY_URI_STRING = "multicast://239.255.2.3:6155";
053        public static final String DEFAULT_HOST_STR = "default"; 
054        public static final String DEFAULT_HOST_IP  = System.getProperty("activemq.partition.discovery", "239.255.2.3"); 
055        public static final int    DEFAULT_PORT  = 6155; 
056            
057        private static final Log LOG = LogFactory.getLog(MulticastDiscoveryAgent.class);
058        private static final String TYPE_SUFFIX = "ActiveMQ-4.";
059        private static final String ALIVE = "alive.";
060        private static final String DEAD = "dead.";
061        private static final String DELIMITER = "%";
062        private static final int BUFF_SIZE = 8192;
063        private static final int DEFAULT_IDLE_TIME = 500;
064        private static final int HEARTBEAT_MISS_BEFORE_DEATH = 10;
065    
066        private long initialReconnectDelay = 1000 * 5;
067        private long maxReconnectDelay = 1000 * 30;
068        private long backOffMultiplier = 2;
069        private boolean useExponentialBackOff;
070        private int maxReconnectAttempts;
071    
072        private int timeToLive = 1;
073        private boolean loopBackMode;
074        private Map<String, RemoteBrokerData> brokersByService = new ConcurrentHashMap<String, RemoteBrokerData>();
075        private String group = "default";
076        private URI discoveryURI;
077        private InetAddress inetAddress;
078        private SocketAddress sockAddress;
079        private DiscoveryListener discoveryListener;
080        private String selfService;
081        private MulticastSocket mcast;
082        private Thread runner;
083        private long keepAliveInterval = DEFAULT_IDLE_TIME;
084        private String mcInterface;
085        private String mcNetworkInterface;
086        private long lastAdvertizeTime;
087        private AtomicBoolean started = new AtomicBoolean(false);
088        private boolean reportAdvertizeFailed = true;
089        private ExecutorService executor = null;
090    
091        class RemoteBrokerData {
092            final String brokerName;
093            final String service;
094            long lastHeartBeat;
095            long recoveryTime;
096            int failureCount;
097            boolean failed;
098    
099            public RemoteBrokerData(String brokerName, String service) {
100                this.brokerName = brokerName;
101                this.service = service;
102                this.lastHeartBeat = System.currentTimeMillis();
103            }
104    
105            public synchronized void updateHeartBeat() {
106                lastHeartBeat = System.currentTimeMillis();
107    
108                // Consider that the broker recovery has succeeded if it has not
109                // failed in 60 seconds.
110                if (!failed && failureCount > 0 && (lastHeartBeat - recoveryTime) > 1000 * 60) {
111                    if (LOG.isDebugEnabled()) {
112                        LOG.debug("I now think that the " + service + " service has recovered.");
113                    }
114                    failureCount = 0;
115                    recoveryTime = 0;
116                }
117            }
118    
119            public synchronized long getLastHeartBeat() {
120                return lastHeartBeat;
121            }
122    
123            public synchronized boolean markFailed() {
124                if (!failed) {
125                    failed = true;
126                    failureCount++;
127    
128                    long reconnectDelay;
129                    if (!useExponentialBackOff) {
130                        reconnectDelay = initialReconnectDelay;
131                    } else {
132                        reconnectDelay = (long)Math.pow(backOffMultiplier, failureCount);
133                        if (reconnectDelay > maxReconnectDelay) {
134                            reconnectDelay = maxReconnectDelay;
135                        }
136                    }
137    
138                    if (LOG.isDebugEnabled()) {
139                        LOG.debug("Remote failure of " + service + " while still receiving multicast advertisements.  Advertising events will be suppressed for " + reconnectDelay
140                                  + " ms, the current failure count is: " + failureCount);
141                    }
142    
143                    recoveryTime = System.currentTimeMillis() + reconnectDelay;
144                    return true;
145                }
146                return false;
147            }
148    
149            /**
150             * @return true if this broker is marked failed and it is now the right
151             *         time to start recovery.
152             */
153            public synchronized boolean doRecovery() {
154                if (!failed) {
155                    return false;
156                }
157    
158                // Are we done trying to recover this guy?
159                if (maxReconnectAttempts > 0 && failureCount > maxReconnectAttempts) {
160                    if (LOG.isDebugEnabled()) {
161                        LOG.debug("Max reconnect attempts of the " + service + " service has been reached.");
162                    }
163                    return false;
164                }
165    
166                // Is it not yet time?
167                if (System.currentTimeMillis() < recoveryTime) {
168                    return false;
169                }
170    
171                if (LOG.isDebugEnabled()) {
172                    LOG.debug("Resuming event advertisement of the " + service + " service.");
173                }
174                failed = false;
175                return true;
176            }
177    
178            public boolean isFailed() {
179                return failed;
180            }
181        }
182    
183        /**
184         * Set the discovery listener
185         * 
186         * @param listener
187         */
188        public void setDiscoveryListener(DiscoveryListener listener) {
189            this.discoveryListener = listener;
190        }
191    
192        /**
193         * register a service
194         */
195        public void registerService(String name) throws IOException {
196            this.selfService = name;
197            if (started.get()) {
198                doAdvertizeSelf();
199            }
200        }
201    
202        /**
203         * @return Returns the loopBackMode.
204         */
205        public boolean isLoopBackMode() {
206            return loopBackMode;
207        }
208    
209        /**
210         * @param loopBackMode The loopBackMode to set.
211         */
212        public void setLoopBackMode(boolean loopBackMode) {
213            this.loopBackMode = loopBackMode;
214        }
215    
216        /**
217         * @return Returns the timeToLive.
218         */
219        public int getTimeToLive() {
220            return timeToLive;
221        }
222    
223        /**
224         * @param timeToLive The timeToLive to set.
225         */
226        public void setTimeToLive(int timeToLive) {
227            this.timeToLive = timeToLive;
228        }
229    
230        /**
231         * @return the discoveryURI
232         */
233        public URI getDiscoveryURI() {
234            return discoveryURI;
235        }
236    
237        /**
238         * Set the discoveryURI
239         * 
240         * @param discoveryURI
241         */
242        public void setDiscoveryURI(URI discoveryURI) {
243            this.discoveryURI = discoveryURI;
244        }
245    
246        public long getKeepAliveInterval() {
247            return keepAliveInterval;
248        }
249    
250        public void setKeepAliveInterval(long keepAliveInterval) {
251            this.keepAliveInterval = keepAliveInterval;
252        }
253        
254        public void setInterface(String mcInterface) {
255            this.mcInterface = mcInterface;
256        }
257        
258        public void setNetworkInterface(String mcNetworkInterface) {
259            this.mcNetworkInterface = mcNetworkInterface;    
260        }
261        
262        /**
263         * start the discovery agent
264         * 
265         * @throws Exception
266         */
267        public void start() throws Exception {
268            
269            if (started.compareAndSet(false, true)) {               
270                                    
271                if (group == null || group.length() == 0) {
272                    throw new IOException("You must specify a group to discover");
273                }
274                String type = getType();
275                if (!type.endsWith(".")) {
276                    LOG.warn("The type '" + type + "' should end with '.' to be a valid Discovery type");
277                    type += ".";
278                }
279                
280                if (discoveryURI == null) {
281                    discoveryURI = new URI(DEFAULT_DISCOVERY_URI_STRING);
282                }
283                
284                if (LOG.isTraceEnabled()) 
285                            LOG.trace("start - discoveryURI = " + discoveryURI);                                      
286                      
287                      String myHost = discoveryURI.getHost();
288                      int    myPort = discoveryURI.getPort(); 
289                         
290                      if( DEFAULT_HOST_STR.equals(myHost) ) 
291                            myHost = DEFAULT_HOST_IP;                         
292                      
293                      if(myPort < 0 )
294                        myPort = DEFAULT_PORT;                  
295                      
296                      if (LOG.isTraceEnabled()) {
297                            LOG.trace("start - myHost = " + myHost); 
298                            LOG.trace("start - myPort = " + myPort);        
299                            LOG.trace("start - group  = " + group );                                
300                            LOG.trace("start - interface  = " + mcInterface );
301                            LOG.trace("start - network interface  = " + mcNetworkInterface );
302                      }     
303                      
304                this.inetAddress = InetAddress.getByName(myHost);
305                this.sockAddress = new InetSocketAddress(this.inetAddress, myPort);
306                mcast = new MulticastSocket(myPort);
307                mcast.setLoopbackMode(loopBackMode);
308                mcast.setTimeToLive(getTimeToLive());
309                mcast.joinGroup(inetAddress);
310                mcast.setSoTimeout((int)keepAliveInterval);
311                if (mcInterface != null) {
312                    mcast.setInterface(InetAddress.getByName(mcInterface));
313                }
314                if (mcNetworkInterface != null) {
315                    mcast.setNetworkInterface(NetworkInterface.getByName(mcNetworkInterface));
316                }
317                runner = new Thread(this);
318                runner.setName(this.toString() + ":" + runner.getName());
319                runner.setDaemon(true);
320                runner.start();
321                doAdvertizeSelf();
322            }
323        }
324    
325        /**
326         * stop the channel
327         * 
328         * @throws Exception
329         */
330        public void stop() throws Exception {
331            if (started.compareAndSet(true, false)) {
332                doAdvertizeSelf();
333                if (mcast != null) {
334                    mcast.close();
335                }
336                if (runner != null) {
337                    runner.interrupt();
338                }
339                getExecutor().shutdownNow();
340            }
341        }
342    
343        public String getType() {
344            return group + "." + TYPE_SUFFIX;
345        }
346    
347        public void run() {
348            byte[] buf = new byte[BUFF_SIZE];
349            DatagramPacket packet = new DatagramPacket(buf, 0, buf.length);
350            while (started.get()) {
351                doTimeKeepingServices();
352                try {
353                    mcast.receive(packet);
354                    if (packet.getLength() > 0) {
355                        String str = new String(packet.getData(), packet.getOffset(), packet.getLength());
356                        processData(str);
357                    }
358                } catch (SocketTimeoutException se) {
359                    // ignore
360                } catch (IOException e) {
361                    if (started.get()) {
362                        LOG.error("failed to process packet: " + e);
363                    }
364                }
365            }
366        }
367    
368        private void processData(String str) {
369            if (discoveryListener != null) {
370                if (str.startsWith(getType())) {
371                    String payload = str.substring(getType().length());
372                    if (payload.startsWith(ALIVE)) {
373                        String brokerName = getBrokerName(payload.substring(ALIVE.length()));
374                        String service = payload.substring(ALIVE.length() + brokerName.length() + 2);
375                        processAlive(brokerName, service);
376                    } else {
377                        String brokerName = getBrokerName(payload.substring(DEAD.length()));
378                        String service = payload.substring(DEAD.length() + brokerName.length() + 2);
379                        processDead(service);
380                    }
381                }
382            }
383        }
384    
385        private void doTimeKeepingServices() {
386            if (started.get()) {
387                long currentTime = System.currentTimeMillis();
388                if (currentTime < lastAdvertizeTime || ((currentTime - keepAliveInterval) > lastAdvertizeTime)) {
389                    doAdvertizeSelf();
390                    lastAdvertizeTime = currentTime;
391                }
392                doExpireOldServices();
393            }
394        }
395    
396        private void doAdvertizeSelf() {
397            if (selfService != null) {
398                String payload = getType();
399                payload += started.get() ? ALIVE : DEAD;
400                payload += DELIMITER + "localhost" + DELIMITER;
401                payload += selfService;
402                try {
403                    byte[] data = payload.getBytes();
404                    DatagramPacket packet = new DatagramPacket(data, 0, data.length, sockAddress);
405                    mcast.send(packet);
406                } catch (IOException e) {
407                    // If a send fails, chances are all subsequent sends will fail
408                    // too.. No need to keep reporting the
409                    // same error over and over.
410                    if (reportAdvertizeFailed) {
411                        reportAdvertizeFailed = false;
412                        LOG.error("Failed to advertise our service: " + payload, e);
413                        if ("Operation not permitted".equals(e.getMessage())) {
414                            LOG.error("The 'Operation not permitted' error has been know to be caused by improper firewall/network setup.  "
415                                      + "Please make sure that the OS is properly configured to allow multicast traffic over: " + mcast.getLocalAddress());
416                        }
417                    }
418                }
419            }
420        }
421    
422        private void processAlive(String brokerName, String service) {
423            if (selfService == null || !service.equals(selfService)) {
424                RemoteBrokerData data = brokersByService.get(service);
425                if (data == null) {
426                    data = new RemoteBrokerData(brokerName, service);
427                    brokersByService.put(service, data);      
428                    fireServiceAddEvent(data);
429                    doAdvertizeSelf();
430                } else {
431                    data.updateHeartBeat();
432                    if (data.doRecovery()) {
433                        fireServiceAddEvent(data);
434                    }
435                }
436            }
437        }
438    
439        private void processDead(String service) {
440            if (!service.equals(selfService)) {
441                RemoteBrokerData data = brokersByService.remove(service);
442                if (data != null && !data.isFailed()) {
443                    fireServiceRemovedEvent(data);
444                }
445            }
446        }
447    
448        private void doExpireOldServices() {
449            long expireTime = System.currentTimeMillis() - (keepAliveInterval * HEARTBEAT_MISS_BEFORE_DEATH); 
450            for (Iterator<RemoteBrokerData> i = brokersByService.values().iterator(); i.hasNext();) {
451                RemoteBrokerData data = i.next();
452                if (data.getLastHeartBeat() < expireTime) {
453                    processDead(data.service);
454                }
455            }
456        }
457    
458        private String getBrokerName(String str) {
459            String result = null;
460            int start = str.indexOf(DELIMITER);
461            if (start >= 0) {
462                int end = str.indexOf(DELIMITER, start + 1);
463                result = str.substring(start + 1, end);
464            }
465            return result;
466        }
467    
468        public void serviceFailed(DiscoveryEvent event) throws IOException {
469            RemoteBrokerData data = brokersByService.get(event.getServiceName());
470            if (data != null && data.markFailed()) {
471                fireServiceRemovedEvent(data);
472            }
473        }
474    
475        private void fireServiceRemovedEvent(RemoteBrokerData data) {
476            if (discoveryListener != null) {
477                final DiscoveryEvent event = new DiscoveryEvent(data.service);
478                event.setBrokerName(data.brokerName);
479    
480                // Have the listener process the event async so that
481                // he does not block this thread since we are doing time sensitive
482                // processing of events.
483                getExecutor().execute(new Runnable() {
484                    public void run() {
485                        DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
486                        if (discoveryListener != null) {
487                            discoveryListener.onServiceRemove(event);
488                        }
489                    }
490                });
491            }
492        }
493    
494        private void fireServiceAddEvent(RemoteBrokerData data) {
495            if (discoveryListener != null) {
496                final DiscoveryEvent event = new DiscoveryEvent(data.service);
497                event.setBrokerName(data.brokerName);
498                
499                // Have the listener process the event async so that
500                // he does not block this thread since we are doing time sensitive
501                // processing of events.
502                getExecutor().execute(new Runnable() {
503                    public void run() {
504                        DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
505                        if (discoveryListener != null) {
506                            discoveryListener.onServiceAdd(event);
507                        }
508                    }
509                });
510            }
511        }
512    
513        private ExecutorService getExecutor() {
514            if (executor == null) {
515                final String threadName = "Notifier-" + this.toString();
516                executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
517                    public Thread newThread(Runnable runable) {
518                        Thread t = new Thread(runable,  threadName);
519                        t.setDaemon(true);
520                        return t;
521                    }
522                });
523            }
524            return executor;
525        }
526    
527        public long getBackOffMultiplier() {
528            return backOffMultiplier;
529        }
530    
531        public void setBackOffMultiplier(long backOffMultiplier) {
532            this.backOffMultiplier = backOffMultiplier;
533        }
534    
535        public long getInitialReconnectDelay() {
536            return initialReconnectDelay;
537        }
538    
539        public void setInitialReconnectDelay(long initialReconnectDelay) {
540            this.initialReconnectDelay = initialReconnectDelay;
541        }
542    
543        public int getMaxReconnectAttempts() {
544            return maxReconnectAttempts;
545        }
546    
547        public void setMaxReconnectAttempts(int maxReconnectAttempts) {
548            this.maxReconnectAttempts = maxReconnectAttempts;
549        }
550    
551        public long getMaxReconnectDelay() {
552            return maxReconnectDelay;
553        }
554    
555        public void setMaxReconnectDelay(long maxReconnectDelay) {
556            this.maxReconnectDelay = maxReconnectDelay;
557        }
558    
559        public boolean isUseExponentialBackOff() {
560            return useExponentialBackOff;
561        }
562    
563        public void setUseExponentialBackOff(boolean useExponentialBackOff) {
564            this.useExponentialBackOff = useExponentialBackOff;
565        }
566    
567        public void setGroup(String group) {
568            this.group = group;
569        }
570        
571        @Override
572        public String toString() {
573            return  "MulticastDiscoveryAgent-"
574                + (selfService != null ? "advertise:" + selfService : "listener:" + this.discoveryListener);
575        }
576    }