JXTA

net.jxta.impl.util.pipe.reliable
Class ReliableOutputStream

java.lang.Object
  extended by java.io.OutputStream
      extended by net.jxta.impl.util.pipe.reliable.ReliableOutputStream
All Implemented Interfaces:
Closeable, Flushable, Incoming

public class ReliableOutputStream
extends OutputStream
implements Incoming

Accepts data and packages it into messages for sending to the remote. The messages are kept in a retry queue until the remote peer acknowledges receipt of the message.


Field Summary
protected  List<net.jxta.impl.util.pipe.reliable.ReliableOutputStream.RetrQElt> retrQ
          The collection of messages available for re-transmission.
 
Constructor Summary
ReliableOutputStream(Outgoing outgoing)
          Constructor for the ReliableOutputStream object
ReliableOutputStream(Outgoing outgoing, FlowControl fc)
          Constructor for the ReliableOutputStream object
 
Method Summary
 void ackReceived(int seqnum, int[] sackList)
          Process an ACK Message.
 void close()
          
 void flush()
          
 long getLingerDelay()
           
 int getMaxAck()
          Gets the maxAck attribute of the ReliableOutputStream object
 int getSeqNumber()
          Gets the seqNumber attribute of the ReliableOutputStream object
 void hardClose()
          We have received a close request from the remote peer.
 boolean isClosed()
          Returns the state of the stream
 boolean isQueueEmpty()
          Gets the queueEmpty attribute of the ReliableOutputStream object.
protected  boolean isQueueFull()
          Gets the queueFull attribute of the ReliableOutputStream object
 void recv(Message msg)
          process an incoming message
 int send(Message msg)
          Serialize a JXTA message as a reliable message.
 void setLingerDelay(long linger)
           
 int setSendBufferSize()
          Return the size of the buffers we are using for accumulating writes.
 void setSendBufferSize(int size)
          Set the size of the buffers we will use for accumulating writes.
 boolean waitQueueEmpty(long timeout)
          Waits for the retransmit queue to become empty.
 void waitQueueEvent(long timeout)
          wait for activity on the retry queue
 void write(byte[] b, int off, int len)
          
 void write(int b)
          
 
Methods inherited from class java.io.OutputStream
write
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

retrQ

protected final List<net.jxta.impl.util.pipe.reliable.ReliableOutputStream.RetrQElt> retrQ
The collection of messages available for re-transmission.

Constructor Detail

ReliableOutputStream

public ReliableOutputStream(Outgoing outgoing)
Constructor for the ReliableOutputStream object

Parameters:
outgoing - the outgoing object

ReliableOutputStream

public ReliableOutputStream(Outgoing outgoing,
                            FlowControl fc)
Constructor for the ReliableOutputStream object

Parameters:
outgoing - the outgoing object
fc - flow-control
Method Detail

close

public void close()
           throws IOException

Specified by:
close in interface Closeable
Overrides:
close in class OutputStream
Throws:
IOException

getLingerDelay

public long getLingerDelay()

setLingerDelay

public void setLingerDelay(long linger)

setSendBufferSize

public int setSendBufferSize()
Return the size of the buffers we are using for accumulating writes.

Returns:
size of our write buffers.

setSendBufferSize

public void setSendBufferSize(int size)
                       throws IOException
Set the size of the buffers we will use for accumulating writes.

Parameters:
size - The desired size of write buffers.
Throws:
IOException - if an I/O error occurs. In particular, an IOException is thrown if the output stream is closed.

hardClose

public void hardClose()
We have received a close request from the remote peer. We must stop retransmissions immediately.


isClosed

public boolean isClosed()
Returns the state of the stream

Returns:
true if closed

flush

public void flush()
           throws IOException

Specified by:
flush in interface Flushable
Overrides:
flush in class OutputStream
Throws:
IOException

write

public void write(int b)
           throws IOException

Specified by:
write in class OutputStream
Throws:
IOException

write

public void write(byte[] b,
                  int off,
                  int len)
           throws IOException

Overrides:
write in class OutputStream
Throws:
IOException

send

public int send(Message msg)
         throws IOException
Serialize a JXTA message as a reliable message.

This method bypasses the built-in buffering and ignores the MTU size.

Parameters:
msg - message to send
Returns:
message sequence number
Throws:
IOException - if an I/O error occurs

getMaxAck

public int getMaxAck()
Gets the maxAck attribute of the ReliableOutputStream object

Returns:
The maxAck value

getSeqNumber

public int getSeqNumber()
Gets the seqNumber attribute of the ReliableOutputStream object

Returns:
The seqNumber value

isQueueFull

protected boolean isQueueFull()
Gets the queueFull attribute of the ReliableOutputStream object

Returns:
The queueFull value

isQueueEmpty

public boolean isQueueEmpty()
Gets the queueEmpty attribute of the ReliableOutputStream object.

Returns:
true if the queue is empty otherwise false.

waitQueueEmpty

public boolean waitQueueEmpty(long timeout)
                       throws InterruptedException
Waits for the retransmit queue to become empty.

Parameters:
timeout - The relative time in milliseconds to wait for the queue to become empty.
Returns:
true if the queue is empty otherwise false.
Throws:
InterruptedException - if interrupted

waitQueueEvent

public void waitQueueEvent(long timeout)
                    throws InterruptedException
wait for activity on the retry queue

Parameters:
timeout - timeout in millis
Throws:
InterruptedException - when interrupted

recv

public void recv(Message msg)
process an incoming message

Specified by:
recv in interface Incoming
Parameters:
msg - message to process

ackReceived

public void ackReceived(int seqnum,
                        int[] sackList)
Process an ACK Message. We remove ACKed messages from the retry queue. We only acknowledge messages received in sequence.

The seqnum is for the largest unacknowledged seqnum the recipient has received.

The sackList is a sequence of all of the received messages in the sender's input Q. All will be sequence numbers higher than the sequential ACK seqnum.

Recipients are passive and only ack upon the receipt of an in sequence message.

They depend on our RTO to fill holes in message sequences.

Parameters:
seqnum - message sequence number
sackList - array of message sequence numbers

JXSE