JXTA

net.jxta.endpoint
Class AsyncChannelMessenger

java.lang.Object
  extended by net.jxta.util.AbstractSimpleSelectable
      extended by net.jxta.endpoint.AbstractMessenger
          extended by net.jxta.endpoint.ChannelMessenger
              extended by net.jxta.endpoint.AsyncChannelMessenger
All Implemented Interfaces:
Messenger, SimpleSelectable

public abstract class AsyncChannelMessenger
extends ChannelMessenger

Extends Channel Messenger behaviour to provide asynchronous message sending via queuing.


Nested Class Summary
protected static class AsyncChannelMessenger.PendingMessage
          The representation of a queued message.
 
Nested classes/interfaces inherited from interface net.jxta.util.SimpleSelectable
SimpleSelectable.IdentityReference
 
Field Summary
 
Fields inherited from class net.jxta.endpoint.ChannelMessenger
InsertedServicePrefix, origService, origServiceParam
 
Fields inherited from class net.jxta.endpoint.AbstractMessenger
DEFAULT_MTU, dstAddress
 
Fields inherited from class net.jxta.util.AbstractSimpleSelectable
identityReference
 
Fields inherited from interface net.jxta.endpoint.Messenger
ANYSTATE, BREAKING, BROKEN, CLOSED, CLOSING, CONNECTED, DISCONNECTED, DISCONNECTING, IDLE, RECONCLOSING, RECONNECTING, RECONSATURATED, RESOLCLOSING, RESOLPENDING, RESOLSATURATED, RESOLVED, RESOLVING, SATURATED, SENDING, SENDINGSATURATED, TERMINAL, UNRESOLVABLE, UNRESOLVED, UNRESOLVING, USABLE
 
Constructor Summary
AsyncChannelMessenger(EndpointAddress baseAddress, PeerGroupID redirection, String origService, String origServiceParam, int queueSize, boolean connected)
          Create a new AsyncChannelMessenger.
 
Method Summary
 void close()
          Close this messenger after processing any pending messages.
protected abstract  void connectImpl()
          We invoke this method to be placed on the list of channels that are waiting for resolution.
protected  void down()
          The implementation invokes this method when it becomes broken.
 Messenger getChannelMessenger(PeerGroupID redirection, String service, String serviceParam)
          If applicable, returns another messenger that will send messages to the same destination address than this one, but with the specified default service and serviceParam, possibly rewriting addresses to ensure delivery through the specified redirection.
 int getState()
          Returns the current state.
protected  AsyncChannelMessenger.PendingMessage peek()
          Here, we behave like a queue to the shared messenger.
protected  boolean poll()
          One message done.
protected abstract  void resolPendingImpl()
          This is invoked to inform the implementation that this channel is now in the resolPending or resolSaturated state.
 void resolve()
          Force the messenger to start resolving if it is not resolved yet.
 void sendMessageB(Message msg, String rService, String rServiceParam)
          Simple sending: blocks until the message was accepted for sending or the messenger is not Messenger.USABLE; whichever occurs first.
 boolean sendMessageN(Message msg, String rService, String rServiceParam)
          Sends a message to the destination specified at construction.
protected  int size()
          Returns the number of elements in this collection.
protected abstract  void startImpl()
          We invoke this method to be placed on the list of channels that have message to send.
protected  void up()
          The implementation will invoke this method when it becomes resolved, after connectImpl was invoked.
 
Methods inherited from class net.jxta.endpoint.ChannelMessenger
effectiveParam, effectiveService, sendMessage, setMessageWatcher
 
Methods inherited from class net.jxta.endpoint.AbstractMessenger
flush, getDestinationAddress, getDestinationAddressObject, getMTU, isClosed, isIdle, isSynchronous, itemChanged, sendMessage, sendMessage, setStateLock, toString, waitState
 
Methods inherited from class net.jxta.util.AbstractSimpleSelectable
getIdentityReference, haveListeners, notifyChange, register, registerListener, unregister, unregisterListener
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 
Methods inherited from interface net.jxta.endpoint.Messenger
flush, getDestinationAddress, getDestinationAddressObject, getLogicalDestinationAddress, getMTU, isClosed, isIdle, isSynchronous, sendMessage, sendMessage, waitState
 
Methods inherited from interface net.jxta.util.SimpleSelectable
getIdentityReference, itemChanged, register, unregister
 

Constructor Detail

AsyncChannelMessenger

public AsyncChannelMessenger(EndpointAddress baseAddress,
                             PeerGroupID redirection,
                             String origService,
                             String origServiceParam,
                             int queueSize,
                             boolean connected)
Create a new AsyncChannelMessenger.

Parameters:
baseAddress - The network address messages go to; regardless of service, param, or group.
redirection - Group to which the messages must be redirected. This is used to implement the automatic group segregation which has become a de-facto standard. If not null, the unique portion of the specified groupID is prepended with ChannelMessenger.InsertedServicePrefix and inserted in every message's destination address in place of the the original service name, which gets shifted into the beginning of the service parameter. The opposite is done on arrival to restore the original destination address before the message is delivered to the listener in the the specified group. Messages that already bear a group redirection are not affected.
origService - The default destination service for messages sent without specifying a different service.
origServiceParam - The default destination service parameter for messages sent without specifying a different service parameter.
queueSize - the queue size that channels should have.
connected - true if the channel is created in the connected state.
Method Detail

close

public final void close()
Close this messenger after processing any pending messages. This method is not blocking. Upon return, the messenger will be in one of the non Messenger.USABLE states, which means that no message may be sent through it. Any other effect of this method, such as an underlying connection being closed, or all pending messages being processed, may be deferred indefinitely. When the messenger has completely processed the closure request, it will be in one of the Messenger.TERMINAL states (which are also Messenger.IDLE states). Therefore, if one is interested in the outcome of the closure, one may wait for the messenger to be in a Messenger.TERMINAL or Messenger.IDLE state, and check which it is. Messenger.CLOSED denotes success (all outstanding messages have been sent), as opposed to Messenger.UNRESOLVABLE or Messenger.BROKEN.


sendMessageN

public final boolean sendMessageN(Message msg,
                                  String rService,
                                  String rServiceParam)
Sends a message to the destination specified at construction. If a service name and service param are specified, they will replace those specified at construction for the purpose of sending this message only.

This method is identical to Messenger.sendMessage(Message,String,String), except that it does not throw an exception. The invoker has to retrieve a detailed status from the message if needed.

Error Handling:

WARNING: The Message object should not be reused or modified until completely processed. Concurrent modification of a message while a messenger is sending the message will produce incorrect and unpredictable results. If completion is not monitored, the message should never be reused. If necessary, a clone of the message may be provided to Messenger.sendMessageN(net.jxta.endpoint.Message, java.lang.String, java.lang.String):

     messenger.sendMessageN( (Message) myMessage.clone(), theService, theParam );
 

There is no guarantee that a message successfully sent will actually be received.

Parameters:
msg - The message to send.
rService - Optionally replaces the service in the destination address. If null then the destination address's default service will be used. If the empty string ("") is used then no service is included in the destination address.
rServiceParam - Optionally replaces the service param in the destination address. If null then the destination address's default service parameter will be used. If the empty string ("") is used then no service param is included in the destination address.
Returns:
boolean true if the message has been accepted for sending, otherwise false.

sendMessageB

public final void sendMessageB(Message msg,
                               String rService,
                               String rServiceParam)
                        throws IOException
Simple sending: blocks until the message was accepted for sending or the messenger is not Messenger.USABLE; whichever occurs first. If a service name and service param are specified, they will replace those specified at construction for the purpose of sending this message only.

Error Handling:

WARNING: The Message object should not be reused or modified until completely processed. Concurrent modification of a message while a messenger is sending the message will produce incorrect and unpredictable results. If completion is not monitored, the message should never be reused. If necessary, a clone of the message may be provided to Messenger.sendMessageB(net.jxta.endpoint.Message, java.lang.String, java.lang.String):

     messenger.sendMessageB( (Message) myMessage.clone(), theService, theParam );
 

There is no guarantee that a message successfully sent will actually be received.

Parameters:
msg - the message
rService - Optionally replaces the service in the destination address. If null then the destination address's default service will be used. If the empty string ("") is used then no service is included in the destination address.
rServiceParam - Optionally replaces the service param in the destination address. If null then the destination address's default service parameter will be used. If the empty string ("") is used then no service param is included in the destination address.
Throws:
IOException - Thrown if the message cannot be sent.

resolve

public final void resolve()
Force the messenger to start resolving if it is not resolved yet. Any attempt at sending a message has the same effect, but the message may fail as a result, depending upon the method used.


getState

public final int getState()
Returns the current state.

Returns:
one of the legal discrete state values.

getChannelMessenger

public final Messenger getChannelMessenger(PeerGroupID redirection,
                                           String service,
                                           String serviceParam)
If applicable, returns another messenger that will send messages to the same destination address than this one, but with the specified default service and serviceParam, possibly rewriting addresses to ensure delivery through the specified redirection. This is not generally useful to applications and most messengers will return null. This method is needed by the EndpointService when interacting with Messengers provided by Transport modules. If you are not implementing a Transport module, then you can ignore this method.

Important: The channel so obtained is not configured to support the Messenger.sendMessage(Message,String,String, OutgoingMessageEventListener) legacy method. If use of this method is desired, ChannelMessenger.setMessageWatcher(net.jxta.endpoint.ListenerAdaptor) must be used first.

By default a channel refuses to make a channel.

Specified by:
getChannelMessenger in interface Messenger
Overrides:
getChannelMessenger in class ChannelMessenger
Parameters:
redirection - The requested redirection. The resulting channel messenger will use this to force delivery of the message only in the specified group (or possibly descendents, but not parents). If null the local group is assumed. This redirection is applied only to messages that are sent to a service name and service param that do not imply a group redirection.
service - The service to which the resulting channel will send messages, when they are not sent to a specified service.
serviceParam - The service parameter that the resulting channel will use to send messages, when no parameter is specified.
Returns:
a channelMessenger as specified.
See Also:
MessageSender.getMessenger(EndpointAddress,Object)

up

protected void up()
The implementation will invoke this method when it becomes resolved, after connectImpl was invoked.


down

protected void down()
The implementation invokes this method when it becomes broken.


peek

protected AsyncChannelMessenger.PendingMessage peek()
Here, we behave like a queue to the shared messenger. When we report being empty, though, we're automatically removed from the active queues list. We'll go back there the next time we have something to send by calling startImpl.

Returns:
pending message

size

protected int size()
Returns the number of elements in this collection. If this collection contains more than Integer.MAX_VALUE elements, returns Integer.MAX_VALUE.

Returns:
the number of elements in this collection

poll

protected boolean poll()
One message done. Update the saturated/etc state accordingly.

Returns:
true if there are more messages after the one we removed.

startImpl

protected abstract void startImpl()
We invoke this method to be placed on the list of channels that have message to send.

NOTE that it is the shared messenger responsibility to synchronize so that we cannot be added to the active list just before we get removed due to reporting an empty queue in parallel. So, if we report an empty queue and have a new message to send before the shared messenger removes us form the active list, startImpl will block until the removal is done. Then we'll be added back.

If it cannot be done, it means that the shared messenger is no longer usable. It may call down() in sequence. Out of defensiveness, it should do so without holding its lock.


connectImpl

protected abstract void connectImpl()
We invoke this method to be placed on the list of channels that are waiting for resolution.

If it cannot be done, it means that the shared messenger is no longer usable. It may call down() in sequence. Out of defensiveness, it should do so without holding its lock. If the messenger is already resolved it may call up() in sequence. Same wisdom applies. It is a good idea to create channels in the resolved state if the shared messenger is already resolved. That avoids this extra contortion.


resolPendingImpl

protected abstract void resolPendingImpl()
This is invoked to inform the implementation that this channel is now in the resolPending or resolSaturated state. This is specific to this type of channels. The shared messenger must make sure that this channel remains strongly referenced, even though it is not resolved, because there are messages in it. It is valid for an application to let go of a channel after sending a message, even if the channel is not yet resolved. The message will go if/when the channel resolves. This method may be invoked redundantly and even once the channel is no longer among the one awaiting resolution. The implementation must be careful to ignore such calls.


JXSE