com.bigdata.relation.rule.eval.pipeline
Class JoinTask

java.lang.Object
  extended by com.bigdata.relation.rule.eval.pipeline.JoinTask
All Implemented Interfaces:
Callable<Void>
Direct Known Subclasses:
DistributedJoinTask, LocalJoinTask

public abstract class JoinTask
extends Object
implements Callable<Void>

Consumes IBindingSet chunks from the previous join dimension.

Note: Instances of this class MUST be created on the IDataService that is host to the index partition on the task will read and they MUST run inside of an AbstractTask on the ConcurrencyManager in order to have access to the local index object for the index partition.

This class is NOT serializable.

For a rule with 2 predicates, there will be two JoinTasks. The orderIndex is ZERO (0) for the first JoinTask and ONE (1) for the second JoinTask. The first JoinTask will have a single initialBinding from the JoinMasterTask and will read on the IAccessPath for the first IPredicate in the evaluation order. The second JoinTask will read chunks of IBindingSets containing partial solutions from the first JoinTask and will obtain and read on an IAccessPath for the second predicate in the evaluation order for every partial solution. Since there are only two IPredicates in the IRule, the second and last JoinTask will write on the ISolution buffer obtained from JoinMasterTask.getSolutionBuffer(). Each JoinTask will report its JoinStats to the master, which aggregates those statistics.

Note: ITx.UNISOLATED requests will deadlock if the same query uses the same access path for two predicates! This is because the first such join dimension in the evaluation order will obtain an exclusive lock on an index partition making it impossible for another JoinTask to obtain an exclusive lock on the same index partition. This is not a problem if you are using read-consistent timestamps!

Version:
$Id: JoinTask.java 3454 2010-08-20 19:00:43Z thompsonbry $
Author:
Bryan Thompson
TODO:
Allow the access paths to be consumed in parallel. this would let us use more threads for join dimensions that had to test more source binding sets.

Parallel JoinTask.AccessPathTask processing is useful when each JoinTask.AccessPathTask consumes only a small chunk and there are a large #of source binding sets to be processed. In this case, parallelism reduces the overall latency by allowing threads to progress as soon as the data can be materialized from the index. JoinTask.AccessPathTask parallelism is realized by submitting each JoinTask.AccessPathTask to a service imposing a parallelism limit on the shared IIndexStore.getExecutorService(). Since the JoinTask.AccessPathTasks are concurrent, each one requires its own UnsynchronizedOutputBuffer on which it will place any accepted IBindingSets. Once an JoinTask.AccessPathTask completes, its buffer may be reused by the next JoinTask.AccessPathTask assigned to a worker thread (this reduces heap churn and allows us to assemble full chunks when each IAccessPath realizes only a few accepted IBindingSets). For an ExecutorService with a parallelism limit of N, there are therefore N UnsynchronizedOutputBuffers. Those buffers must be flushed when the JoinTask exhausts its source(s). If the same set of threads is not known to be reused for each JoinTask.AccessPathTask then the actual #of buffers will be the #of distinct threads used. To reduce the potential memory demand. striped locks could be used to protect a pool of UnsynchronizedArrayBuffers, but could lead to deadlock if the buffer reference was exposed to the task (as opposed to adding the object to the buffer within a private method, which hides that reference) since there more than one thread demanding access to the same buffer., Parallel JoinTask.ChunkTask processing may be useful when an JoinTask.AccessPathTask will consume a large #of chunks. Since the IAccessPath.iterator() is NOT thread-safe, reads on the IAccessPath must be sequential, but the chunks read from the IAccessPath can be placed onto a queue and parallel JoinTask.ChunkTasks can drain that queue, consuming the chunks. This can help by reducing the latency to materialize any given chunk.

The required change is to have a per-thread UnsynchronizedArrayBuffer feeding a thread-safe UnsyncDistributedOutputBuffer (potentially via a queue) which maps each generated binding set across the index partition(s) for the sink JoinTasks.


Nested Class Summary
protected  class JoinTask.AccessPathTask
          Accepts an asBound IPredicate and a (non-empty) collection of IBindingSets each of which licenses the same asBound predicate for the current join dimension.
protected  class JoinTask.BindingSetConsumerTask
          Class consumes chunks from the source(s) until canceled, interrupted, or all source(s) are exhausted.
protected  class JoinTask.ChunkTask
          Task processes a chunk of elements read from the IAccessPath for a join dimension.
 class JoinTask.ThreadLocalFactory<T extends IBuffer<E>,E>
          A factory pattern for per-thread objects whose life cycle is tied to some container.
 
Field Summary
protected static boolean DEBUG
          True iff the log level is DEBUG or less.
protected  AtomicReference<Throwable> firstCause
          Set by JoinTask.BindingSetConsumerTask, JoinTask.AccessPathTask, and JoinTask.ChunkTask if they throw an error.
protected  boolean halt
          Volatile flag is set true if the JoinTask (including any tasks executing on its behalf) should halt.
protected static boolean INFO
          True iff the log level is INFO or less.
protected  IJoinNexus joinNexus
          The IJoinNexus for the local IIndexManager, which will be the live IJournal.
protected  boolean lastJoin
          true iff this is the last join dimension in the evaluation order.
protected static org.apache.log4j.Logger log
           
protected  IJoinMaster masterProxy
          A proxy for the remote JoinMasterTask.
protected  UUID masterUUID
           
protected  int orderIndex
          The index into the evaluation order for the predicate on which we are reading for this join dimension.
protected  int partitionId
          The index partition on which this JoinTask is reading -or- -1 if the deployment does not support key-range partitioned indices.
protected  IPredicate<?> predicate
          The IPredicate on which we are reading for this join dimension.
protected  IRelation<?> relation
          The IRelation view on which we are reading for this join dimensions.
protected  IVariable<?>[][] requiredVars
          A list of variables required for each tail, by tailIndex.
protected  IRule<?> rule
          The rule that is being evaluated.
protected  int tailCount
          The #of predicates in the tail of that rule.
protected  int tailIndex
          The tail index in the rule for the predicate on which we are reading for this join dimension.
protected static boolean WARN
          True iff the log level is WARN or less.
 
Constructor Summary
JoinTask(IRule rule, IJoinNexus joinNexus, int[] order, int orderIndex, int partitionId, IJoinMaster masterProxy, UUID masterUUID, IVariable[][] requiredVars)
          Instances of this class MUST be created in the appropriate execution context of the target DataService so that the federation and the joinNexus references are both correct and so that it has access to the local index object for the specified index partition.
 
Method Summary
 Void call()
          Runs the JoinTask.
protected abstract  void cancelSinks()
          Cancel sink JoinTask(s).
protected  void consumeSources()
          Consume IBindingSet chunks from source(s).
protected abstract  void flushAndCloseBuffersAndAwaitSinks()
          Flush and close all output buffers and await sink JoinTask(s).
protected abstract  IBuffer<ISolution[]> getSolutionBuffer()
          The buffer on which the last predicate in the evaluation order will write its ISolutions.
protected  int getTailIndex(int orderIndex)
          Return the index of the tail predicate to be evaluated at the given index in the evaluation order.
protected  void halt(Throwable cause)
          Indicate that join processing should halt.
protected  void logCallError(Throwable t)
          Method is used to log the primary exception thrown by call().
protected abstract  AbstractUnsynchronizedArrayBuffer<IBindingSet> newUnsyncOutputBuffer()
          A method used by the threadLocalBufferFactory to create new output buffer as required.
protected abstract  IBindingSet[] nextChunk()
          Return a chunk of IBindingSets from the IAsynchronousIterators.
protected  void reportOnce()
          Method reports JoinStats to the JoinMasterTask, but only if they have not already been reported.
 String toString()
           
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 

Field Detail

log

protected static final org.apache.log4j.Logger log

WARN

protected static final boolean WARN
True iff the log level is WARN or less.


INFO

protected static final boolean INFO
True iff the log level is INFO or less.


DEBUG

protected static final boolean DEBUG
True iff the log level is DEBUG or less.


rule

protected final IRule<?> rule
The rule that is being evaluated.


tailCount

protected final int tailCount
The #of predicates in the tail of that rule.


partitionId

protected final int partitionId
The index partition on which this JoinTask is reading -or- -1 if the deployment does not support key-range partitioned indices.


tailIndex

protected final int tailIndex
The tail index in the rule for the predicate on which we are reading for this join dimension.


predicate

protected final IPredicate<?> predicate
The IPredicate on which we are reading for this join dimension.


relation

protected final IRelation<?> relation
The IRelation view on which we are reading for this join dimensions.


orderIndex

protected final int orderIndex
The index into the evaluation order for the predicate on which we are reading for this join dimension.


lastJoin

protected final boolean lastJoin
true iff this is the last join dimension in the evaluation order.


masterProxy

protected final IJoinMaster masterProxy
A proxy for the remote JoinMasterTask.


masterUUID

protected final UUID masterUUID

requiredVars

protected final IVariable<?>[][] requiredVars
A list of variables required for each tail, by tailIndex. Used to filter downstream variable binding sets.


joinNexus

protected IJoinNexus joinNexus
The IJoinNexus for the local IIndexManager, which will be the live IJournal. This IJoinNexus MUST have access to the local index objects, which means that class MUST be run inside of the ConcurrencyManager. The joinNexus is created from the #joinNexusFactory once the task begins to execute.


halt

protected volatile boolean halt
Volatile flag is set true if the JoinTask (including any tasks executing on its behalf) should halt. This flag is monitored by the JoinTask.BindingSetConsumerTask, the JoinTask.AccessPathTask, and the JoinTask.ChunkTask. It is set by any of those tasks if they are interrupted or error out.

TODO:
review handling of this flag. Should an exception always be thrown if the flag is set wrapping the firstCause? Are there any cases where the behavior should be different? If not, then replace tests with halt() and encapsulate the logic in that method.

firstCause

protected final AtomicReference<Throwable> firstCause
Set by JoinTask.BindingSetConsumerTask, JoinTask.AccessPathTask, and JoinTask.ChunkTask if they throw an error. Tasks are required to use an AtomicReference.compareAndSet(Object, Object) and must specify null as the expected value. This ensures that only the first cause is recorded by this field.

Constructor Detail

JoinTask

public JoinTask(IRule rule,
                IJoinNexus joinNexus,
                int[] order,
                int orderIndex,
                int partitionId,
                IJoinMaster masterProxy,
                UUID masterUUID,
                IVariable[][] requiredVars)
Instances of this class MUST be created in the appropriate execution context of the target DataService so that the federation and the joinNexus references are both correct and so that it has access to the local index object for the specified index partition.

Parameters:
concurrencyManager -
indexName -
rule -
joinNexus -
order -
orderIndex -
partitionId - The index partition identifier and -1 if the deployment does not support key-range partitioned indices.
masterProxy -
See Also:
JoinTaskFactoryTask
Method Detail

halt

protected void halt(Throwable cause)
Indicate that join processing should halt. This method is written defensively and will not throw anything.

Parameters:
cause - The cause.

newUnsyncOutputBuffer

protected abstract AbstractUnsynchronizedArrayBuffer<IBindingSet> newUnsyncOutputBuffer()
A method used by the threadLocalBufferFactory to create new output buffer as required. The output buffer will be used to aggregate IBindingSets generated by this JoinTask.

Note: A different implementation class must be used depending on whether or not this is the last join dimension for the query (when it is, then we write on the solution buffer) and whether or not the target join index is key-range partitioned (when it is, each binding set is mapped across the sink JoinTask(s)).


getSolutionBuffer

protected abstract IBuffer<ISolution[]> getSolutionBuffer()
The buffer on which the last predicate in the evaluation order will write its ISolutions.

Returns:
The buffer.
Throws:
IllegalStateException - unless lastJoin is true.

getTailIndex

protected final int getTailIndex(int orderIndex)
Return the index of the tail predicate to be evaluated at the given index in the evaluation order.

Parameters:
orderIndex - The evaluation order index.
Returns:
The tail index to be evaluated at that index in the evaluation order.

toString

public String toString()
Overrides:
toString in class Object

call

public Void call()
          throws Exception
Runs the JoinTask.

Specified by:
call in interface Callable<Void>
Returns:
null.
Throws:
Exception

logCallError

protected void logCallError(Throwable t)
Method is used to log the primary exception thrown by call(). The default implementation does nothing and the exception will be logged by the JoinMasterTask. However, this method is overridden by DistributedJoinTask so that the exception can be logged on the host and DataService where it originates. This appears to be necessary in order to trace back the cause of an exception which can otherwise be obscured (or even lost?) in a deeply nested RMI stack trace.

Parameters:
o -
t -

reportOnce

protected void reportOnce()
Method reports JoinStats to the JoinMasterTask, but only if they have not already been reported. This "report once" constraint is used to make it safe to invoke during error handling before actions which could cause the source JoinTasks (and hence the JoinMasterTask) to terminate.


consumeSources

protected void consumeSources()
                       throws Exception
Consume IBindingSet chunks from source(s). The first join dimension always has a single source - the initialBindingSet established by the JoinMasterTask. Downstream join dimensions read from IAsynchronousIterator(s) from the upstream join dimension. When the IIndexManager allows key-range partitions, then the fan-in for the sources may be larger than one as there will be one JoinTask for each index partition touched by each join dimension.

Throws:
Exception
BufferClosedException - if there is an attempt to output a chunk of IBindingSets or ISolutions and the output buffer is an IBlockingBuffer (true for all join dimensions exception the lastJoin and also true for query on the lastJoin) and that IBlockingBuffer has been closed.

flushAndCloseBuffersAndAwaitSinks

protected abstract void flushAndCloseBuffersAndAwaitSinks()
                                                   throws InterruptedException,
                                                          ExecutionException
Flush and close all output buffers and await sink JoinTask(s).

Note: You MUST close the BlockingBuffer from which each sink reads before invoking this method in order for those sinks to terminate. Otherwise the source IAsynchronousIterator(s) on which the sink is reading will remain open and the sink will never decide that it has exhausted its source(s).

Throws:
InterruptedException
ExecutionException

cancelSinks

protected abstract void cancelSinks()
Cancel sink JoinTask(s).


nextChunk

protected abstract IBindingSet[] nextChunk()
                                    throws InterruptedException
Return a chunk of IBindingSets from the IAsynchronousIterators. The 1st join dimension is always fed by the JoinMasterTask. The nth+1 join dimension is always fed by the nth JoinTask(s).

Returns:
The next available chunk of IBindingSets -or- null IFF all known source(s) are exhausted.
Throws:
InterruptedException


Copyright © 2006-2011 SYSTAP, LLC. All Rights Reserved.