com.bigdata.relation.rule.eval.pipeline
Class JoinMasterTask

java.lang.Object
  extended by com.bigdata.relation.rule.eval.pipeline.JoinMasterTask
All Implemented Interfaces:
IStepTask, IJoinMaster, Remote, Callable<RuleStats>
Direct Known Subclasses:
DistributedJoinMasterTask, LocalJoinMasterTask

public abstract class JoinMasterTask
extends Object
implements IStepTask, IJoinMaster

Master providing efficient distributed evaluation of IRules. For query, this task should be run by the client that wishes to materialize the query results. For mutation, this task may be run by any client or service since the data does not flow through the master for mutation.

For the first join dimension, the JoinMasterTask creates a JoinTask per index partition that will be spanned by the IAccessPath for the first IPredicate in the evaluation order and feeds each JoinTask(s) in the first join dimension with an IAsynchronousIterator reading on a buffer containing single empty IBindingSet.

Each JoinTask consumes IBindingSet chunks read from the previous join dimension. For each IBindingSet chunk read, a new IAccessPath is obtained. Elements are then read from than IAccessPath in chunks. Given the IBindingSet used to obtain the IAccessPath, a new IBindingSet is created for each element in each chunk read from the IAccessPath. If the new IBindingSet satisfies the constraint(s) on the IRule then it will be output to the next join dimension. An IBindingSet is output by placing it onto the UnsynchronizedArrayBuffer for the join dimension. Periodically that UnsynchronizedArrayBuffer will overflow, and a chunk of IBindingSets will be placed onto the IBlockingBuffer from which the next join dimension will read its IBindingSet chunks.

The last join dimension is slightly different. Its UnsynchronizedArrayBuffer writes onto the IJoinNexus.newQueryBuffer(), IJoinNexus.newInsertBuffer(com.bigdata.relation.IMutableRelation), or IJoinNexus.newDeleteBuffer(com.bigdata.relation.IMutableRelation) depending on the ActionEnum.

For each JoinTask, once its source iterator(s) have been exhausted and the IAccessPath reading from the last source IBindingSet has been exhausted, then the JoinTask for that join dimension is done and it will flush its UnsynchronizedArrayBuffer and close its output IBuffer and wait for the downstream JoinTasks to report their RuleStats. Those RuleStats are aggregated and passed back to its caller in turn.

Each join dimension is single-threaded. Coordination of resources is achieved using the output buffer for each join dimension. This allows a source join dimension to read ahead and forces the sink join dimension to process chunks of IBindingSets at a time.

The JoinMasterTask is responsible for the JoinTasks for the first join dimension. Each JoinTask is responsible for the downstream JoinTasks. If the JoinMasterTask is interrupted or cancelled, then it interrupts or cancels the JoinTasks for the first join dimension. If JoinTask is interrupted or cancelled then it must cancel any JoinTasks which it has created for the next join dimension.

Choosing the view

Rules SHOULD be evaluated against a read-historical state.

This is a hard requirement when computing the fix point closure of a rule (set). Each round of closure MUST be evaluated against the commit time reported by IBigdataFederation.getLastCommitTime() and is applied for all rules in that round. This allows unisolated tasks to write on the generated solutions onto the indices. This is a strong requirement since the JoinTasks will otherwise wind up holding an exclusive lock on the ITx.UNISOLATED index partitions, which would cause a deadlock when attempting to write the generated solutions onto the index partitions. At the start of the next round of closure, simply update the read-historical timestamp to the then current value of IBigdataFederation.getLastCommitTime().

Queries that use ITx.READ_COMMITTED or ITx.UNISOLATED will not generate deadlocks, but they are subject to abort from the split/join/move of index partition(s) during query evaluation. This problem WILL NOT arise if you read instead from the IBigdataFederation.getLastCommitTime().

Key-range partitioned joins

In order to scale-out efficiently, the JoinMasterTask must distribute the JoinTasks such that they run inside of the ConcurrencyManager on the various DataServices on which the index partitions reside from which the IAccessPaths must read. This allows the IAccessPath to read on the local index object and reduces the message traffic to pulling chunks of IBindingSets from the source JoinTasks.

For the JoinMasterTask and for each JoinTask, the fan out of JoinTasks is determined by the #of index partitions that are spanned by the IAccessPaths required to evaluate the IBindingSets for the next join dimension. The IAccessPath will not be used by the source join dimension to read on the index, merely to discover the index partitions to which the generating IBindingSets must be assigned. The index partition spanned for a given IBindingSet is determined by generating an as bound IPredicate for the next join dimension, instantiating the IAccessPath on the source join dimension that will be used by the target join dimension, and then using a locator scan for the fromKey and toKey for that IAccessPath. In the case where the IPredicate is fully bound, the IAccessPath will be restricted to a single index partition, but we still need to know which index partition.

The IBindingSet is written on an UnsynchronizedArrayBuffer corresponding to the target index partition. The UnsynchronizedArrayBuffer (together with the output IBuffer for the IBindingSet chunks and the Future for the JoinTask for that index partition) for the target index partition exists in an LRU. If it falls off of the end of the LRU, then the UnsynchronizedArrayBuffer is flushed and the output IBuffer is closed. The downstream JoinTask will eventually exhaust the corresponding IAsynchronousIterator source.

When the source join dimension and the sink join dimension have the same IKeyOrder there will be an orderly progression through the indices and each sink JoinTask can be safely closed once a JoinTask is created on the DataService for the next index partition. However, the IKeyOrders offer differ, which can lead to more scattered assignment of output IBindingSets to index partitions. The LRU helps to manage this fan out.

Fan out means that there may be N>1 JoinTasks for each join dimension. For this reason, a QUERY ISlice must be applied by the client reading on the IAsynchronousIterator returned by the JoinMasterTask.

Fan out also implies a requirement for fan-in in order to reduce the scatter of JoinTasks. Fan-in must aggregate the source JoinTask such that they target the same sink JoinTask instance for the same rule execution instance, the same orderIndex (hence the same IPredicate), and the same index partition. This means that a factory mechanism must be used to either create a new JoinTask or return the existing JoinTask on the DataService based on those identifying properties. This must be done in a thread-safe manner, but contention should be restricted to the case where the identifying properties are the same. The factory must be given the IAsynchronousIterator reading IBindingSet chunks from the source join dimension and the JoinTask must not close (unless interrupted or cancelled) until all of its source IAsynchronousIterators have been exhausted.

Version:
$Id: JoinMasterTask.java 3509 2010-09-05 18:16:01Z thompsonbry $
Author:
Bryan Thompson
TODO:
fold these comments into the javadoc.

The goal is to have no more than one JoinTask per index partition per rule execution. If the #of index partitions is very large then we may have to use an LRU cache in an attempt to release JoinTasks that are not being written on by a given source JoinTask.

There is a strong requirement for closure to get back the mutationCount. That would require us to keep alive a source JoinTask until all downstream JoinTasks complete., The pipeline join needs to be modified to force stable evaluation (single-threaded) and to observed the ISlice constraints and IQueryOptions.isStable() constraint.

In order to avoid a hot spot on RuleStats.solutionCount, that field should only be updated as chunks of solutions are produced. When evaluating a SLICE, we will constrain the evaluation to be single threaded so once again there will not be a hot spot on that field.

All of this should be done when we refactor the pipeline join to use a fixed thread pool for join processing. In the meantime, query hints can be used to specify the nested subquery join if you need stable evaluation. In the long run, SPARQL aware query caching is the right way to handle stable query and SLICE for scale-out since a stable evaluation order can impose far too much constraint on parallelism otherwise., We are not seeing the totals when a SLICE is used. I believe that the test harness is simply exiting once it gets its N results and the daemon threads for the workers are not keeping the JVM alive. Ideally either the JoinMasterTask the last JoinTask would notice that the solution buffer was closed and would use that information to halt the ongoing JoinTasks.


Field Summary
protected static boolean DEBUG
          True iff the log level is DEBUG or less.
protected  IJoinNexus joinNexus
           
protected  IJoinNexusFactory joinNexusFactory
           
protected  JoinStats[] joinStats
          Statistics on JoinTask behavior for each IPredicate in the tail of the rule.
protected static org.apache.log4j.Logger log
           
protected  UUID masterUUID
          The unique identifier for this JoinMasterTask instance.
protected  int[] order
          The evaluation order.
protected  IRule rule
           
protected  IRuleState ruleState
           
protected  RuleStats ruleStats
           
protected  IBuffer<ISolution[]> solutionBuffer
          From the ctor.
protected  int tailCount
           
 
Constructor Summary
protected JoinMasterTask(IRule rule, IJoinNexus joinNexus, IBuffer<ISolution[]> solutionBuffer)
           
 
Method Summary
protected  void awaitAll(List<Future<Void>> futures, long timeout, TimeUnit unit)
          Make sure that each JoinTask completed successfully.
 RuleStats call()
          Evaluate the rule.
protected  void combineJoinStats()
          Aggregates statistics each JoinTask onto ruleStats.
 IBuffer<ISolution[]> getSolutionBuffer()
          Returns the buffer specified to the ctor (overridden for distributed joins).
 UUID getUUID()
          Return a unique identifier for the JoinMasterTask instance.
protected  ThickAsynchronousIterator<IBindingSet[]> newBindingSetIterator(IBindingSet bindingSet)
          Return an IAsynchronousIterator that will read a single IBindingSet.
 void report(JoinStats joinStats)
          Aggregates the statistics for some join dimension.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

log

protected static final org.apache.log4j.Logger log

DEBUG

protected static final boolean DEBUG
True iff the log level is DEBUG or less.


rule

protected final IRule rule

joinNexus

protected final IJoinNexus joinNexus

joinNexusFactory

protected final IJoinNexusFactory joinNexusFactory

solutionBuffer

protected final IBuffer<ISolution[]> solutionBuffer
From the ctor. This will be the IBuffer on which the last join dimension writes the computed ISolutions.

Note: LocalJoinMasterTask always passes this along to the last LocalJoinTask.

Note: For a DistributedJoinMasterTask running a Query this gets proxied and the DistributedJoinTasks all write on the proxy. However, the DistributedJoinMasterTask DOES NOT proxy this for mutation in order to keep all data from flowing through the master.


tailCount

protected final int tailCount

ruleState

protected final IRuleState ruleState

order

protected final int[] order
The evaluation order.


ruleStats

protected final RuleStats ruleStats

joinStats

protected final JoinStats[] joinStats
Statistics on JoinTask behavior for each IPredicate in the tail of the rule. These statistics are reported by each JoinTask and then aggregated for each join dimension.

Note: The index into this array is the evaluation order of the predicate.


masterUUID

protected final UUID masterUUID
The unique identifier for this JoinMasterTask instance.

Constructor Detail

JoinMasterTask

protected JoinMasterTask(IRule rule,
                         IJoinNexus joinNexus,
                         IBuffer<ISolution[]> solutionBuffer)
Parameters:
rule - The rule to be executed.
joinNexus - The IJoinNexus.
solutionBuffer - The ISolution buffer.
Method Detail

getUUID

public final UUID getUUID()
Description copied from interface: IJoinMaster
Return a unique identifier for the JoinMasterTask instance. This is used to concentrate all DistributedJoinTask that target the same tail predicate and index partition onto the same DistributedJoinTask sink.

Specified by:
getUUID in interface IJoinMaster
Returns:
The unique identifier.

call

public RuleStats call()
               throws Exception
Evaluate the rule.

Specified by:
call in interface IStepTask
Specified by:
call in interface Callable<RuleStats>
Throws:
Exception

awaitAll

protected void awaitAll(List<Future<Void>> futures,
                        long timeout,
                        TimeUnit unit)
                 throws ExecutionExceptions,
                        InterruptedException,
                        TimeoutException
Make sure that each JoinTask completed successfully.

Note: This waits until all JoinTasks complete, regardless of their outcome (or until the timeout expires), so that all JoinTask have the opportunity to report their JoinStats to the JoinMasterTask.

Parameters:
futures - The Future for each JoinTask that was created by the JoinMasterTask.
timeout - The timeout for awaiting those futures.
unit - The unit for that timeout.
Throws:
ExecutionExceptions - if one or more JoinTasks fail.
InterruptedException - if the JoinMasterTask itself was interrupted while awaiting its JoinTasks.
TimeoutException - if the timeout expires first.

newBindingSetIterator

protected ThickAsynchronousIterator<IBindingSet[]> newBindingSetIterator(IBindingSet bindingSet)
Return an IAsynchronousIterator that will read a single IBindingSet.

Parameters:
bindingSet - the binding set.

combineJoinStats

protected void combineJoinStats()
Aggregates statistics each JoinTask onto ruleStats. There are N JoinTasks per IPredicate in the tail of the rule, where N is the #of index partitions on which we must read to evaluate the IRule for a given IPredicate in the tail (N is per IPredicate, not the same for each IPredicate).


report

public void report(JoinStats joinStats)
Aggregates the statistics for some join dimension.

Specified by:
report in interface IJoinMaster
Parameters:
joinStats - Statistics for an index partition of some join dimension.

getSolutionBuffer

public IBuffer<ISolution[]> getSolutionBuffer()
                                       throws IOException
Returns the buffer specified to the ctor (overridden for distributed joins).

Specified by:
getSolutionBuffer in interface IJoinMaster
Throws:
IOException


Copyright © 2006-2011 SYSTAP, LLC. All Rights Reserved.