001 /** 002 * Copyright (C) 2012 FuseSource, Inc. 003 * http://fusesource.com 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.fusesource.hawtdispatch.example.discovery; 019 020 import org.fusesource.hawtdispatch.*; 021 022 import static org.fusesource.hawtdispatch.Dispatch.*; 023 024 import java.io.ByteArrayOutputStream; 025 import java.io.EOFException; 026 import java.io.IOException; 027 import java.net.InetSocketAddress; 028 import java.net.URI; 029 import java.nio.ByteBuffer; 030 import java.nio.channels.*; 031 import java.util.ArrayList; 032 import java.util.concurrent.TimeUnit; 033 034 /** 035 * An example of a networks of servers which advertise connection information to each other. 036 */ 037 public class EchoNetJava { 038 public static void main(String[] args) throws Exception { 039 run(); 040 } 041 042 public static void run() throws Exception { 043 Server a = new Server(4444).start(); 044 Server b = new Server(5555).start(); 045 Server c = new Server(6666).start(); 046 047 Thread.sleep(200); 048 049 a.connect(3333); 050 a.connect(b); 051 b.connect(c); 052 System.in.read(); 053 } 054 055 static class Server { 056 final int port; 057 final URI me; 058 final ServerSocketChannel serverChannel; 059 final ArrayList<URI> seen = new ArrayList<URI>(); 060 final DispatchQueue queue; 061 final DispatchSource accept_source; 062 063 064 public Server(int port) throws Exception { 065 this.port = port; 066 this.me = URI.create("conn://localhost:" + port); 067 this.serverChannel = ServerSocketChannel.open(); 068 serverChannel.socket().bind(new InetSocketAddress(port)); 069 serverChannel.configureBlocking(false); 070 queue = createQueue(me.toString()); 071 accept_source = createSource(serverChannel, SelectionKey.OP_ACCEPT, queue); 072 073 accept_source.setEventHandler(new Task() { 074 public void run() { 075 // we are a server 076 077 // when you are a server, we must first listen for the 078 // address of the client before sending data. 079 080 // once they send us their address, we will send our 081 // full list of known addresses, followed by our own 082 // address to signal that we are done. 083 084 // Afterward we will only pulls our heartbeat 085 SocketChannel client = null; 086 try { 087 client = serverChannel.accept(); 088 089 InetSocketAddress address = (InetSocketAddress) client.socket().getRemoteSocketAddress(); 090 trace("accept " + address.getPort()); 091 client.configureBlocking(false); 092 093 // Server sessions start by reading the client's greeting 094 Session session = new Session(Server.this, client, address); 095 session.start_read_greeting(); 096 097 } catch (Exception e) { 098 try { 099 client.close(); 100 } catch (IOException e1) { 101 } 102 } 103 104 } 105 }); 106 107 accept_source.setCancelHandler(new Task() { 108 public void run() { 109 try { 110 serverChannel.close(); 111 } catch (Throwable e) { 112 } 113 } 114 }); 115 trace("Listening"); 116 } 117 118 public Server start() { 119 accept_source.resume(); 120 return this; 121 } 122 123 public void stop() { 124 accept_source.suspend(); 125 } 126 127 public void close() { 128 accept_source.cancel(); 129 } 130 131 public void connect(Server s) { 132 connect(s.port); 133 } 134 135 public void connect(int port) { 136 connect(URI.create("conn://localhost:" + port)); 137 } 138 139 public void connect(final URI uri) { 140 queue.execute(new Task() { 141 public void run() { 142 if (me.equals(uri) || seen.contains(uri)) 143 return; 144 145 try { 146 int port = uri.getPort(); 147 String host = uri.getHost(); 148 149 trace("open " + uri); 150 151 final SocketChannel socketChannel = SocketChannel.open(); 152 socketChannel.configureBlocking(false); 153 154 final InetSocketAddress address = new InetSocketAddress(host, port); 155 156 socketChannel.connect(address); 157 158 final DispatchSource connect_source = createSource(socketChannel, SelectionKey.OP_CONNECT, queue); 159 connect_source.setEventHandler(new Task() { 160 public void run() { 161 connect_source.cancel(); 162 try { 163 socketChannel.finishConnect(); 164 trace("connected " + uri); 165 Session session = new Session(Server.this, socketChannel, address, uri); 166 session.start_write_greeting(); 167 } catch (IOException e) { 168 trace("connect to " + uri + " FAILED."); 169 } 170 } 171 }); 172 connect_source.resume(); 173 seen.add(uri); 174 175 } catch (IOException e) { 176 e.printStackTrace(); 177 } 178 } 179 }); 180 } 181 182 public void trace(String str) { 183 System.out.println(String.format("%5d - %s", port, str)); 184 } 185 186 } 187 188 static class Session { 189 190 Server server; 191 SocketChannel channel; 192 InetSocketAddress address; 193 URI uri; 194 195 ByteBuffer read_buffer = ByteBuffer.allocate(1024); 196 197 DispatchQueue queue; 198 DispatchSource read_source; 199 DispatchSource write_source; 200 ArrayList<URI> seen; 201 ArrayList<URI> listed = new ArrayList<URI>(); 202 203 204 public Session(Server server, SocketChannel channel, InetSocketAddress address, URI uri) { 205 this.server = server; 206 this.channel = channel; 207 this.address = address; 208 this.uri = uri; 209 210 this.queue = createQueue(uri.toString()); 211 this.read_source = createSource(channel, SelectionKey.OP_READ, queue); 212 this.write_source = createSource(channel, SelectionKey.OP_WRITE, queue); 213 this.seen = new ArrayList<URI>(server.seen); 214 215 } 216 217 public Session(Server server, SocketChannel channel, InetSocketAddress address) { 218 this(server, channel, address, URI.create("conn://" + address.getHostName() + ":" + address.getPort())); 219 } 220 221 222 public void start_read_greeting() { 223 read_source.setEventHandler(read_greeting()); 224 read_source.resume(); 225 } 226 227 228 public Task read_greeting() { 229 return new Task() { 230 public void run() { 231 try { 232 String message = read_frame(); 233 if (message != null) { 234 // stop looking for read events.. 235 read_source.suspend(); 236 URI uri = URI.create(message); 237 trace("welcome"); 238 239 // Send them our seen uris.. 240 ArrayList<Object> list = new ArrayList<Object>(seen); 241 list.remove(server.me); 242 list.remove(uri); 243 list.add("end"); 244 245 start_write_data(new Task() { 246 public void run() { 247 start_read_hearbeat(); 248 } 249 }, list.toArray(new Object[list.size()])); 250 } 251 } catch (IOException e) { 252 e.printStackTrace(); 253 } 254 } 255 }; 256 } 257 258 public void start_write_greeting() throws IOException { 259 trace("hello"); 260 start_write_data(new Task() { 261 public void run() { 262 start_read_server_listings(); 263 } 264 }, server.me); 265 } 266 267 public void start_read_server_listings() { 268 read_source.setEventHandler(read_server_listings()); 269 read_source.resume(); 270 } 271 272 273 public Task read_server_listings() { 274 return new Task() { 275 public void run() { 276 try { 277 String message = read_frame(); 278 if (message != null) { 279 if (!message.equals("end")) { 280 URI uri = URI.create(message); 281 listed.add(uri); 282 server.connect(uri); 283 } else { 284 // Send them our seen uris.. 285 ArrayList<Object> list = new ArrayList<Object>(seen); 286 list.removeAll(listed); 287 list.remove(server.me); 288 list.add("end"); 289 start_write_data(new Task(){ 290 public void run() { 291 start_write_hearbeat(); 292 } 293 }, list.toArray(new Object[list.size()])); 294 } 295 } 296 } catch (IOException e) { 297 e.printStackTrace(); 298 } 299 } 300 }; 301 } 302 303 public void start_read_client_listings() { 304 read_source.setEventHandler(read_clientlistings()); 305 read_source.resume(); 306 } 307 308 public Task read_clientlistings() { 309 return new Task() { 310 public void run() { 311 try { 312 String message = read_frame(); 313 if (message != null) { 314 if (!message.equals("end")) { 315 server.connect(URI.create(message)); 316 } else { 317 start_read_hearbeat(); 318 } 319 } 320 } catch (IOException e) { 321 e.printStackTrace(); 322 } 323 } 324 }; 325 } 326 327 public void start_write_hearbeat() { 328 queue.executeAfter(1, TimeUnit.SECONDS, new Task() { 329 public void run() { 330 try { 331 trace("ping"); 332 start_write_data(new Task() { 333 public void run() { 334 start_write_hearbeat(); 335 } 336 }, "ping"); 337 } catch (IOException e) { 338 e.printStackTrace(); 339 } 340 } 341 }); 342 } 343 344 345 public void start_read_hearbeat() { 346 read_source.setEventHandler(read_hearbeat()); 347 read_source.resume(); 348 } 349 350 public Task read_hearbeat() { 351 return new Task() { 352 public void run() { 353 try { 354 String message = read_frame(); 355 if (message != null) { 356 trace("pong"); 357 } 358 } catch (IOException e) { 359 e.printStackTrace(); 360 } 361 } 362 }; 363 } 364 365 public void start_write_data(Task onDone, Object... list) throws IOException { 366 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 367 for (Object next : list) { 368 baos.write(next.toString().getBytes("UTF-8")); 369 baos.write(0); 370 } 371 ByteBuffer buffer = ByteBuffer.wrap(baos.toByteArray()); 372 write_source.setEventHandler(write_data(buffer, onDone)); 373 write_source.resume(); 374 } 375 376 public Task write_data(final ByteBuffer buffer, final Task onDone) { 377 return new Task() { 378 public void run() { 379 try { 380 channel.write(buffer); 381 if (buffer.remaining() == 0) { 382 write_source.suspend(); 383 onDone.run(); 384 } 385 } catch (IOException e) { 386 e.printStackTrace(); 387 } 388 } 389 }; 390 } 391 392 public String read_frame() throws IOException { 393 if (channel.read(read_buffer) == -1) { 394 throw new EOFException(); 395 } 396 byte[] buf = read_buffer.array(); 397 int endPos = eof(buf, 0, read_buffer.position()); 398 if (endPos < 0) { 399 trace(" --- "); 400 return null; 401 } 402 String rc = new String(buf, 0, endPos); 403 int newPos = read_buffer.position() - endPos; 404 System.arraycopy(buf, endPos + 1, buf, 0, newPos); 405 read_buffer.position(newPos); 406 return rc; 407 } 408 409 public int eof(byte[] data, int offset, int pos) { 410 int i = offset; 411 while (i < pos) { 412 if (data[i] == 0) { 413 return i; 414 } 415 i++; 416 } 417 return -1; 418 } 419 420 public void trace(String str) { 421 System.out.println(String.format("%5d %5d - %s", server.port, uri.getPort(), str)); 422 } 423 424 425 } 426 427 }