com.bigdata.service.jini.master
Class AggregatorTask<T extends IKeyArrayIndexProcedure,O,R,A>

java.lang.Object
  extended by com.bigdata.service.FederationCallable<Void>
      extended by com.bigdata.service.jini.master.AggregatorTask<T,O,R,A>
Type Parameters:
T - The generic type of the procedure used to write on the index.
O - The generic type for unserialized value objects.
R - The type of the result from applying the index procedure to a single Split of data.
A - The type of the aggregated result.
All Implemented Interfaces:
IFederationCallable, IAsynchronousWriteBufferFactory, Serializable, Callable<Void>

public class AggregatorTask<T extends IKeyArrayIndexProcedure,O,R,A>
extends FederationCallable<Void>
implements IAsynchronousWriteBufferFactory

A task which aggregates writes destined for a specific scale-out index. An instance of this class may be submitted to any IRemoteExecutor and used to concentrate the writes from multiple clients. The requirement for an aggregation task grows in proportion to the #of index partitions.

The task creates a BlockingBuffer (the input queue) and an IndexWriteTask which will write on the specified scale-out index. It then exports a proxy for the input queue. Each client which attaches to this AggregatorTask must allocate a local BlockingBuffer and assign a worker task to drain chunks from that buffer and transfer them to this task via the exposed proxy.

Version:
$Id$
Author:
Bryan Thompson
See Also:
Serialized Form
TODO:
The aggregator's input buffer should be closed by the TaskMaster for a given (name,timestamp) when the task is done., It should be transparent to use aggregators in combination with the asynchronous write API. This may require the introduction another level of indirection.

For example, the RDF parser task directly creates the asynchronous write buffers today. However, it should accept a factory which it then applies to obtain those buffers. When using an aggregator, the factory needs to return the proxy for the write buffer for the corresponding aggregator. If this is to happen dynamically, then the TaskMaster ServiceMap must be dynamic as well (which could introduce load balancing for the IRemoteExecutor s). When an (RDF parser) task needs an asynchronous write buffer, it must atomically start an aggregator if none exists for the given (name,timestamp). The responsibility for starting that aggregator needs to be either centralized or de-centralized with global synchronous locking.

This all sounds far too complex. If the task declares the indices on which it will write then we can do this statically when the TaskMaster starts. The clients receive the UUIDs of the services on which the aggregator(s) are running. They can then lookup the appropriate aggregator on each such service in parallel, collecting the set of aggregators for each index on which they will write. This is similar to the session mechanism of the DataService which is used to support distributed joins. See JoinMasterTask, JoinTaskFactoryTask and Session., For scale-out indices with an exceedingly large #of index partitions it may be necessary to take additional steps to limit the RAM burden of the aggregator. For example, either the aggregator can buffer the writes on local disk or the the clients could split their writes across a population of aggregators each of which takes on a key-range for the index (the key-ranges would be more coarse than those of the index partitions themselves).


Constructor Summary
AggregatorTask(String name, long timestamp, IResultHandler<R,A> resultHandler, IDuplicateRemover<O> duplicateRemover, AbstractKeyArrayIndexProcedureConstructor<T> ctor)
           
 
Method Summary
 Void call()
          Starts an IndexWriteTask and makes a proxy for the input buffer for that task available via newWriteBuffer(IResultHandler, IDuplicateRemover, AbstractKeyArrayIndexProcedureConstructor) .
 JiniFederation getFederation()
          The federation object used by the IRemoteExecutor on which this task is executing.
<T extends IKeyArrayIndexProcedure,O,R,A>
IRunnableBuffer<KVO<O>[]>
newWriteBuffer(IResultHandler<R,A> resultHandler, IDuplicateRemover<O> duplicateRemover, AbstractKeyArrayIndexProcedureConstructor<T> ctor)
          Note: This ignores its arguments (it uses those specified to the ctor instead) and returns the proxy for the pre-existing write buffer.
 
Methods inherited from class com.bigdata.service.FederationCallable
setFederation
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

AggregatorTask

public AggregatorTask(String name,
                      long timestamp,
                      IResultHandler<R,A> resultHandler,
                      IDuplicateRemover<O> duplicateRemover,
                      AbstractKeyArrayIndexProcedureConstructor<T> ctor)
Parameters:
name - The name of the index to which the writes will be directed.
timestamp - The timestamp associated with the index view.
Method Detail

getFederation

public JiniFederation getFederation()
The federation object used by the IRemoteExecutor on which this task is executing.

Specified by:
getFederation in interface IFederationCallable
Overrides:
getFederation in class FederationCallable<Void>
Returns:
The federation and never null.

call

public Void call()
          throws Exception
Starts an IndexWriteTask and makes a proxy for the input buffer for that task available via newWriteBuffer(IResultHandler, IDuplicateRemover, AbstractKeyArrayIndexProcedureConstructor) . The task will run until the input buffer is closed. It may be closed via its proxy or by an internal error. If this task is interrupted, the interrupt is propagated to the inner IndexWriteTask which will also be canceled.

Specified by:
call in interface Callable<Void>
Throws:
Exception

newWriteBuffer

public <T extends IKeyArrayIndexProcedure,O,R,A> IRunnableBuffer<KVO<O>[]> newWriteBuffer(IResultHandler<R,A> resultHandler,
                                                                                          IDuplicateRemover<O> duplicateRemover,
                                                                                          AbstractKeyArrayIndexProcedureConstructor<T> ctor)
Note: This ignores its arguments (it uses those specified to the ctor instead) and returns the proxy for the pre-existing write buffer. For a given AggregatorTask, all caller's will obtain a proxy for the same buffer. This provides the desired aggregation semantics.

Specified by:
newWriteBuffer in interface IAsynchronousWriteBufferFactory
Type Parameters:
T - The generic type of the procedure used to write on the index.
O - The generic type for unserialized value objects.
R - The type of the result from applying the index procedure to a single Split of data.
A - The type of the aggregated result.
Parameters:
resultHandler - Used to aggregate results.
duplicateRemover - Used to filter out duplicates in an application specified manner (optional).
ctor - Used to create instances of the procedure that will execute a write on an individual index partition (this implies that insert and remove operations as well as custom index write operations must use separate buffers).
Returns:
A buffer on which the producer may write their data.
Throws:
IllegalStateException - if the proxy for the buffer does not exist (because the task is not running).
See Also:
IndexMetadata.getAsynchronousIndexWriteConfiguration(), AbstractFederation.getIndexCounters(String)


Copyright © 2006-2009 SYSTAP, LLC. All Rights Reserved.