com.bigdata.service.ndx.pipeline
Class AbstractPendingSetMasterTask<H extends AbstractPendingSetMasterStats<L,? extends AbstractSubtaskStats>,E,S extends AbstractPendingSetSubtask,L>

java.lang.Object
  extended by com.bigdata.util.concurrent.AbstractHaltableProcess
      extended by com.bigdata.service.ndx.pipeline.AbstractMasterTask<H,E,S,L>
          extended by com.bigdata.service.ndx.pipeline.AbstractPendingSetMasterTask<H,E,S,L>
All Implemented Interfaces:
INotifyOutcome<E,L>, IMasterTask<E,H>, Remote, Callable<H>
Direct Known Subclasses:
ResourceBufferTask

public abstract class AbstractPendingSetMasterTask<H extends AbstractPendingSetMasterStats<L,? extends AbstractSubtaskStats>,E,S extends AbstractPendingSetSubtask,L>
extends AbstractMasterTask<H,E,S,L>
implements INotifyOutcome<E,L>

Extends the master task to track outstanding asynchronous operations on work items.

The clients notify the AbstractPendingSetSubtask as each operation completes. The subtask notifies the master, which then clears the entry from its #pendingMap and also clears the entry from any other subtask that had been tasked with the same work item (this permits subtasks to terminate as soon as their work is complete regardless of which subtask actually performed the work). The master will not terminate until all outstanding asynchronous operations (the pending set) are complete.

Version:
$Id: AbstractPendingSetMasterTask.java 2265 2009-10-26 12:51:06Z thompsonbry $
Author:
Bryan Thompson

Field Summary
protected static org.apache.log4j.Logger log
          Log may be used to see just success/error reporting for the master without the log information from the base class.
protected  INotifyOutcome<E,L> masterProxy
          A proxy for this class which is used by the client task to send asynchronous notifications.
 
Fields inherited from class com.bigdata.service.ndx.pipeline.AbstractMasterTask
buffer, sinkIdleTimeoutNanos, sinkPollTimeoutNanos, src, stats
 
Constructor Summary
AbstractPendingSetMasterTask(JiniFederation<?> fed, H stats, BlockingBuffer<E[]> buffer, long sinkIdleTimeoutNanos, long sinkPollTimeoutNanos)
           
 
Method Summary
protected  boolean addPending(E e, AbstractPendingSetSubtask sink, L locator)
          Add a work item to the pending set.
protected  void didFail(E resource, Throwable cause)
          Hook provides notification if all outstanding work requests for the resource have failed.
protected  void didSucceed(E e)
          Hook provides notification the first time work for the resource has been successfully completed for any set of concurrent outstanding work requests and may be extended if necessary.
 void error(E resource, L locator, Throwable cause)
          The resource is removed from the pending set for the sink associated with that locator.
 JiniFederation<?> getFederation()
           
protected abstract  Map<E,Collection<L>> getPendingMap()
          Return the pending map.
 int getPendingSetSize()
           
protected abstract  Map<E,Collection<L>> newPendingMap()
          Return a new pending map instance.
protected  boolean nothingPending()
          Extension hook for implementations where the clients accept work for asynchronous processing and notify the master as work items completed successfully or fail.
protected  boolean removePending(E e, L locator, Throwable cause)
          Remove a work item from the pending set.
 void success(E e, L locator)
          The resource is removed from the #pendingMap and the pending set for each sink for which there is an outstanding request for that resource.
 
Methods inherited from class com.bigdata.service.ndx.pipeline.AbstractMasterTask
addToOutputBuffer, call, getBuffer, getRedirectQueueSize, getSink, getStats, handleChunk, mapOperationOverSubtasks, moveSinkToFinishedQueueAtomically, newSubtask, newSubtaskBuffer, notifySubtaskDone, redirectChunk, submitSubtask, willShutdown
 
Methods inherited from class com.bigdata.util.concurrent.AbstractHaltableProcess
halt, halted
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

log

protected static final transient org.apache.log4j.Logger log
Log may be used to see just success/error reporting for the master without the log information from the base class.


masterProxy

protected final INotifyOutcome<E,L> masterProxy
A proxy for this class which is used by the client task to send asynchronous notifications.

Constructor Detail

AbstractPendingSetMasterTask

public AbstractPendingSetMasterTask(JiniFederation<?> fed,
                                    H stats,
                                    BlockingBuffer<E[]> buffer,
                                    long sinkIdleTimeoutNanos,
                                    long sinkPollTimeoutNanos)
Parameters:
stats -
buffer -
sinkIdleTimeoutNanos -
sinkPollTimeoutNanos -
Method Detail

getFederation

public JiniFederation<?> getFederation()

getPendingMap

protected abstract Map<E,Collection<L>> getPendingMap()
Return the pending map. The pending map reflects the resources which are in process. Resources are added to this collection when they are posted to a client for processing and are removed when the client asynchronously reports success or failure for the resource.


nothingPending

protected final boolean nothingPending()
Description copied from class: AbstractMasterTask
Extension hook for implementations where the clients accept work for asynchronous processing and notify the master as work items completed successfully or fail. The AbstractMasterTask will not terminate unless this method returns true when queried while holding the AbstractMasterTask.lock. A true return indicates that there are no pending work items. The default implementation returns true.

The work perform by the client must be idempotent (it must be safe to re-perform the operation). Pending work items may be in an unknown state, the master may submit the same work item to multiple clients (where that is permitted by the locator semantics), the client task may fail before the work item is complete, and a failed client can cause work items associated with that client to be posted to another client.

To handle master termination, the pending set must track outstanding work items. Those work item should be removed from the pending set as soon as any client has successfully completed that work item (since the work is idempotent).

To handle client failure, the subtask must track the pending set for its client. If the client dies, then the subtask must handle the client by placing all pending work items for that client (including any in the chunk for the current request) onto the AbstractMasterTask.redirectQueue.

Overrides:
nothingPending in class AbstractMasterTask<H extends AbstractPendingSetMasterStats<L,? extends AbstractSubtaskStats>,E,S extends AbstractPendingSetSubtask,L>

getPendingSetSize

public final int getPendingSetSize()

addPending

protected boolean addPending(E e,
                             AbstractPendingSetSubtask sink,
                             L locator)
Add a work item to the pending set.

Note: This method is written such that a BigdataMap could be used as the implementation object. (The tuple is always updated by an insert when its value's state is changed.)

Parameters:
e - The work item.
locator - The locator of the subtask/client that will process that work item.
Returns:
true iff the pending set did not contain an entry for that work item. Since entries are cleared from the map when work is successfully complete or if all pending operations fail for a work item, a true return does not conclusively indicate a new work item.

removePending

protected boolean removePending(E e,
                                L locator,
                                Throwable cause)
Remove a work item from the pending set.

Note: This method is written such that a BigdataMap could be used as the implementation object. (The tuple is always updated by an insert when its value's state is changed.)

Parameters:
e - The work item.
locator - The subtask / client locator.
cause - null unless an error is being reported.
Returns:
true iff the work item was cleared from the pending set (present on entry but cleared on exit).
TODO:
unit tests for the add/remove pending methods since they are a bit complex internally.

newPendingMap

protected abstract Map<E,Collection<L>> newPendingMap()
Return a new pending map instance. The size of this collection places a machine limit on the #of resources which may be processed concurrently. A BigdataMap may be used if sufficient RAM is not available.


success

public final void success(E e,
                          L locator)
The resource is removed from the #pendingMap and the pending set for each sink for which there is an outstanding request for that resource. didSucceed(Object) will be invoked the first time a request succeeds for that resource.

Specified by:
success in interface INotifyOutcome<E,L>
Parameters:
e - The resource identifier.
locator - The client locator.

error

public final void error(E resource,
                        L locator,
                        Throwable cause)
The resource is removed from the pending set for the sink associated with that locator. If there are no more outstanding requests for that resource in the #pendingMap then the resource is removed from the pending map as well. didFail(Object, Throwable) will be invoked if no requests remain for that resource in the #pendingMap.

Specified by:
error in interface INotifyOutcome<E,L>
Parameters:
resource - The resource identifier.
locator - The client locator.
cause - The exception.

didSucceed

protected void didSucceed(E e)
Hook provides notification the first time work for the resource has been successfully completed for any set of concurrent outstanding work requests and may be extended if necessary. The final outcome for each resource is not retained. Therefore if the same resource is resubmitted after its successful completion or its failure, then this method may be invoked again for that resource. The default implementation logs the event @ INFO.


didFail

protected void didFail(E resource,
                       Throwable cause)
Hook provides notification if all outstanding work requests for the resource have failed. There may be more than one request to perform the same work. This method is not invoked until all such requests have failed and is not invoked if any of those requests succeed. Note that work requests MUST be idempotent. The default implementation logs the event @ ERROR.

Parameters:
resource - The resource.
cause - The exception.


Copyright © 2006-2009 SYSTAP, LLC. All Rights Reserved.