001    /**
002     * Copyright (C) 2012 FuseSource, Inc.
003     * http://fusesource.com
004     *
005     * Licensed under the Apache License, Version 2.0 (the "License");
006     * you may not use this file except in compliance with the License.
007     * You may obtain a copy of the License at
008     *
009     *    http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.fusesource.hawtdispatch.transport;
019    
020    import org.fusesource.hawtdispatch.*;
021    
022    import java.io.IOException;
023    import java.net.*;
024    import java.nio.channels.SelectionKey;
025    import java.nio.channels.ServerSocketChannel;
026    import java.nio.channels.SocketChannel;
027    import java.util.concurrent.Executor;
028    
029    /**
030     * A TCP based implementation of {@link TransportServer}
031     *
032     * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
033     */
034    
035    public class TcpTransportServer implements TransportServer {
036    
037        protected final String bindScheme;
038        protected final InetSocketAddress bindAddress;
039        protected int backlog = 100;
040        protected ServerSocketChannel channel;
041        protected TransportServerListener listener;
042        protected DispatchQueue dispatchQueue;
043        protected DispatchSource acceptSource;
044        protected int receiveBufferSize = 64*1024;
045        protected int sendBufferSize = 64*1204;
046        protected Executor blockingExecutor;
047    
048        public TcpTransportServer(URI location) throws UnknownHostException {
049            bindScheme = location.getScheme();
050            String host = location.getHost();
051            host = (host == null || host.length() == 0) ? "::" : host;
052            bindAddress = new InetSocketAddress(InetAddress.getByName(host), location.getPort());
053        }
054    
055        public void setTransportServerListener(TransportServerListener listener) {
056            this.listener = listener;
057        }
058    
059        public InetSocketAddress getSocketAddress() {
060            return (InetSocketAddress) channel.socket().getLocalSocketAddress();
061        }
062    
063        public DispatchQueue getDispatchQueue() {
064            return dispatchQueue;
065        }
066    
067        public void setDispatchQueue(DispatchQueue dispatchQueue) {
068            this.dispatchQueue = dispatchQueue;
069        }
070    
071        public void suspend() {
072            acceptSource.suspend();
073        }
074    
075        public void resume() {
076            acceptSource.resume();
077        }
078    
079        @Deprecated
080        public void start(Runnable onCompleted) throws Exception {
081            start(new TaskWrapper(onCompleted));
082        }
083        @Deprecated
084        public void stop(Runnable onCompleted) throws Exception {
085            stop(new TaskWrapper(onCompleted));
086        }
087    
088        public void start(Task onCompleted) throws Exception {
089    
090            try {
091                channel = ServerSocketChannel.open();
092                channel.configureBlocking(false);
093                try {
094                    channel.socket().setReceiveBufferSize(receiveBufferSize);
095                } catch (SocketException ignore) {
096                }
097                try {
098                    channel.socket().setReceiveBufferSize(sendBufferSize);
099                } catch (SocketException ignore) {
100                }
101                channel.socket().bind(bindAddress, backlog);
102            } catch (IOException e) {
103                throw new IOException("Failed to bind to server socket: " + bindAddress + " due to: " + e);
104            }
105    
106            acceptSource = Dispatch.createSource(channel, SelectionKey.OP_ACCEPT, dispatchQueue);
107            acceptSource.setEventHandler(new Task() {
108                public void run() {
109                    try {
110                        SocketChannel client = channel.accept();
111                        while( client!=null ) {
112                            handleSocket(client);
113                            client = channel.accept();
114                        }
115                    } catch (Exception e) {
116                        listener.onAcceptError(e);
117                    }
118                }
119            });
120            acceptSource.setCancelHandler(new Task() {
121                public void run() {
122                    try {
123                        channel.close();
124                    } catch (IOException e) {
125                    }
126                }
127            });
128            acceptSource.resume();
129            if( onCompleted!=null ) {
130                dispatchQueue.execute(onCompleted);
131            }
132        }
133    
134        public String getBoundAddress() {
135            try {
136                return new URI(bindScheme, null, bindAddress.getAddress().getHostAddress(), channel.socket().getLocalPort(), null, null, null).toString();
137            } catch (URISyntaxException e) {
138                throw new RuntimeException(e);
139            }
140        }
141    
142        public void stop(final Task onCompleted) throws Exception {
143            if( acceptSource.isCanceled() ) {
144                onCompleted.run();
145            } else {
146                acceptSource.setCancelHandler(new Task() {
147                    public void run() {
148                        try {
149                            channel.close();
150                        } catch (IOException e) {
151                        }
152                        onCompleted.run();
153                    }
154                });
155                acceptSource.cancel();
156            }
157        }
158    
159        public int getBacklog() {
160            return backlog;
161        }
162    
163        public void setBacklog(int backlog) {
164            this.backlog = backlog;
165        }
166    
167        protected final void handleSocket(SocketChannel socket) throws Exception {
168            TcpTransport transport = createTransport();
169            transport.connected(socket);
170            listener.onAccept(transport);
171        }
172    
173        protected TcpTransport createTransport() {
174            final TcpTransport rc = new TcpTransport();
175            rc.setBlockingExecutor(blockingExecutor);
176            rc.setDispatchQueue(dispatchQueue);
177            return rc;
178        }
179    
180        /**
181         * @return pretty print of this
182         */
183        public String toString() {
184            return getBoundAddress();
185        }
186    
187        public int getReceiveBufferSize() {
188            return receiveBufferSize;
189        }
190    
191        public void setReceiveBufferSize(int receiveBufferSize) {
192            this.receiveBufferSize = receiveBufferSize;
193            if( channel!=null ) {
194                try {
195                    channel.socket().setReceiveBufferSize(receiveBufferSize);
196                } catch (SocketException ignore) {
197                }
198            }
199        }
200    
201        public int getSendBufferSize() {
202            return sendBufferSize;
203        }
204    
205        public void setSendBufferSize(int sendBufferSize) {
206            this.sendBufferSize = sendBufferSize;
207            if( channel!=null ) {
208                try {
209                    channel.socket().setReceiveBufferSize(sendBufferSize);
210                } catch (SocketException ignore) {
211                }
212            }
213        }
214    
215        public Executor getBlockingExecutor() {
216            return blockingExecutor;
217        }
218    
219        public void setBlockingExecutor(Executor blockingExecutor) {
220            this.blockingExecutor = blockingExecutor;
221        }
222    
223    }