|
||||||||||
| PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
| SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD | |||||||||
java.lang.Objectcom.bigdata.relation.rule.eval.pipeline.JoinTask
public abstract class JoinTask
Consumes IBindingSet chunks from the previous join dimension.
Note: Instances of this class MUST be created on the IDataService
that is host to the index partition on the task will read and they MUST run
inside of an AbstractTask on the ConcurrencyManager in order
to have access to the local index object for the index partition.
This class is NOT serializable.
For a rule with 2 predicates, there will be two JoinTasks. The
orderIndex is ZERO (0) for the first JoinTask and ONE (1)
for the second JoinTask. The first JoinTask will have a
single initialBinding from the JoinMasterTask and will read on the
IAccessPath for the first IPredicate in the evaluation
order. The second JoinTask will read chunks of
IBindingSets containing partial solutions from the first
JoinTask and will obtain and read on an IAccessPath for the
second predicate in the evaluation order for every partial solution. Since
there are only two IPredicates in the IRule, the second and
last JoinTask will write on the ISolution buffer obtained
from JoinMasterTask.getSolutionBuffer(). Each JoinTask will
report its JoinStats to the master, which aggregates those
statistics.
Note: ITx.UNISOLATED requests will deadlock if the same query uses
the same access path for two predicates! This is because the first such join
dimension in the evaluation order will obtain an exclusive lock on an index
partition making it impossible for another JoinTask to obtain an
exclusive lock on the same index partition. This is not a problem if you are
using read-consistent timestamps!
Parallel JoinTask.AccessPathTask processing is useful when each
JoinTask.AccessPathTask consumes only a small chunk and there are a
large #of source binding sets to be processed. In this case,
parallelism reduces the overall latency by allowing threads to progress
as soon as the data can be materialized from the index.
JoinTask.AccessPathTask parallelism is realized by submitting each
JoinTask.AccessPathTask to a service imposing a parallelism limit on the
shared IIndexStore.getExecutorService(). Since the
JoinTask.AccessPathTasks are concurrent, each one requires its own
UnsynchronizedOutputBuffer on which it will place any accepted
IBindingSets. Once an JoinTask.AccessPathTask completes, its
buffer may be reused by the next JoinTask.AccessPathTask assigned to a
worker thread (this reduces heap churn and allows us to assemble full
chunks when each IAccessPath realizes only a few accepted
IBindingSets). For an ExecutorService with a
parallelism limit of N, there are therefore N
UnsynchronizedOutputBuffers. Those buffers must be flushed when
the JoinTask exhausts its source(s). If the same set of threads
is not known to be reused for each JoinTask.AccessPathTask then the
actual #of buffers will be the #of distinct threads used. To reduce the
potential memory demand. striped locks could be used to protect a pool
of UnsynchronizedArrayBuffers, but could lead to deadlock if
the buffer reference was exposed to the task (as opposed to adding the
object to the buffer within a private method, which hides that
reference) since there more than one thread demanding access to the
same buffer., Parallel JoinTask.ChunkTask processing may be useful when an
JoinTask.AccessPathTask will consume a large #of chunks. Since the
IAccessPath.iterator() is NOT thread-safe, reads on the
IAccessPath must be sequential, but the chunks read from the
IAccessPath can be placed onto a queue and parallel
JoinTask.ChunkTasks can drain that queue, consuming the chunks. This can
help by reducing the latency to materialize any given chunk.
The required change is to have a per-thread
UnsynchronizedArrayBuffer feeding a thread-safe
UnsyncDistributedOutputBuffer (potentially via a queue) which
maps each generated binding set across the index partition(s) for the
sink JoinTasks.
| Nested Class Summary | |
|---|---|
protected class |
JoinTask.AccessPathTask
Accepts an asBound IPredicate and a (non-empty) collection of
IBindingSets each of which licenses the same asBound
predicate for the current join dimension. |
protected class |
JoinTask.BindingSetConsumerTask
Class consumes chunks from the source(s) until canceled, interrupted, or all source(s) are exhausted. |
protected class |
JoinTask.ChunkTask
Task processes a chunk of elements read from the IAccessPath
for a join dimension. |
class |
JoinTask.ThreadLocalFactory<T extends IBuffer<E>,E>
A factory pattern for per-thread objects whose life cycle is tied to some container. |
| Field Summary | |
|---|---|
protected static boolean |
DEBUG
True iff the log level is DEBUG or less. |
protected AtomicReference<Throwable> |
firstCause
Set by JoinTask.BindingSetConsumerTask, JoinTask.AccessPathTask, and
JoinTask.ChunkTask if they throw an error. |
protected boolean |
halt
Volatile flag is set true if the JoinTask
(including any tasks executing on its behalf) should halt. |
protected static boolean |
INFO
True iff the log level is INFO or less. |
protected IJoinNexus |
joinNexus
The IJoinNexus for the local IIndexManager, which
will be the live IJournal. |
protected boolean |
lastJoin
true iff this is the last join dimension in the
evaluation order. |
protected static org.apache.log4j.Logger |
log
|
protected IJoinMaster |
masterProxy
A proxy for the remote JoinMasterTask. |
protected UUID |
masterUUID
|
protected int |
orderIndex
The index into the evaluation order for the predicate on
which we are reading for this join dimension. |
protected int |
partitionId
The index partition on which this JoinTask is reading -or-
-1 if the deployment does not support key-range
partitioned indices. |
protected IPredicate<?> |
predicate
The IPredicate on which we are reading for this join
dimension. |
protected IRelation<?> |
relation
The IRelation view on which we are reading for this join
dimensions. |
protected IVariable<?>[][] |
requiredVars
A list of variables required for each tail, by tailIndex. |
protected IRule<?> |
rule
The rule that is being evaluated. |
protected int |
tailCount
The #of predicates in the tail of that rule. |
protected int |
tailIndex
The tail index in the rule for the predicate on which we are reading for this join dimension. |
protected static boolean |
WARN
True iff the log level is WARN or less. |
| Constructor Summary | |
|---|---|
JoinTask(IRule rule,
IJoinNexus joinNexus,
int[] order,
int orderIndex,
int partitionId,
IJoinMaster masterProxy,
UUID masterUUID,
IVariable[][] requiredVars)
Instances of this class MUST be created in the appropriate execution context of the target DataService so that the federation and
the joinNexus references are both correct and so that it has access
to the local index object for the specified index partition. |
|
| Method Summary | |
|---|---|
Void |
call()
Runs the JoinTask. |
protected abstract void |
cancelSinks()
Cancel sink JoinTask(s). |
protected void |
consumeSources()
Consume IBindingSet chunks from source(s). |
protected abstract void |
flushAndCloseBuffersAndAwaitSinks()
Flush and close all output buffers and await sink JoinTask(s). |
protected abstract IBuffer<ISolution[]> |
getSolutionBuffer()
The buffer on which the last predicate in the evaluation order will write its ISolutions. |
protected int |
getTailIndex(int orderIndex)
Return the index of the tail predicate to be evaluated at the given index in the evaluation order. |
protected void |
halt(Throwable cause)
Indicate that join processing should halt. |
protected void |
logCallError(Throwable t)
Method is used to log the primary exception thrown by call(). |
protected abstract AbstractUnsynchronizedArrayBuffer<IBindingSet> |
newUnsyncOutputBuffer()
A method used by the threadLocalBufferFactory to create new
output buffer as required. |
protected abstract IBindingSet[] |
nextChunk()
Return a chunk of IBindingSets from the
IAsynchronousIterators. |
protected void |
reportOnce()
Method reports JoinStats to the JoinMasterTask, but
only if they have not already been reported. |
String |
toString()
|
| Methods inherited from class java.lang.Object |
|---|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait |
| Field Detail |
|---|
protected static final org.apache.log4j.Logger log
protected static final boolean WARN
log level is WARN or less.
protected static final boolean INFO
log level is INFO or less.
protected static final boolean DEBUG
log level is DEBUG or less.
protected final IRule<?> rule
protected final int tailCount
protected final int partitionId
JoinTask is reading -or-
-1 if the deployment does not support key-range
partitioned indices.
protected final int tailIndex
protected final IPredicate<?> predicate
IPredicate on which we are reading for this join
dimension.
protected final IRelation<?> relation
IRelation view on which we are reading for this join
dimensions.
protected final int orderIndex
order for the predicate on
which we are reading for this join dimension.
protected final boolean lastJoin
true iff this is the last join dimension in the
evaluation order.
protected final IJoinMaster masterProxy
JoinMasterTask.
protected final UUID masterUUID
protected final IVariable<?>[][] requiredVars
protected IJoinNexus joinNexus
IJoinNexus for the local IIndexManager, which
will be the live IJournal. This IJoinNexus MUST have
access to the local index objects, which means that class MUST be run
inside of the ConcurrencyManager. The joinNexus is
created from the #joinNexusFactory once the task begins to
execute.
protected volatile boolean halt
true if the JoinTask
(including any tasks executing on its behalf) should halt. This flag
is monitored by the JoinTask.BindingSetConsumerTask, the
JoinTask.AccessPathTask, and the JoinTask.ChunkTask. It is set by any
of those tasks if they are interrupted or error out.
firstCause?
Are there any cases where the behavior should be different?
If not, then replace tests with halt() and encapsulate the
logic in that method.protected final AtomicReference<Throwable> firstCause
JoinTask.BindingSetConsumerTask, JoinTask.AccessPathTask, and
JoinTask.ChunkTask if they throw an error. Tasks are required to use
an AtomicReference.compareAndSet(Object, Object) and must
specify null as the expected value. This ensures that
only the first cause is recorded by this field.
| Constructor Detail |
|---|
public JoinTask(IRule rule,
IJoinNexus joinNexus,
int[] order,
int orderIndex,
int partitionId,
IJoinMaster masterProxy,
UUID masterUUID,
IVariable[][] requiredVars)
DataService so that the federation and
the joinNexus references are both correct and so that it has access
to the local index object for the specified index partition.
concurrencyManager - indexName - rule - joinNexus - order - orderIndex - partitionId - The index partition identifier and -1 if
the deployment does not support key-range partitioned
indices.masterProxy - JoinTaskFactoryTask| Method Detail |
|---|
protected void halt(Throwable cause)
cause - The cause.protected abstract AbstractUnsynchronizedArrayBuffer<IBindingSet> newUnsyncOutputBuffer()
threadLocalBufferFactory to create new
output buffer as required. The output buffer will be used to
aggregate IBindingSets generated by this JoinTask.
Note: A different implementation class must be used depending on
whether or not this is the last join dimension for the query (when it
is, then we write on the solution buffer) and whether or not the
target join index is key-range partitioned (when it is, each binding
set is mapped across the sink JoinTask(s)).
protected abstract IBuffer<ISolution[]> getSolutionBuffer()
ISolutions.
IllegalStateException - unless lastJoin is true.protected final int getTailIndex(int orderIndex)
orderIndex - The evaluation order index.
public String toString()
toString in class Object
public Void call()
throws Exception
JoinTask.
call in interface Callable<Void>null.
Exceptionprotected void logCallError(Throwable t)
call().
The default implementation does nothing and the exception will be logged
by the JoinMasterTask. However, this method is overridden by
DistributedJoinTask so that the exception can be logged on the
host and DataService where it originates. This appears to be
necessary in order to trace back the cause of an exception which can
otherwise be obscured (or even lost?) in a deeply nested RMI stack trace.
o - t - protected void reportOnce()
JoinStats to the JoinMasterTask, but
only if they have not already been reported. This "report once"
constraint is used to make it safe to invoke during error handling
before actions which could cause the source JoinTasks (and
hence the JoinMasterTask) to terminate.
protected void consumeSources()
throws Exception
IBindingSet chunks from source(s). The first join
dimension always has a single source - the initialBindingSet
established by the JoinMasterTask. Downstream join
dimensions read from IAsynchronousIterator(s) from the
upstream join dimension. When the IIndexManager allows
key-range partitions, then the fan-in for the sources may be larger
than one as there will be one JoinTask for each index
partition touched by each join dimension.
Exception
BufferClosedException - if there is an attempt to output a chunk of
IBindingSets or ISolutions and the
output buffer is an IBlockingBuffer (true for all
join dimensions exception the lastJoin and also true for
query on the lastJoin) and that IBlockingBuffer
has been closed.
protected abstract void flushAndCloseBuffersAndAwaitSinks()
throws InterruptedException,
ExecutionException
JoinTask(s).
Note: You MUST close the BlockingBuffer from which each sink
reads before invoking this method in order for those sinks to
terminate. Otherwise the source IAsynchronousIterator(s) on which
the sink is reading will remain open and the sink will never decide that
it has exhausted its source(s).
InterruptedException
ExecutionExceptionprotected abstract void cancelSinks()
JoinTask(s).
protected abstract IBindingSet[] nextChunk()
throws InterruptedException
IBindingSets from the
IAsynchronousIterators. The 1st join dimension is always fed
by the JoinMasterTask. The nth+1 join dimension is always
fed by the nth JoinTask(s).
IBindingSets -or-
null IFF all known source(s) are exhausted.
InterruptedException
|
||||||||||
| PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
| SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD | |||||||||