com.bigdata.bop.engine
Class QueryEngine

java.lang.Object
  extended by com.bigdata.bop.engine.QueryEngine
All Implemented Interfaces:
IQueryClient, IQueryPeer, ICounterSetAccess, Remote
Direct Known Subclasses:
FederatedQueryEngine

public class QueryEngine
extends Object
implements IQueryPeer, IQueryClient, ICounterSetAccess

A class managing execution of concurrent queries against a local IIndexManager.

Design notes

Much of the complexity of the current approach owes itself to having to run a separate task for each join for each shard in order to have the appropriate lock when running against the unisolated shard view. This also means that the join task is running inside of the concurrency manager and hence has the local view of the shard.

The main, and perhaps the only, reason why we run unisolated rules is during closure, when we query against the unisolated indices and then write the entailments back on the unisolated indices.

Supporting closure has always been complicated. This complexity is mostly handled by ProgramTask#executeMutation() and AbstractTripleStore#newJoinNexusFactory() which play games with the timestamps used to read and write on the database, with commit points designed to create visibility for tuples written by a mutation rule, and with the automated advance of the read timestamp for the query in each closure pass in order to make newly committed tuples visible to subsequent rounds of closure. For scale-out, we do shard-wise auto commits so we always have a commit point which makes each write visible and the read timestamp is actually a read-only transaction which prevents the historical data we need during a closure round from being released as we are driving updates onto the federation. For the RWStore, we are having a similar problem (in the HA branch since that is where we are working on the RWStore) where historically allocated records were being released as writes drove updates on the indices. Again, we "solved" the problem for the RWStore using a commit point followed by a read-only transaction reading on that commit point to hold onto the view on which the next closure round needs to read (this uncovered a problem with the RWStore and transaction service interaction which Martyn is currently working to resolve through a combination of shadow allocators and deferred deletes which are processed once the release time is advanced by the transaction service).

The WORM does not have some of these problems with closure because we never delete history, so we do not need to create a commit point and a read-behind transaction. However, the WORM would have problems with concurrent access to the unisolated indices except that we hack that problem through the transparent use of the UnisolatedReadWriteIndex, which allows multiple threads to access the same unisolated index view using a read/write lock pattern (concurrent readers are allowed, but there is only one writer and it has exclusive access when it is running). This works out because we never run closure operations against the WORM through the concurrency manager. If we did, we would have to create a commit point after each mutation and use a read-behind transaction to prevent concurrent access to the unisolated index.

The main advantage that I can see of the current complexity is that it allows us to do load+closure as a single operation on the WORM, resulting in a single commit point. This makes that operation ACID without having to use full read/write transactions. This is how we gain the ACID contract for the standalone Journal in the SAIL for the WORM. Of course, the SAIL does not have that contract for the RWStore because we have to do the commit and read-behind transaction in order to have visibility and avoid concurrent access to the unisolated index (by reading behind on the last commit point).

I think that the reality is even one step more complicated. When doing truth maintenance (incremental closure), we bring the temporary graph to a fixed point (the rules write on the temp store) and then apply the delta in a single write to the database. That suggests that incremental truth maintenance would continue to be ACID, but that database-at-once-closure would be round-wise ACID.

So, I would like to suggest that we break ACID for database-at-once-closure and always follow the pattern of (1) do a commit before each round of closure; and (2) create a read-behind transaction to prevent the release of that commit point as we drive writes onto the indices. If we follow this pattern then we can write on the unisolated indices without conflict and read on the historical views without conflict. Since there will be a commit point before each mutation rule runs (which corresponds to a closure round), database-at-once-closure will be atomic within a round, but will not be a single atomic operation. Per above, I think that we would retain the ACID property for incremental truth maintenance against a WORM or RW mode Journal.

----

The advantage of this proposal (commit before each mutation rule and run query against a read-behind transaction) is that this could enormously simplify how we execute joins.

Right now, we use a factory pattern to create a join task on each node for each shard for which that node receives binding sets for a query. The main reason for doing this is to gain the appropriate lock for the unisolated index. If we never run a query against the unisolated index then we can go around the concurrency manager and run a single "query manager" task for all joins for all shards for all queries. This has some great benefits which I will go into below.

That "query manager" task would be responsible for accepting buffers containing elements or binding sets from other nodes and scheduling consumption of those data based on various criteria (order of arrival, priority, buffer resource requirements, timeout, etc.). This manager task could use a fork join pool to execute light weight operations (NIO, formulation of access paths from binding sets, mapping of binding sets onto shards, joining a chunk already read from an access path against a binding set, etc). Operations which touch the disk need to run in their own thread (until we get Java 7 async file IO, which is already available in a preview library). We could handle that by queuing those operations against a fixed size thread pool for reads.

This is a radical change in how we handle distributed query execution, but I think that it could have a huge payoff by reducing the complexity of the join logic, making it significantly easier to execute different kinds of join operations, reducing the overhead for acquiring locks for the unisolated index views, reducing the #of threads consumed by joins (from one per shard per join per query to a fixed pool of N threads for reads), etc. It would centralize the management of resources on each node and make it possible for us to handle things like join termination by simply purging data from the query manager task for the terminated join.

Version:
$Id: QueryEngine.java 6235 2012-03-31 11:38:02Z thompsonbry $
Author:
Bryan Thompson
TODO:
Expander patterns will continue to exist until we handle the standalone backchainers in a different manner for scale-out so add support for those for now., There is a going to be a relationship to recycling of intermediates (for individual BOps or BOp tree fragments) and a distributed query cache which handles invalidation (for updates) and BOp aware reuse of result sets available in the cache. This sort of thing will have to be coordinated among the cache nodes.

Nested Class Summary
static interface QueryEngine.Annotations
          Annotations understood by the QueryEngine.
 
Field Summary
protected  QueryEngineCounters counters
          Counters at the global level.
protected static String ERR_QUERY_NOT_RUNNING
          Error message used if a query is not running.
 
Constructor Summary
QueryEngine(IIndexManager localIndexManager)
           
 
Method Summary
protected  boolean acceptChunk(IChunkMessage<IBindingSet> msg)
          Add a chunk of intermediate results for consumption by some query.
protected  void assertRunning()
          Return if the query engine is running.
 void bufferReady(IChunkMessage<IBindingSet> msg)
          Notify a service that a buffer having data for some BOp in some running query is available.
 void cancelQuery(UUID queryId, Throwable cause)
          Notify a service that the query has been terminated.
 void declareQuery(IQueryDecl queryDecl)
          Deprecated. 
protected  void didShutdown()
          Hook is notified by shutdown() when all running queries have terminated.
 AbstractRunningQuery eval(BOp op)
          Evaluate a query.
 AbstractRunningQuery eval(BOp op, IBindingSet bset)
          Evaluate a query.
 AbstractRunningQuery eval(BOp op, IBindingSet[] bsets)
          Evaluate a query.
 AbstractRunningQuery eval(UUID queryId, BOp op, IBindingSet bset)
           
 AbstractRunningQuery eval(UUID queryId, BOp op, IBindingSet[] bset)
           
 AbstractRunningQuery eval(UUID queryId, BOp op, IBindingSet[][] bset)
           
 AbstractRunningQuery eval(UUID queryId, PipelineOp query, IChunkMessage<IBindingSet> msg)
          Evaluate a query.
protected  void execute(Runnable r)
          Executes the Runnable on the local IIndexManager's ExecutorService.
protected  void finalize()
          QueryEngines are used with a singleton pattern managed by the QueryEngineFactory.
 ClientConnectionManager getClientConnectionManager()
          Return the ClientConnectionManager used to make remote SERVICE call requests.
 CounterSet getCounters()
          Return a CounterSet which reports various statistics for the QueryEngine.
 IBigdataFederation<?> getFederation()
          The IBigdataFederation iff running in scale-out.
 IIndexManager getIndexManager()
          The local index manager, which provides direct access to local BTree and IndexSegment objects.
 IQueryClient getProxy()
          The RMI proxy for this QueryEngine when used as a query controller.
 PipelineOp getQuery(UUID queryId)
          Return the query.
protected  QueryEngineCounters getQueryEngineCounters()
          The QueryEngineCounters object for this QueryEngine.
 UUID[] getRunningQueries()
          Return the set of queries which are running as of the moment when the request was processed.
 AbstractRunningQuery getRunningQuery(UUID queryId)
          Return the AbstractRunningQuery associated with that query identifier.
 UUID getServiceUUID()
          The UUID of the service in which this QueryEngine is running.
protected  void halt(AbstractRunningQuery q)
          The query is no longer running.
 void haltOp(IHaltOpMessage msg)
          Notify the client that execution has halted for some query, operator, node, shard, and source binding set chunk(s).
 void init()
          Initialize the QueryEngine.
protected  boolean isRunning()
           
 boolean isScaleOut()
          Return true iff running against an IBigdataFederation.
protected  QueryEngineCounters newCounters()
          Extension hook for new QueryEngineCounters instances.
protected  AbstractRunningQuery newRunningQuery(UUID queryId, boolean controller, IQueryClient clientProxy, UUID queryControllerId, PipelineOp query, IChunkMessage<IBindingSet> realSource)
          Factory for IRunningQuerys.
protected  AbstractRunningQuery putIfAbsent(UUID queryId, AbstractRunningQuery runningQuery)
          Places the AbstractRunningQuery object into the internal map.
 void shutdown()
          Shutdown the QueryEngine (blocking).
 void shutdownNow()
          Do not accept new queries and halt any running binding set chunk tasks.
 void startOp(IStartOpMessage msg)
          Notify the client that execution has started for some query, operator, node, and index partition.
 
Methods inherited from class java.lang.Object
clone, equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

ERR_QUERY_NOT_RUNNING

protected static final transient String ERR_QUERY_NOT_RUNNING
Error message used if a query is not running.

See Also:
Constant Field Values

counters

protected final QueryEngineCounters counters
Counters at the global level.

Constructor Detail

QueryEngine

public QueryEngine(IIndexManager localIndexManager)
Parameters:
localIndexManager - The local index manager.
Method Detail

getCounters

public CounterSet getCounters()
Return a CounterSet which reports various statistics for the QueryEngine.

Specified by:
getCounters in interface ICounterSetAccess

newCounters

protected QueryEngineCounters newCounters()
Extension hook for new QueryEngineCounters instances.


getQueryEngineCounters

protected QueryEngineCounters getQueryEngineCounters()
The QueryEngineCounters object for this QueryEngine.


getServiceUUID

public UUID getServiceUUID()
The UUID of the service in which this QueryEngine is running.

Specified by:
getServiceUUID in interface IQueryPeer
Returns:
The UUID of the service in which this QueryEngine is running -or- a unique and distinct UUID if the QueryEngine is not running against an IBigdataFederation.
See Also:
IService.getServiceUUID()

getFederation

public IBigdataFederation<?> getFederation()
The IBigdataFederation iff running in scale-out.

Note: The IBigdataFederation is required in scale-out in order to perform shard locator scans when mapping binding sets across the next join in a query plan.


getIndexManager

public IIndexManager getIndexManager()
The local index manager, which provides direct access to local BTree and IndexSegment objects. In scale-out, this is the IndexManager inside the IDataService and provides direct access to FusedViews (aka shards).

Note: You MUST NOT use unisolated indices without obtaining the necessary locks. The QueryEngine is intended to run only against committed index views for which no locks are required.


getProxy

public IQueryClient getProxy()
The RMI proxy for this QueryEngine when used as a query controller. The default implementation returns this.


getClientConnectionManager

public ClientConnectionManager getClientConnectionManager()
Return the ClientConnectionManager used to make remote SERVICE call requests.


isScaleOut

public boolean isScaleOut()
Return true iff running against an IBigdataFederation.


init

public void init()
Initialize the QueryEngine. It will accept binding set chunks and run them against running queries until it is shutdown.


finalize

protected void finalize()
                 throws Throwable
QueryEngines are used with a singleton pattern managed by the QueryEngineFactory. They are torn down automatically once they are no longer reachable. This behavior depends on not having any hard references back to the QueryEngine.

Overrides:
finalize in class Object
Throws:
Throwable

assertRunning

protected void assertRunning()
Return if the query engine is running.

Throws:
IllegalStateException - if the query engine is shutting down.

isRunning

protected boolean isRunning()

execute

protected final void execute(Runnable r)
Executes the Runnable on the local IIndexManager's ExecutorService.

Parameters:
r - The Runnable.

acceptChunk

protected boolean acceptChunk(IChunkMessage<IBindingSet> msg)
Add a chunk of intermediate results for consumption by some query. The chunk will be attached to the query and the query will be scheduled for execution.

Parameters:
msg - A chunk of intermediate results.
Returns:
true if the chunk was accepted. This will return false if the query is done (including cancelled) or the query engine is shutdown. The IChunkMessage will have been released if it was not accepted.
Throws:
IllegalArgumentException - if the chunk is null.
IllegalStateException - if the chunk is not materialized.

shutdown

public void shutdown()
Shutdown the QueryEngine (blocking). The QueryEngine will not accept new queries, but existing queries will run to completion.


didShutdown

protected void didShutdown()
Hook is notified by shutdown() when all running queries have terminated.


shutdownNow

public void shutdownNow()
Do not accept new queries and halt any running binding set chunk tasks.


declareQuery

@Deprecated
public void declareQuery(IQueryDecl queryDecl)
                  throws RemoteException
Deprecated. 

Description copied from interface: IQueryPeer
Declare a query to a peer. This message is sent to the peer before any other message for that query and declares the query and the query controller with which the peer must communicate during query evaluation.

Specified by:
declareQuery in interface IQueryPeer
Parameters:
queryDecl - The query declaration.
Throws:
RemoteException

bufferReady

public void bufferReady(IChunkMessage<IBindingSet> msg)
Description copied from interface: IQueryPeer
Notify a service that a buffer having data for some BOp in some running query is available. The receiver may request the data when they are ready. If the query is cancelled, then the sender will drop the buffer.

Specified by:
bufferReady in interface IQueryPeer
Parameters:
msg - The message.

cancelQuery

public void cancelQuery(UUID queryId,
                        Throwable cause)
Notify a service that the query has been terminated. The peer MUST NOT cancel the query synchronously as that can lead to a deadlock with the query controller. Instead, the peer should queue a task to cancel the query and then return.

The default implementation is a NOP.

Specified by:
cancelQuery in interface IQueryPeer
Parameters:
queryId - The query identifier.
cause - The cause. When null, this is presumed to be normal query termination.

getQuery

public PipelineOp getQuery(UUID queryId)
Description copied from interface: IQueryClient
Return the query.

Specified by:
getQuery in interface IQueryClient
Parameters:
queryId - The query identifier.
Returns:
The query.

startOp

public void startOp(IStartOpMessage msg)
             throws RemoteException
Description copied from interface: IQueryClient
Notify the client that execution has started for some query, operator, node, and index partition.

Specified by:
startOp in interface IQueryClient
Throws:
RemoteException

haltOp

public void haltOp(IHaltOpMessage msg)
            throws RemoteException
Description copied from interface: IQueryClient
Notify the client that execution has halted for some query, operator, node, shard, and source binding set chunk(s). If execution halted abnormally, then the cause is sent as well.

Specified by:
haltOp in interface IQueryClient
Throws:
RemoteException

eval

public AbstractRunningQuery eval(BOp op)
                          throws Exception
Evaluate a query. This node will serve as the controller for the query.

Parameters:
query - The query to evaluate.
Returns:
The IRunningQuery.
Throws:
IllegalStateException - if the QueryEngine has been shutdown().
Exception

eval

public AbstractRunningQuery eval(BOp op,
                                 IBindingSet bset)
                          throws Exception
Evaluate a query. This node will serve as the controller for the query.

Parameters:
query - The query to evaluate.
bset - The initial binding set to present.
Returns:
The IRunningQuery.
Throws:
IllegalStateException - if the QueryEngine has been shutdown().
Exception

eval

public AbstractRunningQuery eval(UUID queryId,
                                 BOp op,
                                 IBindingSet bset)
                          throws Exception
Throws:
Exception

eval

public AbstractRunningQuery eval(UUID queryId,
                                 BOp op,
                                 IBindingSet[] bset)
                          throws Exception
Throws:
Exception

eval

public AbstractRunningQuery eval(UUID queryId,
                                 BOp op,
                                 IBindingSet[][] bset)
                          throws Exception
Throws:
Exception

eval

public AbstractRunningQuery eval(BOp op,
                                 IBindingSet[] bsets)
                          throws Exception
Evaluate a query. This node will serve as the controller for the query.

Parameters:
query - The query to evaluate.
bsets - The initial binding sets to present.
Returns:
The IRunningQuery.
Throws:
IllegalStateException - if the QueryEngine has been shutdown().
Exception

eval

public AbstractRunningQuery eval(UUID queryId,
                                 PipelineOp query,
                                 IChunkMessage<IBindingSet> msg)
                          throws Exception
Evaluate a query. This node will serve as the controller for the query. The IBindingSets made available by the IChunkMessage will be pushed into the query.

Parameters:
queryId - The unique identifier for the query.
query - The query to evaluate.
msg - A message providing access to the initial binding set(s) used to begin query evaluation.
Returns:
The IRunningQuery.
Throws:
IllegalStateException - if the QueryEngine has been shutdown().
Exception

putIfAbsent

protected AbstractRunningQuery putIfAbsent(UUID queryId,
                                           AbstractRunningQuery runningQuery)
Places the AbstractRunningQuery object into the internal map.

Parameters:
queryId - The query identifier.
runningQuery - The AbstractRunningQuery.
Returns:
The AbstractRunningQuery -or- another AbstractRunningQuery iff one exists with the same UUID.

getRunningQuery

public AbstractRunningQuery getRunningQuery(UUID queryId)
Return the AbstractRunningQuery associated with that query identifier.

Parameters:
queryId - The query identifier.
Returns:
The AbstractRunningQuery -or- null if there is no query associated with that query identifier.
Throws:
RuntimeException - if the query halted with an error (if the query halted normally this will wrap an InterruptedException).

halt

protected void halt(AbstractRunningQuery q)
The query is no longer running. Resources associated with the query should be released.


newRunningQuery

protected AbstractRunningQuery newRunningQuery(UUID queryId,
                                               boolean controller,
                                               IQueryClient clientProxy,
                                               UUID queryControllerId,
                                               PipelineOp query,
                                               IChunkMessage<IBindingSet> realSource)
Factory for IRunningQuerys.

See Also:
QueryEngine.Annotations.RUNNING_QUERY_CLASS

getRunningQueries

public UUID[] getRunningQueries()
Description copied from interface: IQueryClient
Return the set of queries which are running as of the moment when the request was processed. Queries reported in the returned array may terminate at any time.

Specified by:
getRunningQueries in interface IQueryClient


Copyright © 2006-2012 SYSTAP, LLC. All Rights Reserved.