com.bigdata.relation.accesspath
Class BlockingBuffer<E>

java.lang.Object
  extended by com.bigdata.relation.accesspath.BlockingBuffer<E>
All Implemented Interfaces:
IBlockingBuffer<E>, IBuffer<E>, IRunnableBuffer<E>
Direct Known Subclasses:
BlockingBufferWithStats

public class BlockingBuffer<E>
extends Object
implements IBlockingBuffer<E>

A buffer that will block when it is full. You write elements on the buffer and they can be read using iterator(). This class is safe for concurrent writes (multiple threads can use add(Object)) but the iterator() is not thread-safe (it assumes a single reader).

You MUST make sure that the thread that sets up the BlockingBuffer and which submits a task that writes on the buffer also sets the Future on the BlockingBuffer so that the iterator can monitor the Future, detect if it has been canceled, and throw out the exception from the Future back to the client. Failure to do this can lead to the iterator not terminating!

Note: BlockingBuffer is used (a) for IAccessPath iterators that exceed the fully-buffered read threshold; (b) for high-level query with at least one join; (c) by the BigdataStatementIteratorImpl, which is used by the RDF DB for high-level query with no joins; and by the BigdataSolutionResolverator, which is used by the RDF DB high-level query that produces binding sets.

Version:
$Id: BlockingBuffer.java 6130 2012-03-15 10:31:25Z thompsonbry $
Author:
Bryan Thompson
TODO:
This class has internal complexities which arise from the need to coordinate elements being added to the buffer with elements being drained by the iterator without access to the internal lock of the backing queue. If we could use that internal lock then we could get rid of the spin loops and related cruft.

Nested Class Summary
protected  class BlockingBuffer.BlockingIterator
          An inner class that reads from the buffer.
 
Field Summary
static long DEFAULT_CONSUMER_CHUNK_TIMEOUT
          The default timeout in milliseconds during which chunks of elements may be combined in order to satisfy the desired minimum chunk size.
static TimeUnit DEFAULT_CONSUMER_CHUNK_TIMEOUT_UNIT
          The unit in which DEFAULT_CONSUMER_CHUNK_TIMEOUT is expressed (milliseconds).
static int DEFAULT_MINIMUM_CHUNK_SIZE
          The default minimum chunk size for the chunk combiner.
static int DEFAULT_PRODUCER_QUEUE_CAPACITY
          The default capacity for the internal Queue on which elements (or chunks of elements) are buffered.
protected static org.apache.log4j.Logger log
          Warning messages are emitted if either the producer or the consumer is stalled.
 
Constructor Summary
BlockingBuffer()
           
BlockingBuffer(BlockingQueue<E> queue, int minimumChunkSize, long chunkTimeout, TimeUnit chunkTimeoutUnit, boolean ordered)
          Core ctor.
BlockingBuffer(int capacity)
          Ctor automatically provisions an appropriate BlockingQueue.
BlockingBuffer(int capacity, int minimumChunkSize, long chunkTimeout, TimeUnit chunkTimeoutUnit)
           
 
Method Summary
 void abort(Throwable cause)
          Signal abnormal termination of the process writing on the buffer.
 void add(E e)
          Add an element to the buffer.
 boolean add(E e, long timeout, TimeUnit unit)
          Add element to the buffer.
 void clear()
          Clear the backing queue.
 void close()
          Closes the BlockingBuffer such that it will not accept new elements (this is a NOP if unless the buffer is open).
static
<E> E
combineChunks(E chunk1, E chunk2)
          Combines two chunks in the order given (DOES NOT apply a merge sort).
 long flush()
          This is a NOP since the IBlockingBuffer.iterator() is the only way to consume data written on the buffer.
 long getChunksAddedCount()
          The #of chunks #add(Object)ed to the buffer.
 long getChunkTimeout()
          The maximum time to wait in nanoseconds for another chunk to come along so that we can combine it with the current chunk for #next().
 long getElementsAddedCount()
          The #of elements #add(Object)ed to the buffer.
 long getElementsOnQueueCount()
          The #of elements on the queue.
 Future getFuture()
          The Future of the producer feeding the BlockingBuffer.
 int getMinimumChunkSize()
          The desired minimum chunk size for the chunk combiner.
 boolean isEmpty()
          True iff there are no elements in the buffer.
 boolean isOpen()
          Return true if the buffer is open.
 boolean isOrdered()
           
 IAsynchronousIterator<E> iterator()
          The iterator is NOT thread-safe and does NOT support remove().
 void reset()
          Reset the state of the buffer, including the counter whose value is reported by IBuffer.flush().
 void setFuture(Future future)
          Set the future of the producer feeding the BlockingBuffer.
 int size()
          This reports the #of items in the queue.
 String toString()
           
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 

Field Detail

log

protected static final org.apache.log4j.Logger log
Warning messages are emitted if either the producer or the consumer is stalled. When Category.isInfoEnabled() is true, those log messages will be include stack traces which can help to identify the the consumer or producer.


DEFAULT_PRODUCER_QUEUE_CAPACITY

public static final transient int DEFAULT_PRODUCER_QUEUE_CAPACITY
The default capacity for the internal Queue on which elements (or chunks of elements) are buffered.

See Also:
Constant Field Values

DEFAULT_MINIMUM_CHUNK_SIZE

public static final transient int DEFAULT_MINIMUM_CHUNK_SIZE
The default minimum chunk size for the chunk combiner.

See Also:
Constant Field Values

DEFAULT_CONSUMER_CHUNK_TIMEOUT

public static final transient long DEFAULT_CONSUMER_CHUNK_TIMEOUT
The default timeout in milliseconds during which chunks of elements may be combined in order to satisfy the desired minimum chunk size.

See Also:
Constant Field Values

DEFAULT_CONSUMER_CHUNK_TIMEOUT_UNIT

public static final transient TimeUnit DEFAULT_CONSUMER_CHUNK_TIMEOUT_UNIT
The unit in which DEFAULT_CONSUMER_CHUNK_TIMEOUT is expressed (milliseconds).

Constructor Detail

BlockingBuffer

public BlockingBuffer()

BlockingBuffer

public BlockingBuffer(int capacity)
Ctor automatically provisions an appropriate BlockingQueue.

Parameters:
capacity - The capacity of the buffer. When the generic type <E> is an array type, then this is the chunkOfChunksCapacity and small chunks will be automatically combined based on availability and latency. When zero (0) a SynchronousQueue will be used. Otherwise an ArrayBlockingQueue of the given capacity is used.

BlockingBuffer

public BlockingBuffer(int capacity,
                      int minimumChunkSize,
                      long chunkTimeout,
                      TimeUnit chunkTimeoutUnit)
Parameters:
capacity - The capacity of the buffer. When the generic type <E> is an array type, then this is the chunkOfChunksCapacity and small chunks will be automatically combined based on availability and latency. When zero (0) a SynchronousQueue will be used. Otherwise an LinkedBlockingDeque of the given capacity is used.
minimumChunkSize - The desired minimum chunk size. When the elements stored in the buffer are chunks (i.e., arrays of some component type), elements will be combined together to form larger chunks until this minimumChunkSize is satisfied, the iterator() is exhausted, or the chunkTimeout is reached.
chunkTimeout - The maximum time to wait in nanoseconds for another chunk to come along so that we can combine it with the current chunk for #next(). A value of ZERO(0) disables chunk combiner.
chunkTimeoutUnit - The units in which the chunkTimeout is expressed.
TODO:
There has been little to no testing of BlockingBuffer in combination with a SynchronousQueue.

BlockingBuffer

public BlockingBuffer(BlockingQueue<E> queue,
                      int minimumChunkSize,
                      long chunkTimeout,
                      TimeUnit chunkTimeoutUnit,
                      boolean ordered)
Core ctor.

Parameters:
queue - The queue on which elements will be buffered. Elements will be added to the queue and drained by the iterator(). When this is a BlockingDeque array element types will be combined by add(Object, long, TimeUnit) until the minimumChunkSize is satisfied.
minimumChunkSize - The desired minimum chunk size. When the elements stored in the buffer are chunks (i.e., arrays of some component type), elements will be combined together to form larger chunks until this chunkSize is satisfied, the iterator() is exhausted, or the chunkTimeout is reached.
chunkTimeout - The maximum time to wait in nanoseconds for another chunk to come along so that we can combine it with the current chunk for #next(). A value of ZERO(0) disables chunk combiner.
chunkTimeoutUnit - The units in which the chunkTimeout is expressed.
ordered - When true the data are asserted to be ordered and a merge sort will be applied if chunks are combined such that the combined chunks will also be ordered (this has no effect unless the generic type of the buffer is an array type).
Method Detail

getMinimumChunkSize

public final int getMinimumChunkSize()
The desired minimum chunk size for the chunk combiner.

See Also:
DEFAULT_MINIMUM_CHUNK_SIZE

getChunkTimeout

public final long getChunkTimeout()
The maximum time to wait in nanoseconds for another chunk to come along so that we can combine it with the current chunk for #next(). A value of ZERO(0) disables chunk combiner.

See Also:
DEFAULT_CONSUMER_CHUNK_TIMEOUT

setFuture

public void setFuture(Future future)
Set the future of the producer feeding the BlockingBuffer.

Specified by:
setFuture in interface IBlockingBuffer<E>
Parameters:
future - The Future.

getFuture

public Future getFuture()
The Future of the producer feeding the BlockingBuffer.

Specified by:
getFuture in interface IRunnableBuffer<E>
Returns:
The Future -or- null if no Future has been set.
TODO:
There should be a generic type for this.

toString

public String toString()
Overrides:
toString in class Object

isEmpty

public boolean isEmpty()
Description copied from interface: IBuffer
True iff there are no elements in the buffer.

Specified by:
isEmpty in interface IBuffer<E>

size

public int size()
This reports the #of items in the queue.

Specified by:
size in interface IBuffer<E>
See Also:
getElementsOnQueueCount()

isOpen

public boolean isOpen()
Description copied from interface: IRunnableBuffer
Return true if the buffer is open.

Specified by:
isOpen in interface IRunnableBuffer<E>

isOrdered

public boolean isOrdered()

close

public void close()
Closes the BlockingBuffer such that it will not accept new elements (this is a NOP if unless the buffer is open). Once the buffer is closed, the BlockingBuffer.BlockingIterator will drain any elements remaining in the BlockingBuffer and then report false for BlockingIterator#hasNext()) (this does NOT close the {@link BlockingIterator}).

Specified by:
close in interface IRunnableBuffer<E>

clear

public void clear()
           throws IllegalStateException
Clear the backing queue.

Throws:
IllegalStateException - If the buffer is open.

abort

public void abort(Throwable cause)
Description copied from interface: IRunnableBuffer
Signal abnormal termination of the process writing on the buffer. The buffer will be closed. The iterator will report the cause via a wrapped exception the next time any method on its interface is invoked. The internal queue may be cleared once this method is invoked.

Specified by:
abort in interface IRunnableBuffer<E>
Parameters:
cause - The exception thrown by the processing writing on the buffer.

getChunksAddedCount

public long getChunksAddedCount()
The #of chunks #add(Object)ed to the buffer. This will be ZERO unless the generic type of the buffer is an array type.


getElementsAddedCount

public long getElementsAddedCount()
The #of elements #add(Object)ed to the buffer. When the generic type of the buffer is an array type, this will be the sum of the length of the arrays #add(Object)ed to the buffer.


getElementsOnQueueCount

public long getElementsOnQueueCount()
The #of elements on the queue. When the queue uses a scalar element type this will track the size() very closely. However, when the queue uses an array element type, this will be the sum of the #of elements across all arrays on the queue.


add

public void add(E e)
Description copied from interface: IRunnableBuffer
Add an element to the buffer.

Note: This method is constrained to throw the specified exception if the buffer has been IRunnableBuffer.close()d.

Specified by:
add in interface IBuffer<E>
Specified by:
add in interface IRunnableBuffer<E>
Parameters:
e - The element
Throws:
BufferClosedException - if the buffer has been close()d.
RuntimeException - if the caller's Thread is interrupted. The RuntimeException will wrap the InterruptedException as its cause.

add

public boolean add(E e,
                   long timeout,
                   TimeUnit unit)
            throws InterruptedException
Add element to the buffer.

Parameters:
e - The element.
timeout - The timeout.
unit - The unit in which the timeout is expressed.
Returns:
true iff the element was added to the buffer ( false indicates that the timeout expired before the element could be added to the buffer).
Throws:
InterruptedException - if interrupted

flush

public long flush()
Description copied from interface: IBlockingBuffer
This is a NOP since the IBlockingBuffer.iterator() is the only way to consume data written on the buffer.

Specified by:
flush in interface IBlockingBuffer<E>
Specified by:
flush in interface IBuffer<E>
Returns:
ZERO (0L)

reset

public void reset()
Description copied from interface: IBuffer
Reset the state of the buffer, including the counter whose value is reported by IBuffer.flush(). Any data in the buffer will be discarded.

Specified by:
reset in interface IBuffer<E>

iterator

public IAsynchronousIterator<E> iterator()
The iterator is NOT thread-safe and does NOT support remove().

Note: If the IAsynchronousIterator is ICloseableIterator.close()d before the Future of the process writing on the BlockingBuffer is done, then the Future will be canceled using Thread.interrupt(). Owing to a feature of FileChannel, this will cause the backing store to be asynchronously closed if the interrupt is detected during an IO. The backing store will be re-opened transparently, but there is overhead associated with that (locks to be re-acquired, etc).

The most common reason to close an iterator early are that you want to only visit a limited #of elements. However, if you use either IAccessPath.iterator(int, int) or IRule with an IQueryOptions to impose that limit, then most processes that produce IAsynchronousIterators will automatically terminate when they reach the desired limit, thereby avoiding issuing interrupts. Those processes include IAccessPath scans where the #of elements to be visited exceeds the fully materialized chunk threshold and IRule evaluation.

Specified by:
iterator in interface IBlockingBuffer<E>
Returns:
The iterator (this is a singleton).
See Also:
BlockingBuffer.BlockingIterator.close()

combineChunks

public static <E> E combineChunks(E chunk1,
                                  E chunk2)
Combines two chunks in the order given (DOES NOT apply a merge sort).

Parameters:
chunk1 - The first chunk.
chunk2 - The second chunk.
Returns:
The combined chunks.


Copyright © 2006-2012 SYSTAP, LLC. All Rights Reserved.