com.bigdata.concurrent
Class NonBlockingLockManagerWithNewDesign<R extends Comparable<R>>

java.lang.Object
  extended by com.bigdata.concurrent.NonBlockingLockManagerWithNewDesign<R>
All Implemented Interfaces:
ICounterSetAccess

public abstract class NonBlockingLockManagerWithNewDesign<R extends Comparable<R>>
extends Object
implements ICounterSetAccess

This class coordinates a schedule among concurrent operations requiring exclusive access to shared resources. Whenever possible, the result is a concurrent schedule - that is, operations having non-overlapping lock requirements run concurrently while operations that have lock contentions are queued behind operations that currently have locks on the relevant resources. A ResourceQueue is created for each resource and used to block operations that are awaiting a lock. When those locks become available, ready(Runnable) will be invoked with the task.

The class will use an optional WAITS_FOR graph to detect deadlocks if locks are not being pre-declared (and hence deadlocks are possible).

Version:
$Id: NonBlockingLockManagerWithNewDesign.java 4280 2011-03-08 15:06:58Z thompsonbry $
Author:
Bryan Thompson
See Also:
ready(Runnable)
TODO:
2PL

Note: It is not really possible to have deadlocks without 2PL. Instead, what happens is that the maximum multi-programming capacity of the TxDag is temporarily exceeded. Review the unit tests again when supporting 2PL and verify that the deadlock handling logic works.

In order to support 2PL we need to decouple the NonBlockingLockManagerWithNewDesign.LockFutureTask from the transaction with which it is associated and use the latter in the TxDag. Otherwise each submit(Comparable[], Callable) will always look like a new transaction (2PL is impossible unless you can execute multiple tasks for the same transaction).

To introduce 2PL I need to create interfaces which extend Callable which are understood by this class. One such interface should provide a method by which a task can release its locks. For 2PL, either that interface or an extension of the interface would need to have a method by which a task could post new lock requests. Such a callable could be submitted directly. It would wait if it declared any precondition locks and otherwise pass through to ready(Runnable) immediately. We can't really just pass the task through again when it requests additional locks, so maybe the Tx would submit a task to the lock service each time it wanted to gain more or more locks and ready(Runnable) would change its run state..., Support escalation of operation priority based on time and scheduling of higher priority operations. the latter is done by queueing lock requests in front of pending requests for each resource on which an operation attempt to gain a lock. The former is just a dynamic adjustment of the position of the operation in the resource queue where it is awaiting a lock (an operation never awaits more than one lock at a time). This facility could be used to give priority to distributed transactions over local unisolated operations and to priviledge certain operations that have low latency requirements. This is not quite a "real-time" guarantee since the VM is not (normally) providing real-time guarantees and since we are not otherwise attempting to ensure anything except lower latency when compared to other operations awaiting their own locks.


Nested Class Summary
protected static class NonBlockingLockManagerWithNewDesign.Counters
          Counters for the NonBlockingLockManagerWithNewDesign.
static class NonBlockingLockManagerWithNewDesign.LockFutureTask<R extends Comparable<R>,T>
          FutureTask which executes once it holds its locks.
protected static class NonBlockingLockManagerWithNewDesign.ResourceQueue<R extends Comparable<R>,T extends NonBlockingLockManagerWithNewDesign.LockFutureTask<R,? extends Object>>
          Unbounded queue of operations waiting to gain an exclusive lock on a resource.
protected  class NonBlockingLockManagerWithNewDesign.StatisticsTask
          Class for tracking the average queue size of each ResourceQueue and various other moving averages for the service as a whole.
 
Field Summary
protected static boolean DEBUG
           
protected static boolean INFO
           
protected static org.apache.log4j.Logger log
           
 NonBlockingLockManagerWithNewDesign.StatisticsTask statisticsTask
          This Runnable should be submitted to a ScheduledExecutorService in order to track the average queue size for each active ResourceQueue and various moving averages pertaining to the lock service as a whole.
 
Constructor Summary
NonBlockingLockManagerWithNewDesign(int maxConcurrency, int maxLockTries, boolean predeclareLocks)
          Create a lock manager.
 
Method Summary
 CounterSet getCounters()
          Note: You MUST submit statisticsTask to a ScheduledExecutorService in order counter values which report moving averages to be maintained.
 Runnable getTaskWithLocks(R[] resource)
          Return the task holding all of the specified locks.
 boolean isLockHeldByTask(R lock, Runnable task)
          Return true if the lock is held by the task at the moment when it is inspected.
 boolean isOpen()
           
 boolean isShutdown()
           
 boolean isTerminated()
           
protected abstract  void ready(Runnable task)
          Method invoked when a task is ready to execute holding any locks which it declared to submit(Comparable[], Callable) or submit(Comparable[], Runnable, Object).
 void releaseLocksForTask(R[] resource)
          If there is a task holding ALL of the specified locks then its locks are released.
 void shutdown()
           
 void shutdownNow()
           
<T> Future<T>
submit(R[] resource, Callable<T> task)
          Submit a task for execution.
<T> Future<T>
submit(R[] resource, Runnable task, T val)
          Variant for a Runnable target.
 String toString()
           
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 

Field Detail

log

protected static final org.apache.log4j.Logger log

INFO

protected static final boolean INFO

DEBUG

protected static final boolean DEBUG

statisticsTask

public final NonBlockingLockManagerWithNewDesign.StatisticsTask statisticsTask
This Runnable should be submitted to a ScheduledExecutorService in order to track the average queue size for each active ResourceQueue and various moving averages pertaining to the lock service as a whole.

Constructor Detail

NonBlockingLockManagerWithNewDesign

public NonBlockingLockManagerWithNewDesign(int maxConcurrency,
                                           int maxLockTries,
                                           boolean predeclareLocks)
Create a lock manager. No concurrency limit imposed when predeclareLocks is true as deadlocks are impossible and we do not maintain a WAITS_FOR graph.

Parameters:
maxConcurrency - The maximum multi-programming level (ignored if predeclareLocks is true).
maxLockTries - The maximum #of times that a task whose lock requests would produce a deadlock will be retried. Deadlock is typically transient but the potential for deadlock can vary depending on the application. Note that deadlock CAN NOT arise if you are predeclaring and sorting the lock requests.
predeclareLocks - When true, operations MUST declare all locks before they begin to execute. This makes possible several efficiencies and by sorting the resources in each lock request into a common order we are able to avoid deadlocks entirely.
Method Detail

getCounters

public CounterSet getCounters()
Note: You MUST submit statisticsTask to a ScheduledExecutorService in order counter values which report moving averages to be maintained.

Note: A new instance is returned every time. This makes the pattern where the counters are "attached" to a hierarchy work since that has the side-effect of "detaching" them from the returned object.

Specified by:
getCounters in interface ICounterSetAccess

ready

protected abstract void ready(Runnable task)
Method invoked when a task is ready to execute holding any locks which it declared to submit(Comparable[], Callable) or submit(Comparable[], Runnable, Object). The implementation will normally submit the Runnable to an Executor. The Runnable wraps the original task and the task will automatically release its locks when it is done executing.

Note: Implementations SHOULD NOT cause the Runnable to execute in the caller's thread. That will cause this service to block while the task is executing. The implementation can safely submit the task to a ThreadPoolExecutor whose work queue is a SynchronousQueue as long as the the ThreadPoolExecutor has an unbounded pool size. Another option is to submit the task to a ThreadPoolExecutor whose work queue is unbounded queue, such as LinkedBlockingQueue when no queue capacity was specified. The SynchronousQueue may be the better choice since the ResourceQueues already provide an unbounded queue and the actual concurrency of the delegate will be bounded by the #of distinct resources for which tasks are actively contending for locks. See the discussion on queues at ThreadPoolExecutor.

Parameters:
task - The Callable or Runnable wrapped up as a NonBlockingLockManagerWithNewDesign.LockFutureTask.

isOpen

public boolean isOpen()

isShutdown

public boolean isShutdown()

isTerminated

public boolean isTerminated()

shutdown

public void shutdown()

shutdownNow

public void shutdownNow()

submit

public <T> Future<T> submit(R[] resource,
                            Callable<T> task)
Submit a task for execution. The task will wait until it holds the declared locks. It will then execute. This method is non-blocking. The caller must use FutureTask.get() to await the outcome.

Parameters:
resource - An array of resources whose locks are required to execute the task.
task - The task to be executed.
Throws:
IllegalArgumentException - if resource is null or if any element of that array is null.
IllegalArgumentException - if the task is null.
RejectedExecutionException - if the task can not be queued for execution (including if the service is not running or if a blocking queue was used and the queue is at capacity).

submit

public <T> Future<T> submit(R[] resource,
                            Runnable task,
                            T val)
Variant for a Runnable target.

Type Parameters:
T - The generic type of the value which will be returned by the Future.
Parameters:
resource - The declared locks.
task - The Runnable target.
val - The value to be returned by the Future.
Returns:
The Future for that task.
Throws:
IllegalArgumentException - if resource is null or if any element of that array is null.
IllegalArgumentException - if the task is null.
RejectedExecutionException - if the task can not be queued for execution (including if the service is not running or if a blocking queue was used and the queue is at capacity).

releaseLocksForTask

public final void releaseLocksForTask(R[] resource)
If there is a task holding ALL of the specified locks then its locks are released. This is intended to support workflows in which the task can release its locks before Runnable.run() is finished.

Parameters:
resource[] - The declared locks for the task.
Throws:
IllegalStateException - if there is no task which holds all the declared locks.

getTaskWithLocks

public Runnable getTaskWithLocks(R[] resource)
Return the task holding all of the specified locks.

Parameters:
resource - The locks.
Returns:
The task -or- null iff there is no such task.

isLockHeldByTask

public boolean isLockHeldByTask(R lock,
                                Runnable task)
Return true if the lock is held by the task at the moment when it is inspected.

Parameters:
lock - The lock.
task - The task.
Returns:
true if the lock was held by that task.

toString

public String toString()
Overrides:
toString in class Object


Copyright © 2006-2011 SYSTAP, LLC. All Rights Reserved.