|
||||||||||
| PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
| SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD | |||||||||
java.lang.Objectcom.bigdata.relation.rule.eval.NestedSubqueryWithJoinThreadsTask
public class NestedSubqueryWithJoinThreadsTask
Evaluation of an IRule using nested subquery (one or more JOINs plus
any IElementFilters specified for the predicates in the tail or
IConstraints on the IRule itself). The subqueries are formed
into tasks and submitted to an ExecutorService. The effective
parallelism is limited by the #of elements visited in a chunk for the first
join dimension, as only those subqueries will be parallelized. Subqueries for
2nd+ join dimensions are run in the caller's thread to ensure liveness.
Note: This join strategy is not very efficient for scale-out joins. If
executed one an AbstractScaleOutFederation, this task will wind up
using ClientIndexViews rather than directly using the local index
objects. This means that all work (other than the iterator scan) will be
performed on the client running the join.
JoinMasterTask, which is designed for scale-out federations.IAccessPath., support foreign key joins.| Nested Class Summary | |
|---|---|
protected class |
NestedSubqueryWithJoinThreadsTask.SubqueryTask<E>
This class is used when we want to evaluate the subqueries in parallel. |
| Field Summary | |
|---|---|
protected IBuffer<ISolution[]> |
buffer
|
protected static boolean |
DEBUG
True iff the log level is DEBUG or less. |
protected static boolean |
INFO
True iff the log level is INFO or less. |
protected ExecutionHelper<Void> |
joinHelper
The object that helps us to execute the queries on that service. |
protected IJoinNexus |
joinNexus
|
protected long |
last
The index of the last solutionCount that we will generate (OFFSET + LIMIT). |
protected static org.apache.log4j.Logger |
log
|
protected int |
maxParallelSubqueries
The maximum #of subqueries for the first join dimension that will be issued in parallel. |
protected long |
offset
The offset first computed solution that will be inserted into the buffer. |
protected int[] |
order
The evaluation order. |
protected IRule |
rule
|
protected IRuleState |
ruleState
|
protected RuleStats |
ruleStats
|
protected int |
tailCount
|
| Constructor Summary | |
|---|---|
NestedSubqueryWithJoinThreadsTask(IRule rule,
IJoinNexus joinNexus,
IBuffer<ISolution[]> buffer)
|
|
| Method Summary | |
|---|---|
protected void |
apply(int orderIndex,
IBindingSet bindingSet)
Evaluate a join dimension. |
protected void |
apply(int orderIndex,
IBindingSet bindingSet,
IBuffer<ISolution> buffer)
Evaluate a join dimension. |
protected void |
applyOptional(int orderIndex,
IBindingSet bindingSet,
IBuffer<ISolution> buffer)
Method to be invoked IFF there were no solutions in the data that satisified the constraints on the rule. |
RuleStats |
call()
Recursively evaluate the subqueries. |
protected void |
emitSolutions(int orderIndex,
Object[] chunk,
IBindingSet bindingSet,
IBuffer<ISolution> buffer)
Consider each element in the chunk in turn. |
protected IAccessPath |
getAccessPath(int orderIndex,
IBindingSet bindingSet)
Return the IAccessPath for the tail predicate to be evaluated at
the given index in the evaluation order. |
protected int |
getTailIndex(int orderIndex)
Return the index of the tail predicate to be evaluated at the given index in the evaluation order. |
protected void |
runSubQueries(int orderIndex,
Object[] chunk,
IBindingSet bindingSet,
IBuffer<ISolution> buffer)
Evaluate the right-hand side (aka the subquery) of the join for each element in the chunk. |
protected void |
runSubQueriesInCallersThread(int orderIndex,
Object[] chunk,
IBindingSet bindingSet,
IBuffer<ISolution> buffer)
Runs the subquery in the caller's thread (this was the original behavior). |
protected void |
runSubQueriesOnThreadPool(int orderIndex,
Object[] chunk,
IBindingSet bindingSet)
Variant that creates a NestedSubqueryWithJoinThreadsTask.SubqueryTask for each element of the chunk
and submits those tasks to the #joinService. |
| Methods inherited from class java.lang.Object |
|---|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
| Field Detail |
|---|
protected static final org.apache.log4j.Logger log
protected static final boolean INFO
log level is INFO or less.
protected static final boolean DEBUG
log level is DEBUG or less.
protected final IRule rule
protected final IJoinNexus joinNexus
protected final IBuffer<ISolution[]> buffer
protected final IRuleState ruleState
protected final RuleStats ruleStats
protected final int tailCount
protected final int[] order
protected final long offset
buffer.
Note: We handle an ISlice directly in order to avoid having the
caller close an IAsynchronousIterator reading from a
BlockingBuffer. That causes the Future writing on the
BlockingBuffer to be interrupted, and if that interrupt is
noticed during an IO the FileChannel backing the store will be
asynchronously closed. While it will be automatically re-opened, it is
best to avoid the latency associated with that process (which requires
re-obtaining any necessary locks, etc).
The logic to handle an ISlice is scattered across several places
in this class and in the NestedSubqueryWithJoinThreadsTask.SubqueryTask. This is necessary in
order for processing to halt eagerly if limit ISolution have been
written onto the buffer.
When a complex IProgramTask is being executed, the ISlice
is defined in terms of the total #of solutions generated. Since the
solutionCount is tracked on a per-rule execution basis,
you MUST serialize the IRules in the IProgram by
specifying IQueryOptions.isStable() or simply marking the
IProgram as NOT IProgram.isParallel().
protected final long last
Long.MAX_VALUE, then use Long.MAX_VALUE instead.
Whether or not an ISolution is part of the slice is based on the
post-increment value of the AtomicLong
RuleStats.solutionCount. If the post-increment value is GT
last then we have exhausted the slice. Else, if the
post-increment value is GT offset then we add the
ISolution to the IBuffer. Else the ISolution is
before the offset of the slice and it will be ignored.
Note: AtomicLong.incrementAndGet() is used to make an atomic
decision concerning which solutions are allowed into the buffer. This
works even in the face of parallel-subquery, even though we in fact turn
off parallel subquery when evaluating a slice in order to have the
results be stable with respect to a given commit point. The fence posts
for the tests are different than you might expect because we must
consider the post-increment state of the
RuleStats.solutionCount.
Note: Each IRule executes with its own RuleStats
instance. This means that concurrent execution of rules would not see the
same RuleStats.solutionCount field and hence that a slice would
not be computed correctly when executing IRules in parallel for
some program. However, as pointed out above, we always disable
parallelism for a slice in order to have a stable result set for a given
commit point. The stable property is required for client to be able to
page through the solutions using a series of slices.
Consider (OFFSET=0, LIMIT=1, LAST=1). We would emit a solution for the post-increment solutionCount value ONE (1) (GT OFFSET) but NOT emit a solution for the post-increment solutionCount value TWO (2) (GT LAST).
RuleStats.solutionCountprotected final int maxParallelSubqueries
#joinService entirely and ONE (1) to submit a single task at a
time to the #joinService.
protected final ExecutionHelper<Void> joinHelper
| Constructor Detail |
|---|
public NestedSubqueryWithJoinThreadsTask(IRule rule,
IJoinNexus joinNexus,
IBuffer<ISolution[]> buffer)
rule - The rule to be executed.joinNexus - The IJoinNexus.buffer - A thread-safe buffer onto which chunks of ISolution
will be flushed during rule execution.| Method Detail |
|---|
public final RuleStats call()
call in interface IStepTaskcall in interface Callable<RuleStats>protected final int getTailIndex(int orderIndex)
orderIndex - The evaluation order index.
protected void apply(int orderIndex,
IBindingSet bindingSet)
throws InterruptedException
IBuffer will allocated and used to buffer a chunk of results. The
buffer will be flushed when it overflows and regardless when
apply(int, IBindingSet) is done. When flushed, it will emit a
single chunk onto the thread-safe buffer.
orderIndex - The current index in the evaluation order[] that is being
scanned.bindingSet - The bindings from the prior join(s) (if any).
InterruptedException - This exception will be thrown during query if the
BlockingBuffer on which the query ISolutions
are being written is closed, e.g., because someone closed a
high-level iterator reading solutions from the
BlockingBuffer. Note that closing the
BlockingBuffer causes the Future that is
writing on the BlockingBuffer to be interrupted in
order to eagerly terminate processing. When that interrupt is
detected, we handle it by no longer evaluating the JOIN(s)
and the caller will get whatever ISolutions are
already in the BlockingBuffer.
protected final void apply(int orderIndex,
IBindingSet bindingSet,
IBuffer<ISolution> buffer)
throws InterruptedException
orderIndex - The current index in the evaluation order[] that is being
scanned.bindingSet - The bindings from the prior join(s) (if any).buffer - The buffer onto which ISolutions will be written.
Note: For performance reasons, this buffer is NOT thread-safe. Therefore parallel subquery MUST use distinct buffer instances each of which flushes a chunk at a time onto the thread-safe buffer specified to the ctor.
InterruptedException
protected void applyOptional(int orderIndex,
IBindingSet bindingSet,
IBuffer<ISolution> buffer)
throws InterruptedException
orderIndex - The index into the evaluation order.bindingSet - The bindings from the prior join(s) (if any).buffer - A buffer onto which ISolutions will be written - this
object is NOT thread-safe.
InterruptedException
protected IAccessPath getAccessPath(int orderIndex,
IBindingSet bindingSet)
IAccessPath for the tail predicate to be evaluated at
the given index in the evaluation order.
orderIndex - The index into the evaluation order.bindingSet - The bindings from the prior join(s) (if any).
IAccessPath.
protected void runSubQueries(int orderIndex,
Object[] chunk,
IBindingSet bindingSet,
IBuffer<ISolution> buffer)
throws InterruptedException
orderIndex - The current index in the evaluation order.chunk - A chunk of elements from the left-hand side of the join.bindingSet - The bindings from the prior joins (if any).buffer - A buffer onto which ISolutions will be written - this
object is NOT thread-safe.
InterruptedException
protected void runSubQueriesInCallersThread(int orderIndex,
Object[] chunk,
IBindingSet bindingSet,
IBuffer<ISolution> buffer)
throws InterruptedException
orderIndex - The current index in the evaluation order.chunk - A chunk of elements from the left-hand side of the join.bindingSet - The bindings from the prior joins (if any).buffer - A buffer onto which ISolutions will be written - this
object is NOT thread-safe.
InterruptedException
protected void runSubQueriesOnThreadPool(int orderIndex,
Object[] chunk,
IBindingSet bindingSet)
throws InterruptedException
NestedSubqueryWithJoinThreadsTask.SubqueryTask for each element of the chunk
and submits those tasks to the #joinService. This method will
not return until all subqueries for the chunk have been evaluated.
The #joinService will decide for each task whether to allocate a
new thread, to run it on an existing thread, to leave it on the work
queue for a while, or to execute it in the caller's thread (the latter is
selected via a rejected exection handler option).
Note: This requires that we clone the IBindingSet so that each
parallel task will have its own state.
Note: The tasks should be executed (more or less) in order so as to maximum the effect of ordered reads on the next join dimension.
orderIndex - The current index in the evaluation order.chunk - A chunk of elements from the left-hand side of the join.bindingSet - The bindings from the prior joins (if any).
InterruptedExceptionExecutorService such that
it maintains at least N threads from a producer and no more than M
threads overall. We need that pattern here for better sustained
throughput and in the map/reduce system as well (it exists there
but needs to be refactored and aligned with the simpler thread pool
exposed by the IIndexManager). The pattern is similar to
one in the RDF concurrent data loader and in the map/reduce
package. It should be encapsulated by the ExecutionHelper
along with the submitOne() and submitSequence() methods.
protected void emitSolutions(int orderIndex,
Object[] chunk,
IBindingSet bindingSet,
IBuffer<ISolution> buffer)
throws InterruptedException
ISolution for the IRule.
orderIndex - The index in the evaluation order.chunk - A chunk of elements from the right-most join dimension.bindingSet - The bindings from the prior joins.buffer - A buffer onto which the solutions will be written - this
object is NOT thread-safe.
InterruptedException
|
||||||||||
| PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
| SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD | |||||||||