com.bigdata.service
Class DataService

java.lang.Object
  extended by com.bigdata.service.AbstractService
      extended by com.bigdata.service.DataService
All Implemented Interfaces:
IDataService, IRemoteExecutor, IService, IServiceShutdown, ISession, ITxCommitProtocol, Remote
Direct Known Subclasses:
AbstractEmbeddedDataService, DataServer.AdministrableDataService, MetadataService

public abstract class DataService
extends AbstractService
implements IDataService, IServiceShutdown, ISession

An implementation of a network-capable IDataService. The service is started using the DataServer class. Operations are submitted using an IConcurrencyManager.submit(AbstractTask) and will run with the appropriate concurrency controls as imposed by that method.

Version:
$Id: DataService.java 2265 2009-10-26 12:51:06Z thompsonbry $
Author:
Bryan Thompson
See Also:
DataServer, which is used to start this service.
TODO:
Startup should be broken into two aspects: local startup and service connect and disconnect events. For example, we on the tx service connect the store manager should notify the tx service of the last commit time on the live journal. On disconnect, the data service needs to go offline. The metadata service is required only for overflow processing, but if it remains down then we will eventually need to bring the data service offline when the buffered writes would cause the live journal to no longer be fully buffered as the overflow processing time will be increased if we need to read through to the disk during overflow., Write benchmark test to measure interhost transfer rates. Should be 100Mbits/sec (~12M/sec) on a 100BaseT switched network. With full duplex in the network and the protocol, that rate should be bidirectional. Can that rate be sustained with a fully connected bi-directional transfer? FIXME Probably ALL of the methods IDataService should be subsumed under submit(Callable) or submit(long, String, IIndexProcedure) so they do not block on the DataService and thereby absorb a thread., Review JERI options to support secure RMI protocols. For example, using SSL or an SSH tunnel. For most purposes I expect bigdata to operate on a private network, but replicate across gateways is also a common use case. Do we have to handle it specially?

Nested Class Summary
static class DataService.DataServiceFederationDelegate
          Delegate handles custom counters for the ResourceManager, local AbstractTransactionService and the ConcurrencyManager, dynamic re-attachment of counters, etc.
 class DataService.DataServiceTransactionManager
          Concrete implementation manages the local state of transactions executing on a DataService.
static class DataService.GetIndexMetadataTask
          Retrieves the IndexMetadata for the named index as of the specified timestamp.
static interface DataService.IDataServiceCounters
          Interface defines and documents the counters and counter namespaces reported by the DataService and the various services which it uses.
static interface DataService.Options
          Options understood by the DataService.
protected static class DataService.RangeIteratorTask
          Task for running a rangeIterator operation.
protected static class DataService.ReadBlockCounters
           
 
Field Summary
protected static org.apache.log4j.Logger log
           
 
Constructor Summary
protected DataService(Properties properties)
          Core constructor - you MUST start() the DataService before it can be used.
 
Method Summary
 void abort(long tx)
          Request abort of the transaction by the data service.
 void destroy()
          Destroy the service.
 void dropIndex(String name)
          Drops the named index.
 void forceOverflow(boolean immediate, boolean compactingMerge)
          Method sets a flag that will force overflow processing during the next group commit and optionally forces a group commit.
 long getAsynchronousOverflowCounter()
          The #of asynchronous overflows that have taken place on this data service (the counter is not restart safe).
 ConcurrencyManager getConcurrencyManager()
          The object used to control access to the local resources.
protected  File getHTTPDURLFile()
          The file on which the URL of the embedded httpd service is written.
 IndexMetadata getIndexMetadata(String name, long timestamp)
          Return the metadata for the named index.
static String getIndexPartitionName(String name, int partitionId)
          Forms the name of the index corresponding to a partition of a named scale-out index as name#partitionId.
 ILocalTransactionManager getLocalTransactionManager()
          The object used to coordinate transactions executing against local resources.
 Properties getProperties()
          An object wrapping the properties specified to the ctor.
 ResourceManager getResourceManager()
          The object used to manage the local resources.
 Class getServiceIface()
          Returns either IDataService or IMetadataService as appropriate.
 Session getSession()
          A transient and dynamic property set (aka session).
 boolean isOpen()
          Note: "open" is judged by the ConcurrencyManager.isOpen() but the DataService is not usable until StoreManager.isStarting() returns false (there is asynchronous processing involved in reading the existing store files or creating the first store file and you can not use the DataService until that processing has been completed).
 boolean isOverflowActive()
          Return true iff the data service is currently engaged in overflow processing.
protected  IResourceManager newResourceManager(Properties properties)
          Returns the IResourceManager.
 void prepare(long tx, long revisionTime)
          Request that the IDataService participate in a 3-phase commit.
 boolean purgeOldResources(long timeout, boolean truncateJournal)
          This attempts to pause the service accepting ITx.UNISOLATED writes and then purges any resources that are no longer required based on the StoreManager.Options#MIN_RELEASE_AGE.
 ResultSet rangeIterator(long tx, String name, byte[] fromKey, byte[] toKey, int capacity, int flags, IFilterConstructor filter)
           Streaming traversal of keys and/or values in a key range.
 IBlock readBlock(IResourceMetadata resource, long addr)
          Read a low-level record from the described IRawStore described by the IResourceMetadata.
 void registerIndex(String name, IndexMetadata metadata)
          Register a named mutable index on the DataService.
 void setReleaseTime(long releaseTime)
          Notify a data service that it MAY release data required to support views for up to the specified releaseTime .
 void shutdown()
          Polite shutdown does not accept new requests and will shutdown once the existing requests have been processed.
 void shutdownNow()
          Shutdown attempts to abort in-progress requests and shutdown as soon as possible.
 long singlePhaseCommit(long tx)
          Note: This is basically identical to the standalone journal case.
 DataService start()
          Starts the DataService.
 Future<? extends Object> submit(Callable<? extends Object> task)
          Note: When the DataService is accessed via RMI the Future MUST be a proxy.
 Future submit(long tx, String name, IIndexProcedure proc)
          Note: This chooses ITx.READ_COMMITTED if the the index has ITx.UNISOLATED isolation and the IIndexProcedure is an read-only operation.
protected  Future wrapFuture(Future future)
          Encapsulate the Future within a proxy that may be marshalled by RMI and sent to a remote client.
 
Methods inherited from class com.bigdata.service.AbstractService
clearLoggingContext, getFederation, getHostname, getServiceName, getServiceUUID, setServiceUUID, setupLoggingContext
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 
Methods inherited from interface com.bigdata.service.IService
getHostname, getServiceName, getServiceUUID
 

Field Detail

log

protected static final org.apache.log4j.Logger log
Constructor Detail

DataService

protected DataService(Properties properties)
Core constructor - you MUST start() the DataService before it can be used.

Parameters:
properties - The configuration properties.
See Also:
DataService.Options, start()
Method Detail

getResourceManager

public ResourceManager getResourceManager()
The object used to manage the local resources.


getConcurrencyManager

public ConcurrencyManager getConcurrencyManager()
The object used to control access to the local resources.


getLocalTransactionManager

public ILocalTransactionManager getLocalTransactionManager()
The object used to coordinate transactions executing against local resources.


newResourceManager

protected IResourceManager newResourceManager(Properties properties)
Returns the IResourceManager.

Parameters:
properties - Properties to configure that object.
Returns:
The IResourceManager.

getProperties

public Properties getProperties()
An object wrapping the properties specified to the ctor.


getSession

public Session getSession()
Description copied from interface: ISession
A transient and dynamic property set (aka session).

Specified by:
getSession in interface ISession

isOpen

public boolean isOpen()
Note: "open" is judged by the ConcurrencyManager.isOpen() but the DataService is not usable until StoreManager.isStarting() returns false (there is asynchronous processing involved in reading the existing store files or creating the first store file and you can not use the DataService until that processing has been completed). The ConcurrencyManager will block for a while waiting for the StoreManager startup to complete and will reject tasks if startup processing does not complete within a timeout.

Specified by:
isOpen in interface IServiceShutdown

start

public DataService start()
Starts the DataService.

Specified by:
start in class AbstractService
Returns:
this (the return type should be strengthened by the concrete implementation to return the actual type).
TODO:
it would be nice if start() could restart after shutdown() but that is hardly necessary.

shutdown

public void shutdown()
Polite shutdown does not accept new requests and will shutdown once the existing requests have been processed.

Specified by:
shutdown in interface IServiceShutdown
Overrides:
shutdown in class AbstractService

shutdownNow

public void shutdownNow()
Shutdown attempts to abort in-progress requests and shutdown as soon as possible.

Specified by:
shutdownNow in interface IServiceShutdown
Overrides:
shutdownNow in class AbstractService

destroy

public void destroy()
Description copied from interface: IService
Destroy the service. If the service is running, it is shutdown immediately and then destroyed. This method has the same signature as DestroyAdmin.destroy().

Specified by:
destroy in interface IService
Overrides:
destroy in class AbstractService

getHTTPDURLFile

protected File getHTTPDURLFile()
The file on which the URL of the embedded httpd service is written.


setReleaseTime

public void setReleaseTime(long releaseTime)
Description copied from interface: ITxCommitProtocol
Notify a data service that it MAY release data required to support views for up to the specified releaseTime . This is the mechanism by which read locks are released. In effect, a read lock is a requirement that the releaseTime not be advanced as far as the start time of the transaction holding that read lock. Periodically and as transactions complete, the transaction manager will advance the releaseTime, thereby releasing read locks.

Specified by:
setReleaseTime in interface ITxCommitProtocol
Parameters:
releaseTime - The new release time (strictly advanced by the transaction manager).

singlePhaseCommit

public long singlePhaseCommit(long tx)
                       throws ExecutionException,
                              InterruptedException,
                              IOException
Note: This is basically identical to the standalone journal case.

Specified by:
singlePhaseCommit in interface ITxCommitProtocol
Parameters:
tx - The transaction identifier.
Returns:
The commit time assigned to that transaction.
Throws:
ExecutionException - This will wrap a ValidationError if validation fails.
InterruptedException - if interrupted.
IOException - if there is an RMI problem.
See Also:
JournalTransactionService#commitImpl(long)}.

prepare

public void prepare(long tx,
                    long revisionTime)
             throws ExecutionException,
                    InterruptedException,
                    IOException
Description copied from interface: ITxCommitProtocol
Request that the IDataService participate in a 3-phase commit.

When the IDataService is sent the ITxCommitProtocol.prepare(long, long) message it executes a task which will handle commit processing for the transaction. That task MUST hold exclusive locks for the unisolated indices to which the transaction write sets will be applied. While holding those locks, the task must first validate the transaction's write set and then merge down the write set onto the corresponding unisolated indices using the specified revisionTime and checkpoint the indices in order to reduce all possible sources of latency. Note that each IDataService is able to independently prepare exactly those parts of the transaction's write set which are mapped onto index partitions hosted by a given IDataService.

Once validation is complete and all possible steps have been taken to reduce sources of latency (e.g., checkpoint the indices and pre-extending the store if necessary), the task notifies the ITransactionService that it has prepared using ITransactionService#prepared(long). The ITransactionService will wait until all tasks have prepared. If a task CAN NOT prepare the transaction, then it MUST throw an exception out of its ITxCommitProtocol.prepare(long, long) method.

Once all tasks have send an ITransactionService#prepared(long) message to the ITransactionService, it will assign a commitTime to the transaction and permit those methods to return that commitTime to the IDataServices. Once the task receives the assigned commit time, it must obtain an exclusive write lock for the live journal (this is a higher requirement than just an exclusive lock on the necessary indices and will lock out all other write requests for the journal), register the checkpointed indices on the commit list and then request a commit of the journal using the specified commitTime. The task then notifies the transaction service that it has completed its commit using ITransactionService#committed(long) and awaits a response. If the ITransactionService indicates that the commit was not successful, the task rolls back the live journal to the prior commit point and throws an exception out of ITxCommitProtocol.prepare(long, long).

A sample flow for successful a distributed transaction commit is shown below. This example shows two IDataServices on which the client has written. (If the client only writes on a single data service then we use a single-phase commit protocol).

 client -------+----txService----+--dataService1--+--dataService2--+...           
   | [1]                    
   | commit(tx) -------- + [2]
   |                     | prepare(tx,rev) +
   |                     | [3]             |
   |                     | prepare(tx,rev) ------------------+
   |                     |                 |                 |
   |                     | <--prepared(tx) +                 |
   |                     |                                   |
   |                     | <------------------- prepared(tx) +
   |                     |  
   |       "prepared" barrier [4]
   |                     |
   |                     | -- (commitTime) +  
   |                     | -------------------- (commitTime) +
   |                     | [5]             |                 |
   |                     | <--committed(tx)------------------+  
   |                     | [6]             |
   |                     | <--committed(tx)+
   |                     | 
   |       "committed" barrier [7]
   |                     | [8]
   |                     | ------ (success)+  
   |                     | [9]             |
   |                     | (void)----------+
   |                     |                 halt
   |                     | [10]             
   |                     | ------------------------ (success)+
   |                     | [11]                              |
   |                     | (void)----------------------------+
   |                [12] |                                   halt
   | (commitTime)--------+  
   |                   
 
There are many points in the protocol where commit processing can fail. However, there are two primary failure classifications that are of interest for error handling. Up until the first barrier is satisified, there is no side-effect on the persistent state so error handling need only halt processing on the IDataServices and discard any local state associated with the transaction and throw an exception out of ITxCommitProtocol.prepare(long, long). Once the first barrier has been satisfied, persistent side-effects MAY occur. Error handling in this case must rollback the state of the live journal for each of the participating IDataServices. If error handling was performed in response to a local error, then the IDataService must throw that error out of ITxCommitProtocol.prepare(long, long). However, if error handling was initiated because ITransactionService#committed(long) returned false then it should return normally (after rolling back the journal).

Specified by:
prepare in interface ITxCommitProtocol
Parameters:
tx - The transaction identifier.
revisionTime - The timestamp that will be written into the ITuples when the write set of the validated transaction is merged down onto the unisolated indices.
Throws:
IOException - if there is an RMI problem.
ExecutionException
InterruptedException

abort

public void abort(long tx)
           throws IOException
Description copied from interface: ITxCommitProtocol
Request abort of the transaction by the data service. This message is sent in response to ITransactionService.abort(long) to each IDataService on which the transaction has written. It is NOT sent for read-only transactions since they have no local state on the IDataServices.

Specified by:
abort in interface ITxCommitProtocol
Parameters:
tx - The transaction identifier.
Throws:
IOException - if there is an RMI problem.

getIndexPartitionName

public static final String getIndexPartitionName(String name,
                                                 int partitionId)
Forms the name of the index corresponding to a partition of a named scale-out index as name#partitionId.

Another advantage of this naming scheme is that index partitions are just named indices and all of the mechanisms for operating on named indices and for concurrency control for named indices apply automatically. Among other things, this means that different tasks can write concurrently on different partitions of the same named index on a given DataService.

Returns:
The name of the index partition.

getServiceIface

public Class getServiceIface()
Returns either IDataService or IMetadataService as appropriate.

Specified by:
getServiceIface in interface IService
Specified by:
getServiceIface in class AbstractService

registerIndex

public void registerIndex(String name,
                          IndexMetadata metadata)
                   throws IOException,
                          InterruptedException,
                          ExecutionException
Description copied from interface: IDataService
Register a named mutable index on the DataService.

Note: In order to register an index partition the partition metadata property MUST be set. The resources property will then be overriden when the index is actually registered so as to reflect the IResourceMetadata description of the journal on which the index actually resides.

Specified by:
registerIndex in interface IDataService
Parameters:
name - The name that can be used to recover the index. In order to create a partition of an index you must form the name of the index partition using getIndexPartitionName(String, int) (this operation is generally performed by the IMetadataService which manages scale-out indices).
metadata - The metadata describing the index.

The LocalPartitionMetadata.getResources() property on the IndexMetadata.getPartitionMetadata() SHOULD NOT be set. The correct IResourceMetadata[] will be assigned when the index is registered on the IDataService.

Throws:
IOException
InterruptedException
ExecutionException

dropIndex

public void dropIndex(String name)
               throws IOException,
                      InterruptedException,
                      ExecutionException
Description copied from interface: IDataService
Drops the named index.

Note: In order to drop a partition of an index you must form the name of the index partition using getIndexPartitionName(String, int) (this operation is generally performed by the IMetadataService which manages scale-out indices).

Specified by:
dropIndex in interface IDataService
Parameters:
name - The index name.
Throws:
IOException
InterruptedException
ExecutionException

getIndexMetadata

public IndexMetadata getIndexMetadata(String name,
                                      long timestamp)
                               throws IOException,
                                      InterruptedException,
                                      ExecutionException
Description copied from interface: IDataService
Return the metadata for the named index.

Specified by:
getIndexMetadata in interface IDataService
Parameters:
name - The index name.
timestamp - A transaction identifier, ITx.UNISOLATED for the unisolated index view, ITx.READ_COMMITTED, or timestamp for a historical view no later than the specified timestamp.
Returns:
The metadata for the named index.
Throws:
IOException
InterruptedException
ExecutionException

submit

public Future submit(long tx,
                     String name,
                     IIndexProcedure proc)
Note: This chooses ITx.READ_COMMITTED if the the index has ITx.UNISOLATED isolation and the IIndexProcedure is an read-only operation. This provides better concurrency on the DataService by moving read-only operations off of the WriteExecutorService.

Note: When the DataService is accessed via RMI the Future MUST be a proxy. This gets handled by the concrete server implementation.

Specified by:
submit in interface IDataService
Parameters:
tx - The transaction identifier, ITx.UNISOLATED for an ACID operation NOT isolated by a transaction, ITx.READ_COMMITTED for a read-committed operation not protected by a transaction (no global read lock), or any valid commit time for a read-historical operation not protected by a transaction (no global read lock).
name - The name of the index partition.
proc - The procedure to be executed.
Returns:
The Future from which the outcome of the procedure may be obtained.

submit

public Future<? extends Object> submit(Callable<? extends Object> task)
Note: When the DataService is accessed via RMI the Future MUST be a proxy. This gets handled by the concrete server implementation.

Specified by:
submit in interface IDataService
Specified by:
submit in interface IRemoteExecutor
Returns:
The Future for that task.
See Also:
AbstractDistributedFederation.getProxy(Future)
TODO:
we should probably put the federation object in a sandbox in order to prevent various operations by tasks running in the DataService using the IDataServiceCallable interface to gain access to the DataService's federation. for example, if they use AbstractFederation.shutdownNow() then the DataService itself would be shutdown.

wrapFuture

protected Future wrapFuture(Future future)
Encapsulate the Future within a proxy that may be marshalled by RMI and sent to a remote client. The client will interact with the unmarshalled Future, which in turn will use RMI to control the original Future within the DataService.

The default implementation simply returns the future and MUST be overriden when remote clients will use RMI to execute methods on the DataService.

Parameters:
future - The future.
Returns:
The encapsulated future.

rangeIterator

public ResultSet rangeIterator(long tx,
                               String name,
                               byte[] fromKey,
                               byte[] toKey,
                               int capacity,
                               int flags,
                               IFilterConstructor filter)
                        throws InterruptedException,
                               ExecutionException
Description copied from interface: IDataService

Streaming traversal of keys and/or values in a key range.

Note: In order to visit all keys in a range, clients are expected to issue repeated calls in which the fromKey is incremented to the successor of the last key visited until either an empty ResultSet is returned or the ResultSet#isLast() flag is set, indicating that all keys up to (but not including) the startKey have been visited. See ClientIndexView (scale-out indices) and DataServiceTupleIterator (unpartitioned indices), both of which encapsulate this method.

Note: If the iterator can be determined to be read-only and it is submitted as ITx.UNISOLATED then it will be run as ITx.READ_COMMITTED to improve concurrency.

Specified by:
rangeIterator in interface IDataService
Parameters:
tx - The transaction identifier -or- ITx.UNISOLATED IFF the operation is NOT isolated by a transaction -or- - tx to read from the most recent commit point not later than the absolute value of tx (a fully isolated read-only transaction using a historical start time).
name - The index name (required).
fromKey - The starting key for the scan (or null iff there is no lower bound).
toKey - The first key that will not be visited (or null iff there is no upper bound).
capacity - When non-zero, this is the maximum #of entries to process.
flags - One or more flags formed by bitwise OR of zero or more of the constants defined by IRangeQuery.
filter - An optional object that may be used to layer additional semantics onto the iterator. The filter will be constructed on the server and in the execution context for the iterator, so it will execute directly against the index for the maximum efficiency.
Throws:
InterruptedException - if the operation was interrupted.
ExecutionException - If the operation caused an error. See Throwable.getCause() for the underlying error.

readBlock

public IBlock readBlock(IResourceMetadata resource,
                        long addr)
Description copied from interface: IDataService
Read a low-level record from the described IRawStore described by the IResourceMetadata.

Specified by:
readBlock in interface IDataService
Parameters:
resource - The description of the resource containing that block.
addr - The address of the block in that resource.
Returns:
An object that may be used to read the block from the data service.
TODO:
this operation should be able to abort an read that takes too long or if there is a need to delete the resource., this should be run on the read service., coordinate close out of stores., efficient (stream-based) read from the journal (IBlockStore API). This is a fully buffered read and will cause heap churn.

forceOverflow

public void forceOverflow(boolean immediate,
                          boolean compactingMerge)
                   throws IOException,
                          InterruptedException,
                          ExecutionException
Description copied from interface: IDataService
Method sets a flag that will force overflow processing during the next group commit and optionally forces a group commit. Normally there is no reason to invoke this method directly. Overflow processing is triggered automatically on a bottom-up basis when the extent of the live journal nears the Options.MAXIMUM_EXTENT.

Specified by:
forceOverflow in interface IDataService
Parameters:
immediate - The purpose of this argument is to permit the caller to trigger an overflow event even though there are no writes being made against the data service. When true the method will write a token record on the live journal in order to provoke a group commit. In this case synchronous overflow processing will have occurred by the time the method returns. When false a flag is set and overflow processing will occur on the next commit.
compactingMerge - The purpose of this flag is to permit the caller to indicate that a compacting merge should be performed for all indices on the data service (at least, all indices whose data are not simply copied onto the new journal) during the next synchronous overflow. Note that compacting merges of indices are performed automatically from time to time so this flag exists mainly for people who want to force a compacting merge for some reason.
Throws:
IOException
InterruptedException - may be thrown if immediate is true.
ExecutionException - may be thrown if immediate is true.

purgeOldResources

public boolean purgeOldResources(long timeout,
                                 boolean truncateJournal)
                          throws InterruptedException
Description copied from interface: IDataService
This attempts to pause the service accepting ITx.UNISOLATED writes and then purges any resources that are no longer required based on the StoreManager.Options#MIN_RELEASE_AGE.

Note: Resources are normally purged during synchronous overflow handling. However, asynchronous overflow handling can cause resources to no longer be needed as new index partition views are defined. This method MAY be used to trigger a release before the next overflow event.

Specified by:
purgeOldResources in interface IDataService
Parameters:
timeout - The timeout (in milliseconds) that the method will await the pause of the write service.
truncateJournal - When true, the live journal will be truncated to its minimum extent (all writes will be preserved but there will be no free space left in the journal). This may be used to force the DataService to its minimum possible footprint for the configured history retention policy.
Returns:
true if successful and false if the write service could not be paused after the specified timeout.
Throws:
InterruptedException

getAsynchronousOverflowCounter

public long getAsynchronousOverflowCounter()
                                    throws IOException
Description copied from interface: IDataService
The #of asynchronous overflows that have taken place on this data service (the counter is not restart safe).

Specified by:
getAsynchronousOverflowCounter in interface IDataService
Throws:
IOException

isOverflowActive

public boolean isOverflowActive()
                         throws IOException
Description copied from interface: IDataService
Return true iff the data service is currently engaged in overflow processing.

Specified by:
isOverflowActive in interface IDataService
Throws:
IOException


Copyright © 2006-2009 SYSTAP, LLC. All Rights Reserved.