com.bigdata.rdf.rio
Class StatementBuffer<S extends Statement>

java.lang.Object
  extended by com.bigdata.rdf.rio.StatementBuffer<S>
All Implemented Interfaces:
IStatementBuffer<S>, IBuffer<S>
Direct Known Subclasses:
VerifyStatementBuffer

public class StatementBuffer<S extends Statement>
extends Object
implements IStatementBuffer<S>

A write buffer for absorbing the output of the RIO parser or other Statement source and writing that output onto an AbstractTripleStore using the batch API.

Note: there is a LOT of Value duplication in parsed RDF and we get a significant reward for reducing Values to only the distinct Values during processing. On the other hand, there is little Statement duplication. Hence we pay an unnecessary overhead if we try to make the statements distinct in the buffer.

Note: This also provides an explanation for why neither this class nor writes of SPOs do better when "distinct" statements is turned on - the "Value" objects in that case are only represented by long integers and duplication in their values does not impose a burden on either the heap or the index writers. In contrast, the duplication of Values in the StatementBuffer imposes a burden on both the heap and the index writers.

Version:
$Id: StatementBuffer.java 6359 2012-06-25 20:09:40Z thompsonbry $
Author:
Bryan Thompson

Field Summary
protected  int capacity
          The maximum #of Statements, URIs, Literals, or BNodes that the buffer can hold.
protected  AbstractTripleStore database
          The database that will be used to resolve terms.
protected  boolean distinct
          When true only distinct terms are stored in the buffer (this is always true since this condition always outperforms the alternative).
protected  int numBNodes
           
protected  int numLiterals
           
protected  int numSIDs
          The #of blank nodes which appear in the context position and zero (0) if statement identifiers are not enabled.
protected  int numStmts
          #of valid entries in stmts.
protected  int numURIs
           
protected  int numValues
          #of valid entries in values.
protected  BigdataStatement[] stmts
          Buffer for parsed RDF Statements.
protected  BigdataValueFactory valueFactory
           
protected  BigdataValue[] values
          Buffer for parsed RDF Values.
 
Constructor Summary
StatementBuffer(AbstractTripleStore database, int capacity)
          Create a buffer that converts Sesame Value objects to SPO s and writes on the database when it is flush()ed.
StatementBuffer(TempTripleStore statementStore, AbstractTripleStore database, int capacity)
          Create a buffer that writes on a TempTripleStore when it is flush()ed.
 
Method Summary
protected  void _clear()
          Invoked by incrementalWrite() to clear terms and statements which have been written in preparation for buffering more writes.
 void add(Resource s, URI p, Value o)
          Add an "explicit" statement to the buffer (flushes on overflow, no context).
 void add(Resource s, URI p, Value o, Resource c)
          Add an "explicit" statement to the buffer (flushes on overflow).
 void add(Resource s, URI p, Value o, Resource c, StatementEnum type)
          Add a statement to the buffer (core impl, flushes on overflow).
 void add(Statement e)
          Add a statement to the buffer.
protected  long addStatements(BigdataStatement[] stmts, int numStmts)
          Adds the statements to each index (batch api, NO truth maintenance).
protected  void addTerms(BigdataValue[] terms, int numTerms)
           
 long flush()
          Signals the end of a source and causes all buffered statements to be written.
 AbstractTripleStore getDatabase()
          The database that will be used to resolve terms.
protected  BigdataValue getDistinctTerm(BigdataValue term)
          Canonicalizing mapping for a term.
 AbstractTripleStore getStatementStore()
          The optional store into which statements will be inserted when non- null.
protected  void handleStatement(Resource s, URI p, Value o, Resource c, StatementEnum type)
          Adds the values and the statement into the buffer.
protected  void incrementalWrite()
          Batch insert buffered data (terms and statements) into the store.
 boolean isEmpty()
          True iff there are no elements in the buffer.
 boolean nearCapacity()
          Returns true if the bufferQueue has less than three slots remaining for any of the value arrays (URIs, Literals, or BNodes) or if there are no slots remaining in the statements array.
protected  void processDeferredStatements()
          Processes the deferred statements.
 void reset()
          Clears all buffered data, including the canonicalizing mapping for blank nodes and deferred provenance statements.
 void setBNodeMap(Map<String,BigdataBNode> bnodes)
          Set the canonicalizing map for blank nodes based on their ID.
 void setChangeLog(IChangeLog changeLog)
           
 void setReadOnly()
          When invoked, the StatementBuffer will resolve terms against the lexicon, but not enter new terms into the lexicon.
 int size()
          The #of elements currently in the buffer.
protected  long writeSPOs(SPO[] stmts, int numStmts)
          Adds the statements to each index (batch api, NO truth maintenance).
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

values

protected final BigdataValue[] values
Buffer for parsed RDF Values.


stmts

protected final BigdataStatement[] stmts
Buffer for parsed RDF Statements.


numValues

protected int numValues
#of valid entries in values.


numStmts

protected int numStmts
#of valid entries in stmts.


numURIs

protected int numURIs
TODO:
consider tossing out these counters - they only add complexity to the code in #handleStatement(Resource, URI, Value, StatementEnum).

numLiterals

protected int numLiterals
TODO:
consider tossing out these counters - they only add complexity to the code in #handleStatement(Resource, URI, Value, StatementEnum).

numBNodes

protected int numBNodes
TODO:
consider tossing out these counters - they only add complexity to the code in #handleStatement(Resource, URI, Value, StatementEnum).

numSIDs

protected int numSIDs
The #of blank nodes which appear in the context position and zero (0) if statement identifiers are not enabled.


database

protected final AbstractTripleStore database
The database that will be used to resolve terms. When statementStore is null, statements will be written into this store as well.


valueFactory

protected final BigdataValueFactory valueFactory

capacity

protected final int capacity
The maximum #of Statements, URIs, Literals, or BNodes that the buffer can hold. The minimum capacity is three (3) since that corresponds to a single triple where all terms are URIs.


distinct

protected final boolean distinct
When true only distinct terms are stored in the buffer (this is always true since this condition always outperforms the alternative).

See Also:
Constant Field Values
Constructor Detail

StatementBuffer

public StatementBuffer(AbstractTripleStore database,
                       int capacity)
Create a buffer that converts Sesame Value objects to SPO s and writes on the database when it is flush()ed. This may be used to perform efficient batch write of Sesame Values or Statements onto the database. If you already have SPOs then use IRawTripleStore.addStatements(IChunkedOrderedIterator, IElementFilter) and friends.

Parameters:
database - The database into which the termS and statements will be inserted.
capacity - The #of statements that the buffer can hold.

StatementBuffer

public StatementBuffer(TempTripleStore statementStore,
                       AbstractTripleStore database,
                       int capacity)
Create a buffer that writes on a TempTripleStore when it is flush()ed. This variant is used during truth maintenance since the terms are written on the database lexicon but the statements are asserted against the TempTripleStore.

Parameters:
statementStore - The store into which the statements will be inserted (optional). When null, both statements and terms will be inserted into the database. This optional argument provides the ability to load statements into a temporary store while the terms are resolved against the main database. This facility is used during incremental load+close operations.
database - The database. When statementStore is null, both terms and statements will be inserted into the database.
capacity - The #of statements that the buffer can hold.
Method Detail

getStatementStore

public final AbstractTripleStore getStatementStore()
The optional store into which statements will be inserted when non- null.

Specified by:
getStatementStore in interface IStatementBuffer<S extends Statement>

getDatabase

public final AbstractTripleStore getDatabase()
The database that will be used to resolve terms. When getStatementStore() is null, statements will be written into this store as well.

Specified by:
getDatabase in interface IStatementBuffer<S extends Statement>

isEmpty

public boolean isEmpty()
Description copied from interface: IBuffer
True iff there are no elements in the buffer.

Specified by:
isEmpty in interface IBuffer<S extends Statement>

size

public int size()
Description copied from interface: IBuffer
The #of elements currently in the buffer.

Specified by:
size in interface IBuffer<S extends Statement>

setReadOnly

public void setReadOnly()
When invoked, the StatementBuffer will resolve terms against the lexicon, but not enter new terms into the lexicon. This mode can be used to efficiently resolve terms to SPOs. FIXME REFACTOR

TODO:
Use an IBuffer pattern can be used to make the statement buffer chunk-at-a-time. The buffer has a readOnly argument and will visit SPOs for the source statements. When readOnly, new terms will not be added to the database., Once we have the SPOs we can just feed them into whatever consumer we like and do bulk completion, bulk filtering, write the SPOs onto the database,etc., must also support the focusStore patterns, which should not be too difficult.

setChangeLog

public void setChangeLog(IChangeLog changeLog)

flush

public long flush()
Signals the end of a source and causes all buffered statements to be written.

Note: The source limits the scope within which blank nodes are co-referenced by their IDs. Calling this method will flush the buffer, cause any deferred statements to be written, and cause the canonicalizing mapping for blank nodes to be discarded.

Specified by:
flush in interface IBuffer<S extends Statement>
Returns:
The #of elements written on the backing IRelation. See IMutableRelation
TODO:
this implementation always returns ZERO (0).

processDeferredStatements

protected void processDeferredStatements()
Processes the deferred statements.

When statement identifiers are enabled the processing of statements using blank nodes in their subject or object position must be deferred until we know whether or not the blank node is being used as a statement identifier (blank nodes are not allowed in the predicate position by the RDF data model). If the blank node is being used as a statement identifier then its IV will be assigned based on the {s,p,o} triple. If it is being used as a blank node, then the IV is assigned using the blank node ID.

Deferred statements are processed as follows:

  1. Collect all deferred statements whose blank node bindings never show up in the context position of a statement ( BigdataBNode#getStatementIdentifier() is false). Those blank nodes are NOT statement identifiers so we insert them into the lexicon and the insert the collected statements as well.
  2. The remaining deferred statements are processed in "cliques". Each clique consists of all remaining deferred statements whose {s,p,o} have become fully defined by virtue of a blank node becoming bound as a statement identifier. A clique is collected by a full pass over the remaining deferred statements. This process repeats until no statements are identified (an empty clique or fixed point).
If there are remaining deferred statements then they contain cycles. This is an error and an exception is thrown.

TODO:
on each flush(), scan the deferred statements for those which are fully determined (bnodes are flagged as statement identifiers) to minimize the build up for long documents?

reset

public void reset()
Clears all buffered data, including the canonicalizing mapping for blank nodes and deferred provenance statements.

Specified by:
reset in interface IBuffer<S extends Statement>

setBNodeMap

public void setBNodeMap(Map<String,BigdataBNode> bnodes)
Description copied from interface: IStatementBuffer
Set the canonicalizing map for blank nodes based on their ID. This allows you to reuse the same map across multiple IStatementBuffer instances. For example, the BigdataSail does this so that the same bnode map is used throughout the life of a SailConnection. While RIO provides blank node correlation within a given source, it does NOT provide blank node correlation across sources. You need to use this method to do that.

Note: It is reasonable to expect that the bnodes map is used by concurrent threads. For this reason, the map SHOULD be thread-safe. This can be accomplished either using Collections.synchronizedMap(Map) or a ConcurrentHashMap. However, implementations MUST still be synchronized on the map reference across operations which conditionally insert into the map in order to make that update atomic and thread-safe. Otherwise a race condition exists for the conditional insert and different threads could get incoherent answers.

Specified by:
setBNodeMap in interface IStatementBuffer<S extends Statement>
Parameters:
bnodes - The blank nodes map.
TODO:
could be replaced with {@link BigdataValueFactory

_clear

protected void _clear()
Invoked by incrementalWrite() to clear terms and statements which have been written in preparation for buffering more writes. This does NOT discard either the canonicalizing mapping for blank nodes NOR any deferred statements.


incrementalWrite

protected void incrementalWrite()
Batch insert buffered data (terms and statements) into the store.


addTerms

protected void addTerms(BigdataValue[] terms,
                        int numTerms)

add

public void add(Resource s,
                URI p,
                Value o)
Add an "explicit" statement to the buffer (flushes on overflow, no context).

Specified by:
add in interface IStatementBuffer<S extends Statement>
Parameters:
s -
p -
o -

add

public void add(Resource s,
                URI p,
                Value o,
                Resource c)
Add an "explicit" statement to the buffer (flushes on overflow).

Specified by:
add in interface IStatementBuffer<S extends Statement>
Parameters:
s -
p -
o -
c -

add

public void add(Resource s,
                URI p,
                Value o,
                Resource c,
                StatementEnum type)
Add a statement to the buffer (core impl, flushes on overflow).

Specified by:
add in interface IStatementBuffer<S extends Statement>
Parameters:
s -
p -
o -
type -
c - The context (optional).

add

public void add(Statement e)
Description copied from interface: IStatementBuffer
Add a statement to the buffer.

Specified by:
add in interface IStatementBuffer<S extends Statement>
Specified by:
add in interface IBuffer<S extends Statement>
Parameters:
e - The statement. If stmt implements BigdataStatement then the StatementEnum will be used (this makes it possible to load axioms into the database as axioms) but the term identifiers on the stmt's values will be ignored.

addStatements

protected final long addStatements(BigdataStatement[] stmts,
                                   int numStmts)
Adds the statements to each index (batch api, NO truth maintenance).

Pre-conditions: The {s,p,o} term identifiers for each BigdataStatement are defined.

Note: If statement identifiers are enabled and the context position is non-null then it will be unified with the statement identifier assigned to that statement. It is an error if the context position is a URI (since it can not be unified with the assigned statement identifier). It is an error if the context position is a blank node which is already bound to a term identifier whose value is different from the statement identifier assigned/reported by the database.

Parameters:
stmts - An array of statements in any order.
Returns:
The #of statements written on the database.

writeSPOs

protected long writeSPOs(SPO[] stmts,
                         int numStmts)
Adds the statements to each index (batch api, NO truth maintenance).

Parameters:
stmts - An array of SPOs
Returns:
The #of statements written on the database.
See Also:
AbstractTripleStore.addStatements(AbstractTripleStore, boolean, IChunkedOrderedIterator, IElementFilter)

nearCapacity

public boolean nearCapacity()
Returns true if the bufferQueue has less than three slots remaining for any of the value arrays (URIs, Literals, or BNodes) or if there are no slots remaining in the statements array. Under those conditions adding another statement to the bufferQueue could cause an overflow.

Returns:
True if the bufferQueue might overflow if another statement were added.

getDistinctTerm

protected BigdataValue getDistinctTerm(BigdataValue term)
Canonicalizing mapping for a term.

Note: Blank nodes are made canonical with the scope of the source from which the data are being read. See bnodes. All other kinds of terms are made canonical within the scope of the buffer's current contents in order to keep down the demand on the heap with reading either very large documents or a series of small documents.

Parameters:
term - A term.
Returns:
Either the term or the pre-existing term in the buffer with the same data.

handleStatement

protected void handleStatement(Resource s,
                               URI p,
                               Value o,
                               Resource c,
                               StatementEnum type)
Adds the values and the statement into the buffer.

Parameters:
s - The subject.
p - The predicate.
o - The object.
c - The context (may be null).
type - The statement type.
Throws:
IndexOutOfBoundsException - if the buffer capacity is exceeded.
See Also:
nearCapacity()


Copyright © 2006-2012 SYSTAP, LLC. All Rights Reserved.