|
||||||||||
| PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
| SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD | |||||||||
java.lang.Objectcom.bigdata.service.mapred.AbstractMaster
public abstract class AbstractMaster
Abstract base class implementing the master for running map/reduce jobs.
Map/reduce is a functional programming style in which a program is broken down into a map and a reduce operation. Those operations are trivially parallelized and distributed across one or more worker tasks on available hosts. In general, there are M map tasks and N reduce tasks for each map/reduce job. The map operation, the intermediate store, and the reduce tasks are distinct services. In particular, this means that the job state resides with the master while the intermediate state for the reduce tasks is decoupled from the reduce logic.
Logically, each map operation processes a key-value pair, writing a set of intermediate key-value pairs as its output. In practice, the inputs are often drawn from files in a (networked) file system but other possible sources include chaining map/reduce jobs or reading from an index. It is important that the keys output by the map operation capture all necessary "uniqueness" distinctions. For example, the key is often formed from an "application key" followed the map task identifier, and finally the map task local tuple counter. The map operation outputs are buffered and automatically partitioned into N temporary stores (one per reduce task) using a hash function of the key module the #of reduce tasks.
Each reduce task reads the intermediate store having data for the intermediate key-value partition assigned to that reduce task. The keys in those partitions are essentially random since they are assigned to partitions by the (user-defined) hash function. However, the intermediate store guarentees that the key-value pairs will be fully ordered. The reduce task is then run on the total order, writing its outputs onto a local output file or loading the data into scale-out indices.
The map/reduce master selects map and reduce services on which to run a map/reduce job and is responsible for re-assigning tasks that fail or that take are taking too long. The master attempts to use reduce services that are "close" to the intermediate data (in terms of the network topology) but not necessarily on the same host - this decouples the reduce operation from the input state and lets us failover to another reduce service as necessary.
DataServices as the store files. A set of
DataServices would either be dedicated to map/reduce tasks or
multiplexed with other operations. If dedicated data services are used, then
use BufferMode.Disk and set the overflow threshold quite high since
large temporary data sets are to be expected. (The advantage of this design
is that state and behavior are isolated: if a service fails you can just
re-run the specific tasks since the state is not lost; you can have state
failover using the existing DataService infrastructure.)Another design choice was the use of B+Trees for the intermediate stores. The B+Tree automatically forces the tuples into a total order within each reduce partition. The use of buffering (potentially across multiple map tasks for a single map job) and group commit help to improve bulk insert performance on the index. The most obvious alternative would be to use an external sort program to sort the tuples in a reduce partition before the reduce job executes. The keys are variable length (unsigned) byte[]s and the values are variable length byte[]s, so the standard Un*x sort utilities will not work. While many sort algorithms preserve duplicate keys, the B+Tree does not. Therefore we make sure that the application key is extended to include a map task identifier and map task local tuple counter so that key collision can only arise when the same map task executes more than once - for example, when retrying a failed or long running map task. In such cases the most recent run of the task will simply overwrite the earlier run.
Note: Any time that we buffer output tuples across tasks we are faced with the possibility that we must re-execute those tasks if the service fails since their state has not been transferred to the reduce worker.
If the intermediate state is not too large, then we could do a sort on each reduce partition split from each map worker (in memory) and then a merge sort of the reduce partition splits., Offer options for the behavior in the face of failed map tasks.
One option is that the task is simply retried, partial data may be present if the task never succeeds, and if the task eventually succeeds then the data will reflect the full execution of that task (a purely additive model with atomic append only on the individual reduce stores and not across the reduce stores on which a given map task writes).
Another alternative is that the output from map tasks that do not
complete within this many retries will not participate in the reduce
task. In order to guarentee this we need to do a
distributed transactional atomic "append" of tuples across
all reduce stores on which a map task writes. Since we know that there
will never be collision from different map tasks (the map task UUID is
part of the key), we can could optimize this distributed transaction in
a number of ways. First, write-write conflicts can not arise. Second,
we could avoid the use of real transactions (since there will never be
conflicts) if we support the concept of a distributed "unisolated"
atomic commit., map/reduce with suitable temporary storage could likely be reused to do
distributed joins. Take a look at the client index view for how
iterators are being range partitioned and see if this could be
refactored for a distributed join., Map and reduce could be abstracted decomposed tasks reading and writing
data in parallel. Map/Reduce becomes a 2-stage chain with slight
specialization for its stages (e.g., hash partitioning output tuples
and writing onto a sorted store for map while reduce reads from a
sorted store). This generalization might require letting the user write
the run() method that is executed by the task worker on the service so
that more input and output options are enabled., Generalize IMapSource so that we can read input 2-tuples from a
variety of sources, including indices, other map reduce jobs, etc. This
interface currently assumes that the inputs for the map operations are
files in a (networked) file system., Iff the write on the reduce output files is to be atomic (so the map
task has no effect unless it completes successfully), then we need to
use a transaction (since we are actually writing onto N distinct reduce
input files). (The writes on the individual reduce input files will
always be atomic, but a failure of some operations would leave part of
the effect of the map task in some of the reduce input files.), A good example of a map/reduce job would be bulk loading a large Lehigh
Benchmark data set. The map job would parse the files that were local
on each host, distributing triples across the cluster using a hash
function. The reduce job would sort the triplets arriving at each host
and then bulk load those triplets into the indices.
Another example would be to bulk index terms in documents, etc., There may well be optimizations available for partitioned bulk index builds from the reduce operation that would then be migrated to the appropriate data service and merged into the current index view for the appropriate index partition)., When reading from a sparse column store the definition of a row is different since the key is formed from the column name, application key, and timestamp., For at least GOM we need to deserialize rows from byte[]s, so we need to have the (de-)serializer to the application level value on hand., consider a BFS - a bigdata file system using the local file system to store files and a bigdata federation to store metadata. The file system would support atomic append, read, write, and delete operations. This should be trivial to implement over the existing infrastructure (failover would require sending the files about as well as the file metadata so that would be a little more complex). BFS could then be used as a distributed file system for fan-in to map/reduce jobs.
While Java clients for BFS might be easy enough, what challenges are involved in supporting BFS to Windows and Un*x clients?
| Field Summary | |
|---|---|
static org.apache.log4j.Logger |
log
|
protected IJobAndTaskService<MapJobMetadata,AbstractMapTask>[] |
mapServices
The map services. |
protected Format |
percent
|
protected IJobAndTaskService<ReduceJobMetadata,AbstractReduceTask>[] |
reduceServices
The reduce services. |
| Constructor Summary | |
|---|---|
AbstractMaster(MapReduceJob job,
IBigdataClient client)
Setup the master to run a map/reduce job. |
|
| Method Summary | |
|---|---|
long |
getMapElapsedTime()
The elapsed time for the map operation (the clock stops once the map operation is over). |
long |
getMapFailedCount()
The #of map tasks that permanently failed. |
double |
getMapPercentSuccess()
The percent success for the map operation [0:1]. |
long |
getMapRetryCount()
The #of map tasks that were retried (this counts each retry of each task). |
long |
getMapSuccessCount()
The #of map tasks that eventually succeeded. |
long |
getMapTaskCount()
The #of map tasks submitted so far (retries are not counted). |
long |
getReduceElapsedTime()
The elapsed time for the reduce operation (the clock stops once the reduce operation is over). |
long |
getReduceFailedCount()
The #of reduce tasks that permanently failed. |
double |
getReducePercentSuccess()
The percent success for the reduce operation [0:1]. |
long |
getReduceRetryCount()
The #of reduce tasks that were retried (this counts each retry of each task). |
long |
getReduceSuccessCount()
The #of reduce tasks that eventually succeeded. |
long |
getReduceTaskCount()
The #of reduce tasks submitted so far (retries are not counted). |
protected double |
map()
Distribute the map tasks and wait for them to complete. |
protected double |
reduce()
Distribute the reduce tasks and wait for them to complete. |
AbstractMaster |
run(double minMapSuccessRate,
double minReduceSuccessRate)
Run the job. |
protected void |
setUp()
Select the map, reduce, and data services to be used by the job. |
protected void |
setUpDataStores()
Setup intermediate indices on the data services (one per reduce task). |
String |
status()
A summary of the current job state. |
protected void |
tearDown()
Tear down the embedded services. |
protected void |
terminate()
Terminate the job. |
| Methods inherited from class java.lang.Object |
|---|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
| Field Detail |
|---|
public static final transient org.apache.log4j.Logger log
protected IJobAndTaskService<MapJobMetadata,AbstractMapTask>[] mapServices
protected IJobAndTaskService<ReduceJobMetadata,AbstractReduceTask>[] reduceServices
protected final Format percent
| Constructor Detail |
|---|
public AbstractMaster(MapReduceJob job,
IBigdataClient client)
job - The map/reduce job to execute.client - The client used to read/write data stored in a federation.| Method Detail |
|---|
public long getMapTaskCount()
public long getReduceTaskCount()
public long getMapElapsedTime()
public long getReduceElapsedTime()
public long getMapSuccessCount()
public long getReduceSuccessCount()
public long getMapRetryCount()
public long getReduceRetryCount()
public long getMapFailedCount()
public long getReduceFailedCount()
protected void setUp()
IllegalStateException - if mapServices and reduceServices have
not been initialized before calling this method.protected void setUpDataStores()
protected void tearDown()
protected void terminate()
protected double map()
throws InterruptedException
InterruptedException - if the excecuting map tasks are interrupted.
protected double reduce()
throws InterruptedException
InterruptedException - if the executing tasks are interrupted.public double getMapPercentSuccess()
public double getReducePercentSuccess()
public String status()
public AbstractMaster run(double minMapSuccessRate,
double minReduceSuccessRate)
|
||||||||||
| PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
| SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD | |||||||||