com.bigdata.relation.rule.eval
Class ProgramTask

java.lang.Object
  extended by com.bigdata.service.FederationCallable<T>
      extended by com.bigdata.service.DataServiceCallable<Object>
          extended by com.bigdata.relation.rule.eval.ProgramTask
All Implemented Interfaces:
IProgramTask, IDataServiceCallable, IFederationCallable, Serializable, Callable<Object>

public class ProgramTask
extends DataServiceCallable<Object>
implements IProgramTask

Task for executing a program when all of the indices for the relation are co-located on the same DataService.

Version:
$Id: ProgramTask.java 4069 2011-01-09 20:58:02Z thompsonbry $
Author:
Bryan Thompson
See Also:
Serialized Form
TODO:
Named result sets. This would provide a means to run a IRuleTask and cache the output for further evaluation as a named result set. The backing store should be a temporary resource. for scale-out it needs to be visible to the federation (since the rule executing against that data may be distributed across the federation based on the access path for the SPORelation) so it would have to be registered on some data service (any) in the federation and dropped in a finally {} clause.

When the sets are large then they may need a backing store, e.g., BigdataSet (specialized so that it does not store anything under the key since we can decode the Long from the key - do utility versions BigdataLongSet(), but the same code can serve float, double, and int as well. Avoid override for duplicate keys to reduce IO., it should be possible to have a different action associated with each rule in the program, and to have a different target relation for the head of each rule on which we will write (mutation). Different query or mutation count results could be handled by an extension with "nextResultSet" style semantics. However, for now, all rules MUST write on the same buffer. Query results will therefore be multiplexed as will mutations counts., foreign key joins: it should be possible to handle different relation classes in the same rules, e.g., RDF and non-RDF relations. Or even the SPO and lexicon relation for the RDF DB -- the latter will be useful for materializing externalized statements efficiently., could make the return type a generic for AbstractStepTask and make this class a concrete implementation of that one.


Field Summary
protected static org.apache.log4j.Logger log
           
 
Constructor Summary
ProgramTask(ActionEnum action, IStep step, IJoinNexusFactory joinNexusFactory)
          Variant when the task will be submitted using IDataService.submit(Callable) (efficient since all indices will be local, but the indices must not be partitioned and must all exist on the target DataService).
ProgramTask(ActionEnum action, IStep step, IJoinNexusFactory joinNexusFactory, IIndexManager indexManager)
          Variant when the task will be executed directly by the caller.
 
Method Summary
 Object call()
          Execute the program.
protected  RuleStats executeClosure(IProgram program)
          Computes the closure of a set of IRules until the relation(s) on which they are writing reach a "fixed point".
protected  RuleStats executeMutation(IStep step)
          Run a mutation IStep.
protected  RuleStats executeProgramWithEmbeddedClosure(IProgram program)
          Execute an IProgram containing one or more sub-IProgram that are closure operations.
protected  IAsynchronousIterator<ISolution[]> executeQuery(IStep step)
          Execute the IStep as a query.
 void setDataService(DataService dataService)
          Sets the DataService reference and the IBigdataFederation reference (if not already set).
 
Methods inherited from class com.bigdata.service.DataServiceCallable
getDataService, isDataService
 
Methods inherited from class com.bigdata.service.FederationCallable
getFederation, setFederation
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 
Methods inherited from interface com.bigdata.service.IFederationCallable
getFederation, setFederation
 

Field Detail

log

protected static final transient org.apache.log4j.Logger log
Constructor Detail

ProgramTask

public ProgramTask(ActionEnum action,
                   IStep step,
                   IJoinNexusFactory joinNexusFactory)
Variant when the task will be submitted using IDataService.submit(Callable) (efficient since all indices will be local, but the indices must not be partitioned and must all exist on the target DataService).

Note: the caller MUST submit the ProgramTask using DataService.submit(Callable) in which case DataServiceCallable.dataService field will be set (after the ctor) by the DataService itself. The DataService will be used to identify an ExecutorService and the IJoinNexusFactory will be used to establish access to indices, relations, etc. in the context of the AbstractTask - see AbstractStepTask.submit().

Parameters:
action -
step -
joinNexus -

ProgramTask

public ProgramTask(ActionEnum action,
                   IStep step,
                   IJoinNexusFactory joinNexusFactory,
                   IIndexManager indexManager)
Variant when the task will be executed directly by the caller.

Parameters:
action -
step -
joinNexusFactory -
indexManager -
Throws:
IllegalArgumentException - if any parameter is null.
Method Detail

setDataService

public void setDataService(DataService dataService)
Description copied from class: DataServiceCallable
Sets the DataService reference and the IBigdataFederation reference (if not already set).

Specified by:
setDataService in interface IDataServiceCallable
Overrides:
setDataService in class DataServiceCallable<Object>
Parameters:
dataService - The data service.

call

public Object call()
            throws Exception
Execute the program.

Note: There is no natural order for high-level query. Also, unless stable evaluation is requested, the results can be produced by parallel threads and the order of the materialized solution is therefore not even stable. The only way to have a natural order is for a sort to be imposed on the ISolutions.

Specified by:
call in interface IProgramTask
Specified by:
call in interface Callable<Object>
Throws:
Exception

executeQuery

protected IAsynchronousIterator<ISolution[]> executeQuery(IStep step)
Execute the IStep as a query.

Parameters:
step - The IStep.
Returns:
The IChunkedOrderedIterator that will drain the ISolutions generated by the IStep. Execution will be cancelled if the iterator is closed. If execution results in an error, then the iterator will throw a RuntimeException whose cause is the error.
Throws:
RuntimeException

executeMutation

protected RuleStats executeMutation(IStep step)
                             throws InterruptedException,
                                    ExecutionException
Run a mutation IStep. The IStep may consist of many sub-ISteps.

Note: If you specify ITx.READ_COMMITTED for mutation operations when using a federation then concurrent split/join/move can cause the operation to fail. It is safer to use read-consistent semantics by specifying IIndexStore.getLastCommitTime() instead.

Parameters:
step - The IStep.
Returns:
Metadata about the program execution, including the required RuleStats.mutationCount.
Throws:
InterruptedException
ExecutionException

executeClosure

protected RuleStats executeClosure(IProgram program)
                            throws InterruptedException,
                                   ExecutionException
Computes the closure of a set of IRules until the relation(s) on which they are writing reach a "fixed point".

The general approach is a series of rounds in which each rule is applied in turn (either sequentially or in parallel, depending on the program). Solutions computed for each rule in each round written onto the relation for the head of that rule. The process halts when no new solutions are computed in a given round.

Note: When we are running the program on a ConcurrencyManager, each round of the closure is submitted as a single AbstractTask. This allows us to do historical reads during the round and to update the read-behind timestamp before each round. During the round, the program will write on buffers that are flushed at the end of the round. Those buffers will use unisolated writes onto the appropriate relations.

mutation counts

In order to detect the fixed point we MUST know whether or not any mutations were made to the relation during the round. The design does NOT rely on the relation count before and after the round since it would have to use an _exact_ range count for the relation (otherwise it is possible that a deleted tuple would be overwritten by a computed entailment but that the count would not change). However, the exact range count is relatively expensive which is why the design insists on getting back the #of elements actually written on the index from each rule in each round. If no rules in a given round caused an element to be written, then we are at the fixed point.

Note: This assumes that you are following the IMutableRelation contract -- you MUST NOT overwrite tuples with the same key and value, or at least you must not report such "do nothing" overwrites in the mutation count!!!

Parameters:
action - The action (must be a mutation operation).
program - The program to be executed.
Throws:
ExecutionException
InterruptedException

executeProgramWithEmbeddedClosure

protected RuleStats executeProgramWithEmbeddedClosure(IProgram program)
                                               throws InterruptedException,
                                                      ExecutionException
Execute an IProgram containing one or more sub-IProgram that are closure operations. The top-level program must not be a closure operation. All steps above the closure operations will be run in a sequence. The closure operations themselves will be executed using executeClosure(IProgram).

Note: Any program that embeds a closure operation must be sequential (this is enforced by the Program class). Note: Programs that use closure operations are constrained to either (a) a fix point of a (normally parallel) program consisting solely of IRules; or (b) a sequential program containing some steps that are the fix point of a (normally parallel) program consisting solely of IRules.

Throws:
ExecutionException
InterruptedException
IllegalArgumentException - if program is null
IllegalArgumentException - if program is itself a closure operation.
IllegalStateException - unless the ActionEnum is a mutation operation.
TODO:
this will not correctly handle programs use closure in a sub-sub-program.


Copyright © 2006-2011 SYSTAP, LLC. All Rights Reserved.