com.bigdata.relation.rule.eval
Class NestedSubqueryWithJoinThreadsTask

java.lang.Object
  extended by com.bigdata.relation.rule.eval.NestedSubqueryWithJoinThreadsTask
All Implemented Interfaces:
IStepTask, Callable<RuleStats>

public class NestedSubqueryWithJoinThreadsTask
extends Object
implements IStepTask

Evaluation of an IRule using nested subquery (one or more JOINs plus any IElementFilters specified for the predicates in the tail or IConstraints on the IRule itself). The subqueries are formed into tasks and submitted to an ExecutorService. The effective parallelism is limited by the #of elements visited in a chunk for the first join dimension, as only those subqueries will be parallelized. Subqueries for 2nd+ join dimensions are run in the caller's thread to ensure liveness.

Note: This join strategy is not very efficient for scale-out joins. If executed one an AbstractScaleOutFederation, this task will wind up using ClientIndexViews rather than directly using the local index objects. This means that all work (other than the iterator scan) will be performed on the client running the join.

Version:
$Id: NestedSubqueryWithJoinThreadsTask.java 2265 2009-10-26 12:51:06Z thompsonbry $
Author:
Bryan Thompson
See Also:
JoinMasterTask, which is designed for scale-out federations.
TODO:
modify access path to allow us to select specific fields from the relation to be returned to the join since not all will be used - we only need those that will be bound. this will require more generalization of the binding set and its serialization and a mix up of that with the iterators on the IAccessPath., support foreign key joins.

Nested Class Summary
protected  class NestedSubqueryWithJoinThreadsTask.SubqueryTask<E>
          This class is used when we want to evaluate the subqueries in parallel.
 
Field Summary
protected  IBuffer<ISolution[]> buffer
           
protected static boolean DEBUG
          True iff the log level is DEBUG or less.
protected static boolean INFO
          True iff the log level is INFO or less.
protected  ExecutionHelper<Void> joinHelper
          The object that helps us to execute the queries on that service.
protected  IJoinNexus joinNexus
           
protected  long last
          The index of the last solutionCount that we will generate (OFFSET + LIMIT).
protected static org.apache.log4j.Logger log
           
protected  int maxParallelSubqueries
          The maximum #of subqueries for the first join dimension that will be issued in parallel.
protected  long offset
          The offset first computed solution that will be inserted into the buffer.
protected  int[] order
          The evaluation order.
protected  IRule rule
           
protected  IRuleState ruleState
           
protected  RuleStats ruleStats
           
protected  int tailCount
           
 
Constructor Summary
NestedSubqueryWithJoinThreadsTask(IRule rule, IJoinNexus joinNexus, IBuffer<ISolution[]> buffer)
           
 
Method Summary
protected  void apply(int orderIndex, IBindingSet bindingSet)
          Evaluate a join dimension.
protected  void apply(int orderIndex, IBindingSet bindingSet, IBuffer<ISolution> buffer)
          Evaluate a join dimension.
protected  void applyOptional(int orderIndex, IBindingSet bindingSet, IBuffer<ISolution> buffer)
          Method to be invoked IFF there were no solutions in the data that satisified the constraints on the rule.
 RuleStats call()
          Recursively evaluate the subqueries.
protected  void emitSolutions(int orderIndex, Object[] chunk, IBindingSet bindingSet, IBuffer<ISolution> buffer)
          Consider each element in the chunk in turn.
protected  IAccessPath getAccessPath(int orderIndex, IBindingSet bindingSet)
          Return the IAccessPath for the tail predicate to be evaluated at the given index in the evaluation order.
protected  int getTailIndex(int orderIndex)
          Return the index of the tail predicate to be evaluated at the given index in the evaluation order.
protected  void runSubQueries(int orderIndex, Object[] chunk, IBindingSet bindingSet, IBuffer<ISolution> buffer)
          Evaluate the right-hand side (aka the subquery) of the join for each element in the chunk.
protected  void runSubQueriesInCallersThread(int orderIndex, Object[] chunk, IBindingSet bindingSet, IBuffer<ISolution> buffer)
          Runs the subquery in the caller's thread (this was the original behavior).
protected  void runSubQueriesOnThreadPool(int orderIndex, Object[] chunk, IBindingSet bindingSet)
          Variant that creates a NestedSubqueryWithJoinThreadsTask.SubqueryTask for each element of the chunk and submits those tasks to the #joinService.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

log

protected static final org.apache.log4j.Logger log

INFO

protected static final boolean INFO
True iff the log level is INFO or less.


DEBUG

protected static final boolean DEBUG
True iff the log level is DEBUG or less.


rule

protected final IRule rule

joinNexus

protected final IJoinNexus joinNexus

buffer

protected final IBuffer<ISolution[]> buffer

ruleState

protected final IRuleState ruleState

ruleStats

protected final RuleStats ruleStats

tailCount

protected final int tailCount

order

protected final int[] order
The evaluation order.


offset

protected final long offset
The offset first computed solution that will be inserted into the buffer.

Note: We handle an ISlice directly in order to avoid having the caller close an IAsynchronousIterator reading from a BlockingBuffer. That causes the Future writing on the BlockingBuffer to be interrupted, and if that interrupt is noticed during an IO the FileChannel backing the store will be asynchronously closed. While it will be automatically re-opened, it is best to avoid the latency associated with that process (which requires re-obtaining any necessary locks, etc).

The logic to handle an ISlice is scattered across several places in this class and in the NestedSubqueryWithJoinThreadsTask.SubqueryTask. This is necessary in order for processing to halt eagerly if limit ISolution have been written onto the buffer.

When a complex IProgramTask is being executed, the ISlice is defined in terms of the total #of solutions generated. Since the solutionCount is tracked on a per-rule execution basis, you MUST serialize the IRules in the IProgram by specifying IQueryOptions.isStable() or simply marking the IProgram as NOT IProgram.isParallel().


last

protected final long last
The index of the last solutionCount that we will generate (OFFSET + LIMIT). The value of this property MUST be computed such that overflow is not possible. If OFFSET+LIMIT would be greater than Long.MAX_VALUE, then use Long.MAX_VALUE instead.

Whether or not an ISolution is part of the slice is based on the post-increment value of the AtomicLong RuleStats.solutionCount. If the post-increment value is GT last then we have exhausted the slice. Else, if the post-increment value is GT offset then we add the ISolution to the IBuffer. Else the ISolution is before the offset of the slice and it will be ignored.

Note: AtomicLong.incrementAndGet() is used to make an atomic decision concerning which solutions are allowed into the buffer. This works even in the face of parallel-subquery, even though we in fact turn off parallel subquery when evaluating a slice in order to have the results be stable with respect to a given commit point. The fence posts for the tests are different than you might expect because we must consider the post-increment state of the RuleStats.solutionCount.

Note: Each IRule executes with its own RuleStats instance. This means that concurrent execution of rules would not see the same RuleStats.solutionCount field and hence that a slice would not be computed correctly when executing IRules in parallel for some program. However, as pointed out above, we always disable parallelism for a slice in order to have a stable result set for a given commit point. The stable property is required for client to be able to page through the solutions using a series of slices.

Consider (OFFSET=0, LIMIT=1, LAST=1). We would emit a solution for the post-increment solutionCount value ONE (1) (GT OFFSET) but NOT emit a solution for the post-increment solutionCount value TWO (2) (GT LAST).

See Also:
RuleStats.solutionCount

maxParallelSubqueries

protected final int maxParallelSubqueries
The maximum #of subqueries for the first join dimension that will be issued in parallel. Use ZERO(0) to avoid the use of the #joinService entirely and ONE (1) to submit a single task at a time to the #joinService.


joinHelper

protected final ExecutionHelper<Void> joinHelper
The object that helps us to execute the queries on that service.

Constructor Detail

NestedSubqueryWithJoinThreadsTask

public NestedSubqueryWithJoinThreadsTask(IRule rule,
                                         IJoinNexus joinNexus,
                                         IBuffer<ISolution[]> buffer)
Parameters:
rule - The rule to be executed.
joinNexus - The IJoinNexus.
buffer - A thread-safe buffer onto which chunks of ISolution will be flushed during rule execution.
Method Detail

call

public final RuleStats call()
Recursively evaluate the subqueries.

Specified by:
call in interface IStepTask
Specified by:
call in interface Callable<RuleStats>

getTailIndex

protected final int getTailIndex(int orderIndex)
Return the index of the tail predicate to be evaluated at the given index in the evaluation order.

Parameters:
orderIndex - The evaluation order index.
Returns:
The tail index to be evaluated at that index in the evaluation order.

apply

protected void apply(int orderIndex,
                     IBindingSet bindingSet)
              throws InterruptedException
Evaluate a join dimension. A private non-thread-safe IBuffer will allocated and used to buffer a chunk of results. The buffer will be flushed when it overflows and regardless when apply(int, IBindingSet) is done. When flushed, it will emit a single chunk onto the thread-safe buffer.

Parameters:
orderIndex - The current index in the evaluation order[] that is being scanned.
bindingSet - The bindings from the prior join(s) (if any).
Throws:
InterruptedException - This exception will be thrown during query if the BlockingBuffer on which the query ISolutions are being written is closed, e.g., because someone closed a high-level iterator reading solutions from the BlockingBuffer. Note that closing the BlockingBuffer causes the Future that is writing on the BlockingBuffer to be interrupted in order to eagerly terminate processing. When that interrupt is detected, we handle it by no longer evaluating the JOIN(s) and the caller will get whatever ISolutions are already in the BlockingBuffer.

apply

protected final void apply(int orderIndex,
                           IBindingSet bindingSet,
                           IBuffer<ISolution> buffer)
                    throws InterruptedException
Evaluate a join dimension.

Parameters:
orderIndex - The current index in the evaluation order[] that is being scanned.
bindingSet - The bindings from the prior join(s) (if any).
buffer - The buffer onto which ISolutions will be written.

Note: For performance reasons, this buffer is NOT thread-safe. Therefore parallel subquery MUST use distinct buffer instances each of which flushes a chunk at a time onto the thread-safe buffer specified to the ctor.

Throws:
InterruptedException

applyOptional

protected void applyOptional(int orderIndex,
                             IBindingSet bindingSet,
                             IBuffer<ISolution> buffer)
                      throws InterruptedException
Method to be invoked IFF there were no solutions in the data that satisified the constraints on the rule. If the tail is optional, then subquery evaluation will simply skip the tail and proceed with the successor of the tail in the evaluation order. If the tail is the last tail in the evaluation order, then a solution will be emitted for the binding set.

Parameters:
orderIndex - The index into the evaluation order.
bindingSet - The bindings from the prior join(s) (if any).
buffer - A buffer onto which ISolutions will be written - this object is NOT thread-safe.
Throws:
InterruptedException

getAccessPath

protected IAccessPath getAccessPath(int orderIndex,
                                    IBindingSet bindingSet)
Return the IAccessPath for the tail predicate to be evaluated at the given index in the evaluation order.

Parameters:
orderIndex - The index into the evaluation order.
bindingSet - The bindings from the prior join(s) (if any).
Returns:
The IAccessPath.

runSubQueries

protected void runSubQueries(int orderIndex,
                             Object[] chunk,
                             IBindingSet bindingSet,
                             IBuffer<ISolution> buffer)
                      throws InterruptedException
Evaluate the right-hand side (aka the subquery) of the join for each element in the chunk. This method will not return until all subqueries for the chunk have been evaluated.

Parameters:
orderIndex - The current index in the evaluation order.
chunk - A chunk of elements from the left-hand side of the join.
bindingSet - The bindings from the prior joins (if any).
buffer - A buffer onto which ISolutions will be written - this object is NOT thread-safe.
Throws:
InterruptedException

runSubQueriesInCallersThread

protected void runSubQueriesInCallersThread(int orderIndex,
                                            Object[] chunk,
                                            IBindingSet bindingSet,
                                            IBuffer<ISolution> buffer)
                                     throws InterruptedException
Runs the subquery in the caller's thread (this was the original behavior).

Parameters:
orderIndex - The current index in the evaluation order.
chunk - A chunk of elements from the left-hand side of the join.
bindingSet - The bindings from the prior joins (if any).
buffer - A buffer onto which ISolutions will be written - this object is NOT thread-safe.
Throws:
InterruptedException

runSubQueriesOnThreadPool

protected void runSubQueriesOnThreadPool(int orderIndex,
                                         Object[] chunk,
                                         IBindingSet bindingSet)
                                  throws InterruptedException
Variant that creates a NestedSubqueryWithJoinThreadsTask.SubqueryTask for each element of the chunk and submits those tasks to the #joinService. This method will not return until all subqueries for the chunk have been evaluated.

The #joinService will decide for each task whether to allocate a new thread, to run it on an existing thread, to leave it on the work queue for a while, or to execute it in the caller's thread (the latter is selected via a rejected exection handler option).

Note: This requires that we clone the IBindingSet so that each parallel task will have its own state.

Note: The tasks should be executed (more or less) in order so as to maximum the effect of ordered reads on the next join dimension.

Parameters:
orderIndex - The current index in the evaluation order.
chunk - A chunk of elements from the left-hand side of the join.
bindingSet - The bindings from the prior joins (if any).
Throws:
InterruptedException
TODO:
Develop a pattern for feeding the ExecutorService such that it maintains at least N threads from a producer and no more than M threads overall. We need that pattern here for better sustained throughput and in the map/reduce system as well (it exists there but needs to be refactored and aligned with the simpler thread pool exposed by the IIndexManager). The pattern is similar to one in the RDF concurrent data loader and in the map/reduce package. It should be encapsulated by the ExecutionHelper along with the submitOne() and submitSequence() methods.

emitSolutions

protected void emitSolutions(int orderIndex,
                             Object[] chunk,
                             IBindingSet bindingSet,
                             IBuffer<ISolution> buffer)
                      throws InterruptedException
Consider each element in the chunk in turn. If the element satisfies the JOIN criteria, then emit an ISolution for the IRule.

Parameters:
orderIndex - The index in the evaluation order.
chunk - A chunk of elements from the right-most join dimension.
bindingSet - The bindings from the prior joins.
buffer - A buffer onto which the solutions will be written - this object is NOT thread-safe.
Throws:
InterruptedException


Copyright © 2006-2009 SYSTAP, LLC. All Rights Reserved.