001    /**
002     * Copyright (C) 2012 FuseSource, Inc.
003     * http://fusesource.com
004     *
005     * Licensed under the Apache License, Version 2.0 (the "License");
006     * you may not use this file except in compliance with the License.
007     * You may obtain a copy of the License at
008     *
009     *    http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.fusesource.hawtdispatch.transport;
019    
020    import org.fusesource.hawtdispatch.*;
021    
022    import java.net.InetSocketAddress;
023    import java.net.URI;
024    import java.util.LinkedList;
025    import java.util.concurrent.Executor;
026    import java.util.concurrent.atomic.AtomicInteger;
027    
028    /**
029     *
030     * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
031     */
032    public class PipeTransportServer implements TransportServer {
033    
034        protected String connectURI;
035        protected TransportServerListener listener;
036        protected String name;
037        protected boolean marshal;
038        protected final AtomicInteger connectionCounter = new AtomicInteger();
039        DispatchQueue dispatchQueue;
040    
041        private CustomDispatchSource<PipeTransport,LinkedList<PipeTransport>> acceptSource;
042    
043    
044        public String getBoundAddress() {
045            return connectURI;
046        }
047    
048        public InetSocketAddress getSocketAddress() {
049            return null;
050        }
051    
052        public DispatchQueue getDispatchQueue() {
053            return dispatchQueue;
054        }
055    
056        public void setDispatchQueue(DispatchQueue queue) {
057            dispatchQueue = queue;
058        }
059    
060        public void suspend() {
061            acceptSource.suspend();
062        }
063    
064        public void resume() {
065            acceptSource.resume();
066        }
067    
068        public void setTransportServerListener(TransportServerListener listener) {
069            this.listener = listener;
070        }
071    
072        @Deprecated
073        public void start(Runnable onCompleted) throws Exception {
074            start(new TaskWrapper(onCompleted));
075        }
076        @Deprecated
077        public void stop(Runnable onCompleted) throws Exception {
078            stop(new TaskWrapper(onCompleted));
079        }
080    
081        public void start(Task onCompleted) throws Exception {
082            acceptSource = Dispatch.createSource(EventAggregators.<PipeTransport>linkedList(), dispatchQueue);
083            acceptSource.setEventHandler(new Task() {
084                public void run() {
085                    LinkedList<PipeTransport> transports = acceptSource.getData();
086                    for (PipeTransport transport : transports) {
087                        try {
088                            listener.onAccept(transport);
089                        } catch (Exception e) {
090                            listener.onAcceptError(e);
091                        }
092                    }
093                }
094            });
095            acceptSource.resume();
096            if( onCompleted!=null ) {
097                dispatchQueue.execute(onCompleted);
098            }
099        }
100    
101        public void stop(Task onCompleted) throws Exception {
102            PipeTransportRegistry.unbind(this);
103            acceptSource.setCancelHandler(onCompleted);
104            acceptSource.cancel();
105        }
106    
107        public void setConnectURI(String connectURI) {
108            this.connectURI = connectURI;
109        }
110    
111        public void setName(String name) {
112            this.name = name;
113        }
114    
115        public String getName() {
116            return name;
117        }
118    
119        public PipeTransport connect() {
120            int connectionId = connectionCounter.incrementAndGet();
121            String remoteAddress = connectURI.toString() + "#" + connectionId;
122            assert this.listener != null : "Server does not have an accept listener";
123    
124            PipeTransport clientTransport = createClientTransport();
125            PipeTransport serverTransport = createServerTransport();
126            clientTransport.peer = serverTransport;
127            serverTransport.peer = clientTransport;
128    
129            clientTransport.setRemoteAddress(remoteAddress);
130            serverTransport.setRemoteAddress(remoteAddress);
131    
132            serverTransport.setMarshal(marshal);
133            this.acceptSource.merge(serverTransport);
134            return clientTransport;
135        }
136    
137        protected PipeTransport createClientTransport() {
138            return new PipeTransport(this);
139        }
140        
141        protected PipeTransport createServerTransport() {
142            return new PipeTransport(this);
143        }
144    
145        public boolean isMarshal() {
146            return marshal;
147        }
148    
149        public void setMarshal(boolean marshal) {
150            this.marshal = marshal;
151        }
152    
153        public Executor getBlockingExecutor() {
154            return null;
155        }
156    
157        public void setBlockingExecutor(Executor blockingExecutor) {
158        }
159    }