JXTA

net.jxta.util
Class JxtaBiDiPipe

java.lang.Object
  extended by net.jxta.util.JxtaBiDiPipe
All Implemented Interfaces:
EventListener, OutputPipeListener, PipeMsgListener

public class JxtaBiDiPipe
extends Object
implements PipeMsgListener, OutputPipeListener

JxtaBiDiPipe is a pair of UnicastPipe channels that implements a bidirectional pipe. By default, JxtaBiDiPipe operates in reliable mode, unless otherwise specified, in addition, messages must not exceed the Endpoint MTU size of 64K, exceed the MTU will lead to unexpected behavior.

It highly recommended that an application message listener is specified, not doing so, may lead to message loss in the event the internal queue is overflowed.

Sending messages vis sendMessage(Message) from within a PipeMsgListener may result in a deadlock due to contention between the sending and receiving portions of BiDi pipes.

JxtaBiDiPipe, whenever possible, will attempt to utilize direct tcp messengers, which leads to improved performance.


Field Summary
protected  Object acceptLock
           
protected  boolean bound
           
protected  boolean closed
           
protected  Object closeLock
           
protected  OutputPipe connectOutpipe
           
protected  Credential credential
           
protected  StructuredDocument credentialDoc
           
protected  boolean dequeued
           
protected  boolean direct
          If true then we are using a reliable direct messenger to the remote peer.
protected  PipeEventListener eventListener
           
protected  Object finalLock
           
protected  PeerGroup group
           
protected  InputPipe inputPipe
           
protected  boolean isReliable
          If true then we are using the underlying end-to-end ACK reliable layer to ensure that messages are received by the remote peer.
protected  int maxRetryTimeout
           
protected  PipeMsgListener msgListener
           
protected  Messenger msgr
           
protected  PipeAdvertisement myPipeAdv
           
protected  OutgoingMsgrAdaptor outgoing
           
static int PIPE_CLOSED_EVENT
          Pipe close Event
protected  PipeAdvertisement pipeAdv
           
protected  PipeService pipeSvc
           
protected  int retryTimeout
           
protected  ReliableInputStream ris
           
protected  ReliableOutputStream ros
           
protected  PipeStateListener stateListener
           
protected  InputStream stream
           
protected  int timeout
           
protected  boolean waiting
           
protected  int windowSize
           
 
Constructor Summary
  JxtaBiDiPipe()
          Creates a new object with a default timeout of #timeout, and no reliability.
protected JxtaBiDiPipe(PeerGroup group, Messenger msgr, PipeAdvertisement pipe, StructuredDocument credDoc, boolean isReliable, boolean direct)
          Creates a bidirectional pipe
  JxtaBiDiPipe(PeerGroup group, PipeAdvertisement pipeAd, int timeout, PipeMsgListener msgListener)
          Creates a bidirectional pipe.
  JxtaBiDiPipe(PeerGroup group, PipeAdvertisement pipeAd, int timeout, PipeMsgListener msgListener, boolean reliable)
          attempts to create a bidirectional connection to remote peer
  JxtaBiDiPipe(PeerGroup group, PipeAdvertisement pipeAd, PipeMsgListener msgListener)
          Creates a bidirectional pipe.
 
Method Summary
protected  boolean checkCred(StructuredDocument cred)
          Not implemented yet
 void close()
          Closes this pipe.
protected  void closePipe(boolean fastClose)
           
 void connect(PeerGroup group, PeerID peerid, PipeAdvertisement pipeAd, int timeout, PipeMsgListener msgListener)
          Connects to a remote JxtaServerPipe
 void connect(PeerGroup group, PeerID peerid, PipeAdvertisement pipeAd, int timeout, PipeMsgListener msgListener, boolean reliable)
          Connects to a remote JxtaServerPipe
 void connect(PeerGroup group, PipeAdvertisement pipeAd)
          Connect to a JxtaServerPipe with default timeout
 void connect(PeerGroup group, PipeAdvertisement pipeAd, int timeout)
          Connects to a remote JxtaBiDiPipe
protected  Message createOpenMessage(PeerGroup group, PipeAdvertisement pipeAd)
          Creates a connection request message
protected  void finalize()
          

Closes the JxtaBiDiPipe.

protected static StructuredDocument getCredDoc(PeerGroup group)
          Obtain the cred doc from the group object.
 StructuredDocument getCredentialDoc()
          get the remote credential doc
protected static Messenger getDirectMessenger(PeerGroup group, PipeAdvertisement pipeAdv, PeerAdvertisement peer)
          A lightweight direct messenger output pipe constructor, note the return type Since all the info needed is available, there's no need for to use the pipe service to resolve the pipe we have all we need to construct a messenger.
 InputPipe getInputPipe()
          Returns an input stream for this socket.
 PipeMsgListener getListener()
          Deprecated. use getMessageListener instead
 int getMaxRetryTimeout()
          Gets the Maximum Retry Timeout of the reliability layer
 Message getMessage(int timeout)
          Gets a message from the queue.
 PipeMsgListener getMessageListener()
          Returns the message listener for this pipe
 PipeAdvertisement getPipeAdvertisement()
          Returns the Assigned PipeAdvertisement
 PipeEventListener getPipeEventListener()
          Returns the Pipe event listener for this pipe
 PipeStateListener getPipeStateListener()
          Returns the Pipe state listener for this pipe
 PeerAdvertisement getRemotePeerAdvertisement()
          Returns remote PeerAdvertisement
 PipeAdvertisement getRemotePipeAdvertisement()
          Returns remote PipeAdvertisement
 int getRetryTimeout()
          Gets the Retry Timeout of the reliability layer
 int getWindowSize()
          When in reliable mode, gets the Reliable library window size
 boolean isBound()
          Returns the binding state of the JxtaServerPipe.
protected static Messenger lightweightOutputPipe(PeerGroup group, PipeAdvertisement pipeAdv, PeerAdvertisement peer)
          A lightweight output pipe constructor, note the return type Since all the info needed is available, there's no need for to use the pipe service to resolve the pipe we have all we need to construct a messenger.
 void outputPipeEvent(OutputPipeEvent event)
          Called when a input pipe has been located for a previously registered pipe.
 void pipeMsgEvent(PipeMsgEvent event)
          Called for each pipe message event that occurs.
 void processIncomingMessage(Message message)
          This method is invoked by the Reliablity library for each incoming data message
 boolean sendMessage(Message msg)
          Send a message

Messenger

 void setCredentialDoc(StructuredDocument doc)
          Sets the connection credential doc.
protected  void setInputPipe(InputPipe inputPipe)
          Sets the inputPipe attribute of the JxtaBiDiPipe object
 void setListener(PipeEventListener eventListener)
          Deprecated. use setPipeEventListener instead
 void setListener(PipeMsgListener msgListener)
          Deprecated. use setMessageListener instead
 void setMaxRetryTimeout(int maxRetryTimeout)
          Gets the Maximum Retry Timeout of the reliability layer
 void setMessageListener(PipeMsgListener msgListener)
          Sets message listener for a pipe spawned by the JxtaServerPipe.
 void setPipeEventListener(PipeEventListener eventListener)
          Sets a Pipe event listener, set listener to null to unset the listener
 void setPipeStateListener(PipeStateListener stateListener)
          Sets a Pipe state listener, set listener to null to unset the listener
 void setReliable(boolean reliable)
          Toggles reliability
protected  void setRemotePeerAdvertisement(PeerAdvertisement peer)
          Sets the remote PeerAdvertisement
protected  void setRemotePipeAdvertisement(PipeAdvertisement pipe)
          Sets the remote PipeAdvertisement
 void setRetryTimeout(int retryTimeout)
          Sets the Retry Timeout of the underlying reliability layer .
 void setWindowSize(int windowSize)
          When in reliable mode, sets the Reliable library window size
 
Methods inherited from class java.lang.Object
clone, equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

timeout

protected int timeout

retryTimeout

protected int retryTimeout

maxRetryTimeout

protected int maxRetryTimeout

windowSize

protected int windowSize

group

protected PeerGroup group

pipeAdv

protected PipeAdvertisement pipeAdv

myPipeAdv

protected PipeAdvertisement myPipeAdv

pipeSvc

protected PipeService pipeSvc

inputPipe

protected InputPipe inputPipe

connectOutpipe

protected OutputPipe connectOutpipe

msgr

protected Messenger msgr

stream

protected InputStream stream

closeLock

protected final Object closeLock

acceptLock

protected final Object acceptLock

finalLock

protected final Object finalLock

closed

protected boolean closed

bound

protected boolean bound

dequeued

protected boolean dequeued

msgListener

protected PipeMsgListener msgListener

eventListener

protected PipeEventListener eventListener

stateListener

protected PipeStateListener stateListener

credential

protected Credential credential

waiting

protected boolean waiting

isReliable

protected boolean isReliable
If true then we are using the underlying end-to-end ACK reliable layer to ensure that messages are received by the remote peer.


ris

protected ReliableInputStream ris

ros

protected ReliableOutputStream ros

direct

protected volatile boolean direct
If true then we are using a reliable direct messenger to the remote peer. We will assume that messages which are sent successfully will be received successfully.


outgoing

protected OutgoingMsgrAdaptor outgoing

credentialDoc

protected StructuredDocument credentialDoc

PIPE_CLOSED_EVENT

public static final int PIPE_CLOSED_EVENT
Pipe close Event

See Also:
Constant Field Values
Constructor Detail

JxtaBiDiPipe

protected JxtaBiDiPipe(PeerGroup group,
                       Messenger msgr,
                       PipeAdvertisement pipe,
                       StructuredDocument credDoc,
                       boolean isReliable,
                       boolean direct)
                throws IOException
Creates a bidirectional pipe

Parameters:
group - group context
msgr - lightweight output pipe
pipe - PipeAdvertisement
isReliable - Whether the connection is reliable or not
credDoc - Credential StructuredDocument
direct - indicates a direct messenger pipe
Throws:
IOException - if an io error occurs

JxtaBiDiPipe

public JxtaBiDiPipe()
Creates a new object with a default timeout of #timeout, and no reliability.


JxtaBiDiPipe

public JxtaBiDiPipe(PeerGroup group,
                    PipeAdvertisement pipeAd,
                    PipeMsgListener msgListener)
             throws IOException
Creates a bidirectional pipe. Attempts to create a bidirectional connection to remote peer within default timeout of #timeout.

Parameters:
group - group context
pipeAd - PipeAdvertisement
msgListener - application PipeMsgListener
Throws:
IOException - if an io error occurs

JxtaBiDiPipe

public JxtaBiDiPipe(PeerGroup group,
                    PipeAdvertisement pipeAd,
                    int timeout,
                    PipeMsgListener msgListener)
             throws IOException
Creates a bidirectional pipe. Attempts to create a bidirectional connection to remote peer within specified timeout of #timeout.

Parameters:
group - group context
timeout - The number of milliseconds within which the JxtaBiDiPipe must be successfully created. An exception will be thrown if the pipe cannot be created in the alotted time. A timeout value of 0 (zero) specifies an infinite timeout.
pipeAd - PipeAdvertisement
msgListener - application PipeMsgListener
Throws:
IOException - if an io error occurs

JxtaBiDiPipe

public JxtaBiDiPipe(PeerGroup group,
                    PipeAdvertisement pipeAd,
                    int timeout,
                    PipeMsgListener msgListener,
                    boolean reliable)
             throws IOException
attempts to create a bidirectional connection to remote peer

Parameters:
group - group context
pipeAd - PipeAdvertisement
timeout - The number of milliseconds within which the JxtaBiDiPipe must be successfully created. An exception will be thrown if the pipe cannot be created in the allotted time. A timeout value of 0 (zero) specifies an infinite timeout.
msgListener - application PipeMsgListener
reliable - if true, the reliability is assumed
Throws:
IOException - if an io error occurs
Method Detail

connect

public void connect(PeerGroup group,
                    PipeAdvertisement pipeAd)
             throws IOException
Connect to a JxtaServerPipe with default timeout

Parameters:
group - group context
pipeAd - PipeAdvertisement
Throws:
IOException - if an io error occurs

connect

public void connect(PeerGroup group,
                    PipeAdvertisement pipeAd,
                    int timeout)
             throws IOException
Connects to a remote JxtaBiDiPipe

Parameters:
group - group context
pipeAd - PipeAdvertisement
timeout - timeout in ms, also reset object default timeout to that of timeout
Throws:
IOException - if an io error occurs

connect

public void connect(PeerGroup group,
                    PeerID peerid,
                    PipeAdvertisement pipeAd,
                    int timeout,
                    PipeMsgListener msgListener)
             throws IOException
Connects to a remote JxtaServerPipe

Parameters:
group - group context
peerid - peer to connect to
pipeAd - PipeAdvertisement
timeout - timeout in ms, also reset object default timeout to that of timeout
msgListener - application PipeMsgListener
Throws:
IOException - if an io error occurs

connect

public void connect(PeerGroup group,
                    PeerID peerid,
                    PipeAdvertisement pipeAd,
                    int timeout,
                    PipeMsgListener msgListener,
                    boolean reliable)
             throws IOException
Connects to a remote JxtaServerPipe

Parameters:
group - group context
peerid - peer to connect to
pipeAd - PipeAdvertisement
timeout - timeout in ms, also reset object default timeout to that of timeout
msgListener - application PipeMsgListener
reliable - Reliable connection
Throws:
IOException - if an io error occurs

setReliable

public void setReliable(boolean reliable)
                 throws IOException
Toggles reliability

Parameters:
reliable - Toggles reliability to reliable
Throws:
IOException - if pipe is bound

getCredDoc

protected static StructuredDocument getCredDoc(PeerGroup group)
Obtain the cred doc from the group object.

Parameters:
group - group context
Returns:
The credDoc value

getCredentialDoc

public StructuredDocument getCredentialDoc()
get the remote credential doc

Returns:
Credential StructuredDocument

setCredentialDoc

public void setCredentialDoc(StructuredDocument doc)
Sets the connection credential doc. If no credentials are set, the default group credential are used.

Parameters:
doc - Credential StructuredDocument

createOpenMessage

protected Message createOpenMessage(PeerGroup group,
                                    PipeAdvertisement pipeAd)
                             throws IOException
Creates a connection request message

Parameters:
group - group context
pipeAd - pipe advertisement
Returns:
the Message object
Throws:
IOException - if an io error occurs

isBound

public boolean isBound()
Returns the binding state of the JxtaServerPipe.

Returns:
true if the ServerSocket successfully bound to an address

getInputPipe

public InputPipe getInputPipe()
                       throws IOException
Returns an input stream for this socket.

Returns:
a stream for reading from this socket.
Throws:
IOException - if an I/O error occurs when creating the input stream.

getRemotePeerAdvertisement

public PeerAdvertisement getRemotePeerAdvertisement()
Returns remote PeerAdvertisement

Returns:
remote PeerAdvertisement

getRemotePipeAdvertisement

public PipeAdvertisement getRemotePipeAdvertisement()
Returns remote PipeAdvertisement

Returns:
remote PipeAdvertisement

setRemotePeerAdvertisement

protected void setRemotePeerAdvertisement(PeerAdvertisement peer)
Sets the remote PeerAdvertisement

Parameters:
peer - Remote PeerAdvertisement

setRemotePipeAdvertisement

protected void setRemotePipeAdvertisement(PipeAdvertisement pipe)
Sets the remote PipeAdvertisement

Parameters:
pipe - PipeAdvertisement

close

public void close()
           throws IOException
Closes this pipe.

Throws:
IOException - if an I/O error occurs when closing this socket.

closePipe

protected void closePipe(boolean fastClose)
                  throws IOException
Throws:
IOException

setInputPipe

protected void setInputPipe(InputPipe inputPipe)
Sets the inputPipe attribute of the JxtaBiDiPipe object

Parameters:
inputPipe - The new inputPipe value

pipeMsgEvent

public void pipeMsgEvent(PipeMsgEvent event)
Called for each pipe message event that occurs.

Specified by:
pipeMsgEvent in interface PipeMsgListener
Parameters:
event - The event being received.

getMaxRetryTimeout

public int getMaxRetryTimeout()
Gets the Maximum Retry Timeout of the reliability layer

Returns:
The maximum retry Timeout value

setMaxRetryTimeout

public void setMaxRetryTimeout(int maxRetryTimeout)
Gets the Maximum Retry Timeout of the reliability layer

Parameters:
maxRetryTimeout - The new maximum retry timeout value
Throws:
IllegalArgumentException - if maxRetryTimeout exceeds jxta platform maximum retry timeout

getRetryTimeout

public int getRetryTimeout()
Gets the Retry Timeout of the reliability layer

Returns:
The retry Timeout value

setRetryTimeout

public void setRetryTimeout(int retryTimeout)
                     throws IOException
Sets the Retry Timeout of the underlying reliability layer . In reliable mode it is possible for this call to block trying to obtain a lock on reliable input stream

Parameters:
retryTimeout - The new retry timeout value
Throws:
IOException - if an I/O error occurs

getWindowSize

public int getWindowSize()
When in reliable mode, gets the Reliable library window size

Returns:
The windowSize value

setWindowSize

public void setWindowSize(int windowSize)
                   throws IOException
When in reliable mode, sets the Reliable library window size

Parameters:
windowSize - The new window size value
Throws:
IOException - if an I/O error occurs

processIncomingMessage

public void processIncomingMessage(Message message)
This method is invoked by the Reliablity library for each incoming data message

Parameters:
message - Incoming message

sendMessage

public boolean sendMessage(Message msg)
                    throws IOException
Send a message

Messenger

Parameters:
msg - Message to send to the remote side
Returns:
true if message was successfully enqueued
Throws:
IOException - if the underlying messenger breaks, either due to a physical address change, reliability issue.
See Also:
Message

outputPipeEvent

public void outputPipeEvent(OutputPipeEvent event)
Called when a input pipe has been located for a previously registered pipe. The event contains an OutputPipe which can be used to communicate with the remote peer.

Specified by:
outputPipeEvent in interface OutputPipeListener
Parameters:
event - the event

getDirectMessenger

protected static Messenger getDirectMessenger(PeerGroup group,
                                              PipeAdvertisement pipeAdv,
                                              PeerAdvertisement peer)
A lightweight direct messenger output pipe constructor, note the return type Since all the info needed is available, there's no need for to use the pipe service to resolve the pipe we have all we need to construct a messenger.

Parameters:
group - group context
pipeAdv - Remote Pipe Advertisement
peer - Remote Peer advertisement
Returns:
Messenger

lightweightOutputPipe

protected static Messenger lightweightOutputPipe(PeerGroup group,
                                                 PipeAdvertisement pipeAdv,
                                                 PeerAdvertisement peer)
A lightweight output pipe constructor, note the return type Since all the info needed is available, there's no need for to use the pipe service to resolve the pipe we have all we need to construct a messenger.

Parameters:
group - group context
pipeAdv - Remote Pipe Advertisement
peer - Remote Peer advertisement
Returns:
Messenger

checkCred

protected boolean checkCred(StructuredDocument cred)
Not implemented yet

Parameters:
cred - the credential document
Returns:
always returns true

getListener

@Deprecated
public PipeMsgListener getListener()
Deprecated. use getMessageListener instead

Returns the message listener for this pipe

Returns:
PipeMsgListener

getMessageListener

public PipeMsgListener getMessageListener()
Returns the message listener for this pipe

Returns:
PipeMsgListener

setListener

@Deprecated
public void setListener(PipeMsgListener msgListener)
Deprecated. use setMessageListener instead

Sets message listener for a pipe spawned by the JxtaServerPipe. There is a window where a message could arrive prior to listener being registered therefore a message queue is created to queue messages, once a listener is registered these messages will be dequeued by calling the listener until the queue is empty

Parameters:
msgListener - New value of property listener.

setMessageListener

public void setMessageListener(PipeMsgListener msgListener)
Sets message listener for a pipe spawned by the JxtaServerPipe. There is a window where a message could arrive prior to listener being registered therefore a message queue is created to queue messages, once a listener is registered these messages will be dequeued by calling the listener until the queue is empty.

Sending messages vis sendMessage(Message) from within a PipeMsgListener may result in a deadlock due to contention between the sending and receiving portions of BiDi pipes.

Parameters:
msgListener - New value of property listener.

setListener

@Deprecated
public void setListener(PipeEventListener eventListener)
Deprecated. use setPipeEventListener instead

Sets a Pipe event listener, set listener to null to unset the listener

Parameters:
eventListener - New value of property listener.

setPipeEventListener

public void setPipeEventListener(PipeEventListener eventListener)
Sets a Pipe event listener, set listener to null to unset the listener

Parameters:
eventListener - New value of property listener.

getPipeEventListener

public PipeEventListener getPipeEventListener()
Returns the Pipe event listener for this pipe

Returns:
PipeMsgListener

setPipeStateListener

public void setPipeStateListener(PipeStateListener stateListener)
Sets a Pipe state listener, set listener to null to unset the listener

Parameters:
stateListener - New value of property listener.

getPipeStateListener

public PipeStateListener getPipeStateListener()
Returns the Pipe state listener for this pipe

Returns:
PipeMsgListener

getMessage

public Message getMessage(int timeout)
                   throws InterruptedException
Gets a message from the queue. If no Object is immediately available, then wait the specified amount of time for a message to be inserted.

Parameters:
timeout - Amount of time to wait in milliseconds for an object to be available. Per Java convention, a timeout of zero (0) means wait an infinite amount of time. Negative values mean do not wait at all.
Returns:
The next message in the queue. if a listener is registered calls to this method will return null
Throws:
InterruptedException - if the operation is interrupted before the timeout interval is completed.

getPipeAdvertisement

public PipeAdvertisement getPipeAdvertisement()
Returns the Assigned PipeAdvertisement

Returns:
the Assigned PipeAdvertisement

finalize

protected void finalize()
                 throws Throwable

Closes the JxtaBiDiPipe.

Overrides:
finalize in class Object
Throws:
Throwable

JXSE