001    /**
002     * Copyright (C) 2012 FuseSource, Inc.
003     * http://fusesource.com
004     *
005     * Licensed under the Apache License, Version 2.0 (the "License");
006     * you may not use this file except in compliance with the License.
007     * You may obtain a copy of the License at
008     *
009     *    http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.fusesource.hawtdispatch.transport;
018    
019    import org.fusesource.hawtdispatch.DispatchQueue;
020    import org.fusesource.hawtdispatch.Task;
021    
022    import java.net.*;
023    import java.nio.channels.DatagramChannel;
024    import java.util.concurrent.Executor;
025    
026    /**
027     * <p>
028     * </p>
029     *
030     * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
031     */
032    public class UdpTransportServer extends ServiceBase implements TransportServer {
033    
034        private final String bindScheme;
035        private final InetSocketAddress bindAddress;
036    
037        private DatagramChannel channel;
038        private TransportServerListener listener;
039        private DispatchQueue dispatchQueue;
040        private Executor blockingExecutor;
041    
042        public UdpTransportServer(URI location) throws UnknownHostException {
043            bindScheme = location.getScheme();
044            String host = location.getHost();
045            host = (host == null || host.length() == 0) ? "::" : host;
046            bindAddress = new InetSocketAddress(InetAddress.getByName(host), location.getPort());
047        }
048    
049        private  UdpTransport transport;
050    
051        public void setTransportServerListener(TransportServerListener listener) {
052            this.listener = listener;
053        }
054    
055        public InetSocketAddress getSocketAddress() {
056            return (InetSocketAddress) channel.socket().getLocalSocketAddress();
057        }
058    
059        public DispatchQueue getDispatchQueue() {
060            return dispatchQueue;
061        }
062    
063        public void setDispatchQueue(DispatchQueue dispatchQueue) {
064            this.dispatchQueue = dispatchQueue;
065        }
066    
067        @Override
068        protected void _start(Task onCompleted) {
069            accept();
070            if( onCompleted!=null ) {
071                dispatchQueue.execute(onCompleted);
072            }
073        }
074    
075        private void queueAccept() {
076            dispatchQueue.execute(new Task() {
077                public void run() {
078                    accept();
079                }
080            });
081        }
082    
083        private void accept() {
084            if (getServiceState().isStarted() || getServiceState().isStarting()) {
085                try {
086                    UdpTransport udpTransport = createTransport();
087                    transport = udpTransport;
088                    transport.onDispose = new Task() {
089                        public void run() {
090                            queueAccept();
091                        }
092                    };
093                    channel = DatagramChannel.open();
094                    channel.socket().bind(bindAddress);
095                    transport.connected(channel);
096                    listener.onAccept(transport);
097                } catch (Exception e) {
098                    listener.onAcceptError(e);
099                }
100            }
101        }
102    
103        protected UdpTransport createTransport() {
104            final UdpTransport transport = new UdpTransport();
105            transport.setBlockingExecutor(blockingExecutor);
106            transport.setDispatchQueue(dispatchQueue);
107            return transport;
108        }
109    
110        @Override
111        protected void _stop(Task onCompleted) {
112            transport.stop(onCompleted);
113        }
114    
115        public void suspend() {
116            dispatchQueue.suspend();
117        }
118    
119        public void resume() {
120            dispatchQueue.resume();
121        }
122    
123        public String getBoundAddress() {
124            try {
125                String host = bindAddress.getAddress().getHostAddress();
126                int port = channel.socket().getLocalPort();
127                return new URI(bindScheme, null, host, port, null, null, null).toString();
128            } catch (URISyntaxException e) {
129                throw new RuntimeException(e);
130            }
131        }
132    
133        /**
134         * @return pretty print of this
135         */
136        public String toString() {
137            return getBoundAddress();
138        }
139    
140        public Executor getBlockingExecutor() {
141            return blockingExecutor;
142        }
143    
144        public void setBlockingExecutor(Executor blockingExecutor) {
145            this.blockingExecutor = blockingExecutor;
146        }
147    }