com.bigdata.service.jini.master
Class ResourceBufferTask<H extends ResourceBufferStatistics<L,HS>,E extends Serializable,S extends ResourceBufferSubtask,L extends ClientLocator,HS extends ResourceBufferSubtaskStatistics>

java.lang.Object
  extended by com.bigdata.util.concurrent.AbstractHaltableProcess
      extended by com.bigdata.service.ndx.pipeline.AbstractMasterTask<H,E,S,L>
          extended by com.bigdata.service.ndx.pipeline.AbstractPendingSetMasterTask<H,E,S,L>
              extended by com.bigdata.service.jini.master.ResourceBufferTask<H,E,S,L,HS>
Type Parameters:
H - The generic type of the value returned by Callable.call() for the master.
E - The generic type of the elements in the chunks stored in the BlockingBuffer.
S - The generic type of the subtask implementation class.
L - The generic type of the key used to lookup a subtask in the internal map (must be unique and must implement hashCode() and equals() per their contracts).
HS - The generic type of the value returned by Callable.call() for the subtask.
All Implemented Interfaces:
INotifyOutcome<E,L>, IMasterTask<E,H>, Remote, Callable<H>
Direct Known Subclasses:
ResourceBufferTask.M

public abstract class ResourceBufferTask<H extends ResourceBufferStatistics<L,HS>,E extends Serializable,S extends ResourceBufferSubtask,L extends ClientLocator,HS extends ResourceBufferSubtaskStatistics>
extends AbstractPendingSetMasterTask<H,E,S,L>

Task drains a BlockingBuffer containing resources (really, resource identifiers) to be processed by the clients and uses hash partitioning to assign the resources to client tasks for processing.

If the task is interrupted, it will refuse additional writes by closing its BlockingBuffer and will cancel any sub-tasks and discard any buffered writes.

Version:
$Id$
Author:
Bryan Thompson
TODO:
Isolate the pending set buffer logic and write unit tests for it. This should also cover the acceleration of the final tasks during normal shutdown.

Nested Class Summary
static class ResourceBufferTask.M<E extends Serializable>
          Concrete master hides most of the generic types leaving you with only those that are meaningful to parameterize.
 
Field Summary
protected  int sinkChunkSize
           
protected  long sinkChunkTimeoutNanos
           
protected  int sinkQueueCapacity
           
protected  MappedTaskMaster taskMaster
           
 
Fields inherited from class com.bigdata.service.ndx.pipeline.AbstractPendingSetMasterTask
log, masterProxy
 
Fields inherited from class com.bigdata.service.ndx.pipeline.AbstractMasterTask
buffer, sinkIdleTimeoutNanos, sinkPollTimeoutNanos, src, stats
 
Constructor Summary
ResourceBufferTask(MappedTaskMaster taskMaster, long sinkIdleTimeoutNanos, long sinkPollTimeoutNanos, int sinkQueueCapacity, int sinkChunkSize, long sinkChunkTimeoutNanos, H stats, BlockingBuffer<E[]> buffer)
          
 
Method Summary
protected  Map<E,Collection<L>> getPendingMap()
          Return the pending map.
protected  void handleChunk(E[] chunk, boolean reopen)
          Hash partitions the chunk among the clients.
protected  Map<E,Collection<L>> newPendingMap()
          Return a new pending map instance.
protected  S newSubtask(L locator, BlockingBuffer<E[]> out)
          Factory for a new subtask.
protected  BlockingBuffer<E[]> newSubtaskBuffer()
          Factory for a new buffer for a subtask.
protected  Future<HS> submitSubtask(S subtask)
          Submit the subtask to an Executor.
 String toString()
           
protected  void willShutdown()
          Accelerate shutdown protocol by mapping the pending set across the remaining clients.
 
Methods inherited from class com.bigdata.service.ndx.pipeline.AbstractPendingSetMasterTask
addPending, didFail, didSucceed, error, getFederation, getPendingSetSize, nothingPending, removePending, success
 
Methods inherited from class com.bigdata.service.ndx.pipeline.AbstractMasterTask
addToOutputBuffer, call, getBuffer, getRedirectQueueSize, getSink, getStats, mapOperationOverSubtasks, moveSinkToFinishedQueueAtomically, notifySubtaskDone, redirectChunk
 
Methods inherited from class com.bigdata.util.concurrent.AbstractHaltableProcess
halt, halted
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 

Field Detail

taskMaster

protected final MappedTaskMaster taskMaster

sinkQueueCapacity

protected final int sinkQueueCapacity

sinkChunkSize

protected final int sinkChunkSize

sinkChunkTimeoutNanos

protected final long sinkChunkTimeoutNanos
Constructor Detail

ResourceBufferTask

public ResourceBufferTask(MappedTaskMaster taskMaster,
                          long sinkIdleTimeoutNanos,
                          long sinkPollTimeoutNanos,
                          int sinkQueueCapacity,
                          int sinkChunkSize,
                          long sinkChunkTimeoutNanos,
                          H stats,
                          BlockingBuffer<E[]> buffer)

Parameters:
sinkQueueCapacity - The capacity of the internal queue for the per-sink output buffer.
sinkChunkSize - The desired size of the chunks written that will be written by the sink.
sinkChunkTimeoutNanos - The maximum amount of time in nanoseconds that a sink will combine smaller chunks so that it can satisfy the desired sinkChunkSize.
Method Detail

getPendingMap

protected Map<E,Collection<L>> getPendingMap()
Description copied from class: AbstractPendingSetMasterTask
Return the pending map. The pending map reflects the resources which are in process. Resources are added to this collection when they are posted to a client for processing and are removed when the client asynchronously reports success or failure for the resource.

Specified by:
getPendingMap in class AbstractPendingSetMasterTask<H extends ResourceBufferStatistics<L,HS>,E extends Serializable,S extends ResourceBufferSubtask,L extends ClientLocator>

toString

public String toString()
Overrides:
toString in class Object

willShutdown

protected void willShutdown()
                     throws InterruptedException
Accelerate shutdown protocol by mapping the pending set across the remaining clients. Each resource in the pending set is assigned to multiple clients. The assignments are made in random orderings to minimize the likelihood that each client will perform the same work. FIXME Finish up should use round robin multiple assignment of resources to clients to get done faster.

Overrides:
willShutdown in class AbstractMasterTask<H extends ResourceBufferStatistics<L,HS>,E extends Serializable,S extends ResourceBufferSubtask,L extends ClientLocator>
Throws:
InterruptedException

handleChunk

protected void handleChunk(E[] chunk,
                           boolean reopen)
                    throws InterruptedException
Hash partitions the chunk among the clients.

Specified by:
handleChunk in class AbstractMasterTask<H extends ResourceBufferStatistics<L,HS>,E extends Serializable,S extends ResourceBufferSubtask,L extends ClientLocator>
Parameters:
chunk - A chunk.
reopen - When false it is an error if the output buffer has been closed. When true the output buffer will be (re-)opened as necessary. This will be false when invoked by AbstractMasterTask.call() (since the output buffers are not closed until the master's buffer is closed) and should be true if you are handling redirects.
Throws:
InterruptedException

newSubtask

protected S newSubtask(L locator,
                       BlockingBuffer<E[]> out)
Description copied from class: AbstractMasterTask
Factory for a new subtask.

Specified by:
newSubtask in class AbstractMasterTask<H extends ResourceBufferStatistics<L,HS>,E extends Serializable,S extends ResourceBufferSubtask,L extends ClientLocator>
Parameters:
locator - The unique key for the subtask.
out - The BlockingBuffer on which the master will write for that subtask.
Returns:
The subtask.
TODO:
In order to handle client failure once the master is shutting down we must start a NEW client task. Prior to that it is sufficient to redistribute the work among the remaining clients (in this case we could also start a replacement client task).

newSubtaskBuffer

protected BlockingBuffer<E[]> newSubtaskBuffer()
Factory for a new buffer for a subtask.

The queue capacity, chunk size and chunk timeout are taken from the ctor parameters.

Specified by:
newSubtaskBuffer in class AbstractMasterTask<H extends ResourceBufferStatistics<L,HS>,E extends Serializable,S extends ResourceBufferSubtask,L extends ClientLocator>

submitSubtask

protected Future<HS> submitSubtask(S subtask)
Description copied from class: AbstractMasterTask
Submit the subtask to an Executor.

Specified by:
submitSubtask in class AbstractMasterTask<H extends ResourceBufferStatistics<L,HS>,E extends Serializable,S extends ResourceBufferSubtask,L extends ClientLocator>
Parameters:
subtask - The subtask.
Returns:
The Future.

newPendingMap

protected Map<E,Collection<L>> newPendingMap()
Description copied from class: AbstractPendingSetMasterTask
Return a new pending map instance. The size of this collection places a machine limit on the #of resources which may be processed concurrently. A BigdataMap may be used if sufficient RAM is not available.

Specified by:
newPendingMap in class AbstractPendingSetMasterTask<H extends ResourceBufferStatistics<L,HS>,E extends Serializable,S extends ResourceBufferSubtask,L extends ClientLocator>


Copyright © 2006-2011 SYSTAP, LLC. All Rights Reserved.