com.bigdata.journal
Class WriteExecutorService

java.lang.Object
  extended by java.util.concurrent.AbstractExecutorService
      extended by java.util.concurrent.ThreadPoolExecutor
          extended by com.bigdata.journal.WriteExecutorService
All Implemented Interfaces:
Executor, ExecutorService

public class WriteExecutorService
extends ThreadPoolExecutor

A custom ThreadPoolExecutor used by the ConcurrencyManager to execute concurrent unisolated write tasks and perform group commits. Tasks extend AbstractTask. The caller receives a Future when they submit a task to the write service. That Future is NOT available until the next group commit following the successful execution of the write task.

Note: adding the thread name to the log messages for this class can aid debugging. You can do this using the log4j configuration.

Note: the problem with running concurrent unisolated operations during a commit and relying on an "auto-commit" flag to indicate whether or not the index will participate is two fold. First, previous unisolated operations on the same index will not get committed if an operation is currently running, so we could wind up deferring check points of indices for quite a while. Second, if there is a problem with the commit and we have to abort, then any ongoing operations would still be using unisolated indices that could include write sets that were discarded - this would make abort non-atomic.

The ground state from which an unisolated operation begins needs to evolve after each unisolated operation that reaches its commit point successfully. This can be accomplished by holding onto the btree reference, or even just the address at which the metadata record for the btree was last written. We use AbstractJournal.getName2Addr() for this purpose.

However, if an unisolated write fails for any reason on a given index then we MUST use the last successful check point for that index. This is handled by doing an abort.

Note: Due to the way in which the BTree class is written, it "steals" child references when cloning an immutable node or leaf prior to making modifications. This means that we must reload the btree from a metadata record if we have to roll back due to an abort of some unisolated operation since the state of the BTree has been changed as a side effect in a non-reversible manner.

Note: Running Threads may be interrupted at arbitrary moments for a number of reasons by this class. The foremost example is a Thread that is executing an AbstractTask when a concurrent decision is made to discard the commit group, e.g., because another task in that commit group failed. Regardless of the reason, if the Thread is performing an NIO operation at the moment that the interrupt is notice, then it will close the channel on which that operation was being performed. If you are using a disk-based BufferMode for the journal, then the interrupt just caused the backing FileChannel to be closed. In order to permit continued operations on the journal, the IRawStore MUST transparently re-open the channel. (The same problem can arise if you are using NIO for sockets or anything else that uses the Channel abstraction.)

Overflow handling

The WriteExecutorService invokes overflow() each time it does a group commit. Normally the WriteExecutorService does not quiesce before doing a group commit, and when it is not quiescent the ResourceManager can NOT overflow() the journal since concurrent tasks are still writing on the current journal. Therefore the ResourceManager monitors the IBufferStrategy.getExtent() of the live journal. When it decides that the live journal is large enough it pause()s WriteExecutorService and waits until overflow() is called with a quiescent WriteExecutorService. This effectively grants the ResourceManager exclusive access to the journal. It can then run overflow() to setup a new journal and tell the WriteExecutorService to resume() processing.

Version:
$Id: WriteExecutorService.java 4549 2011-05-25 19:53:44Z thompsonbry $
Author:
Bryan Thompson
TODO:
There should be a clear advantage to pipelining operations for the same index partition into the same commit group. That would maximize the reuse of the index buffers and minimize the concurrent demand for distinct indices. This is basically barging in on the write service based on an affinity for active indices. The trick is to not starve out indices which are not active. If the scope is limited to a commit group or a period of time then that might do it.

Nested Class Summary
 
Nested classes/interfaces inherited from class java.util.concurrent.ThreadPoolExecutor
ThreadPoolExecutor.AbortPolicy, ThreadPoolExecutor.CallerRunsPolicy, ThreadPoolExecutor.DiscardOldestPolicy, ThreadPoolExecutor.DiscardPolicy
 
Field Summary
protected  AtomicInteger activeTaskCountWithLocksHeld
           
protected static boolean DEBUG
          True iff the log level is DEBUG or less.
protected  long groupCommitTimeout
          The time in milliseconds that a group commit will await currently running tasks to join the commit group.
protected static boolean INFO
          True iff the log level is INFO or less.
protected static org.apache.log4j.Logger log
          Main log for the WriteExecutorService.
protected static boolean OVERFLOW_DEBUG
          True iff the overflowLog level is DEBUG or less.
protected static boolean OVERFLOW_INFO
          True iff the overflowLog level is INFO or less.
protected  long overflowLockRequestTimeout
          The time in milliseconds that a group commit will await an exclusive lock on the write service in order to perform synchronous overflow processing.
protected static org.apache.log4j.Logger overflowLog
          Uses the OverflowManager log for things relating to synchronous overflow processing.
 
Constructor Summary
WriteExecutorService(IResourceManager resourceManager, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit keepAliveUnit, BlockingQueue<Runnable> queue, ThreadFactory threadFactory, long groupCommitTimeout, long overflowLockRequestTimeout)
           
 
Method Summary
protected  void afterTask(AbstractTask r, Throwable t)
          This is executed after AbstractTask.doTask().
protected  void beforeExecute(Thread t, Runnable r)
          If task execution has been paused then awaits someone to call resume().
protected  void beforeTask(Thread t, AbstractTask r)
          Executed before AbstractTask.doTask()
 long getAbortCount()
          The #of aborts (not failed tasks) since the WriteExecutorService was started.
 int getActiveTaskCountWithLocksHeld()
          The instantaneous #of tasks that have acquired their locks are executing concurrently on the write service.
 long getByteCountPerCommit()
          The #of bytes written by the last commit.
 int getCommitGroupSize()
          The #of tasks in the most recent commit group.
 long getGroupCommitCount()
          The #of group commits since the WriteExecutorService was started (all commits by this service are group commits).
 int getInternalLockQueueLength()
          The #of threads queued on the internal lock.
 NonBlockingLockManagerWithNewDesign<String> getLockManager()
          The object that coordinates exclusive access to the resources.
 int getMaxCommitGroupSize()
          The maximum #of tasks in any commit group.
 long getMaxCommitServiceTime()
          The maximum service time in milliseconds of the atomic commit.
 long getMaxCommitWaitingTime()
          The maximum waiting time in millseconds from when a task completes successfully until the next group commit.
 int getMaxPoolSize()
          The maximum #of threads in the pool.
 long getMaxRunning()
          The maximum #of tasks that are concurrently executing without regard to whether or not the tasks have acquired their locks.
 long getOverflowCount()
          The #of times synchronous overflow processing has been performed.
 int getReadyCount()
          #of tasks that are waiting to run but are blocked on the #lock.
 long getRejectedExecutionCount()
          The #of rejected tasks.
 long getTaskCommittedCount()
          The #of tasks that (a) executed successfully and (b) have been committed.
 long getTaskFailedCount()
          The #of tasks that have failed.
 long getTaskSuccessCount()
          The #of tasks that have executed successfully (MIGHT NOT have been committed safely).
 void shutdown()
          Overridden to shutdown the embedded lock manager service.
 List<Runnable> shutdownNow()
          Overridden to shutdown the embedded lock manager service.
 String toString()
          A snapshot of the executor state.
 boolean tryLock(long timeout, TimeUnit unit)
          Acquires an exclusive lock on the write service.
 void unlock()
          Release the exclusive write lock.
 
Methods inherited from class java.util.concurrent.ThreadPoolExecutor
afterExecute, allowCoreThreadTimeOut, allowsCoreThreadTimeOut, awaitTermination, execute, finalize, getActiveCount, getCompletedTaskCount, getCorePoolSize, getKeepAliveTime, getLargestPoolSize, getMaximumPoolSize, getPoolSize, getQueue, getRejectedExecutionHandler, getTaskCount, getThreadFactory, isShutdown, isTerminated, isTerminating, prestartAllCoreThreads, prestartCoreThread, purge, remove, setCorePoolSize, setKeepAliveTime, setMaximumPoolSize, setRejectedExecutionHandler, setThreadFactory, terminated
 
Methods inherited from class java.util.concurrent.AbstractExecutorService
invokeAll, invokeAll, invokeAny, invokeAny, newTaskFor, newTaskFor, submit, submit, submit
 
Methods inherited from class java.lang.Object
clone, equals, getClass, hashCode, notify, notifyAll, wait, wait, wait
 

Field Detail

log

protected static final org.apache.log4j.Logger log
Main log for the WriteExecutorService.


INFO

protected static final boolean INFO
True iff the log level is INFO or less.


DEBUG

protected static final boolean DEBUG
True iff the log level is DEBUG or less.


overflowLog

protected static final org.apache.log4j.Logger overflowLog
Uses the OverflowManager log for things relating to synchronous overflow processing.


OVERFLOW_INFO

protected static final boolean OVERFLOW_INFO
True iff the overflowLog level is INFO or less.


OVERFLOW_DEBUG

protected static final boolean OVERFLOW_DEBUG
True iff the overflowLog level is DEBUG or less.


groupCommitTimeout

protected final long groupCommitTimeout
The time in milliseconds that a group commit will await currently running tasks to join the commit group.


overflowLockRequestTimeout

protected final long overflowLockRequestTimeout
The time in milliseconds that a group commit will await an exclusive lock on the write service in order to perform synchronous overflow processing. This lock is requested IFF overflow process SHOULD be performed. The lock timeout needs to be of significant duration or a lock request for a write service under heavy write load will timeout, in which case an error will be logged. If overflow processing is not performed the live journal extent will grow without bound.


activeTaskCountWithLocksHeld

protected AtomicInteger activeTaskCountWithLocksHeld
Constructor Detail

WriteExecutorService

public WriteExecutorService(IResourceManager resourceManager,
                            int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit keepAliveUnit,
                            BlockingQueue<Runnable> queue,
                            ThreadFactory threadFactory,
                            long groupCommitTimeout,
                            long overflowLockRequestTimeout)
Parameters:
resourceManager -
corePoolSize -
maximumPoolSize -
keepAliveTime -
keepAliveUnit -
queue -
threadFactory -
groupCommitTimeout - The time in milliseconds that a group commit will await currently running tasks to join the commit group.
overflowLockRequestTimeout -
Method Detail

getLockManager

public NonBlockingLockManagerWithNewDesign<String> getLockManager()
The object that coordinates exclusive access to the resources.


getRejectedExecutionCount

public long getRejectedExecutionCount()
The #of rejected tasks.


getMaxPoolSize

public int getMaxPoolSize()
The maximum #of threads in the pool.


getMaxRunning

public long getMaxRunning()
The maximum #of tasks that are concurrently executing without regard to whether or not the tasks have acquired their locks.

Note: Since this does not reflect tasks executing concurrently with locks held it is not a measure of the true concurrency of tasks executing on the service.


getMaxCommitWaitingTime

public long getMaxCommitWaitingTime()
The maximum waiting time in millseconds from when a task completes successfully until the next group commit.


getMaxCommitServiceTime

public long getMaxCommitServiceTime()
The maximum service time in milliseconds of the atomic commit.

See Also:
AbstractJournal.commit()

getInternalLockQueueLength

public int getInternalLockQueueLength()
The #of threads queued on the internal lock. These are (for the most part) threads waiting to start or stop during a group commit. However, you can not use this measure to infer whether there are threads waiting to run which are being starved during a group commit or simply threads waiting to do their post-processing.


getCommitGroupSize

public int getCommitGroupSize()
The #of tasks in the most recent commit group. In order to be useful information this must be sampled and turned into a moving average.


getMaxCommitGroupSize

public int getMaxCommitGroupSize()
The maximum #of tasks in any commit group.


getGroupCommitCount

public long getGroupCommitCount()
The #of group commits since the WriteExecutorService was started (all commits by this service are group commits).


getByteCountPerCommit

public long getByteCountPerCommit()
The #of bytes written by the last commit. This must be sampled to turn it into useful information.


getAbortCount

public long getAbortCount()
The #of aborts (not failed tasks) since the WriteExecutorService was started. Aborts are serious events and occur IFF an IAtomicStore.commit() fails. Failed tasks do NOT result in an abort.


getTaskFailedCount

public long getTaskFailedCount()
The #of tasks that have failed. Task failure means that the write set(s) for the task are discarded and any indices on which it has written are rolled back. Task failure does NOT cause the commit group to be discard. Rather, the failed task never joins a commit group and returns control immediately to the caller.


getTaskSuccessCount

public long getTaskSuccessCount()
The #of tasks that have executed successfully (MIGHT NOT have been committed safely).

See Also:
getTaskCommittedCount()

getTaskCommittedCount

public long getTaskCommittedCount()
The #of tasks that (a) executed successfully and (b) have been committed.


getOverflowCount

public long getOverflowCount()
The #of times synchronous overflow processing has been performed.


getActiveTaskCountWithLocksHeld

public int getActiveTaskCountWithLocksHeld()
The instantaneous #of tasks that have acquired their locks are executing concurrently on the write service. This is the real measure of concurrent task execution on the write service. However, you need to sample this value and compute a moving average in order to turn it into useful information.

The returned value is limited by ThreadPoolExecutor.getActiveCount(). Note that ThreadPoolExecutor.getActiveCount() reports tasks which are waiting on their locks as well as those engaged in various pre- or post-processing.


getReadyCount

public int getReadyCount()
#of tasks that are waiting to run but are blocked on the #lock. This value represents the #of tasks which have been starved from concurrent execution. The main culprit for a high value here is group commit and the occasional synchronous overflow or purge resources (when someone has an exclusive lock on the write service).


beforeExecute

protected void beforeExecute(Thread t,
                             Runnable r)
If task execution has been paused then awaits someone to call resume().

Overrides:
beforeExecute in class ThreadPoolExecutor
Parameters:
t - The thread that will run the task.
r - The Runnable wrapping the AbstractTask - this is actually a FutureTask. See AbstractExecutorService.

beforeTask

protected void beforeTask(Thread t,
                          AbstractTask r)
Executed before AbstractTask.doTask()

Parameters:
t - The thread in which that task will execute.
r - The AbstractTask.

afterTask

protected void afterTask(AbstractTask r,
                         Throwable t)
This is executed after AbstractTask.doTask(). If the task completed successfully (no exception thrown and its thread is not interrupted) then we invoke groupCommit(). Otherwise the write set of the task was already discarded by AbstractTask.InnerWriteServiceCallable and we do nothing.

Parameters:
r - The Callable wrapping the AbstractTask.
t - The exception thrown -or- null if the task completed successfully.

toString

public String toString()
A snapshot of the executor state.

Overrides:
toString in class Object

shutdown

public void shutdown()
Overridden to shutdown the embedded lock manager service.

Specified by:
shutdown in interface ExecutorService
Overrides:
shutdown in class ThreadPoolExecutor

shutdownNow

public List<Runnable> shutdownNow()
Overridden to shutdown the embedded lock manager service.

Specified by:
shutdownNow in interface ExecutorService
Overrides:
shutdownNow in class ThreadPoolExecutor

tryLock

public boolean tryLock(long timeout,
                       TimeUnit unit)
                throws InterruptedException
Acquires an exclusive lock on the write service.

The write service is paused for up to timeout units. During that time no new tasks will start. The lock will be granted if all running tasks complete before the timeout expires.

Note: The exclusive write lock is granted using the same lock that is used to coordinate all other activity of the write service. If the exclusive write lock is granted then the caller's thread will hold the lock and MUST release the lock using unlock().

Note: When the exclusive lock is granted there will be NO running tasks and the write service will be paused. This ensures that no task can run on the write service and that groupCommit will not attempt to grab the lock itself.

Note: If there is heavy write activity on the service then the timeout may well expire before the exclusive write lock becomes available. Further, the acquisition of the exclusive write lock will throttle concurrent write activity and negatively impact write performance if the system is heavily loaded by write tasks. Therefore, the write lock should be requested only when it is necessary and a significant value should be specified for the timeout (60s or more) to ensure that it is acquired.

Parameters:
timeout - The timeout.
unit - The unit in which the timeout is expressed.
Returns:
true iff the exclusive lock was acquired.
Throws:
InterruptedException
TODO:
This really should not be public. It was exposed to make it easy to force overflow of the service. We should be able to achieve the same ends by setting a flag and submitting a task which writes an empty record on the raw store just in case there is no task running.

unlock

public void unlock()
Release the exclusive write lock.

Throws:
IllegalMonitorStateException - if the current thread does not own the lock.


Copyright © 2006-2011 SYSTAP, LLC. All Rights Reserved.