001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.nio;
018    
019    import java.io.EOFException;
020    import java.io.IOException;
021    import java.io.InterruptedIOException;
022    import java.io.OutputStream;
023    import java.nio.ByteBuffer;
024    import java.nio.channels.WritableByteChannel;
025    
026    /**
027     * An optimized buffered outputstream for Tcp
028     * 
029     * @version $Revision: 1.1.1.1 $
030     */
031    
032    public class NIOOutputStream extends OutputStream {
033    
034        private static final int BUFFER_SIZE = 8192;
035    
036        private final WritableByteChannel out;
037        private final byte[] buffer;
038        private final ByteBuffer byteBuffer;
039    
040        private int count;
041        private boolean closed;
042    
043        /**
044         * Constructor
045         * 
046         * @param out
047         */
048        public NIOOutputStream(WritableByteChannel out) {
049            this(out, BUFFER_SIZE);
050        }
051    
052        /**
053         * Creates a new buffered output stream to write data to the specified
054         * underlying output stream with the specified buffer size.
055         * 
056         * @param out the underlying output stream.
057         * @param size the buffer size.
058         * @throws IllegalArgumentException if size <= 0.
059         */
060        public NIOOutputStream(WritableByteChannel out, int size) {
061            this.out = out;
062            if (size <= 0) {
063                throw new IllegalArgumentException("Buffer size <= 0");
064            }
065            buffer = new byte[size];
066            byteBuffer = ByteBuffer.wrap(buffer);
067        }
068    
069        /**
070         * write a byte on to the stream
071         * 
072         * @param b - byte to write
073         * @throws IOException
074         */
075        public void write(int b) throws IOException {
076            checkClosed();
077            if (availableBufferToWrite() < 1) {
078                flush();
079            }
080            buffer[count++] = (byte)b;
081        }
082    
083        /**
084         * write a byte array to the stream
085         * 
086         * @param b the byte buffer
087         * @param off the offset into the buffer
088         * @param len the length of data to write
089         * @throws IOException
090         */
091        public void write(byte b[], int off, int len) throws IOException {
092            checkClosed();
093            if (availableBufferToWrite() < len) {
094                flush();
095            }
096            if (buffer.length >= len) {
097                System.arraycopy(b, off, buffer, count, len);
098                count += len;
099            } else {
100                write(ByteBuffer.wrap(b, off, len));
101            }
102        }
103    
104        /**
105         * flush the data to the output stream This doesn't call flush on the
106         * underlying outputstream, because Tcp is particularly efficent at doing
107         * this itself ....
108         * 
109         * @throws IOException
110         */
111        public void flush() throws IOException {
112            if (count > 0 && out != null) {
113                byteBuffer.position(0);
114                byteBuffer.limit(count);
115                write(byteBuffer);
116                count = 0;
117            }
118        }
119    
120        /**
121         * close this stream
122         * 
123         * @throws IOException
124         */
125        public void close() throws IOException {
126            super.close();
127            closed = true;
128        }
129    
130        /**
131         * Checks that the stream has not been closed
132         * 
133         * @throws IOException
134         */
135        protected void checkClosed() throws IOException {
136            if (closed) {
137                throw new EOFException("Cannot write to the stream any more it has already been closed");
138            }
139        }
140    
141        /**
142         * @return the amount free space in the buffer
143         */
144        private int availableBufferToWrite() {
145            return buffer.length - count;
146        }
147    
148        protected void write(ByteBuffer data) throws IOException {
149            int remaining = data.remaining();
150            int lastRemaining = remaining - 1;
151            long delay = 1;
152            while (remaining > 0) {
153    
154                // We may need to do a little bit of sleeping to avoid a busy loop.
155                // Slow down if no data was written out..
156                if (remaining == lastRemaining) {
157                    try {
158                        // Use exponential rollback to increase sleep time.
159                        Thread.sleep(delay);
160                        delay *= 2;
161                        if (delay > 1000) {
162                            delay = 1000;
163                        }
164                    } catch (InterruptedException e) {
165                        throw new InterruptedIOException();
166                    }
167                } else {
168                    delay = 1;
169                }
170                lastRemaining = remaining;
171    
172                // Since the write is non-blocking, all the data may not have been
173                // written.
174                out.write(data);
175                remaining = data.remaining();
176            }
177        }
178    
179    }