com.bigdata.service.ndx.pipeline
Class AbstractPendingSetSubtask<HS extends AbstractSubtaskStats,M extends AbstractPendingSetMasterTask<? extends AbstractPendingSetMasterStats<L,HS>,E,? extends AbstractPendingSetSubtask,L>,E,L>

java.lang.Object
  extended by com.bigdata.service.ndx.pipeline.AbstractSubtask<HS,M,E,L>
      extended by com.bigdata.service.ndx.pipeline.AbstractPendingSetSubtask<HS,M,E,L>
All Implemented Interfaces:
Callable<HS>
Direct Known Subclasses:
ResourceBufferSubtask

public abstract class AbstractPendingSetSubtask<HS extends AbstractSubtaskStats,M extends AbstractPendingSetMasterTask<? extends AbstractPendingSetMasterStats<L,HS>,E,? extends AbstractPendingSetSubtask,L>,E,L>
extends AbstractSubtask<HS,M,E,L>

Extended to assign chunks of work items to a remote IAsynchronousClientTask, to track the set of outstanding asynchronous operations for a specific client task (the "pending set"), and to close the client task when the sink not assign any more work to that client.

Version:
$Id: AbstractPendingSetSubtask.java 2265 2009-10-26 12:51:06Z thompsonbry $
Author:
Bryan Thompson

Field Summary
protected  IAsynchronousClientTask<?,E> clientTask
           
 
Fields inherited from class com.bigdata.service.ndx.pipeline.AbstractSubtask
buffer, lastChunkAvailableNanos, lastChunkNanos, locator, log, master, src, stats
 
Constructor Summary
AbstractPendingSetSubtask(M master, L locator, IAsynchronousClientTask<?,E> clientTask, BlockingBuffer<E[]> buffer)
          
 
Method Summary
protected  boolean addPending(E e)
           
protected  void awaitPending()
          Wait until any asynchronous processing for the subtask is done.
protected  void cancelRemoteTask(boolean mayInterruptIfRunning)
          Cancel the remote task.
protected abstract  Set<E> getPendingSet()
          Return the pending set.
 int getPendingSetSize()
           
protected  boolean handleChunk(E[] chunk)
          Submits the chunk of resources for processing by the remote client task.
protected  void notifyClientOfRedirect(L locator, Throwable cause)
          Notify the client that the locator is stale.
protected  boolean removePending(E e)
           
 
Methods inherited from class com.bigdata.service.ndx.pipeline.AbstractSubtask
call, handleRedirect, toString
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 

Field Detail

clientTask

protected final IAsynchronousClientTask<?,E> clientTask
Constructor Detail

AbstractPendingSetSubtask

public AbstractPendingSetSubtask(M master,
                                 L locator,
                                 IAsynchronousClientTask<?,E> clientTask,
                                 BlockingBuffer<E[]> buffer)

Parameters:
clientTask - The proxy for the remote client task.
Method Detail

getPendingSet

protected abstract Set<E> getPendingSet()
Return the pending set.


getPendingSetSize

public int getPendingSetSize()

cancelRemoteTask

protected final void cancelRemoteTask(boolean mayInterruptIfRunning)
                               throws InterruptedException
Description copied from class: AbstractSubtask
Cancel the remote task. This is an extension hook which is used if the remote task accepts chunks for processing and uses an asynchronous notification mechanism to indicate the success or failure of elements. The default implementation is a NOP.

Overrides:
cancelRemoteTask in class AbstractSubtask<HS extends AbstractSubtaskStats,M extends AbstractPendingSetMasterTask<? extends AbstractPendingSetMasterStats<L,HS>,E,? extends AbstractPendingSetSubtask,L>,E,L>
Throws:
InterruptedException

awaitPending

protected final void awaitPending()
                           throws InterruptedException
Description copied from class: AbstractSubtask
Wait until any asynchronous processing for the subtask is done. This is an extension hook which is used if the remote task accepts chunks for processing and uses an asynchronous notification mechanism to indicate the success or failure of elements. The default implementation is a NOP.

Overrides:
awaitPending in class AbstractSubtask<HS extends AbstractSubtaskStats,M extends AbstractPendingSetMasterTask<? extends AbstractPendingSetMasterStats<L,HS>,E,? extends AbstractPendingSetSubtask,L>,E,L>
Throws:
InterruptedException

addPending

protected final boolean addPending(E e)

removePending

protected final boolean removePending(E e)

handleChunk

protected boolean handleChunk(E[] chunk)
                       throws ExecutionException,
                              InterruptedException,
                              IOException
Submits the chunk of resources for processing by the remote client task. Clients should accept resources for asynchronous processing, notifying the sink as resources succeed or fail.

Specified by:
handleChunk in class AbstractSubtask<HS extends AbstractSubtaskStats,M extends AbstractPendingSetMasterTask<? extends AbstractPendingSetMasterStats<L,HS>,E,? extends AbstractPendingSetSubtask,L>,E,L>
Parameters:
chunk - A chunk of resources to be processed.
Returns:
true iff the client will not accept additional chunks.
Throws:
IOException - RMI error.
ExecutionException
InterruptedException

notifyClientOfRedirect

protected void notifyClientOfRedirect(L locator,
                                      Throwable cause)
Description copied from class: AbstractSubtask
Notify the client that the locator is stale.

Specified by:
notifyClientOfRedirect in class AbstractSubtask<HS extends AbstractSubtaskStats,M extends AbstractPendingSetMasterTask<? extends AbstractPendingSetMasterStats<L,HS>,E,? extends AbstractPendingSetSubtask,L>,E,L>
Parameters:
locator - The locator.
cause - The cause.
TODO:
This should handle the redirect of the pendingSet if the remote client task dies. To be robust, we need to notice client death even if it occurs when we are not invoking client#accept(chunk). Doing that will require some changes to the logic in this class, perhaps only in awaitPending() which might have to poll the future of the client to make sure that it is still alive (or check w/ zk but zk can disconnect clients overly eagerly)., This class ALSO needs to handle the resubmit of the resources associated with the current submit. That will occur only when the remote client task dies while invoking client#accept(chunk). The AbstractPendingSetMasterTask needs to start a new client task for the given ClientLocator, ideally on a different IRemoteExecutor service. If there is no such available service then it could multiplex multiple client#s onto the same client, essentially doubling the load for some client. Or we could hash partition based on the #of remaining clients, which would distribute the load evenly over the remaining clients.


Copyright © 2006-2009 SYSTAP, LLC. All Rights Reserved.