001    /**
002     * Copyright (C) 2012 FuseSource, Inc.
003     * http://fusesource.com
004     *
005     * Licensed under the Apache License, Version 2.0 (the "License");
006     * you may not use this file except in compliance with the License.
007     * You may obtain a copy of the License at
008     *
009     *    http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.fusesource.hawtdispatch.example.discovery;
019    
020    import org.fusesource.hawtdispatch.*;
021    
022    import static org.fusesource.hawtdispatch.Dispatch.*;
023    
024    import java.io.ByteArrayOutputStream;
025    import java.io.EOFException;
026    import java.io.IOException;
027    import java.net.InetSocketAddress;
028    import java.net.URI;
029    import java.nio.ByteBuffer;
030    import java.nio.channels.*;
031    import java.util.ArrayList;
032    import java.util.concurrent.TimeUnit;
033    
034    /**
035     * An example of a networks of servers which advertise connection information to each other.
036     */
037    public class EchoNetJava {
038        public static void main(String[] args) throws Exception {
039            run();
040        }
041    
042        public static void run() throws Exception {
043            Server a = new Server(4444).start();
044            Server b = new Server(5555).start();
045            Server c = new Server(6666).start();
046    
047            Thread.sleep(200);
048    
049            a.connect(3333);
050            a.connect(b);
051            b.connect(c);
052            System.in.read();
053        }
054    
055        static class Server {
056            final int port;
057            final URI me;
058            final ServerSocketChannel serverChannel;
059            final ArrayList<URI> seen = new ArrayList<URI>();
060            final DispatchQueue queue;
061            final DispatchSource accept_source;
062    
063    
064            public Server(int port) throws Exception {
065                this.port = port;
066                this.me = URI.create("conn://localhost:" + port);
067                this.serverChannel = ServerSocketChannel.open();
068                serverChannel.socket().bind(new InetSocketAddress(port));
069                serverChannel.configureBlocking(false);
070                queue = createQueue(me.toString());
071                accept_source = createSource(serverChannel, SelectionKey.OP_ACCEPT, queue);
072    
073                accept_source.setEventHandler(new Task() {
074                    public void run() {
075                        // we are a server
076    
077                        // when you are a server, we must first listen for the
078                        // address of the client before sending data.
079    
080                        // once they send us their address, we will send our
081                        // full list of known addresses, followed by our own
082                        // address to signal that we are done.
083    
084                        // Afterward we will only pulls our heartbeat
085                        SocketChannel client = null;
086                        try {
087                            client = serverChannel.accept();
088    
089                            InetSocketAddress address = (InetSocketAddress) client.socket().getRemoteSocketAddress();
090                            trace("accept " + address.getPort());
091                            client.configureBlocking(false);
092    
093                            // Server sessions start by reading the client's greeting
094                            Session session = new Session(Server.this, client, address);
095                            session.start_read_greeting();
096    
097                        } catch (Exception e) {
098                            try {
099                                client.close();
100                            } catch (IOException e1) {
101                            }
102                        }
103    
104                    }
105                });
106    
107                accept_source.setCancelHandler(new Task() {
108                    public void run() {
109                        try {
110                            serverChannel.close();
111                        } catch (Throwable e) {
112                        }
113                    }
114                });
115                trace("Listening");
116            }
117    
118            public Server start() {
119                accept_source.resume();
120                return this;
121            }
122    
123            public void stop() {
124                accept_source.suspend();
125            }
126    
127            public void close() {
128                accept_source.cancel();
129            }
130    
131            public void connect(Server s) {
132                connect(s.port);
133            }
134    
135            public void connect(int port) {
136                connect(URI.create("conn://localhost:" + port));
137            }
138    
139            public void connect(final URI uri) {
140                queue.execute(new Task() {
141                    public void run() {
142                        if (me.equals(uri) || seen.contains(uri))
143                            return;
144    
145                        try {
146                            int port = uri.getPort();
147                            String host = uri.getHost();
148    
149                            trace("open " + uri);
150    
151                            final SocketChannel socketChannel = SocketChannel.open();
152                            socketChannel.configureBlocking(false);
153    
154                            final InetSocketAddress address = new InetSocketAddress(host, port);
155    
156                            socketChannel.connect(address);
157    
158                            final DispatchSource connect_source = createSource(socketChannel, SelectionKey.OP_CONNECT, queue);
159                            connect_source.setEventHandler(new Task() {
160                                public void run() {
161                                    connect_source.cancel();
162                                    try {
163                                        socketChannel.finishConnect();
164                                        trace("connected " + uri);
165                                        Session session = new Session(Server.this, socketChannel, address, uri);
166                                        session.start_write_greeting();
167                                    } catch (IOException e) {
168                                        trace("connect to " + uri + " FAILED.");
169                                    }
170                                }
171                            });
172                            connect_source.resume();
173                            seen.add(uri);
174    
175                        } catch (IOException e) {
176                            e.printStackTrace();
177                        }
178                    }
179                });
180            }
181    
182            public void trace(String str) {
183                System.out.println(String.format("%5d       - %s", port, str));
184            }
185    
186        }
187    
188        static class Session {
189    
190            Server server;
191            SocketChannel channel;
192            InetSocketAddress address;
193            URI uri;
194    
195            ByteBuffer read_buffer = ByteBuffer.allocate(1024);
196    
197            DispatchQueue queue;
198            DispatchSource read_source;
199            DispatchSource write_source;
200            ArrayList<URI> seen;
201            ArrayList<URI> listed = new ArrayList<URI>();
202    
203    
204            public Session(Server server, SocketChannel channel, InetSocketAddress address, URI uri) {
205                this.server = server;
206                this.channel = channel;
207                this.address = address;
208                this.uri = uri;
209    
210                this.queue = createQueue(uri.toString());
211                this.read_source = createSource(channel, SelectionKey.OP_READ, queue);
212                this.write_source = createSource(channel, SelectionKey.OP_WRITE, queue);
213                this.seen = new ArrayList<URI>(server.seen);
214    
215            }
216    
217            public Session(Server server, SocketChannel channel, InetSocketAddress address) {
218                this(server, channel, address, URI.create("conn://" + address.getHostName() + ":" + address.getPort()));
219            }
220    
221    
222            public void start_read_greeting() {
223                read_source.setEventHandler(read_greeting());
224                read_source.resume();
225            }
226    
227    
228            public Task read_greeting() {
229                return new Task() {
230                    public void run() {
231                        try {
232                            String message = read_frame();
233                            if (message != null) {
234                                // stop looking for read events..
235                                read_source.suspend();
236                                URI uri = URI.create(message);
237                                trace("welcome");
238    
239                                // Send them our seen uris..
240                                ArrayList<Object> list = new ArrayList<Object>(seen);
241                                list.remove(server.me);
242                                list.remove(uri);
243                                list.add("end");
244    
245                                start_write_data(new Task() {
246                                    public void run() {
247                                        start_read_hearbeat();
248                                    }
249                                }, list.toArray(new Object[list.size()]));
250                            }
251                        } catch (IOException e) {
252                            e.printStackTrace();
253                        }
254                    }
255                };
256            }
257    
258            public void start_write_greeting() throws IOException {
259                trace("hello");
260                start_write_data(new Task() {
261                    public void run() {
262                        start_read_server_listings();
263                    }
264                }, server.me);
265            }
266    
267            public void start_read_server_listings() {
268                read_source.setEventHandler(read_server_listings());
269                read_source.resume();
270            }
271    
272    
273            public Task read_server_listings() {
274                return new Task() {
275                    public void run() {
276                        try {
277                            String message = read_frame();
278                            if (message != null) {
279                                if (!message.equals("end")) {
280                                    URI uri = URI.create(message);
281                                    listed.add(uri);
282                                    server.connect(uri);
283                                } else {
284                                    // Send them our seen uris..
285                                    ArrayList<Object> list = new ArrayList<Object>(seen);
286                                    list.removeAll(listed);
287                                    list.remove(server.me);
288                                    list.add("end");
289                                    start_write_data(new Task(){
290                                        public void run() {
291                                            start_write_hearbeat();
292                                        }
293                                    }, list.toArray(new Object[list.size()]));
294                                }
295                            }
296                        } catch (IOException e) {
297                            e.printStackTrace();
298                        }
299                    }
300                };
301            }
302    
303            public void start_read_client_listings() {
304                read_source.setEventHandler(read_clientlistings());
305                read_source.resume();
306            }
307    
308            public Task read_clientlistings() {
309                return new Task() {
310                    public void run() {
311                        try {
312                            String message = read_frame();
313                            if (message != null) {
314                                if (!message.equals("end")) {
315                                    server.connect(URI.create(message));
316                                } else {
317                                    start_read_hearbeat();
318                                }
319                            }
320                        } catch (IOException e) {
321                            e.printStackTrace();
322                        }
323                    }
324                };
325            }
326    
327            public void start_write_hearbeat() {
328                queue.executeAfter(1, TimeUnit.SECONDS, new Task() {
329                    public void run() {
330                        try {
331                            trace("ping");
332                            start_write_data(new Task() {
333                                public void run() {
334                                    start_write_hearbeat();
335                                }
336                            }, "ping");
337                        } catch (IOException e) {
338                            e.printStackTrace();
339                        }
340                    }
341                });
342            }
343    
344    
345            public void start_read_hearbeat() {
346                read_source.setEventHandler(read_hearbeat());
347                read_source.resume();
348            }
349    
350            public Task read_hearbeat() {
351                return new Task() {
352                    public void run() {
353                        try {
354                            String message = read_frame();
355                            if (message != null) {
356                                trace("pong");
357                            }
358                        } catch (IOException e) {
359                            e.printStackTrace();
360                        }
361                    }
362                };
363            }
364    
365            public void start_write_data(Task onDone, Object... list) throws IOException {
366                ByteArrayOutputStream baos = new ByteArrayOutputStream();
367                for (Object next : list) {
368                    baos.write(next.toString().getBytes("UTF-8"));
369                    baos.write(0);
370                }
371                ByteBuffer buffer = ByteBuffer.wrap(baos.toByteArray());
372                write_source.setEventHandler(write_data(buffer, onDone));
373                write_source.resume();
374            }
375    
376            public Task write_data(final ByteBuffer buffer, final Task onDone) {
377                return new Task() {
378                    public void run() {
379                        try {
380                            channel.write(buffer);
381                            if (buffer.remaining() == 0) {
382                                write_source.suspend();
383                                onDone.run();
384                            }
385                        } catch (IOException e) {
386                            e.printStackTrace();
387                        }
388                    }
389                };
390            }
391    
392            public String read_frame() throws IOException {
393                if (channel.read(read_buffer) == -1) {
394                    throw new EOFException();
395                }
396                byte[] buf = read_buffer.array();
397                int endPos = eof(buf, 0, read_buffer.position());
398                if (endPos < 0) {
399                    trace(" --- ");
400                    return null;
401                }
402                String rc = new String(buf, 0, endPos);
403                int newPos = read_buffer.position() - endPos;
404                System.arraycopy(buf, endPos + 1, buf, 0, newPos);
405                read_buffer.position(newPos);
406                return rc;
407            }
408    
409            public int eof(byte[] data, int offset, int pos) {
410                int i = offset;
411                while (i < pos) {
412                    if (data[i] == 0) {
413                        return i;
414                    }
415                    i++;
416                }
417                return -1;
418            }
419    
420            public void trace(String str) {
421                System.out.println(String.format("%5d %5d - %s", server.port, uri.getPort(), str));
422            }
423    
424    
425        }
426    
427    }