com.bigdata.resources
Class AsynchronousOverflowTask

java.lang.Object
  extended by com.bigdata.resources.AsynchronousOverflowTask
All Implemented Interfaces:
Callable<Object>

public class AsynchronousOverflowTask
extends Object
implements Callable<Object>

This class examines the named indices defined on the journal identified by the lastCommitTime and, for each named index registered on that journal, determines which of the following conditions applies and then schedules any necessary tasks based on that decision:

Each task has two phases
  1. historical read from the lastCommitTime of the old journal
  2. unisolated task performing an atomic update of the index partition view and the metadata index

Processing is divided into two stages:

#chooseTasks()
This stage examines the named indices and decides what action (if any) will be applied to each index partition.
#runTasks()
This stage reads on the historical state of the named index partitions, building, merging, splitting, joining, or moving their data as appropriate. When each task is finished, it submits and awaits the completion of an AbstractAtomicUpdateTask. The atomic update tasks use ITx.UNISOLATED operations on the live journal to make atomic updates to the index partition definitions and to the MetadataService and/or a remote data service where necessary.

Note: This task is invoked after an OverflowManager.overflow(). It is run on the ResourceManager's ExecutorService so that its execution is asynchronous with respect to the IConcurrencyManager. While it does not require any locks for its own processing stages, it relies on the OverflowManager.overflowAllowed flag to disallow additional overflow operations until it has completed. The various actions taken by this task are submitted to the IConcurrencyManager so that they will obtain the appropriate locks as necessary on the named indices.

Version:
$Id: AsynchronousOverflowTask.java 5284 2011-10-03 16:33:08Z thompsonbry $
Author:
Bryan Thompson
TODO:
consider side-effects of post-processing tasks (build, split, join, or move) on a distributed index rebuild operation. It is possible that the new index partitions may have been defined (but not yet registered in the metadata index) and that new index resources (on journals or index segment files) may have been defined. However, none of these operations should produce incoherent results so it should be possible to restore a coherent state of the metadata index by picking and choosing carefully. The biggest danger is choosing a new index partition which does not yet have all of its state on hand, but we have the LocalPartitionMetadata.getSourcePartitionId() which captures that., If an index partition is moved (or split or joined) while an active transaction has a write set for that index partition on a data service then we need to move/split/join the transaction write set as well so that it stays aligned with the index partition definitions. In this way the validate and merge operations may be conducted in parallel for each index partition which participates in the transaction.

Field Summary
protected static org.apache.log4j.Logger log
           
 
Constructor Summary
AsynchronousOverflowTask(ResourceManager resourceManager, OverflowMetadata overflowMetadata)
           
 
Method Summary
 Object call()
          Note: This task is interrupted by OverflowManager.shutdownNow().
protected  List<AbstractTask> chooseJoins()
          Scans the registered named indices and decides which ones (if any) are undercapacity and should be joined.
protected  List<AbstractTask> chooseScatterSplits()
          Choose index partitions for scatter split operations.
protected  List<AbstractTask> chooseSplitBuildOrMerge(boolean compactingMerge)
          For each index (partition) that has not been handled, decide whether we will: Split the index partition. Compacting merge - build an IndexSegment the FusedView of the the index partition. Incremental build - build an IndexSegment from the writes absorbed by the mutable BTree on the old journal (this removes the dependency on the old journal as of its lastCommitTime); or Note: Compacting merges are decided in two passes.
protected  List<AbstractTask> chooseTasks(boolean forceCompactingMerges)
          Examine each named index on the old journal and decide what, if anything, to do with that index.
protected  ILoadBalancerService getLoadBalancerService()
          Return the ILoadBalancerService if it can be discovered.
protected static boolean isNormalShutdown(ResourceManager resourceManager, Throwable t)
          These are all good indicators that the data service was shutdown.
protected  void putUsed(String name, String action)
          Deprecated. This is no longer valid as many index partitions are entered onto BOTH the buildQueue and the mergeQueue rather than exclusively being assigned one task or the other.
protected  void runTasks(List<AbstractTask> tasks)
          Submit all tasks, awaiting their completion and check their futures for errors.
protected  void runTasksConcurrent(List<AbstractTask> tasks)
          Runs the overflow tasks in parallel, cancelling any tasks which have not completed if we run out of time.
protected  void runTasksInSingleThread(List<AbstractTask> tasks)
          Runs the overflow tasks one at a time, stopping when the journal needs to overflow again, when we run out of time, or when there are no more tasks to be executed.
protected  boolean shouldMove(ILoadBalancerService loadBalancerService)
          Figure out if this data service is considered to be highly utilized, in which case the DS should shed some index partitions.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

log

protected static final org.apache.log4j.Logger log
Constructor Detail

AsynchronousOverflowTask

public AsynchronousOverflowTask(ResourceManager resourceManager,
                                OverflowMetadata overflowMetadata)
Parameters:
resourceManager -
overflowMetadata -
Method Detail

putUsed

protected void putUsed(String name,
                       String action)
Deprecated. This is no longer valid as many index partitions are entered onto BOTH the buildQueue and the mergeQueue rather than exclusively being assigned one task or the other.

This method is invoked each time an index partition is "used" by assigning it to participate in some build, split, join, or move operation.

Parameters:
name - The name of the index partition.
Throws:
IllegalStateException - if the index partition was already used by some other operation.
TODO:
could be replaced by index on BTreeMetadata in the OverflowMetadata object and BTreeMetadata#action

chooseScatterSplits

protected List<AbstractTask> chooseScatterSplits()
Choose index partitions for scatter split operations. The scatter split divides an index partition into N index partitions, one per data service, and then moves N-1 of the generated index partitions to other data services leaving one index partition in place on this data service.


chooseJoins

protected List<AbstractTask> chooseJoins()
Scans the registered named indices and decides which ones (if any) are undercapacity and should be joined.

If the rightSibling of an undercapacity index partition is also local then a JoinIndexPartitionTask is created to join those index partitions and both index partitions will be marked as "used".

If the rightSibling of an undercapacity index partition is remote, then a MoveTask is created to move the undercapacity index partition to the remove data service and the undercapacity index partition will be marked as "used".


getLoadBalancerService

protected ILoadBalancerService getLoadBalancerService()
Return the ILoadBalancerService if it can be discovered.

Returns:
the ILoadBalancerService if it can be discovered and otherwise null.

shouldMove

protected boolean shouldMove(ILoadBalancerService loadBalancerService)
Figure out if this data service is considered to be highly utilized, in which case the DS should shed some index partitions.

Note: We consult the load balancer service on this since it is able to put the load of this service into perspective by also considering the load on the other services in the federation.

Parameters:
loadBalancerService - The load balancer.

chooseTasks

protected List<AbstractTask> chooseTasks(boolean forceCompactingMerges)
                                  throws Exception
Examine each named index on the old journal and decide what, if anything, to do with that index. These indices are key range partitions of named scale-out indices. The LocalPartitionMetadata describes the key range partition and identifies the historical resources required to present a coherent view of that index partition.

Note: Overflow actions which define a new index partition (Split, Join, and Move) all require a phase (which is part of their atomic update tasks) in which they will block the application. This is necessary in order for them to "catch up" with buffered writes on the new journal - those writes need to be incorporated into the new index partition.

Compacting Merge

A compacting merge is performed when there are buffered writes, when the buffered writes were not simply copied onto the new journal during the atomic overflow operation, when the index partition is neither overcapacity (split) nor undercapacity (joined), and when the #of source index components for the index partition exceeds some threshold (~4). Also, we do not do a build if the index partitions will be moved.

Incremental Build

A incremental build is performed when there are buffered writes, when the buffered writes were not simply copied onto the new journal during the atomic overflow operation, when the index partition is neither overcapacity (split) nor undercapacity (joined), and when there are fewer than some threshold (~4) #of components in the index partition view. Also, we do not do a build if the index partitions will be moved.

An incremental build is generally faster than a compacting merge because it only copies those writes that were buffered on the mutable BTree. However, an incremental build must copy ALL tuples, including deleted tuples, so it can do more work and does not cause the rangeCount() to be reduced since the deleted tuples are preserved.

Split

A split is considered when an index partition appears to be overcapacity. The split operation will inspect the index partition in more detail when it runs. If the index partition does not, in fact, have sufficient capacity to warrant a split then a build will be performed instead (the build is treated more or less as a one-to-one split, but we do not assign a new partition identifier). An index partition which is WAY overcapacity can be split into more than 2 new index partitions.

Join

A join is considered when an index partition is undercapacity. Joins require both an undercapacity index partition and its rightSibling. Since the rightSeparatorKey for an index partition is also the key under which the rightSibling would be found, we use the rightSeparatorKey to lookup the rightSibling of an index partition in the MetadataIndex. If that rightSibling is local (same ResourceManager) then we will JOIN the index partitions. Otherwise we will MOVE the undercapacity index partition to the IDataService on which its rightSibling was found.

Move

We move index partitions around in order to make the use of CPU, RAM and DISK resources more even across the federation and prevent hosts or data services from being either under- or over-utilized. Index partition moves are necessary when a scale-out index is relatively new in to distribute the index over more than a single data service. Likewise, index partition moves are important when a host is overloaded, especially when it is approaching resource exhaustion. However, index partition moves DO NOT release DISK space on a host since only the current state of the index partition is moved, not its historical states (which are on a mixture of journals and index segments).

We can choose which index partitions to move fairly liberally. Cold index partitions are not consuming any CPU/RAM/IO resources and moving them to another host will not effect the utilization of either the source or the target host. Moving an index partition which is "hot for write" can impose a noticeable latency because the "hot for write" partition will have absorbed more writes on the journal while we are moving the data from the old view and we will need to move those writes as well. When we move those writes the index will be unavailable for write until it appears on the target data service. Therefore we generally choose to move "warm" index partitions since it will introduce less latency when we temporarily suspend writes on the index partition.

Indices typically have many commit points, and any one of them could become "hot for read". However, moving an index partition is not going to reduce the load on the old node for historical reads since we only move the current state of the index, not its history. Nodes that are hot for historical reads spots should be handled by increasing its replication count and reading from the secondary data services. Note that ITx.READ_COMMITTED and ITx.UNISOLATED both count against the "live" index - read-committed reads always track the most recent state of the index partition and would be moved if the index partition was moved.

Bottom line: if a node is hot for historical read then increase the replication count and read from failover services. If a node is hot for read-committed and unisolated operations then move one or more of the warm read-committed/unisolated index partitions to a node with less utilization.

Index partitions that get a lot of action are NOT candidates for moves unless the node itself is either overutilized, about to exhaust its DISK, or other nodes are at very low utilization. We always prefer to move the "warm" index partitions instead.

DISK exhaustion

Running out of DISK space causes an urgent condition and can lead to failure or all services on the same host. Therefore, when a host is near to exhausting its DISK space it (a) MUST notify the ILoadBalancerService; (b) temporary files SHOULD be purged; it MAY choose to shed indices that are "hot for write" since that will slow down the rate at which the disk space is consumed; (d) index partitions may be aggressively moved off of the LDS; (e) the transaction service MAY reduce the retention period; and (f) as a last resort, the transaction service MAY invalidate read locks, which implies that read or read-write transactions will be aborted. FIXME implement suggestions for handling cases when we are nearing DISK exhaustion (aggressive release of resources, which should not depend on asynchronous overflow but rather be part of a monitoring thread or an inspection task run in each group commit). FIXME read locks for read-committed operations. For example, queries to the mds should use a read-historical tx so that overflow in the mds will not cause the views to be discarded during data service overflow. In fact, we can just create a read-historical transaction when we start data service overflow and then pass it into the rest of the process, aborting that tx when overflow is complete. FIXME make the atomic update tasks truly atomic using full transactions and/or distributed locks and correcting actions.

Throws:
Exception

chooseSplitBuildOrMerge

protected List<AbstractTask> chooseSplitBuildOrMerge(boolean compactingMerge)
For each index (partition) that has not been handled, decide whether we will: Note: Compacting merges are decided in two passes. First mandatory compacting merges and splits are identified and a "merge" priority is computed for the remaining index partitions. In the second pass, we consume the remaining index partitions in "merge priority" order, assigning compacting merge tasks until we reach the maximum #of compacting merges to be performed in a given asynchronous overflow operation.

Parameters:
compactingMerge - When true a compacting merge will be performed for all index partitions.
Returns:
The list of tasks. FIXME Should schedule builds for all remaining shards and then prioritize merges. Merge should do split (or scatter split) [or cause it to be scheduled] if size on disk exceeds threshold after the merge. If running a build, then withdraw the merge task until the build is complete and then reschedule the merge task. Merges can run across overflow processing unless that specific merge is already a clear candidate for a split (200M+ on the disk), and even then we will not lock out overflow if the journal is 2x overextended when we start the merge.

Remove all support for splits from this method. Splits are decided by CompactingMergeTask. Run the split logic against the IndexSegment to choose the separatorKeys. Then submit the split task.


call

public Object call()
            throws Exception
Note: This task is interrupted by OverflowManager.shutdownNow(). Therefore is tests Thread.isInterrupted() and returns immediately if it has been interrupted.

Specified by:
call in interface Callable<Object>
Returns:
The return value is always null.
Throws:
Exception - This implementation does not throw anything since there is no one to catch the exception. Instead it logs exceptions at a high log level.

runTasks

protected void runTasks(List<AbstractTask> tasks)
                 throws InterruptedException
Submit all tasks, awaiting their completion and check their futures for errors.

Throws:
InterruptedException

runTasksInSingleThread

protected void runTasksInSingleThread(List<AbstractTask> tasks)
                               throws InterruptedException
Runs the overflow tasks one at a time, stopping when the journal needs to overflow again, when we run out of time, or when there are no more tasks to be executed.

Throws:
InterruptedException

runTasksConcurrent

protected void runTasksConcurrent(List<AbstractTask> tasks)
                           throws InterruptedException
Runs the overflow tasks in parallel, cancelling any tasks which have not completed if we run out of time. A dedicated thread pool is allocated for this purpose. Depending on the configuration, it will be either a cached thread pool (full parallelism) or a fixed thread pool (limited parallelism).

Parameters:
tasks -
Throws:
InterruptedException
See Also:
OverflowManager#overflowTasksConcurrent}

isNormalShutdown

protected static boolean isNormalShutdown(ResourceManager resourceManager,
                                          Throwable t)
These are all good indicators that the data service was shutdown.



Copyright © 2006-2011 SYSTAP, LLC. All Rights Reserved.