001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.udp;
018    
019    import java.io.EOFException;
020    import java.io.IOException;
021    import java.net.BindException;
022    import java.net.DatagramSocket;
023    import java.net.InetAddress;
024    import java.net.InetSocketAddress;
025    import java.net.SocketAddress;
026    import java.net.SocketException;
027    import java.net.URI;
028    import java.net.UnknownHostException;
029    import java.nio.channels.AsynchronousCloseException;
030    import java.nio.channels.DatagramChannel;
031    
032    import org.apache.activemq.Service;
033    import org.apache.activemq.command.Command;
034    import org.apache.activemq.command.Endpoint;
035    import org.apache.activemq.openwire.OpenWireFormat;
036    import org.apache.activemq.transport.Transport;
037    import org.apache.activemq.transport.TransportThreadSupport;
038    import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy;
039    import org.apache.activemq.transport.reliable.ReplayBuffer;
040    import org.apache.activemq.transport.reliable.ReplayStrategy;
041    import org.apache.activemq.transport.reliable.Replayer;
042    import org.apache.activemq.util.IntSequenceGenerator;
043    import org.apache.activemq.util.ServiceStopper;
044    import org.apache.commons.logging.Log;
045    import org.apache.commons.logging.LogFactory;
046    
047    /**
048     * An implementation of the {@link Transport} interface using raw UDP
049     * 
050     * @version $Revision: 891816 $
051     */
052    public class UdpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
053        private static final Log LOG = LogFactory.getLog(UdpTransport.class);
054    
055        private static final int MAX_BIND_ATTEMPTS = 50;
056        private static final long BIND_ATTEMPT_DELAY = 100;
057    
058        private CommandChannel commandChannel;
059        private OpenWireFormat wireFormat;
060        private ByteBufferPool bufferPool;
061        private ReplayStrategy replayStrategy = new ExceptionIfDroppedReplayStrategy();
062        private ReplayBuffer replayBuffer;
063        private int datagramSize = 4 * 1024;
064        private SocketAddress targetAddress;
065        private SocketAddress originalTargetAddress;
066        private DatagramChannel channel;
067        private boolean trace;
068        private boolean useLocalHost = true;
069        private int port;
070        private int minmumWireFormatVersion;
071        private String description;
072        private IntSequenceGenerator sequenceGenerator;
073        private boolean replayEnabled = true;
074    
075        protected UdpTransport(OpenWireFormat wireFormat) throws IOException {
076            this.wireFormat = wireFormat;
077        }
078    
079        public UdpTransport(OpenWireFormat wireFormat, URI remoteLocation) throws UnknownHostException, IOException {
080            this(wireFormat);
081            this.targetAddress = createAddress(remoteLocation);
082            description = remoteLocation.toString() + "@";
083        }
084    
085        public UdpTransport(OpenWireFormat wireFormat, SocketAddress socketAddress) throws IOException {
086            this(wireFormat);
087            this.targetAddress = socketAddress;
088            this.description = getProtocolName() + "ServerConnection@";
089        }
090    
091        /**
092         * Used by the server transport
093         */
094        public UdpTransport(OpenWireFormat wireFormat, int port) throws UnknownHostException, IOException {
095            this(wireFormat);
096            this.port = port;
097            this.targetAddress = null;
098            this.description = getProtocolName() + "Server@";
099        }
100    
101        /**
102         * Creates a replayer for working with the reliable transport
103         */
104        public Replayer createReplayer() throws IOException {
105            if (replayEnabled) {
106                return getCommandChannel();
107            }
108            return null;
109        }
110    
111        /**
112         * A one way asynchronous send
113         */
114        public void oneway(Object command) throws IOException {
115            oneway(command, targetAddress);
116        }
117    
118        /**
119         * A one way asynchronous send to a given address
120         */
121        public void oneway(Object command, SocketAddress address) throws IOException {
122            if (LOG.isDebugEnabled()) {
123                LOG.debug("Sending oneway from: " + this + " to target: " + targetAddress + " command: " + command);
124            }
125            checkStarted();
126            commandChannel.write((Command)command, address);
127        }
128    
129        /**
130         * @return pretty print of 'this'
131         */
132        public String toString() {
133            if (description != null) {
134                return description + port;
135            } else {
136                return getProtocolUriScheme() + targetAddress + "@" + port;
137            }
138        }
139    
140        /**
141         * reads packets from a Socket
142         */
143        public void run() {
144            LOG.trace("Consumer thread starting for: " + toString());
145            while (!isStopped()) {
146                try {
147                    Command command = commandChannel.read();
148                    doConsume(command);
149                } catch (AsynchronousCloseException e) {
150                    // DatagramChannel closed
151                    try {
152                        stop();
153                    } catch (Exception e2) {
154                        LOG.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2);
155                    }
156                } catch (SocketException e) {
157                    // DatagramSocket closed
158                    LOG.debug("Socket closed: " + e, e);
159                    try {
160                        stop();
161                    } catch (Exception e2) {
162                        LOG.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2);
163                    }
164                } catch (EOFException e) {
165                    // DataInputStream closed
166                    LOG.debug("Socket closed: " + e, e);
167                    try {
168                        stop();
169                    } catch (Exception e2) {
170                        LOG.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2);
171                    }
172                } catch (Exception e) {
173                    try {
174                        stop();
175                    } catch (Exception e2) {
176                        LOG.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2);
177                    }
178                    if (e instanceof IOException) {
179                        onException((IOException)e);
180                    } else {
181                        LOG.error("Caught: " + e, e);
182                        e.printStackTrace();
183                    }
184                }
185            }
186        }
187    
188        /**
189         * We have received the WireFormatInfo from the server on the actual channel
190         * we should use for all future communication with the server, so lets set
191         * the target to be the actual channel that the server has chosen for us to
192         * talk on.
193         */
194        public void setTargetEndpoint(Endpoint newTarget) {
195            if (newTarget instanceof DatagramEndpoint) {
196                DatagramEndpoint endpoint = (DatagramEndpoint)newTarget;
197                SocketAddress address = endpoint.getAddress();
198                if (address != null) {
199                    if (originalTargetAddress == null) {
200                        originalTargetAddress = targetAddress;
201                    }
202                    targetAddress = address;
203                    commandChannel.setTargetAddress(address);
204                }
205            }
206        }
207    
208        // Properties
209        // -------------------------------------------------------------------------
210        public boolean isTrace() {
211            return trace;
212        }
213    
214        public void setTrace(boolean trace) {
215            this.trace = trace;
216        }
217    
218        public int getDatagramSize() {
219            return datagramSize;
220        }
221    
222        public void setDatagramSize(int datagramSize) {
223            this.datagramSize = datagramSize;
224        }
225    
226        public boolean isUseLocalHost() {
227            return useLocalHost;
228        }
229    
230        /**
231         * Sets whether 'localhost' or the actual local host name should be used to
232         * make local connections. On some operating systems such as Macs its not
233         * possible to connect as the local host name so localhost is better.
234         */
235        public void setUseLocalHost(boolean useLocalHost) {
236            this.useLocalHost = useLocalHost;
237        }
238    
239        public CommandChannel getCommandChannel() throws IOException {
240            if (commandChannel == null) {
241                commandChannel = createCommandChannel();
242            }
243            return commandChannel;
244        }
245    
246        /**
247         * Sets the implementation of the command channel to use.
248         */
249        public void setCommandChannel(CommandDatagramChannel commandChannel) {
250            this.commandChannel = commandChannel;
251        }
252    
253        public ReplayStrategy getReplayStrategy() {
254            return replayStrategy;
255        }
256    
257        /**
258         * Sets the strategy used to replay missed datagrams
259         */
260        public void setReplayStrategy(ReplayStrategy replayStrategy) {
261            this.replayStrategy = replayStrategy;
262        }
263    
264        public int getPort() {
265            return port;
266        }
267    
268        /**
269         * Sets the port to connect on
270         */
271        public void setPort(int port) {
272            this.port = port;
273        }
274    
275        public int getMinmumWireFormatVersion() {
276            return minmumWireFormatVersion;
277        }
278    
279        public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
280            this.minmumWireFormatVersion = minmumWireFormatVersion;
281        }
282    
283        public OpenWireFormat getWireFormat() {
284            return wireFormat;
285        }
286    
287        public IntSequenceGenerator getSequenceGenerator() {
288            if (sequenceGenerator == null) {
289                sequenceGenerator = new IntSequenceGenerator();
290            }
291            return sequenceGenerator;
292        }
293    
294        public void setSequenceGenerator(IntSequenceGenerator sequenceGenerator) {
295            this.sequenceGenerator = sequenceGenerator;
296        }
297    
298        public boolean isReplayEnabled() {
299            return replayEnabled;
300        }
301    
302        /**
303         * Sets whether or not replay should be enabled when using the reliable
304         * transport. i.e. should we maintain a buffer of messages that can be
305         * replayed?
306         */
307        public void setReplayEnabled(boolean replayEnabled) {
308            this.replayEnabled = replayEnabled;
309        }
310    
311        public ByteBufferPool getBufferPool() {
312            if (bufferPool == null) {
313                bufferPool = new DefaultBufferPool();
314            }
315            return bufferPool;
316        }
317    
318        public void setBufferPool(ByteBufferPool bufferPool) {
319            this.bufferPool = bufferPool;
320        }
321    
322        public ReplayBuffer getReplayBuffer() {
323            return replayBuffer;
324        }
325    
326        public void setReplayBuffer(ReplayBuffer replayBuffer) throws IOException {
327            this.replayBuffer = replayBuffer;
328            getCommandChannel().setReplayBuffer(replayBuffer);
329        }
330    
331        // Implementation methods
332        // -------------------------------------------------------------------------
333    
334        /**
335         * Creates an address from the given URI
336         */
337        protected InetSocketAddress createAddress(URI remoteLocation) throws UnknownHostException, IOException {
338            String host = resolveHostName(remoteLocation.getHost());
339            return new InetSocketAddress(host, remoteLocation.getPort());
340        }
341    
342        protected String resolveHostName(String host) throws UnknownHostException {
343            String localName = InetAddress.getLocalHost().getHostName();
344            if (localName != null && isUseLocalHost()) {
345                if (localName.equals(host)) {
346                    return "localhost";
347                }
348            }
349            return host;
350        }
351    
352        protected void doStart() throws Exception {
353            getCommandChannel().start();
354    
355            super.doStart();
356        }
357    
358        protected CommandChannel createCommandChannel() throws IOException {
359            SocketAddress localAddress = createLocalAddress();
360            channel = DatagramChannel.open();
361    
362            channel = connect(channel, targetAddress);
363    
364            DatagramSocket socket = channel.socket();
365            bind(socket, localAddress);
366            if (port == 0) {
367                port = socket.getLocalPort();
368            }
369    
370            return createCommandDatagramChannel();
371        }
372    
373        protected CommandChannel createCommandDatagramChannel() {
374            return new CommandDatagramChannel(this, getWireFormat(), getDatagramSize(), getTargetAddress(), createDatagramHeaderMarshaller(), getChannel(), getBufferPool());
375        }
376    
377        protected void bind(DatagramSocket socket, SocketAddress localAddress) throws IOException {
378            channel.configureBlocking(true);
379    
380            if (LOG.isDebugEnabled()) {
381                LOG.debug("Binding to address: " + localAddress);
382            }
383    
384            //
385            // We have noticed that on some platfoms like linux, after you close
386            // down
387            // a previously bound socket, it can take a little while before we can
388            // bind it again.
389            // 
390            for (int i = 0; i < MAX_BIND_ATTEMPTS; i++) {
391                try {
392                    socket.bind(localAddress);
393                    return;
394                } catch (BindException e) {
395                    if (i + 1 == MAX_BIND_ATTEMPTS) {
396                        throw e;
397                    }
398                    try {
399                        Thread.sleep(BIND_ATTEMPT_DELAY);
400                    } catch (InterruptedException e1) {
401                        Thread.currentThread().interrupt();
402                        throw e;
403                    }
404                }
405            }
406    
407        }
408    
409        protected DatagramChannel connect(DatagramChannel channel, SocketAddress targetAddress2) throws IOException {
410            // TODO
411            // connect to default target address to avoid security checks each time
412            // channel = channel.connect(targetAddress);
413    
414            return channel;
415        }
416    
417        protected SocketAddress createLocalAddress() {
418            return new InetSocketAddress(port);
419        }
420    
421        protected void doStop(ServiceStopper stopper) throws Exception {
422            if (channel != null) {
423                channel.close();
424            }
425        }
426    
427        protected DatagramHeaderMarshaller createDatagramHeaderMarshaller() {
428            return new DatagramHeaderMarshaller();
429        }
430    
431        protected String getProtocolName() {
432            return "Udp";
433        }
434    
435        protected String getProtocolUriScheme() {
436            return "udp://";
437        }
438    
439        protected SocketAddress getTargetAddress() {
440            return targetAddress;
441        }
442    
443        protected DatagramChannel getChannel() {
444            return channel;
445        }
446    
447        protected void setChannel(DatagramChannel channel) {
448            this.channel = channel;
449        }
450    
451        public InetSocketAddress getLocalSocketAddress() {
452            if (channel == null) {
453                return null;
454            } else {
455                return (InetSocketAddress)channel.socket().getLocalSocketAddress();
456            }
457        }
458    
459        public String getRemoteAddress() {
460            if (targetAddress != null) {
461                return "" + targetAddress;
462            }
463            return null;
464        }
465    
466        public int getReceiveCounter() {
467            if (commandChannel == null) {
468                return 0;
469            }
470            return commandChannel.getReceiveCounter();
471        }
472    }