|
||||||||||
| PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
| SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD | |||||||||
java.lang.Objectcom.bigdata.bop.engine.QueryEngine
public class QueryEngine
A class managing execution of concurrent queries against a local
IIndexManager.
Much of the complexity of the current approach owes itself to having to run a separate task for each join for each shard in order to have the appropriate lock when running against the unisolated shard view. This also means that the join task is running inside of the concurrency manager and hence has the local view of the shard.
The main, and perhaps the only, reason why we run unisolated rules is during closure, when we query against the unisolated indices and then write the entailments back on the unisolated indices.
Supporting closure has always been complicated. This complexity is mostly handled by ProgramTask#executeMutation() and AbstractTripleStore#newJoinNexusFactory() which play games with the timestamps used to read and write on the database, with commit points designed to create visibility for tuples written by a mutation rule, and with the automated advance of the read timestamp for the query in each closure pass in order to make newly committed tuples visible to subsequent rounds of closure. For scale-out, we do shard-wise auto commits so we always have a commit point which makes each write visible and the read timestamp is actually a read-only transaction which prevents the historical data we need during a closure round from being released as we are driving updates onto the federation. For the RWStore, we are having a similar problem (in the HA branch since that is where we are working on the RWStore) where historically allocated records were being released as writes drove updates on the indices. Again, we "solved" the problem for the RWStore using a commit point followed by a read-only transaction reading on that commit point to hold onto the view on which the next closure round needs to read (this uncovered a problem with the RWStore and transaction service interaction which Martyn is currently working to resolve through a combination of shadow allocators and deferred deletes which are processed once the release time is advanced by the transaction service).
The WORM does not have some of these problems with closure because we never delete history, so we do not need to create a commit point and a read-behind transaction. However, the WORM would have problems with concurrent access to the unisolated indices except that we hack that problem through the transparent use of the UnisolatedReadWriteIndex, which allows multiple threads to access the same unisolated index view using a read/write lock pattern (concurrent readers are allowed, but there is only one writer and it has exclusive access when it is running). This works out because we never run closure operations against the WORM through the concurrency manager. If we did, we would have to create a commit point after each mutation and use a read-behind transaction to prevent concurrent access to the unisolated index.
The main advantage that I can see of the current complexity is that it allows us to do load+closure as a single operation on the WORM, resulting in a single commit point. This makes that operation ACID without having to use full read/write transactions. This is how we gain the ACID contract for the standalone Journal in the SAIL for the WORM. Of course, the SAIL does not have that contract for the RWStore because we have to do the commit and read-behind transaction in order to have visibility and avoid concurrent access to the unisolated index (by reading behind on the last commit point).
I think that the reality is even one step more complicated. When doing truth maintenance (incremental closure), we bring the temporary graph to a fixed point (the rules write on the temp store) and then apply the delta in a single write to the database. That suggests that incremental truth maintenance would continue to be ACID, but that database-at-once-closure would be round-wise ACID.
So, I would like to suggest that we break ACID for database-at-once-closure and always follow the pattern of (1) do a commit before each round of closure; and (2) create a read-behind transaction to prevent the release of that commit point as we drive writes onto the indices. If we follow this pattern then we can write on the unisolated indices without conflict and read on the historical views without conflict. Since there will be a commit point before each mutation rule runs (which corresponds to a closure round), database-at-once-closure will be atomic within a round, but will not be a single atomic operation. Per above, I think that we would retain the ACID property for incremental truth maintenance against a WORM or RW mode Journal.
----
The advantage of this proposal (commit before each mutation rule and run query against a read-behind transaction) is that this could enormously simplify how we execute joins.Right now, we use a factory pattern to create a join task on each node for each shard for which that node receives binding sets for a query. The main reason for doing this is to gain the appropriate lock for the unisolated index. If we never run a query against the unisolated index then we can go around the concurrency manager and run a single "query manager" task for all joins for all shards for all queries. This has some great benefits which I will go into below.
That "query manager" task would be responsible for accepting buffers containing elements or binding sets from other nodes and scheduling consumption of those data based on various criteria (order of arrival, priority, buffer resource requirements, timeout, etc.). This manager task could use a fork join pool to execute light weight operations (NIO, formulation of access paths from binding sets, mapping of binding sets onto shards, joining a chunk already read from an access path against a binding set, etc). Operations which touch the disk need to run in their own thread (until we get Java 7 async file IO, which is already available in a preview library). We could handle that by queuing those operations against a fixed size thread pool for reads.
This is a radical change in how we handle distributed query execution, but I think that it could have a huge payoff by reducing the complexity of the join logic, making it significantly easier to execute different kinds of join operations, reducing the overhead for acquiring locks for the unisolated index views, reducing the #of threads consumed by joins (from one per shard per join per query to a fixed pool of N threads for reads), etc. It would centralize the management of resources on each node and make it possible for us to handle things like join termination by simply purging data from the query manager task for the terminated join.
BOps or BOp tree fragments) and a
distributed query cache which handles invalidation (for updates) and
BOp aware reuse of result sets available in the cache. This
sort of thing will have to be coordinated among the cache nodes.| Nested Class Summary | |
|---|---|
static interface |
QueryEngine.Annotations
Annotations understood by the QueryEngine. |
| Field Summary | |
|---|---|
protected QueryEngineCounters |
counters
Counters at the global level. |
protected static String |
ERR_QUERY_NOT_RUNNING
Error message used if a query is not running. |
| Constructor Summary | |
|---|---|
QueryEngine(IIndexManager localIndexManager)
|
|
| Method Summary | |
|---|---|
protected boolean |
acceptChunk(IChunkMessage<IBindingSet> msg)
Add a chunk of intermediate results for consumption by some query. |
protected void |
assertRunning()
Return if the query engine is running. |
void |
bufferReady(IChunkMessage<IBindingSet> msg)
Notify a service that a buffer having data for some BOp in some
running query is available. |
void |
cancelQuery(UUID queryId,
Throwable cause)
Notify a service that the query has been terminated. |
void |
declareQuery(IQueryDecl queryDecl)
Deprecated. |
protected void |
didShutdown()
Hook is notified by shutdown() when all running queries have
terminated. |
AbstractRunningQuery |
eval(BOp op)
Evaluate a query. |
AbstractRunningQuery |
eval(BOp op,
IBindingSet bset)
Evaluate a query. |
AbstractRunningQuery |
eval(BOp op,
IBindingSet[] bsets)
Evaluate a query. |
AbstractRunningQuery |
eval(UUID queryId,
BOp op,
IBindingSet bset)
|
AbstractRunningQuery |
eval(UUID queryId,
BOp op,
IBindingSet[] bset)
|
AbstractRunningQuery |
eval(UUID queryId,
BOp op,
IBindingSet[][] bset)
|
AbstractRunningQuery |
eval(UUID queryId,
PipelineOp query,
IChunkMessage<IBindingSet> msg)
Evaluate a query. |
protected void |
execute(Runnable r)
Executes the Runnable on the local IIndexManager's
ExecutorService. |
protected void |
finalize()
QueryEngines are used with a singleton pattern managed by the
QueryEngineFactory. |
ClientConnectionManager |
getClientConnectionManager()
Return the ClientConnectionManager used to make remote SERVICE
call requests. |
CounterSet |
getCounters()
Return a CounterSet which reports various statistics for the
QueryEngine. |
IBigdataFederation<?> |
getFederation()
The IBigdataFederation iff running in scale-out. |
IIndexManager |
getIndexManager()
The local index manager, which provides direct access to local BTree and IndexSegment objects. |
IQueryClient |
getProxy()
The RMI proxy for this QueryEngine when used as a query controller. |
PipelineOp |
getQuery(UUID queryId)
Return the query. |
protected QueryEngineCounters |
getQueryEngineCounters()
The QueryEngineCounters object for this QueryEngine. |
UUID[] |
getRunningQueries()
Return the set of queries which are running as of the moment when the request was processed. |
AbstractRunningQuery |
getRunningQuery(UUID queryId)
Return the AbstractRunningQuery associated with that query
identifier. |
UUID |
getServiceUUID()
The UUID of the service in which this QueryEngine is
running. |
protected void |
halt(AbstractRunningQuery q)
The query is no longer running. |
void |
haltOp(IHaltOpMessage msg)
Notify the client that execution has halted for some query, operator, node, shard, and source binding set chunk(s). |
void |
init()
Initialize the QueryEngine. |
protected boolean |
isRunning()
|
boolean |
isScaleOut()
Return true iff running against an
IBigdataFederation. |
protected QueryEngineCounters |
newCounters()
Extension hook for new QueryEngineCounters instances. |
protected AbstractRunningQuery |
newRunningQuery(UUID queryId,
boolean controller,
IQueryClient clientProxy,
UUID queryControllerId,
PipelineOp query,
IChunkMessage<IBindingSet> realSource)
Factory for IRunningQuerys. |
protected AbstractRunningQuery |
putIfAbsent(UUID queryId,
AbstractRunningQuery runningQuery)
Places the AbstractRunningQuery object into the internal map. |
void |
shutdown()
Shutdown the QueryEngine (blocking). |
void |
shutdownNow()
Do not accept new queries and halt any running binding set chunk tasks. |
void |
startOp(IStartOpMessage msg)
Notify the client that execution has started for some query, operator, node, and index partition. |
| Methods inherited from class java.lang.Object |
|---|
clone, equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
| Field Detail |
|---|
protected static final transient String ERR_QUERY_NOT_RUNNING
protected final QueryEngineCounters counters
| Constructor Detail |
|---|
public QueryEngine(IIndexManager localIndexManager)
localIndexManager - The local index manager.| Method Detail |
|---|
public CounterSet getCounters()
CounterSet which reports various statistics for the
QueryEngine.
getCounters in interface ICounterSetAccessprotected QueryEngineCounters newCounters()
QueryEngineCounters instances.
protected QueryEngineCounters getQueryEngineCounters()
QueryEngineCounters object for this QueryEngine.
public UUID getServiceUUID()
UUID of the service in which this QueryEngine is
running.
getServiceUUID in interface IQueryPeerUUID of the service in which this QueryEngine
is running -or- a unique and distinct UUID if the
QueryEngine is not running against an
IBigdataFederation.IService.getServiceUUID()public IBigdataFederation<?> getFederation()
IBigdataFederation iff running in scale-out.
Note: The IBigdataFederation is required in scale-out in order to
perform shard locator scans when mapping binding sets across the next
join in a query plan.
public IIndexManager getIndexManager()
BTree and IndexSegment objects. In scale-out, this is the
IndexManager inside the IDataService and provides direct
access to FusedViews (aka shards).
Note: You MUST NOT use unisolated indices without obtaining the necessary
locks. The QueryEngine is intended to run only against committed
index views for which no locks are required.
public IQueryClient getProxy()
QueryEngine when used as a query controller.
The default implementation returns this.
public ClientConnectionManager getClientConnectionManager()
ClientConnectionManager used to make remote SERVICE
call requests.
public boolean isScaleOut()
true iff running against an
IBigdataFederation.
public void init()
QueryEngine. It will accept binding set chunks and
run them against running queries until it is shutdown.
protected void finalize()
throws Throwable
QueryEngines are used with a singleton pattern managed by the
QueryEngineFactory. They are torn down automatically once they
are no longer reachable. This behavior depends on not having any hard
references back to the QueryEngine.
finalize in class ObjectThrowableprotected void assertRunning()
IllegalStateException - if the query engine is shutting down.protected boolean isRunning()
protected final void execute(Runnable r)
Runnable on the local IIndexManager's
ExecutorService.
r - The Runnable.protected boolean acceptChunk(IChunkMessage<IBindingSet> msg)
msg - A chunk of intermediate results.
true if the chunk was accepted. This will return
false if the query is done (including cancelled) or
the query engine is shutdown. The IChunkMessage will have
been released if it was not
accepted.
IllegalArgumentException - if the chunk is null.
IllegalStateException - if the chunk is not materialized.public void shutdown()
QueryEngine (blocking). The QueryEngine will
not accept new queries, but existing queries will run to completion.
protected void didShutdown()
shutdown() when all running queries have
terminated.
public void shutdownNow()
@Deprecated
public void declareQuery(IQueryDecl queryDecl)
throws RemoteException
IQueryPeer
declareQuery in interface IQueryPeerqueryDecl - The query declaration.
RemoteExceptionpublic void bufferReady(IChunkMessage<IBindingSet> msg)
IQueryPeerBOp in some
running query is available. The receiver may request the data when they
are ready. If the query is cancelled, then the sender will drop the
buffer.
bufferReady in interface IQueryPeermsg - The message.
public void cancelQuery(UUID queryId,
Throwable cause)
The default implementation is a NOP.
cancelQuery in interface IQueryPeerqueryId - The query identifier.cause - The cause. When null, this is presumed to be
normal query termination.public PipelineOp getQuery(UUID queryId)
IQueryClient
getQuery in interface IQueryClientqueryId - The query identifier.
public void startOp(IStartOpMessage msg)
throws RemoteException
IQueryClient
startOp in interface IQueryClientRemoteException
public void haltOp(IHaltOpMessage msg)
throws RemoteException
IQueryClient
haltOp in interface IQueryClientRemoteException
public AbstractRunningQuery eval(BOp op)
throws Exception
query - The query to evaluate.
IRunningQuery.
IllegalStateException - if the QueryEngine has been shutdown().
Exception
public AbstractRunningQuery eval(BOp op,
IBindingSet bset)
throws Exception
query - The query to evaluate.bset - The initial binding set to present.
IRunningQuery.
IllegalStateException - if the QueryEngine has been shutdown().
Exception
public AbstractRunningQuery eval(UUID queryId,
BOp op,
IBindingSet bset)
throws Exception
Exception
public AbstractRunningQuery eval(UUID queryId,
BOp op,
IBindingSet[] bset)
throws Exception
Exception
public AbstractRunningQuery eval(UUID queryId,
BOp op,
IBindingSet[][] bset)
throws Exception
Exception
public AbstractRunningQuery eval(BOp op,
IBindingSet[] bsets)
throws Exception
query - The query to evaluate.bsets - The initial binding sets to present.
IRunningQuery.
IllegalStateException - if the QueryEngine has been shutdown().
Exception
public AbstractRunningQuery eval(UUID queryId,
PipelineOp query,
IChunkMessage<IBindingSet> msg)
throws Exception
IBindingSets made available by the IChunkMessage will
be pushed into the query.
queryId - The unique identifier for the query.query - The query to evaluate.msg - A message providing access to the initial binding set(s) used to begin query evaluation.
IRunningQuery.
IllegalStateException - if the QueryEngine has been shutdown().
Exception
protected AbstractRunningQuery putIfAbsent(UUID queryId,
AbstractRunningQuery runningQuery)
AbstractRunningQuery object into the internal map.
queryId - The query identifier.runningQuery - The AbstractRunningQuery.
AbstractRunningQuery -or- another
AbstractRunningQuery iff one exists with the same
UUID.public AbstractRunningQuery getRunningQuery(UUID queryId)
AbstractRunningQuery associated with that query
identifier.
queryId - The query identifier.
AbstractRunningQuery -or- null if there
is no query associated with that query identifier.
RuntimeException - if the query halted with an error (if the query halted
normally this will wrap an InterruptedException).protected void halt(AbstractRunningQuery q)
protected AbstractRunningQuery newRunningQuery(UUID queryId,
boolean controller,
IQueryClient clientProxy,
UUID queryControllerId,
PipelineOp query,
IChunkMessage<IBindingSet> realSource)
IRunningQuerys.
QueryEngine.Annotations.RUNNING_QUERY_CLASSpublic UUID[] getRunningQueries()
IQueryClient
getRunningQueries in interface IQueryClient
|
||||||||||
| PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
| SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD | |||||||||