com.bigdata.relation.rule.eval
Class AbstractJoinNexus

java.lang.Object
  extended by com.bigdata.relation.rule.eval.AbstractJoinNexus
All Implemented Interfaces:
IJoinNexus
Direct Known Subclasses:
RDFJoinNexus

public abstract class AbstractJoinNexus
extends Object
implements IJoinNexus

Base implementation for IJoinNexus

Version:
$Id: AbstractJoinNexus.java 6130 2012-03-15 10:31:25Z thompsonbry $
Author:
Bryan Thompson

Field Summary
protected  int chunkCapacity
           
protected  int chunkOfChunksCapacity
           
protected  IElementFilter<?> filter
           
protected  IIndexManager indexManager
           
protected  IEvaluationPlanFactory planFactory
          The factory for rule evaluation plans.
protected  long readTimestamp
           
protected  IResourceLocator<?> resourceLocator
          Note: cached.
protected  int solutionFlags
           
 
Fields inherited from interface com.bigdata.relation.rule.eval.IJoinNexus
ALL, BINDINGS, ELEMENT, RULE
 
Constructor Summary
protected AbstractJoinNexus(IJoinNexusFactory joinNexusFactory, IIndexManager indexManager)
           
 
Method Summary
 boolean bind(IPredicate<?> pred, IConstraint[] constraints, Object e, IBindingSet bindings)
          Binds variables from a visited element.
 boolean bind(IRule rule, int index, Object e, IBindingSet bindings)
          Binds variables from a visited element.
 boolean forceSerialExecution()
          When true, rule level parallelism is disabled and the ISolution buffers are flushed after after every IStep.
 ActionEnum getAction()
          The kind of operation that is being executed (Query, Insert, or Delete).
 IStreamSerializer<IBindingSet[]> getBindingSetSerializer()
          FIXME Custom serialization for binding sets, especially since there tends to be a lot of redundancy in the data arising from how bindings are propagated during JOINs.
 int getChunkCapacity()
          The #of elements in a chunk for query or mutation.
 int getChunkOfChunksCapacity()
          The #of chunks that can be held by an IBuffer that is the target or one or more UnsynchronizedArrayBuffers.
 int getFullyBufferedReadThreshold()
          The #of elements that will be materialized in a fully buffered read by an IAccessPath.
 IRelation getHeadRelationView(IPredicate pred)
          The head relation is what we write on for mutation operations and is also responsible for minting new elements from computed ISolutions.
 IIndexManager getIndexManager()
          Used to locate indices, relations and relation containers.
 IJoinNexusFactory getJoinNexusFactory()
          The factory object is used to materialize appropriate IJoinNexus instances when the rule execution crosses an RMI boundary.
 int getMaxParallelSubqueries()
          The maximum #of subqueries for the first join dimension that will be issued in parallel.
 IEvaluationPlanFactory getPlanFactory()
          Return the factory for IEvaluationPlans.
 String getProperty(String name, String defaultValue)
          Resolve the property value using the IIndexManager, the namespace of the resource, and the Properties instance to be tested as hidden parameters.
<T> T
getProperty(String name, String defaultValue, IValidator<T> validator)
          Resolves, parses, and validates the property value.
 IRangeCountFactory getRangeCountFactory()
          The factory object for range counts used by IEvaluationPlans.
 long getReadTimestamp()
          Equivalent to IJoinNexusFactory.getReadTimestamp().
 IResourceLocator getRelationLocator()
           
 IRuleStatisticsFactory getRuleStatisticsFactory()
          The factory for rule statistics objects.
 IRuleTaskFactory getRuleTaskFactory(boolean parallel, IRule rule)
          Return the effective IRuleTaskFactory for the rule.
 IElementFilter<ISolution> getSolutionFilter()
          Return the IElementFilter that will be used to reject solutions based on the bindings for the head of the rule -or- null if no filter will be imposed.
 IStreamSerializer<ISolution[]> getSolutionSerializer()
          FIXME Custom serialization for solution sets, especially since there tends to be a lot of redundancy in the data arising from how bindings are propagated during JOINs.
 IAccessPath getTailAccessPath(IRelation relation, IPredicate predicate)
          Obtain an access path reading from relation for the specified predicate (from the tail of some rule).
 IRelation getTailRelationView(IPredicate pred)
          Locate and return the view of the relation(s) identified by the IPredicate.
 long getWriteTimestamp()
          Equivalent to IJoinNexusFactory.getWriteTimestamp().
protected  boolean isEmptyProgram(IStep step)
          Return true iff the step is an empty IProgram.
 Iterator<PartitionLocator> locatorScan(AbstractScaleOutFederation<?> fed, IPredicate<?> predicate)
          Return an iterator visiting the PartitionLocator for the index partitions from which an IAccessPath must read in order to materialize all elements which would be visited for that predicate.
 IBindingSet newBindingSet(IRule rule)
          Factory for IBindingSet implementations.
 IBuffer<ISolution[]> newDeleteBuffer(IMutableRelation relation)
          Return a thread-safe buffer onto which chunks of computed ISolutions will be written.
 IBuffer<ISolution[]> newInsertBuffer(IMutableRelation relation)
          Return a thread-safe buffer onto which chunks of computed ISolutions will be written.
 IBlockingBuffer<ISolution[]> newQueryBuffer()
          Note: ISolution (not relation elements) will be written on the buffer concurrently by different rules so there is no natural order for the elements in the buffer.
 ISolution newSolution(IRule rule, IBindingSet bindingSet)
          Create a new ISolution.
protected abstract  ISortKeyBuilder<?> newSortKeyBuilder(IPredicate<?> head)
          Return the ISortKeyBuilder used to impose DISTINCT on the solutions generated by a query.
 IBuffer<ISolution> newUnsynchronizedBuffer(IBuffer<ISolution[]> targetBuffer, int chunkCapacity)
          Return a buffer suitable for a single-threaded writer that flushes onto the specified targetBuffer.
protected  Object runDistributedProgram(IBigdataFederation<?> fed, ActionEnum action, IStep step)
          Runs a distributed IProgram (key-range partitioned indices, RMI, and multi-machine).
protected  Object runLocalProgram(ActionEnum action, IStep step)
          This variant handles both local indices on a TemporaryStore or Journal WITHOUT concurrency controls (fast).
 long runMutation(IStep step)
          Run as mutation operation (it will write any solutions onto the relations named in the head of the various IRules).
protected  Object runProgram(ActionEnum action, IStep step)
          Core impl.
 IChunkedOrderedIterator<ISolution> runQuery(IStep step)
          Run as a query.
 int solutionFlags()
          The flags that effect the behavior of IJoinNexus.newSolution(IRule, IBindingSet).
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 
Methods inherited from interface com.bigdata.relation.rule.eval.IJoinNexus
fakeBinding, newBindingSetSortKeyBuilder
 

Field Detail

indexManager

protected final IIndexManager indexManager

resourceLocator

protected final IResourceLocator<?> resourceLocator
Note: cached.


readTimestamp

protected final long readTimestamp

chunkCapacity

protected final int chunkCapacity

chunkOfChunksCapacity

protected final int chunkOfChunksCapacity

solutionFlags

protected final int solutionFlags

filter

protected final IElementFilter<?> filter

planFactory

protected final IEvaluationPlanFactory planFactory
The factory for rule evaluation plans.

Constructor Detail

AbstractJoinNexus

protected AbstractJoinNexus(IJoinNexusFactory joinNexusFactory,
                            IIndexManager indexManager)
Parameters:
joinNexusFactory - The object used to create this instance and which can be used to create other instances as necessary for distributed rule execution.
indexManager - The object used to resolve indices, relations, etc.
Method Detail

getChunkOfChunksCapacity

public final int getChunkOfChunksCapacity()
Description copied from interface: IJoinNexus
The #of chunks that can be held by an IBuffer that is the target or one or more UnsynchronizedArrayBuffers. This is generally a small value on the order of the #of parallel producers that might be writing on the IBuffer since the capacity of the UnsynchronizedArrayBuffers is already quite large (10k or better elements, defining a single "chunk" from a single producer).

Specified by:
getChunkOfChunksCapacity in interface IJoinNexus
See Also:
IJoinNexus.getMaxParallelSubqueries(), IJoinNexus.getChunkCapacity()

getChunkCapacity

public final int getChunkCapacity()
Description copied from interface: IJoinNexus
The #of elements in a chunk for query or mutation. This is normally a relatively large value on the order of 10,000 or better.

Specified by:
getChunkCapacity in interface IJoinNexus

getFullyBufferedReadThreshold

public final int getFullyBufferedReadThreshold()
Description copied from interface: IJoinNexus
The #of elements that will be materialized in a fully buffered read by an IAccessPath. When this threshold is exceeded the IAccessPath will use an IAsynchronousIterator instead. This value should on the order of IJoinNexus.getChunkCapacity().

Specified by:
getFullyBufferedReadThreshold in interface IJoinNexus
See Also:
IAccessPath.iterator(int, int), AbstractResource.Options#FULLY_BUFFERED_READ_THRESHOLD

getProperty

public final String getProperty(String name,
                                String defaultValue)
Description copied from interface: IJoinNexus
Resolve the property value using the IIndexManager, the namespace of the resource, and the Properties instance to be tested as hidden parameters.

Specified by:
getProperty in interface IJoinNexus
Parameters:
name - The property name.
defaultValue - The default.
Returns:
The resolved property value.
See Also:
Configuration

getProperty

public final <T> T getProperty(String name,
                               String defaultValue,
                               IValidator<T> validator)
Description copied from interface: IJoinNexus
Resolves, parses, and validates the property value.

Specified by:
getProperty in interface IJoinNexus
Parameters:
name - The property name.
defaultValue - The default value.
Returns:

getSolutionFilter

public IElementFilter<ISolution> getSolutionFilter()
Description copied from interface: IJoinNexus
Return the IElementFilter that will be used to reject solutions based on the bindings for the head of the rule -or- null if no filter will be imposed. This may be used for query or mutation.

Specified by:
getSolutionFilter in interface IJoinNexus
Returns:
The optional filter.

getRuleStatisticsFactory

public IRuleStatisticsFactory getRuleStatisticsFactory()
Description copied from interface: IJoinNexus
The factory for rule statistics objects.

Specified by:
getRuleStatisticsFactory in interface IJoinNexus

getJoinNexusFactory

public IJoinNexusFactory getJoinNexusFactory()
Description copied from interface: IJoinNexus
The factory object is used to materialize appropriate IJoinNexus instances when the rule execution crosses an RMI boundary.

Specified by:
getJoinNexusFactory in interface IJoinNexus

getRangeCountFactory

public IRangeCountFactory getRangeCountFactory()
Description copied from interface: IJoinNexus
The factory object for range counts used by IEvaluationPlans.

Specified by:
getRangeCountFactory in interface IJoinNexus

forceSerialExecution

public final boolean forceSerialExecution()
Description copied from interface: IJoinNexus
When true, rule level parallelism is disabled and the ISolution buffers are flushed after after every IStep. This can be enabled if you are exploring apparent concurrency problems with the rules. It should normally be false for better performance.

Specified by:
forceSerialExecution in interface IJoinNexus

getMaxParallelSubqueries

public final int getMaxParallelSubqueries()
Description copied from interface: IJoinNexus
The maximum #of subqueries for the first join dimension that will be issued in parallel. Use ZERO(0) to avoid submitting tasks to the ExecutorService entirely and ONE (1) to submit a single task at a time to the ExecutorService.

Specified by:
getMaxParallelSubqueries in interface IJoinNexus

getAction

public final ActionEnum getAction()
Description copied from interface: IJoinNexus
The kind of operation that is being executed (Query, Insert, or Delete).

Specified by:
getAction in interface IJoinNexus

getWriteTimestamp

public final long getWriteTimestamp()
Description copied from interface: IJoinNexus
Equivalent to IJoinNexusFactory.getWriteTimestamp().

Specified by:
getWriteTimestamp in interface IJoinNexus

getReadTimestamp

public final long getReadTimestamp()
Description copied from interface: IJoinNexus
Equivalent to IJoinNexusFactory.getReadTimestamp().

Specified by:
getReadTimestamp in interface IJoinNexus

getHeadRelationView

public IRelation getHeadRelationView(IPredicate pred)
The head relation is what we write on for mutation operations and is also responsible for minting new elements from computed ISolutions. This method depends solely on the name of the head relation and the timestamp of interest for the view.

Specified by:
getHeadRelationView in interface IJoinNexus
Parameters:
pred - The IPredicate, which MUST be the head of some IRule.
Returns:
The IRelation, which will never be a fused view and which will accept writes iff the rules are being executed as a mutation operation.

getTailRelationView

public IRelation getTailRelationView(IPredicate pred)
Description copied from interface: IJoinNexus
Locate and return the view of the relation(s) identified by the IPredicate.

Note: This method is responsible for returning a fused view when more than one relation name was specified for the IPredicate. It SHOULD be used whenever the IRelation is selected based on a predicate in the tail of an IRule and could therefore be a fused view of more than one relation instance. (The head of the IRule must be a simple IRelation and not a view.)

Note: The implementation should choose the read timestamp for each relation in the view using #getReadTimestamp(String).

Specified by:
getTailRelationView in interface IJoinNexus
Parameters:
pred - The IPredicate, which MUST be a tail from some IRule.
Returns:
The IRelation, which might be a RelationFusedView.

getTailAccessPath

public IAccessPath getTailAccessPath(IRelation relation,
                                     IPredicate predicate)
Description copied from interface: IJoinNexus
Obtain an access path reading from relation for the specified predicate (from the tail of some rule).

Note that passing in the IRelation is important since it otherwise must be discovered using the IResourceLocator. By requiring the caller to resolve it before hand and pass it into this method the contention and demand on the IResourceLocator cache is reduced.

Specified by:
getTailAccessPath in interface IJoinNexus
Parameters:
relation - The relation.
predicate - The predicate. When IPredicate.getPartitionId() is set, the returned IAccessPath MUST read on the identified local index partition (directly, not via RMI).
Returns:
The access path.

locatorScan

public Iterator<PartitionLocator> locatorScan(AbstractScaleOutFederation<?> fed,
                                              IPredicate<?> predicate)
Description copied from interface: IJoinNexus
Return an iterator visiting the PartitionLocator for the index partitions from which an IAccessPath must read in order to materialize all elements which would be visited for that predicate.

Note: You can use an IDataServiceCallable to obtain the reference of the IDataService and pass that into your AbstractTask in order to have the federation reference available when running under the ConcurrencyManager.

Specified by:
locatorScan in interface IJoinNexus
Parameters:
fed - The federation, which is required in order to access the IMetadataIndex for a scale-out index.
predicate - The predicate, with whatever bindings already applied.
Returns:
The iterator.

getIndexManager

public final IIndexManager getIndexManager()
Description copied from interface: IJoinNexus
Used to locate indices, relations and relation containers.

Specified by:
getIndexManager in interface IJoinNexus

bind

public final boolean bind(IRule rule,
                          int index,
                          Object e,
                          IBindingSet bindings)
Description copied from interface: IJoinNexus
Binds variables from a visited element.

Note: The bindings are propagated before the constraints are verified so this method will have a side-effect on the bindings even if the constraints were not satisfied. Therefore you should clone the bindings before calling this method.

Specified by:
bind in interface IJoinNexus
Parameters:
rule - The rule.
index - The index of the IPredicate in the body of the Rule.
e - An element materialized by the IAccessPath for that IPredicate.
Returns:
true unless the new bindings would violate any of the IConstraints declared for the Rule).

bind

public final boolean bind(IPredicate<?> pred,
                          IConstraint[] constraints,
                          Object e,
                          IBindingSet bindings)
Description copied from interface: IJoinNexus
Binds variables from a visited element.

Note: The bindings are propagated before the constraints are verified so this method will have a side-effect on the bindings even if the constraints were not satisfied. Therefore you should clone the bindings before calling this method.

Specified by:
bind in interface IJoinNexus
Parameters:
pred - The IPredicate from which the element was read.
constraints - An array of constraints which must be satisfied (optional).
e - An element materialized by the IAccessPath for that IPredicate.
Returns:
true unless the new bindings would violate any of the optional IConstraint.

newSolution

public final ISolution newSolution(IRule rule,
                                   IBindingSet bindingSet)
Description copied from interface: IJoinNexus
Create a new ISolution. The behavior of this method generally depends on bit flags specified when the IJoinNexus was created.

Note: For many purposes, it is only the computed IJoinNexus.ELEMENTs that are of interest. For high-level query, you will generally specify only the IJoinNexus.BINDINGS. The IJoinNexus.BINDINGS are also useful for some truth maintenance applications. The IJoinNexus.RULE is generally only of interest for inspecting the behavior of some rule set.

Specified by:
newSolution in interface IJoinNexus
Parameters:
rule - The rule.
bindingSet - The bindings (the implementation MUST clone the bindings if they will be saved with the ISolution).
Returns:
The new ISolution.
See Also:
IJoinNexus.ELEMENT, IJoinNexus.BINDINGS, IJoinNexus.RULE, IJoinNexus.solutionFlags(), Solution

solutionFlags

public final int solutionFlags()
Description copied from interface: IJoinNexus
The flags that effect the behavior of IJoinNexus.newSolution(IRule, IBindingSet).

Specified by:
solutionFlags in interface IJoinNexus

getSolutionSerializer

public IStreamSerializer<ISolution[]> getSolutionSerializer()
FIXME Custom serialization for solution sets, especially since there tends to be a lot of redundancy in the data arising from how bindings are propagated during JOINs.

Specified by:
getSolutionSerializer in interface IJoinNexus
See Also:
(needs to be written).
TODO:
We can sort the ISolutions much like we already do for DISTINCT or intend to do for SORT and use the equivalent of leading key compression to reduce IO costs (or when they are SORTed we could leverage that to produce a more compact serialization).

getBindingSetSerializer

public IStreamSerializer<IBindingSet[]> getBindingSetSerializer()
FIXME Custom serialization for binding sets, especially since there tends to be a lot of redundancy in the data arising from how bindings are propagated during JOINs.

Specified by:
getBindingSetSerializer in interface IJoinNexus
See Also:
SPOBindingSetSerializer, which has not been finished.
TODO:
We can sort the ISolutions much like we already do for DISTINCT or intend to do for SORT and use the equivalent of leading key compression to reduce IO costs (or when they are SORTed we could leverage that to produce a more compact serialization).

newBindingSet

public final IBindingSet newBindingSet(IRule rule)
Description copied from interface: IJoinNexus
Factory for IBindingSet implementations.

Note: The factory MUST apply any bound constants for the IRule before returning the IBindingSet.

Specified by:
newBindingSet in interface IJoinNexus
Parameters:
rule - The rule whose bindings will be stored in the binding set.
Returns:
A new binding set suitable for that rule.

getRuleTaskFactory

public final IRuleTaskFactory getRuleTaskFactory(boolean parallel,
                                                 IRule rule)
Description copied from interface: IJoinNexus
Return the effective IRuleTaskFactory for the rule. When the rule is a step of a sequential program writing on one or more IMutableRelations, then the returned IStepTask must automatically flush the buffer after the rule executes in order to ensure that the state of the IMutableRelation(s) are updated before the next IRule is executed.

Specified by:
getRuleTaskFactory in interface IJoinNexus
Parameters:
parallel - true unless the rule is a step is a sequential IProgram. Note that a sequential step MUST flush its buffer since steps are run in sequence precisely because they have a dependency!
rule - A rule that is a step in some program. If the program is just a rule then the value of parallel does not matter. The buffer will is cleared when it flushed so a re-flushed is always a NOP.
Returns:
The IStepTask to execute for that rule.
See Also:
RunRuleAndFlushBufferTaskFactory, RunRuleAndFlushBufferTask

getPlanFactory

public final IEvaluationPlanFactory getPlanFactory()
Description copied from interface: IJoinNexus
Return the factory for IEvaluationPlans.

Specified by:
getPlanFactory in interface IJoinNexus
Returns:
The factory.

getRelationLocator

public final IResourceLocator getRelationLocator()

newUnsynchronizedBuffer

public final IBuffer<ISolution> newUnsynchronizedBuffer(IBuffer<ISolution[]> targetBuffer,
                                                        int chunkCapacity)
Description copied from interface: IJoinNexus
Return a buffer suitable for a single-threaded writer that flushes onto the specified targetBuffer.

The returned buffer MUST apply the optional filter value returned by IJoinNexus.getSolutionFilter() in order to keep individual ISolutions out of the buffer. Filtering is done at this level since the targetBuffer contains chunks of solutions.

Specified by:
newUnsynchronizedBuffer in interface IJoinNexus
Parameters:
targetBuffer - A thread-safe buffer for chunks of ISolutions that was allocated with IJoinNexus.newQueryBuffer(), IJoinNexus.newInsertBuffer(IMutableRelation), or IJoinNexus.newDeleteBuffer(IMutableRelation).
chunkCapacity - The capacity of the new buffer. This should be maximum chunk size that will be produced or IJoinNexus.getChunkCapacity() if you do not have better information.
Returns:
A non-thread-safe buffer.

newQueryBuffer

public final IBlockingBuffer<ISolution[]> newQueryBuffer()
Note: ISolution (not relation elements) will be written on the buffer concurrently by different rules so there is no natural order for the elements in the buffer.

Specified by:
newQueryBuffer in interface IJoinNexus

newInsertBuffer

public IBuffer<ISolution[]> newInsertBuffer(IMutableRelation relation)
Return a thread-safe buffer onto which chunks of computed ISolutions will be written. When the buffer is flushed the chunked ISolutions will be inserted into the IMutableRelation.

Note: getSolutionFilter() is applied by newUnsynchronizedBuffer(IBuffer, int) and NOT by the buffer returned by this method.

Specified by:
newInsertBuffer in interface IJoinNexus
Parameters:
relation - The relation.
Returns:
The buffer.

newDeleteBuffer

public IBuffer<ISolution[]> newDeleteBuffer(IMutableRelation relation)
Return a thread-safe buffer onto which chunks of computed ISolutions will be written. When the buffer is flushed the chunks of ISolutions will be deleted from the IMutableRelation.

Note: getSolutionFilter() is applied by newUnsynchronizedBuffer(IBuffer, int) and NOT by the buffer returned by this method.

Specified by:
newDeleteBuffer in interface IJoinNexus
Parameters:
relation - The relation.
Returns:
The buffer.

newSortKeyBuilder

protected abstract ISortKeyBuilder<?> newSortKeyBuilder(IPredicate<?> head)
Return the ISortKeyBuilder used to impose DISTINCT on the solutions generated by a query.

Parameters:
head - The head of the rule.
Returns:
The ISortKeyBuilder.
TODO:
This should be based on bop annotations and a hash table for distinct unless it is very high volume and you can wait for the first result, in which case a SORT should be selected. For high volume with low latency to the first result, use a persistent hash table on a temporary store.

runQuery

public IChunkedOrderedIterator<ISolution> runQuery(IStep step)
                                            throws Exception
Description copied from interface: IJoinNexus
Run as a query.

Specified by:
runQuery in interface IJoinNexus
Parameters:
step - The IRule or IProgram.
Returns:
An iterator from which you can read the solutions.
Throws:
IllegalStateException - unless this is an ActionEnum.Query.
IllegalArgumentException - if either argument is null.
Exception

runMutation

public final long runMutation(IStep step)
                       throws Exception
Description copied from interface: IJoinNexus
Run as mutation operation (it will write any solutions onto the relations named in the head of the various IRules).

Specified by:
runMutation in interface IJoinNexus
Parameters:
step - The IRule or IProgram.
Returns:
The mutation count (#of distinct elements modified in the relation(s)).
Throws:
IllegalArgumentException - unless ActionEnum.isMutation() is true.
Exception

isEmptyProgram

protected final boolean isEmptyProgram(IStep step)
Return true iff the step is an empty IProgram.

Parameters:
step - The step.

runProgram

protected final Object runProgram(ActionEnum action,
                                  IStep step)
                           throws Exception
Core impl. This handles the logic required to execute the program either on a target DataService (highly efficient) or within the client using the IClientIndex to submit operations to the appropriate DataService(s) (not very efficient, even w/o RMI).

Returns:
Either an IChunkedOrderedIterator (query) or Long (mutation count).
Throws:
Exception

runLocalProgram

protected final Object runLocalProgram(ActionEnum action,
                                       IStep step)
                                throws Exception
This variant handles both local indices on a TemporaryStore or Journal WITHOUT concurrency controls (fast).

Throws:
Exception

runDistributedProgram

protected final Object runDistributedProgram(IBigdataFederation<?> fed,
                                             ActionEnum action,
                                             IStep step)
                                      throws Exception
Runs a distributed IProgram (key-range partitioned indices, RMI, and multi-machine).

Throws:
Exception


Copyright © 2006-2012 SYSTAP, LLC. All Rights Reserved.