com.bigdata.service.ndx.pipeline
Class AbstractMasterTask<H extends AbstractMasterStats<L,? extends AbstractSubtaskStats>,E,S extends AbstractSubtask,L>

java.lang.Object
  extended by com.bigdata.util.concurrent.AbstractHaltableProcess
      extended by com.bigdata.service.ndx.pipeline.AbstractMasterTask<H,E,S,L>
Type Parameters:
H - The generic type of the value returned by Callable.call() for the master.
E - The generic type of the elements in the chunks stored in the BlockingBuffer.
S - The generic type of the subtask implementation class.
L - The generic type of the locator object used to lookup a subtask in the internal map (must be unique and must implement hashCode() and equals() per their contracts).
All Implemented Interfaces:
IMasterTask<E,H>, Callable<H>
Direct Known Subclasses:
AbstractPendingSetMasterTask, IndexWriteTask

public abstract class AbstractMasterTask<H extends AbstractMasterStats<L,? extends AbstractSubtaskStats>,E,S extends AbstractSubtask,L>
extends AbstractHaltableProcess
implements Callable<H>, IMasterTask<E,H>

Abstract base class for a master task which consumes chunks of elements written onto a BlockingBuffer and distributes those chunks to subtasks according to some abstraction which is not defined by this class.

Design discussion

The asynchronous write API exposes a blocking buffer to the application which accepts concurrent writes of KVO[] chunks, in which each KVO represents a tuple to be written on a scale-out index. The buffer is drained by a master task, which transfers chunks to sinks tasks, each of which writes on a specific index partition.

The master is provisioned with a CTOR which is used to convert the KVOs into unsigned byte[] keys and byte[] values for writes on the scale-out index. The master may be provisioned with a mechanism to filter out duplicate tuples.

Writes on the index partitions are asynchronous with respect to the application. However, a KVOC / KVOLatch combination can be used to coordinate notification when some set of tuples of interest have been successfully written onto the scale-out index. This combination can also be used to pass back values from the write operation if they are assigned by side-effect onto the KVO.obj reference.

The asynchronous write implementation is divided into a master, with an input queue and a redirect queue, and sinks, each of which has an input queue and writes chunks onto a specific index partition for the scale-out index. The input queues for the master and the sinks are bounded. The redirect input queue is unbounded. The master and each sink is assigned its own worker thread.

The master transfers chunks from its input queue to the sinks. It polls the redirect queue for a chunk. If that queue was empty, then it polls a chunk from the input queue. If no chunks are available, it needs to check again. The master stops polling the input queue when the input queue is closed, but it continues to drain the redirect queue until all sinks are done or the master is canceled.

Note: The requirement for polling arises because: (a) we are not coordinating signals for the arrival of a chunk on the input or redirect queues; and (b) a chunk can be redirected at any time if there is an outstanding write by a sink on an index partition.

The atomic decision to terminate the master is made using a lock. The lock is specific to the life cycle of the sinks. The lock is held when a sink is created. When a sink terminates, its last action is to grab the lock and signal the subtaskDone Condition. The master terminates when, while holding the lock, it observes that no sinks are running AND the redirect queue is empty. Since chunks are placed onto the redirect queue by sinks (and there are no sinks running) and by the master (which is not issuing a redirect since it is running its termination logic) these criteria are sufficient for termination. However, the sink must ensure that its buffer is closed before it terminates, even if it terminates by exception, so that an attempt to transfer a chunk to the sink will not block forever.

Once the master is holding a chunk, it splits the chunk into a set of dense chunks correlated with the locators of the index partitions on which those chunks will be written. The split operation is NOT atomic, but it is consistent in the following sense. If a Split is identified based on old information, then the chunk will be directed to an index partition which no longer exists. An attempt to write on that index partition will result in a stale locator exception, which is handled.

Once the chunk has been split, the split chunks are transferred to the appropriate sink(s). Since the master is not holding any locks, a blocking put() may be used to transfer the chunk to sink.

The sink drain chunks from its input queue. If the input queue is empty and the idle timeout expires before a chunk is transferred to the input queue, then the sink will be asynchronously closed and an IdleTimeoutException will be set on the input queue. If the master attempts to transfer a chunk to the sink's input queue after the idle timeout, then an exception wrapping the idle timeout exception will be thrown. The master handles the wrapped idle timeout exception by re-opening the sink and will retry the transfer of the chunk to the (new) sink's input queue. After the sink closes it's input queue by an idle timeout, it will continue to drain the input queue until it is empty, at which point the sink will terminate (this handles the case where the master concurrently transferred a chunk to the sink's input queue before it was closed by the idle time out).

The sink combines chunks drained from its input queue until the target chunk size for a write is achieved or until the chunk timeout for the sink is exceeded. The sink then writes on the index partition corresponding to its locator. This write occurs in the thread assigned to the sink and the sink will block during the write request.

If a stale locator exception is received by the sink in response to a write, it will: (a) notify the client of the stale locator exception; (b) close the input queue, setting the stale locator exception as the cause; (c) place the chunk for that write onto the master's (unbounded) redirect queue; and (d) drain its input queue and transfer the chunks to the master's redirect queue.

If the master attempts to transfer a chunk to the input queue for a sink which has closed its input queue in response to a stale locator exception, then an exception will be thrown with the stale locator exception as the inner cause. The master will trap that exception and place the chunk on the redirect queue instead.

If the sink RMI is successful, the sink will invoke the optional result handler and touch each tuple in the chunk using KVO#done(). These protocols can be used to pass results from asynchronous writes back to the application.

Version:
$Id: AbstractMasterTask.java 2265 2009-10-26 12:51:06Z thompsonbry $
Author:
Bryan Thompson
See Also:
ISplitter
TODO:
Update javadoc to reflect that the master no longer waits for a closed sink in getSink(Object, boolean) but instead places the sink onto the finishedSubtaskQueue.

Field Summary
protected  BlockingBuffer<E[]> buffer
          The top-level buffer on which the application is writing.
protected static org.apache.log4j.Logger log
           
protected  long sinkIdleTimeoutNanos
          The timeout in nanoseconds before closing an idle output sink.
protected  long sinkPollTimeoutNanos
          The time in nanoseconds that the sink will wait inside of the IAsynchronousIterator when it polls the iterator for a chunk.
protected  IAsynchronousIterator<E[]> src
          The iterator draining the buffer.
 H stats
          Statistics for this (and perhaps other) masters.
 
Constructor Summary
AbstractMasterTask(H stats, BlockingBuffer<E[]> buffer, long sinkIdleTimeoutNanos, long sinkPollTimeoutNanos)
           
 
Method Summary
protected  void addToOutputBuffer(L locator, E[] a, int fromIndex, int toIndex, boolean reopen)
          Resolves the output buffer onto which the split must be written and adds the data to that output buffer.
 H call()
           
 BlockingBuffer<E[]> getBuffer()
          The top-level buffer on which the application is writing.
 int getRedirectQueueSize()
          The #of chunks on the master's redirectQueue.
protected  S getSink(L locator, boolean reopen)
          Return the sink for the locator.
 H getStats()
          The statistics.
protected abstract  void handleChunk(E[] chunk, boolean reopen)
          Handle the next chunk of elements from the buffer.
 void mapOperationOverSubtasks(SubtaskOp<S> op)
          Maps an operation across the subtasks.
protected  void moveSinkToFinishedQueueAtomically(L locator, AbstractSubtask sink)
          Transfer a sink from sinks to finishedSubtaskQueue.
protected abstract  S newSubtask(L locator, BlockingBuffer<E[]> out)
          Factory for a new subtask.
protected abstract  BlockingBuffer<E[]> newSubtaskBuffer()
          Factory for a new buffer for a subtask.
protected  boolean nothingPending()
          Extension hook for implementations where the clients accept work for asynchronous processing and notify the master as work items completed successfully or fail.
protected  void notifySubtaskDone(AbstractSubtask subtask)
          Notify the master that a subtask is done.
protected  void redirectChunk(E[] chunk)
          Places a chunk onto the master's redirectQueue.
protected abstract  Future<? extends AbstractSubtaskStats> submitSubtask(S subtask)
          Submit the subtask to an Executor.
protected  void willShutdown()
          Extension hook invoked when the master's buffer is exhausted by awaitAll().
 
Methods inherited from class com.bigdata.util.concurrent.AbstractHaltableProcess
halt, halted
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

log

protected static final transient org.apache.log4j.Logger log

buffer

protected final BlockingBuffer<E[]> buffer
The top-level buffer on which the application is writing.


src

protected final IAsynchronousIterator<E[]> src
The iterator draining the buffer.

Note: DO NOT close this iterator from within call() as that would cause this task to interrupt itself!


stats

public final H extends AbstractMasterStats<L,? extends AbstractSubtaskStats> stats
Statistics for this (and perhaps other) masters.


sinkIdleTimeoutNanos

protected final long sinkIdleTimeoutNanos
The timeout in nanoseconds before closing an idle output sink.


sinkPollTimeoutNanos

protected final long sinkPollTimeoutNanos
The time in nanoseconds that the sink will wait inside of the IAsynchronousIterator when it polls the iterator for a chunk. If this value is too large then the sink will block for noticeable lengths of time and will be less responsive to interrupts. Something in the 10s of milliseconds is appropriate.

Constructor Detail

AbstractMasterTask

public AbstractMasterTask(H stats,
                          BlockingBuffer<E[]> buffer,
                          long sinkIdleTimeoutNanos,
                          long sinkPollTimeoutNanos)
Parameters:
stats - Statistics for the master.
buffer - The buffer on which data is written by the application and from which it is drained by the master.
sinkIdleTimeoutNanos - The time in nanoseconds after which an idle sink will be closed. Any buffered writes are flushed when the sink is closed. This must be GTE the sinkChunkTimeout otherwise the sink will decide that it is idle when it was just waiting for enough data to prepare a full chunk.
sinkPollTimeoutNanos - The time in nanoseconds that the sink will wait inside of the IAsynchronousIterator when it polls the iterator for a chunk. If this value is too large then the sink will block for noticeable lengths of time and will be less responsive to interrupts. Something in the 10s of milliseconds is appropriate.
TODO:
sinkQueueCapacity, sinkChunkSize, and sinkChunkTimeoutNanos should be arguments for this class and a default newSubtaskBuffer() implementation should be provided. The unit tests for the AbstractMasterTask need to be updated for that change, as do the other derived classes.
Method Detail

getRedirectQueueSize

public final int getRedirectQueueSize()
The #of chunks on the master's redirectQueue.


redirectChunk

protected final void redirectChunk(E[] chunk)
                            throws InterruptedException
Places a chunk onto the master's redirectQueue.

Parameters:
chunk - The chunk.
Throws:
InterruptedException

getBuffer

public BlockingBuffer<E[]> getBuffer()
Description copied from interface: IMasterTask
The top-level buffer on which the application is writing.

Specified by:
getBuffer in interface IMasterTask<E,H extends AbstractMasterStats<L,? extends AbstractSubtaskStats>>

mapOperationOverSubtasks

public void mapOperationOverSubtasks(SubtaskOp<S> op)
                              throws InterruptedException,
                                     ExecutionException
Maps an operation across the subtasks.

Parameters:
op - The operation, which should be light weight
Throws:
InterruptedException
ExecutionException - if a subtask throws an exception.

notifySubtaskDone

protected void notifySubtaskDone(AbstractSubtask subtask)
                          throws InterruptedException
Notify the master that a subtask is done. The subtask is placed onto the finishedSubtaskQueue queue. The master polls that queue in call() and awaitAll() and checks the Future of each finished subtask using drainFutures(). If a Future reports an error, then the master is halted. This is how we ensure that all subtasks complete normally.

Parameters:
subtask - The subtask.
Throws:
InterruptedException

getStats

public H getStats()
Description copied from interface: IMasterTask
The statistics.

Specified by:
getStats in interface IMasterTask<E,H extends AbstractMasterStats<L,? extends AbstractSubtaskStats>>

call

public H call()
                                                                     throws Exception
Specified by:
call in interface Callable<H extends AbstractMasterStats<L,? extends AbstractSubtaskStats>>
Throws:
Exception

handleChunk

protected abstract void handleChunk(E[] chunk,
                                    boolean reopen)
                             throws InterruptedException
Handle the next chunk of elements from the buffer.

Parameters:
chunk - A chunk.
reopen - When false it is an error if the output buffer has been closed. When true the output buffer will be (re-)opened as necessary. This will be false when invoked by call() (since the output buffers are not closed until the master's buffer is closed) and should be true if you are handling redirects.
Throws:
InterruptedException

nothingPending

protected boolean nothingPending()
Extension hook for implementations where the clients accept work for asynchronous processing and notify the master as work items completed successfully or fail. The AbstractMasterTask will not terminate unless this method returns true when queried while holding the lock. A true return indicates that there are no pending work items. The default implementation returns true.

The work perform by the client must be idempotent (it must be safe to re-perform the operation). Pending work items may be in an unknown state, the master may submit the same work item to multiple clients (where that is permitted by the locator semantics), the client task may fail before the work item is complete, and a failed client can cause work items associated with that client to be posted to another client.

To handle master termination, the pending set must track outstanding work items. Those work item should be removed from the pending set as soon as any client has successfully completed that work item (since the work is idempotent).

To handle client failure, the subtask must track the pending set for its client. If the client dies, then the subtask must handle the client by placing all pending work items for that client (including any in the chunk for the current request) onto the redirectQueue.


willShutdown

protected void willShutdown()
                     throws InterruptedException
Extension hook invoked when the master's buffer is exhausted by awaitAll(). The default implementation is a NOP.

Throws:
InterruptedException

getSink

protected S getSink(L locator,
                    boolean reopen)
                                     throws InterruptedException
Return the sink for the locator. The sink is created if it does not exist using newSubtaskBuffer() and newSubtask(Object, BlockingBuffer).

Note: The caller is single threaded since this is invoked from the master's thread. This code depends on that assumption.

Parameters:
locator - The locator (unique subtask key).
reopen - true IFF a closed buffer should be re-opened (in fact, this causes a new buffer to be created and the new buffer will be drained by a new IndexPartitionWriteTask).
Returns:
The sink for that locator.
Throws:
IllegalArgumentException - if the argument is null.
InterruptedException - if interrupted.
RuntimeException - if AbstractHaltableProcess.halted()

newSubtaskBuffer

protected abstract BlockingBuffer<E[]> newSubtaskBuffer()
Factory for a new buffer for a subtask.


newSubtask

protected abstract S newSubtask(L locator,
                                BlockingBuffer<E[]> out)
Factory for a new subtask.

Parameters:
locator - The unique key for the subtask.
out - The BlockingBuffer on which the master will write for that subtask.
Returns:
The subtask.

submitSubtask

protected abstract Future<? extends AbstractSubtaskStats> submitSubtask(S subtask)
Submit the subtask to an Executor.

Parameters:
subtask - The subtask.
Returns:
The Future.

moveSinkToFinishedQueueAtomically

protected void moveSinkToFinishedQueueAtomically(L locator,
                                                 AbstractSubtask sink)
                                          throws InterruptedException
Transfer a sink from sinks to finishedSubtaskQueue. The entry for the locator is removed from sinks atomically IFF that map the given sink is associated with the given locator in that map.

This is done atomically using the lock. This is invoked both by AbstractSubtask.call() (when it is preparing to exit call()) and by getSink(Object, boolean) (when it discovers that the sink for a locator is closed, but not yet finished with its work).

Note: The sink MUST be atomically transferred from sinks to finishedSubtaskQueue and awaitAll() MUST verify that both are empty in order for the termination condition to be atomic.

Parameters:
locator - The locator.
sink - The sink.
Throws:
InterruptedException
TODO:
this method really should be private. It is exposed for one of the unit tests.

addToOutputBuffer

protected void addToOutputBuffer(L locator,
                                 E[] a,
                                 int fromIndex,
                                 int toIndex,
                                 boolean reopen)
                          throws InterruptedException
Resolves the output buffer onto which the split must be written and adds the data to that output buffer.

Parameters:
split - The Split identifies both the tuples to be dispatched and the PartitionLocator on which they must be written.
a - The array of tuples. Only those tuples addressed by the split will be written onto the output buffer.
reopen - true IFF a closed buffer should be re-opened (in fact, this causes a new buffer to be created and the new buffer will be drained by a new AbstractSubtask).
Throws:
InterruptedException - if the thread is interrupted.


Copyright © 2006-2009 SYSTAP, LLC. All Rights Reserved.