001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transport.nio; 018 019 import java.io.DataInputStream; 020 import java.io.DataOutputStream; 021 import java.io.EOFException; 022 import java.io.IOException; 023 import java.net.Socket; 024 import java.net.URI; 025 import java.net.UnknownHostException; 026 import java.nio.ByteBuffer; 027 import java.nio.channels.SelectionKey; 028 import java.nio.channels.SocketChannel; 029 030 import javax.net.SocketFactory; 031 032 import org.apache.activemq.command.Command; 033 import org.apache.activemq.transport.Transport; 034 import org.apache.activemq.transport.tcp.TcpTransport; 035 import org.apache.activemq.util.IOExceptionSupport; 036 import org.apache.activemq.util.ServiceStopper; 037 import org.apache.activemq.wireformat.WireFormat; 038 039 /** 040 * An implementation of the {@link Transport} interface using raw tcp/ip 041 * 042 * @version $Revision$ 043 */ 044 public class NIOTransport extends TcpTransport { 045 046 // private static final Log log = LogFactory.getLog(NIOTransport.class); 047 private SocketChannel channel; 048 private SelectorSelection selection; 049 private ByteBuffer inputBuffer; 050 private ByteBuffer currentBuffer; 051 private int nextFrameSize; 052 053 public NIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException { 054 super(wireFormat, socketFactory, remoteLocation, localLocation); 055 } 056 057 public NIOTransport(WireFormat wireFormat, Socket socket) throws IOException { 058 super(wireFormat, socket); 059 } 060 061 protected void initializeStreams() throws IOException { 062 channel = socket.getChannel(); 063 channel.configureBlocking(false); 064 065 // listen for events telling us when the socket is readable. 066 selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() { 067 public void onSelect(SelectorSelection selection) { 068 serviceRead(); 069 } 070 071 public void onError(SelectorSelection selection, Throwable error) { 072 if (error instanceof IOException) { 073 onException((IOException)error); 074 } else { 075 onException(IOExceptionSupport.create(error)); 076 } 077 } 078 }); 079 080 // Send the data via the channel 081 // inputBuffer = ByteBuffer.allocateDirect(8*1024); 082 inputBuffer = ByteBuffer.allocate(8 * 1024); 083 currentBuffer = inputBuffer; 084 nextFrameSize = -1; 085 currentBuffer.limit(4); 086 this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 16 * 1024)); 087 088 } 089 090 private void serviceRead() { 091 try { 092 while (true) { 093 094 int readSize = channel.read(currentBuffer); 095 if (readSize == -1) { 096 onException(new EOFException()); 097 selection.close(); 098 break; 099 } 100 if (readSize == 0) { 101 break; 102 } 103 104 if (currentBuffer.hasRemaining()) { 105 continue; 106 } 107 108 // Are we trying to figure out the size of the next frame? 109 if (nextFrameSize == -1) { 110 assert inputBuffer == currentBuffer; 111 112 // If the frame is too big to fit in our direct byte buffer, 113 // Then allocate a non direct byte buffer of the right size 114 // for it. 115 inputBuffer.flip(); 116 nextFrameSize = inputBuffer.getInt() + 4; 117 if (nextFrameSize > inputBuffer.capacity()) { 118 currentBuffer = ByteBuffer.allocate(nextFrameSize); 119 currentBuffer.putInt(nextFrameSize); 120 } else { 121 inputBuffer.limit(nextFrameSize); 122 } 123 124 } else { 125 currentBuffer.flip(); 126 127 Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer))); 128 doConsume((Command)command); 129 130 nextFrameSize = -1; 131 inputBuffer.clear(); 132 inputBuffer.limit(4); 133 currentBuffer = inputBuffer; 134 } 135 136 } 137 138 } catch (IOException e) { 139 onException(e); 140 } catch (Throwable e) { 141 onException(IOExceptionSupport.create(e)); 142 } 143 } 144 145 protected void doStart() throws Exception { 146 connect(); 147 selection.setInterestOps(SelectionKey.OP_READ); 148 selection.enable(); 149 } 150 151 protected void doStop(ServiceStopper stopper) throws Exception { 152 selection.close(); 153 super.doStop(stopper); 154 } 155 }