com.bigdata.rdf.rio
Class AsynchronousStatementBufferFactory<S extends BigdataStatement,R>

java.lang.Object
  extended by com.bigdata.rdf.rio.AsynchronousStatementBufferFactory<S,R>
Type Parameters:
S - The generic type of the statement objects.
R - The generic type of the resource identifier (File, URL, etc).
All Implemented Interfaces:
IStatementBufferFactory<S>, IAsynchronousWriteStatementBufferFactory<S>

public class AsynchronousStatementBufferFactory<S extends BigdataStatement,R>
extends Object
implements IAsynchronousWriteStatementBufferFactory<S>

Factory object for high-volume RDF data load.

The asynchronous statement buffer w/o SIDs is much simpler that w/. If we require that the document is fully buffered in memory, then we can simplify this to just:

 
 Given:
 
 value[] - RDF Values observed in the S,P,O, or C positions.
 
 statement[] - RDF Statements reported by the parser.
 
 Do:
 
 value[] => TERM2ID (Sync RPC, assigning TIDs)
 
 value[] => ID2TERM (Async)
 
 value[] => Text (Async, iff enabled)
 
  statement[] => (SPO,POS,OSP) (Async)
 
Note: This DOES NOT support truth maintenance. Truth maintenance requires that the term identifiers are resolved against the database's lexicon while the statements are written onto a local (and temporary) triple store. There is no (or at least less) reason to use asynchronous writes against a local store. However, TM could use this to copy the data from the temporary triple store to the database. This should be plugged in transparently in the copyStatements() API for the tripleStore.

Note: This DOES NOT support SIDS.

Version:
$Id: AsynchronousStatementBufferFactory.java 2265 2009-10-26 12:51:06Z thompsonbry $ FIXME Modify to support SIDs. We basically need to loop in the workflowLatch_bufferTerm2Id workflow state until all SIDs have been assigned. However, the termination conditions will be a little more complex. During termination, if we have the TIDs but not yet the SIDs then we need to flush the SID requests rather than allowing them to timeout. Since SID processing is cyclic, we may have to do this one or more times.
 AsynchronousStatementBufferWithSids:
 
 When SIDs are enabled, we must identify the minimum set of statements
 whose SIDs are referenced by blank nodes in the S, P, O positions of
 other statements.  Since we can not make that determination until we
 reach the end of the document, all statements which use blank nodes
 are placed into the deferredStatements container.
 
 Further, and unlike the synchronous StatementBuffer, we must defer
 writes of grounded statements until we know whether or not their SID
 appears in a blank node reference by another statement.  We MUST use
 synchronous RPC to obtain the SIDs for those statements.  This means
 that the entire document MUST be parsed into memory.  Since we must
 buffer the entire document in memory when SIDs are enabled (when using
 asynchronous writes), distinct implementations of the asynchronous
 statement buffer are used depending on whether or not SIDs are
 enabled. [Actually, we fully buffer anyway so we can use the same
 implementation class.]
 
 Once the end of the document has been reached, we iteratively divide
 the parsed statements into three collections.  This process ends once
 all three collections are empty.
 
    1. groundedStatements : These are statements which are not
       referenced by other statements using their SID and which do not
       contain references to the SIDs of other statements. The
       groundedStatements are written asynchronously since there is no
       dependency on their SIDs.
 
    2. referencedStatements : These are statements whose SID has not
       been assigned yet and which do not reference other statements
       but which are themselves referenced by other statements using a
       blank node. These statements are written using synchronous RPC
       so that we may obtain their SIDs and thereby convert one or more
       deferredStatements to either groundedStatements or
       referencedStatements.
 
    3. deferredStatements : These are statements using a blank node to
       reference another statement whose SID has not been assigned yet.
       These statements MAY also be referenced by other deferred
       statements.  However, those references MAY NOT form a cycle.
       Deferred statements are moved to either the groundedStatements
       or the referencedStatements collection once their blank node
       references have been assigned SIDs.
 
 Given:
 
 value[] - RDF Values observed in the S, P, O, and C positions.
 
 unresolvedRefs[] - RDF blank nodes observed in the C position are
            entered into this collection.  They are removed
            from the collection as they are resolved.
 
 statement[] - RDF Statements reported by the parser.
 
 Do:
 
 // remove blank nodes serving as SIDs from the value[].
 value[] := value[] - unresolvedRef[];
 
 value[] => TERM2ID (Sync RPC, assigning TIDs)
 
 value[] => ID2TERM (Async)
 
 value[] => Text (Async, iff enabled)
 
 // initially, all statements are deferred.
 deferredStatements := statements;
 
 while(!groundedStatements.isEmpty() && !referencedStatements.isEmpty()
    && !deferredStatements.isEmpty()) {
 
   groundedStatement[] => TERM2ID (async)
 
   groundedStatement[] := []; // empty.
 
   referencedStatement[] => TERM2ID (Sync RPC, assigning SIDs)
 
   foreach spo : referencedStatements {
 
     unresolvedRefs.remove( spo.c );
 
   }
 
   referencedStatement[] := []; // empty.
 
   foreach spo : deferredStatement[i] {
 
      if(spo.isGrounded) {
 
         // true iff S.tid, P.tid, and O.tid are bound, implying that
         // this statement does not have any unresolved references to
         // other statements.
 
     if(unresolvedReferences.contains(spo.c)) {
 
         // will be written synchronously.
         referencedStatements.add( spo );
 
     } else {
 
         // will be written asynchronously.
         groundedStatement.add( spo );
 
     }
 
      }
 
   }
 
 }
 
, $Id: AsynchronousStatementBufferFactory.java 2265 2009-10-26 12:51:06Z thompsonbry $
Author:
Bryan Thompson, Bryan Thompson
TODO:
evaluate this approach for writing on a local triple store. if there is a performance benefit then refactor accordingly (requires asynchronous write API for BTree and friends).

Nested Class Summary
protected  class AsynchronousStatementBufferFactory.AsynchronousStatementBufferImpl
          Inner class provides the statement buffer.
protected  class AsynchronousStatementBufferFactory.DeleteTask
          Task deletes a resource from the local file system.
protected  class AsynchronousStatementBufferFactory.ParserTask
          Tasks either loads a RDF resource or verifies that the told triples found in that resource are present in the database.
 
Field Summary
protected static org.apache.log4j.Logger log
           
 
Constructor Summary
AsynchronousStatementBufferFactory(ScaleOutTripleStore tripleStore, int producerChunkSize, int valuesInitialCapacity, int bnodesInitialCapacity, RDFFormat defaultFormat, boolean verifyData, boolean deleteAfter, int parserPoolSize, int parserQueueCapacity, int term2IdWriterPoolSize, int otherWriterPoolSize, int notifyPoolSize, long pauseParsedPoolStatementThreshold)
           
 
Method Summary
 void awaitAll()
          Close buffers and then await their Futures.
 void cancelAll(boolean mayInterruptIfRunning)
          Cancel all Futures.
 void close()
          Awaits a signal that all documents which have queued writes are finished and then closes the remaining buffers.
protected  void deleteResource(R resource)
          Delete a resource whose data have been made restart safe on the database from the local file system.
protected  void documentDone(R resource)
          Invoked after a document has become restart safe.
protected  void documentError(R resource, Throwable t)
          Invoked after a document has failed.
 CounterSet getCounters()
          Return performance counters defined by this factory.
protected  RDFFormat getDefaultRDFFormat()
          The default RDF interchange format that will be used when the format can not be determined.
 long getDocumentDoneCount()
          The #of documents submitted to the factory which have been processed successfully.
 long getDocumentErrorCount()
          The #of documents submitted to the factory which could not be processed due to some error.
 long getElapsedMillis()
          The elapsed milliseconds, counting only the time between notifyStart() and notifyEnd().
protected  InputStream getInputStream(R resource)
          Open an buffered input stream reading from the resource.
 long getStatementCount()
          Return an estimate of the #of statements written on the indices.
 boolean isAnyDone()
          Return true if the Future for any of the asynchronous write buffers is done.
protected  boolean isDeleteAfter()
          Delete files after they have been successfully loaded when true.
protected  boolean isVerifyData()
          Validate the RDF interchange syntax when true.
protected  Runnable newFailureTask(R resource, Throwable cause)
          Return the optional task to be executed for a resource for which processing has failed.
protected  Callable<?> newParserTask(R resource)
          Return a task to parse the document.
 IStatementBuffer<S> newStatementBuffer()
          Note: do not invoke this directly.
protected  AsynchronousStatementBufferFactory.AsynchronousStatementBufferImpl newStatementBuffer(R resource)
           
protected  Runnable newSuccessTask(R resource)
          Return the optional task to be executed for a resource which has been successfully processed and whose assertions are now restart safe on the database.
protected  void notifyEnd()
          Notify that the factory is done running tasks (for now).
protected  void notifyStart()
          Notify that the factory will begin running tasks.
 int submitAll(File fileOrDir, FilenameFilter filter, long retryMillis)
          Submit all files in a directory for processing via #submitOne(String).
 void submitOne(R resource)
          Submit a resource for processing.
 void submitOne(R resource, long retryMillis)
          Submit a resource for processing.
 String toString()
           
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 

Field Detail

log

protected static final transient org.apache.log4j.Logger log
Constructor Detail

AsynchronousStatementBufferFactory

public AsynchronousStatementBufferFactory(ScaleOutTripleStore tripleStore,
                                          int producerChunkSize,
                                          int valuesInitialCapacity,
                                          int bnodesInitialCapacity,
                                          RDFFormat defaultFormat,
                                          boolean verifyData,
                                          boolean deleteAfter,
                                          int parserPoolSize,
                                          int parserQueueCapacity,
                                          int term2IdWriterPoolSize,
                                          int otherWriterPoolSize,
                                          int notifyPoolSize,
                                          long pauseParsedPoolStatementThreshold)
Parameters:
tripleStore -
producerChunkSize - The chunk size used when writing chunks onto the master for the asynchronous index write API. If this value is on the order of the #of terms or statements in the parsed documents, then all terms / statements will be written onto the master in one chunk. The master will split the chunk based on the separator keys for the index partitions and write splits onto the sink for each index partition. The master and sink configuration is specified via the IndexMetadata when the triple store indices are created.
valuesInitialCapacity - The initial capacity of the map of the distinct RDF Values parsed from a single document.
bnodesInitialCapacity - The initial capacity of the map of the distinct RDF BNodes parsed from a single document.
defaultFormat -
verifyData -
deleteAfter -
parserPoolSize - The #of worker threads in the thread pool for parsing RDF documents.
parserQueueCapacity - The capacity of the bounded work queue for the service running the parser tasks.
term2IdWriterPoolSize - The #of worker threads in the thread pool for buffering asynchronous writes on the TERM2ID index.
otherWriterPoolSize - The #of worker threads in the thread pool for buffering asynchronous index writes on the other indices.
notifyPoolSize - The #of worker threads in the thread pool for handling document success and document error notices.
pauseParsedPoolStatementThreshold - The maximum #of statements which can be parsed but not yet buffered before requests for new parser tasks are paused [0: Long.MAX_VALUE]. This allows you to place a constraint on the RAM of the parsers. The RAM demand of the asynchronous index write buffers is controlled by their master and sink queue capacity and chunk size.
TODO:
CDL still used for validation by some unit tests. Do a variant of this that does read-only TERM2ID requests and then validates the indices so we can drop the ConcurrentDataLoader class.
Method Detail

isVerifyData

protected boolean isVerifyData()
Validate the RDF interchange syntax when true.


isDeleteAfter

protected boolean isDeleteAfter()
Delete files after they have been successfully loaded when true.


getDefaultRDFFormat

protected RDFFormat getDefaultRDFFormat()
The default RDF interchange format that will be used when the format can not be determined.


notifyStart

protected void notifyStart()
Notify that the factory will begin running tasks. This sets the startTime used by getElapsedMillis() to report the run time of the tasks.


notifyEnd

protected void notifyEnd()
Notify that the factory is done running tasks (for now). This places a cap on the time reported by #elapsed().


getElapsedMillis

public long getElapsedMillis()
The elapsed milliseconds, counting only the time between notifyStart() and notifyEnd().


getStatementCount

public long getStatementCount()
Return an estimate of the #of statements written on the indices.

This value is aggregated across any IStatementBuffer obtained from newStatementBuffer() for this instance.

This value actually reports the #of statements written on the SPO index for the database. Statements are written asynchronously in chunks and the writes MAY proceed at different rates for each of the statement indices. The counter value will be stable once the awaitAll() returns normally.

See Also:
SPOIndexWriteProc

getDocumentErrorCount

public long getDocumentErrorCount()
The #of documents submitted to the factory which could not be processed due to some error.


getDocumentDoneCount

public long getDocumentDoneCount()
The #of documents submitted to the factory which have been processed successfully.


newStatementBuffer

public IStatementBuffer<S> newStatementBuffer()
Note: do not invoke this directly. It does not know how to set the resource identifier on the statement buffer impl.

Specified by:
newStatementBuffer in interface IStatementBufferFactory<S extends BigdataStatement>
Specified by:
newStatementBuffer in interface IAsynchronousWriteStatementBufferFactory<S extends BigdataStatement>

newStatementBuffer

protected AsynchronousStatementBufferFactory.AsynchronousStatementBufferImpl newStatementBuffer(R resource)

submitOne

public void submitOne(R resource)
               throws Exception
Submit a resource for processing.

Parameters:
resource - The resource (file or URL, but not a directory).
Throws:
Exception - if there is a problem creating the parser task.
RejectedExecutionException - if the work queue for the parser service is full.

submitOne

public void submitOne(R resource,
                      long retryMillis)
               throws InterruptedException,
                      Exception
Submit a resource for processing.

Parameters:
resource - The resource (file or URL, but not a directory).
retryMillis - The number of milliseconds to wait between retries when the parser service work queue is full. When ZERO (0L), a RejectedExecutionException will be thrown out instead.
Throws:
Exception - if there is a problem creating the parser task.
RejectedExecutionException - if the service is shutdown -or- the retryMillis is ZERO(0L).
InterruptedException

submitAll

public int submitAll(File fileOrDir,
                     FilenameFilter filter,
                     long retryMillis)
              throws Exception
Submit all files in a directory for processing via #submitOne(String).

Parameters:
fileOrDir - The file or directory.
filter - An optional filter. Only the files selected by the filter will be processed.
retryMillis - The number of milliseconds to wait between retries when the parser service work queue is full. When ZERO (0L), a RejectedExecutionException will be thrown out instead.
Returns:
The #of files that were submitted for processing.
Throws:
Exception

getInputStream

protected InputStream getInputStream(R resource)
                              throws IOException
Open an buffered input stream reading from the resource. If the resource ends with .gz or .zip then the appropriate decompression will be applied.

Parameters:
resource - The resource identifier.
Throws:
IOException
TODO:
This will only read the first entry from a ZIP file. Archives need to be recognized as such by the driver and expanded into a sequence of parser calls with the input stream. That will require a different entry point since we can't close the ZipInputStream until we have read all the entries in that file. The ZipInputStream is likely not thread safe so the same parser thread would have to consume each of the entries even though they must also be dealt with as distinct documents. Given all that, reading more than the first entry might not be worth it.

newParserTask

protected Callable<?> newParserTask(R resource)
                             throws Exception
Return a task to parse the document. The task should allocate an AsynchronousStatementBufferFactory.AsynchronousStatementBufferImpl for the document. When that buffer is flushed, the document will be queued for further processing.

Parameters:
resource - The resource to be parsed.
Returns:
The task to execute.
Throws:
Exception

toString

public String toString()
Overrides:
toString in class Object

isAnyDone

public boolean isAnyDone()
Description copied from interface: IAsynchronousWriteStatementBufferFactory
Return true if the Future for any of the asynchronous write buffers is done.

Note: This method should be invoked periodically to verify that no errors have been encountered by the asynchronous write buffers. If this method returns true, invoke IAsynchronousWriteStatementBufferFactory.awaitAll(), which will detect any error(s), cancel the other Futures, and throw an error back to you.

Specified by:
isAnyDone in interface IAsynchronousWriteStatementBufferFactory<S extends BigdataStatement>

cancelAll

public void cancelAll(boolean mayInterruptIfRunning)
Description copied from interface: IAsynchronousWriteStatementBufferFactory
Cancel all Futures. The buffers will be automatically closed when their Futures are canceled.

Specified by:
cancelAll in interface IAsynchronousWriteStatementBufferFactory<S extends BigdataStatement>

close

public void close()
Awaits a signal that all documents which have queued writes are finished and then closes the remaining buffers.

Specified by:
close in interface IAsynchronousWriteStatementBufferFactory<S extends BigdataStatement>

awaitAll

public void awaitAll()
              throws InterruptedException,
                     ExecutionException
Description copied from interface: IAsynchronousWriteStatementBufferFactory
Close buffers and then await their Futures. Once closed, the buffers will not accept further input and the consumers will eventually drain the buffers and report that they are exhausted. The Futures will become available once the iterators are exhausted.

Specified by:
awaitAll in interface IAsynchronousWriteStatementBufferFactory<S extends BigdataStatement>
Throws:
InterruptedException - if interrupted while awaiting any of the Futures.
ExecutionException - if any Future fails.

documentDone

protected final void documentDone(R resource)
Invoked after a document has become restart safe. If newSuccessTask(Object) returns a Runnable then that will be executed on the notifyService.

Parameters:
resource - The document identifier.

documentError

protected final void documentError(R resource,
                                   Throwable t)
Invoked after a document has failed. If newFailureTask(Object, Throwable) returns a Runnable then that will be executed on the notifyService.

Parameters:
resource - The document identifier.
t - The exception.

newSuccessTask

protected Runnable newSuccessTask(R resource)
Return the optional task to be executed for a resource which has been successfully processed and whose assertions are now restart safe on the database. The task, if any, will be run on the notifyService.

The default implementation runs a AsynchronousStatementBufferFactory.DeleteTask IFF deleteAfter was specified as true to the ctor and otherwise returns null. The event is logged @ INFO.

Parameters:
resource - The resource.
Returns:
The task to run -or- null if no task should be run.

newFailureTask

protected Runnable newFailureTask(R resource,
                                  Throwable cause)
Return the optional task to be executed for a resource for which processing has failed. The task, if any, will be run on the notifyService.

The default implementation logs a message @ ERROR.

Parameters:
resource - The resource.
cause - The cause.
Returns:
The task to run -or- null if no task should be run.

deleteResource

protected void deleteResource(R resource)
Delete a resource whose data have been made restart safe on the database from the local file system.

Parameters:
resource - The resource.

getCounters

public CounterSet getCounters()
Description copied from interface: IAsynchronousWriteStatementBufferFactory
Return performance counters defined by this factory.

Specified by:
getCounters in interface IAsynchronousWriteStatementBufferFactory<S extends BigdataStatement>


Copyright © 2006-2009 SYSTAP, LLC. All Rights Reserved.