001    /**
002     * Copyright (C) 2012 FuseSource, Inc.
003     * http://fusesource.com
004     *
005     * Licensed under the Apache License, Version 2.0 (the "License");
006     * you may not use this file except in compliance with the License.
007     * You may obtain a copy of the License at
008     *
009     *    http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.fusesource.hawtdispatch.transport;
019    
020    import org.fusesource.hawtdispatch.*;
021    
022    import java.io.EOFException;
023    import java.io.IOException;
024    import java.net.SocketAddress;
025    import java.net.URI;
026    import java.nio.channels.ReadableByteChannel;
027    import java.nio.channels.WritableByteChannel;
028    import java.util.LinkedList;
029    import java.util.concurrent.Executor;
030    import java.util.concurrent.atomic.AtomicBoolean;
031    
032    /**
033     *
034     * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
035     */
036    public class PipeTransport implements Transport {
037        static private final Object EOF_TOKEN = new Object();
038    
039        final private PipeTransportServer server;
040        PipeTransport peer;
041        private TransportListener listener;
042        private SocketAddress remoteAddress;
043        private AtomicBoolean stopping = new AtomicBoolean();
044        private String name;
045        private boolean marshal;
046        private boolean trace;
047    
048        private DispatchQueue dispatchQueue;
049        private CustomDispatchSource<Object,LinkedList<Object>> dispatchSource;
050        private boolean connected;
051    
052        private long writeCounter = 0;
053        private long readCounter = 0;
054        private ProtocolCodec protocolCodec;
055    
056        public PipeTransport(PipeTransportServer server) {
057            this.server = server;
058        }
059    
060        public DispatchQueue getDispatchQueue() {
061            return dispatchQueue;
062        }
063        public void setDispatchQueue(DispatchQueue queue) {
064            this.dispatchQueue = queue;
065        }
066    
067        @Deprecated
068        public void start(final Runnable onCompleted) {
069            start(new TaskWrapper(onCompleted));
070        }
071        public void start(final Task onCompleted) {
072            if (dispatchQueue == null) {
073                throw new IllegalArgumentException("dispatchQueue is not set");
074            }
075            server.dispatchQueue.execute(new Task(){
076                public void run() {
077                    dispatchSource = Dispatch.createSource(EventAggregators.linkedList(), dispatchQueue);
078                    dispatchSource.setEventHandler(new Task() {
079                        public void run() {
080                            try {
081                                final LinkedList<Object> commands = dispatchSource.getData();
082                                for (Object o : commands) {
083    
084                                    if (o == EOF_TOKEN) {
085                                        throw new EOFException();
086                                    }
087                                    readCounter++;
088                                    listener.onTransportCommand(o);
089                                }
090    
091                                // let the peer know that they have been processed.
092                                peer.dispatchQueue.execute(new Task() {
093                                    public void run() {
094                                        outbound -= commands.size();
095                                        drainInbound();
096                                    }
097                                });
098                            } catch (IOException e) {
099                                listener.onTransportFailure(e);
100                            }
101    
102                        }
103                    });
104                    if( peer.dispatchSource != null ) {
105                        fireConnected();
106                        peer.fireConnected();
107                    }
108                    if( onCompleted!=null ) {
109                        onCompleted.run();
110                    }
111    
112                }
113            });
114        }
115    
116        private void fireConnected() {
117            dispatchQueue.execute(new Task() {
118                public void run() {
119                    connected = true;
120                    dispatchSource.resume();
121                    listener.onTransportConnected();
122                    drainInbound();
123                }
124            });
125        }
126    
127        public void flush() {
128            listener.onRefill();
129        }
130    
131        @Deprecated
132        public void stop(final Runnable onCompleted) {
133            stop(new TaskWrapper(onCompleted));
134        }
135        public void stop(Task onCompleted)  {
136            if( connected ) {
137                peer.dispatchSource.merge(EOF_TOKEN);
138            }
139            if( dispatchSource!=null ) {
140                dispatchSource.setCancelHandler(onCompleted);
141                dispatchSource.cancel();
142            }
143            setDispatchQueue(null);
144        }
145    
146        static final class OneWay {
147            final Object command;
148            final Retained retained;
149    
150            public OneWay(Object command, Retained retained) {
151                this.command = command;
152                this.retained = retained;
153            }
154        }
155    
156        int outbound = 0;
157        int maxOutbound = 100;
158    
159        public boolean full() {
160            return outbound >= maxOutbound;
161        }
162    
163        public boolean offer(Object command) {
164            if( !connected ) {
165                return false;
166            }
167            if( full() ) {
168                return false;
169            } else {
170                transmit(command);
171                return true;
172            }
173        }
174    
175        public void drainInbound() {
176            if( !full() ) {
177                listener.onRefill();
178            }
179        }
180    
181        private void transmit(Object command) {
182            writeCounter++;
183            outbound++;
184            peer.dispatchSource.merge(command);
185        }
186    
187        /**
188         * @return The number of objects sent by the transport.
189         */
190        public long getWriteCounter() {
191            return writeCounter;
192        }
193    
194        /**
195         * @return The number of objects received by the transport.
196         */
197        public long getReadCounter() {
198            return readCounter;
199        }
200    
201        public SocketAddress getLocalAddress() {
202            return remoteAddress;
203        }
204    
205        public SocketAddress getRemoteAddress() {
206            return remoteAddress;
207        }
208    
209        public void suspendRead() {
210            dispatchSource.suspend();
211        }
212    
213        public void resumeRead() {
214            dispatchSource.resume();
215        }
216    
217        public void setRemoteAddress(final String remoteAddress) {
218            this.remoteAddress = new SocketAddress() {
219                @Override
220                public String toString() {
221                    return remoteAddress;
222                }
223            };
224            if (name == null) {
225                name = remoteAddress;
226            }
227        }
228    
229        public void setName(String name) {
230            this.name = name;
231        }
232    
233        public TransportListener getTransportListener() {
234            return listener;
235        }
236        public void setTransportListener(TransportListener transportListener) {
237            this.listener = transportListener;
238        }
239    
240        public ProtocolCodec getProtocolCodec() {
241            return protocolCodec;
242        }
243        public void setProtocolCodec(ProtocolCodec protocolCodec) {
244            this.protocolCodec = protocolCodec;
245        }
246    
247    
248        public boolean isTrace() {
249            return trace;
250        }
251    
252        public void setTrace(boolean trace) {
253            this.trace = trace;
254        }
255    
256        public boolean isMarshal() {
257            return marshal;
258        }
259        public void setMarshal(boolean marshall) {
260            this.marshal = marshall;
261        }
262    
263        public boolean isConnected() {
264            return !stopping.get();
265        }
266        public boolean isClosed() {
267            return false;
268        }
269    
270        public Executor getBlockingExecutor() {
271            return null;
272        }
273        public void setBlockingExecutor(Executor blockingExecutor) {
274        }
275    
276        public ReadableByteChannel getReadChannel() {
277            return null;
278        }
279    
280        public WritableByteChannel getWriteChannel() {
281            return null;
282        }
283    }