com.bigdata.service.mapred
Class AbstractMaster

java.lang.Object
  extended by com.bigdata.service.mapred.AbstractMaster
Direct Known Subclasses:
EmbeddedMaster, Master

public abstract class AbstractMaster
extends Object

Abstract base class implementing the master for running map/reduce jobs.

Map/reduce is a functional programming style in which a program is broken down into a map and a reduce operation. Those operations are trivially parallelized and distributed across one or more worker tasks on available hosts. In general, there are M map tasks and N reduce tasks for each map/reduce job. The map operation, the intermediate store, and the reduce tasks are distinct services. In particular, this means that the job state resides with the master while the intermediate state for the reduce tasks is decoupled from the reduce logic.

Logically, each map operation processes a key-value pair, writing a set of intermediate key-value pairs as its output. In practice, the inputs are often drawn from files in a (networked) file system but other possible sources include chaining map/reduce jobs or reading from an index. It is important that the keys output by the map operation capture all necessary "uniqueness" distinctions. For example, the key is often formed from an "application key" followed the map task identifier, and finally the map task local tuple counter. The map operation outputs are buffered and automatically partitioned into N temporary stores (one per reduce task) using a hash function of the key module the #of reduce tasks.

Each reduce task reads the intermediate store having data for the intermediate key-value partition assigned to that reduce task. The keys in those partitions are essentially random since they are assigned to partitions by the (user-defined) hash function. However, the intermediate store guarentees that the key-value pairs will be fully ordered. The reduce task is then run on the total order, writing its outputs onto a local output file or loading the data into scale-out indices.

The map/reduce master selects map and reduce services on which to run a map/reduce job and is responsible for re-assigning tasks that fail or that take are taking too long. The master attempts to use reduce services that are "close" to the intermediate data (in terms of the network topology) but not necessarily on the same host - this decouples the reduce operation from the input state and lets us failover to another reduce service as necessary.

Design alternatives

Note: There are several design alternatives for reduce state and behavior, including:
  1. Map writes on stores local to the reduce service via RMI. The reduce local stores are created when a map/reduce job starts and released when it ends.
  2. Map and reduce use DataServices as the store files. A set of DataServices would either be dedicated to map/reduce tasks or multiplexed with other operations. If dedicated data services are used, then use BufferMode.Disk and set the overflow threshold quite high since large temporary data sets are to be expected. (The advantage of this design is that state and behavior are isolated: if a service fails you can just re-run the specific tasks since the state is not lost; you can have state failover using the existing DataService infrastructure.)
  3. Map writes sorted data for each reduce partition on local disk; reduce copies the data from the fan-in map services and either uses a merge sort or a fused view to provide a total ordering. The map output stores are created when a map operation starts and released when the reduce operation ends. The reduce store (if any) is created when the reduce operation starts and released when it ends. (The advantage here is that you mostly read and write on local storage, except for copying data from the map service outputs to the reduce service inputs. The disadvantage is that state is local to services such that a map server failure during reduce setup could lead to the re-execution of the entire job.)

Another design choice was the use of B+Trees for the intermediate stores. The B+Tree automatically forces the tuples into a total order within each reduce partition. The use of buffering (potentially across multiple map tasks for a single map job) and group commit help to improve bulk insert performance on the index. The most obvious alternative would be to use an external sort program to sort the tuples in a reduce partition before the reduce job executes. The keys are variable length (unsigned) byte[]s and the values are variable length byte[]s, so the standard Un*x sort utilities will not work. While many sort algorithms preserve duplicate keys, the B+Tree does not. Therefore we make sure that the application key is extended to include a map task identifier and map task local tuple counter so that key collision can only arise when the same map task executes more than once - for example, when retrying a failed or long running map task. In such cases the most recent run of the task will simply overwrite the earlier run.

Version:
$Id: AbstractMaster.java 2265 2009-10-26 12:51:06Z thompsonbry $ FIXME Explore the parameter space (m,n,# map services, # reduce services, buffer mode, flush on commit, overflow, group commit, job size, # of machines). Look at job completion rate, scaling, bytes read/written per second, etc. FIXME get working with a distributed federation as well. FIXME rework to use local writes for intemediate values, send intermediate results to result services (unless already local). Look at options for a total sort, including in-memory sort of the component intermediate results or buffering intermediate results into blocks on the map service and then doing a merged read or merge sort on those blocks either on the map service or on the reduce service when all blocks are combined into a total order., $Id: AbstractMaster.java 2265 2009-10-26 12:51:06Z thompsonbry $
Author:
Bryan Thompson, Bryan Thompson
TODO:
offer option to buffer map outputs across map tasks within the service to increase the size of write operations and improve performance.

Note: Any time that we buffer output tuples across tasks we are faced with the possibility that we must re-execute those tasks if the service fails since their state has not been transferred to the reduce worker.

If the intermediate state is not too large, then we could do a sort on each reduce partition split from each map worker (in memory) and then a merge sort of the reduce partition splits., Offer options for the behavior in the face of failed map tasks.

One option is that the task is simply retried, partial data may be present if the task never succeeds, and if the task eventually succeeds then the data will reflect the full execution of that task (a purely additive model with atomic append only on the individual reduce stores and not across the reduce stores on which a given map task writes).

Another alternative is that the output from map tasks that do not complete within this many retries will not participate in the reduce task. In order to guarentee this we need to do a distributed transactional atomic "append" of tuples across all reduce stores on which a map task writes. Since we know that there will never be collision from different map tasks (the map task UUID is part of the key), we can could optimize this distributed transaction in a number of ways. First, write-write conflicts can not arise. Second, we could avoid the use of real transactions (since there will never be conflicts) if we support the concept of a distributed "unisolated" atomic commit., map/reduce with suitable temporary storage could likely be reused to do distributed joins. Take a look at the client index view for how iterators are being range partitioned and see if this could be refactored for a distributed join., Map and reduce could be abstracted decomposed tasks reading and writing data in parallel. Map/Reduce becomes a 2-stage chain with slight specialization for its stages (e.g., hash partitioning output tuples and writing onto a sorted store for map while reduce reads from a sorted store). This generalization might require letting the user write the run() method that is executed by the task worker on the service so that more input and output options are enabled., Generalize IMapSource so that we can read input 2-tuples from a variety of sources, including indices, other map reduce jobs, etc. This interface currently assumes that the inputs for the map operations are files in a (networked) file system., Iff the write on the reduce output files is to be atomic (so the map task has no effect unless it completes successfully), then we need to use a transaction (since we are actually writing onto N distinct reduce input files). (The writes on the individual reduce input files will always be atomic, but a failure of some operations would leave part of the effect of the map task in some of the reduce input files.), A good example of a map/reduce job would be bulk loading a large Lehigh Benchmark data set. The map job would parse the files that were local on each host, distributing triples across the cluster using a hash function. The reduce job would sort the triplets arriving at each host and then bulk load those triplets into the indices.

Another example would be to bulk index terms in documents, etc., There may well be optimizations available for partitioned bulk index builds from the reduce operation that would then be migrated to the appropriate data service and merged into the current index view for the appropriate index partition)., When reading from a sparse column store the definition of a row is different since the key is formed from the column name, application key, and timestamp., For at least GOM we need to deserialize rows from byte[]s, so we need to have the (de-)serializer to the application level value on hand., consider a BFS - a bigdata file system using the local file system to store files and a bigdata federation to store metadata. The file system would support atomic append, read, write, and delete operations. This should be trivial to implement over the existing infrastructure (failover would require sending the files about as well as the file metadata so that would be a little more complex). BFS could then be used as a distributed file system for fan-in to map/reduce jobs.

While Java clients for BFS might be easy enough, what challenges are involved in supporting BFS to Windows and Un*x clients?


Field Summary
static org.apache.log4j.Logger log
           
protected  IJobAndTaskService<MapJobMetadata,AbstractMapTask>[] mapServices
          The map services.
protected  Format percent
           
protected  IJobAndTaskService<ReduceJobMetadata,AbstractReduceTask>[] reduceServices
          The reduce services.
 
Constructor Summary
AbstractMaster(MapReduceJob job, IBigdataClient client)
          Setup the master to run a map/reduce job.
 
Method Summary
 long getMapElapsedTime()
          The elapsed time for the map operation (the clock stops once the map operation is over).
 long getMapFailedCount()
          The #of map tasks that permanently failed.
 double getMapPercentSuccess()
          The percent success for the map operation [0:1].
 long getMapRetryCount()
          The #of map tasks that were retried (this counts each retry of each task).
 long getMapSuccessCount()
          The #of map tasks that eventually succeeded.
 long getMapTaskCount()
          The #of map tasks submitted so far (retries are not counted).
 long getReduceElapsedTime()
          The elapsed time for the reduce operation (the clock stops once the reduce operation is over).
 long getReduceFailedCount()
          The #of reduce tasks that permanently failed.
 double getReducePercentSuccess()
          The percent success for the reduce operation [0:1].
 long getReduceRetryCount()
          The #of reduce tasks that were retried (this counts each retry of each task).
 long getReduceSuccessCount()
          The #of reduce tasks that eventually succeeded.
 long getReduceTaskCount()
          The #of reduce tasks submitted so far (retries are not counted).
protected  double map()
          Distribute the map tasks and wait for them to complete.
protected  double reduce()
          Distribute the reduce tasks and wait for them to complete.
 AbstractMaster run(double minMapSuccessRate, double minReduceSuccessRate)
          Run the job.
protected  void setUp()
          Select the map, reduce, and data services to be used by the job.
protected  void setUpDataStores()
          Setup intermediate indices on the data services (one per reduce task).
 String status()
          A summary of the current job state.
protected  void tearDown()
          Tear down the embedded services.
protected  void terminate()
          Terminate the job.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

log

public static final transient org.apache.log4j.Logger log

mapServices

protected IJobAndTaskService<MapJobMetadata,AbstractMapTask>[] mapServices
The map services.


reduceServices

protected IJobAndTaskService<ReduceJobMetadata,AbstractReduceTask>[] reduceServices
The reduce services.


percent

protected final Format percent
Constructor Detail

AbstractMaster

public AbstractMaster(MapReduceJob job,
                      IBigdataClient client)
Setup the master to run a map/reduce job.

Parameters:
job - The map/reduce job to execute.
client - The client used to read/write data stored in a federation.
Method Detail

getMapTaskCount

public long getMapTaskCount()
The #of map tasks submitted so far (retries are not counted).


getReduceTaskCount

public long getReduceTaskCount()
The #of reduce tasks submitted so far (retries are not counted).


getMapElapsedTime

public long getMapElapsedTime()
The elapsed time for the map operation (the clock stops once the map operation is over).


getReduceElapsedTime

public long getReduceElapsedTime()
The elapsed time for the reduce operation (the clock stops once the reduce operation is over).


getMapSuccessCount

public long getMapSuccessCount()
The #of map tasks that eventually succeeded.


getReduceSuccessCount

public long getReduceSuccessCount()
The #of reduce tasks that eventually succeeded.


getMapRetryCount

public long getMapRetryCount()
The #of map tasks that were retried (this counts each retry of each task).


getReduceRetryCount

public long getReduceRetryCount()
The #of reduce tasks that were retried (this counts each retry of each task).


getMapFailedCount

public long getMapFailedCount()
The #of map tasks that permanently failed.


getReduceFailedCount

public long getReduceFailedCount()
The #of reduce tasks that permanently failed.


setUp

protected void setUp()
Select the map, reduce, and data services to be used by the job.

Throws:
IllegalStateException - if mapServices and reduceServices have not been initialized before calling this method.

setUpDataStores

protected void setUpDataStores()
Setup intermediate indices on the data services (one per reduce task). The same data service MAY be used for more than one reduce task.

TODO:
in order for the map and reduce tasks to have automatic failover they need to query the metadata service if the given data service fails and find the replacement data service (this presumes that a media redundency chain is in effect). FIXME replace with local writes and then replicate/move the data to the reduce hosts.

tearDown

protected void tearDown()
Tear down the embedded services.


terminate

protected void terminate()
Terminate the job.


map

protected double map()
              throws InterruptedException
Distribute the map tasks and wait for them to complete.

Returns:
The percentage tasks that completed successfully.
Throws:
InterruptedException - if the excecuting map tasks are interrupted.

reduce

protected double reduce()
                 throws InterruptedException
Distribute the reduce tasks and wait for them to complete.

Returns:
The percentage tasks that completed successfully.
Throws:
InterruptedException - if the executing tasks are interrupted.

getMapPercentSuccess

public double getMapPercentSuccess()
The percent success for the map operation [0:1].


getReducePercentSuccess

public double getReducePercentSuccess()
The percent success for the reduce operation [0:1].


status

public String status()
A summary of the current job state.


run

public AbstractMaster run(double minMapSuccessRate,
                          double minReduceSuccessRate)
Run the job.

Returns:
this


Copyright © 2006-2009 SYSTAP, LLC. All Rights Reserved.