com.bigdata.service.jini.master
Class TaskMaster<S extends TaskMaster.JobState,T extends Callable<U>,U>

java.lang.Object
  extended by com.bigdata.service.jini.master.TaskMaster<S,T,U>
Type Parameters:
S - The generic type of the TaskMaster.JobState.
T - The generic type of the client task.
U - The generic type of the value returned by the client task.
All Implemented Interfaces:
Callable<Void>
Direct Known Subclasses:
MappedTaskMaster, ThroughputMaster

public abstract class TaskMaster<S extends TaskMaster.JobState,T extends Callable<U>,U>
extends Object
implements Callable<Void>

Utility class that can be used to execute a distributed job. The master creates a set of tasks, submits each task to an IDataService for execution, and awaits their Futures. There are a variety of TaskMaster.ConfigurationOptions. In order to execute a master, you specify a concrete instance of this class and TaskMaster.ConfigurationOptions using the fully qualified class name of that master implementation class. You specify the client task using newClientTask(int).

Version:
$Id$
Author:
Bryan Thompson
TODO:
could refactor the task to a task sequence easily enough, perhaps using some of the rule step logic. That would be an interesting twist on a parallel datalog.

Nested Class Summary
static interface TaskMaster.ConfigurationOptions
          Configuration options for the TaskMaster and derived classes.
static class TaskMaster.DiscoveredServices
          Class used to return the discovered services.
static class TaskMaster.JobState
          State describing the job to be executed.
 
Field Summary
protected  JiniFederation<?> fed
          The federation (from the ctor).
protected static org.apache.log4j.Logger log
           
 
Constructor Summary
protected TaskMaster(JiniFederation<?> fed)
           
 
Method Summary
protected  boolean allDone()
          Check the futures.
protected  void attachPerformanceCounters(CounterSet counterSet)
          Attach to the counters reported by the client to the LBS.
protected  void awaitAll()
          Await the completion of the Future.
protected  void beginJob(S jobState)
          Callback invoked when the job is ready to execute and is holding the ZLock for the TaskMaster.JobState.
 Void call()
          Wait a bit to discover some minimum #of data services.
protected  void cancelAll(boolean mayInterruptIfRunning)
          Cancel the futures.
protected  void detachPerformanceCounters()
          Detach the performance counters for the job.
protected  void error(S jobState, Throwable t)
          Callback invoked if an exception is thrown during the job execution.
 void execute()
          Execute the master.
protected  void forceOverflow()
          Force overflow on all discovered IDataService.
 JiniFederation<?> getFederation()
          The federation (from the ctor).
 S getJobState()
          The TaskMaster.JobState which is either set from the Configuration (new job) or read from zookeeper (existing job) and thereafter unchanging.
protected  Future<Void> innerMain()
          Runs the master.
protected abstract  T newClientTask(int clientNum)
          Return a client to be executed on a remote data service.
protected abstract  S newJobState(String component, net.jini.config.Configuration config)
          Return a TaskMaster.JobState.
protected  void notifyOutcome(int clientNum, U value)
          Callback for the master to consume the outcome of the client's Future (default is NOP).
protected  void runJob()
          Start the client tasks and await their futures.
protected  ZLock setupJob()
          Sets up the TaskMaster.JobState in zookeeper, including the assignment of service UUIDs to each client.
protected  void startClients()
          Distributes the clients to the services on which they will execute and returns a map containing their Futures.
protected  void success(S jobState)
          Callback invoked when the job is done executing (normal completion) but is still holding the ZLock for the TaskMaster.JobState.
protected  void tearDownJob(S jobState, ZLock zlock)
          Callback invoked when the job is done executing (any completion) but has not yet release the ZLock for the TaskMaster.JobState.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

log

protected static final org.apache.log4j.Logger log

fed

protected final JiniFederation<?> fed
The federation (from the ctor).

Constructor Detail

TaskMaster

protected TaskMaster(JiniFederation<?> fed)
              throws net.jini.config.ConfigurationException
Parameters:
fed -
Throws:
net.jini.config.ConfigurationException
Method Detail

getFederation

public JiniFederation<?> getFederation()
The federation (from the ctor).


getJobState

public S getJobState()
The TaskMaster.JobState which is either set from the Configuration (new job) or read from zookeeper (existing job) and thereafter unchanging.


innerMain

protected final Future<Void> innerMain()
Runs the master. SIGTERM (normal kill or ^C) will cancel the job, including any running clients.

Returns:
The Future for the master. Use Future.get() to await the outcome of the master.
Throws:
InterruptedException
ExecutionException

execute

public void execute()
             throws InterruptedException,
                    ExecutionException
Execute the master. If the master is interrupted, including by the signal handler installed by innerMain(), then the client tasks will be cancelled. A simple main() can be written as follows:
 public static void main(final String[] args) {
 
     final JiniFederation fed = new JiniClient(args).connect();
 
     try {
 
         final TaskMaster task = new MyMaster(fed);
 
         task.execute();
         
     } finally {
     
         fed.shutdown();
         
     }
 
 }
 
Where MyMaster is a concrete subclass of TaskMaster.

Throws:
InterruptedException
ExecutionException

call

public final Void call()
                throws Exception
Wait a bit to discover some minimum #of data services. Then allocate the clients to the data services. There can be more than one per data service.

Specified by:
call in interface Callable<Void>
Returns:
null
Throws:
Exception
See Also:
execute()
TODO:
In my experience zookeeper (at least 3.0.1 and 3.1.0) has a tendency to drop sessions for the java client when under even moderate swapping. Because of this I am not verifying that the TaskMaster retains the ZLock for the job throughout its run. Doing so at this point is just begging for an aborted run.

runJob

protected void runJob()
               throws Exception,
                      InterruptedException
Start the client tasks and await their futures.

Throws:
Exception - Client execution problem.
InterruptedException - Master interrupted awaiting clients.

startClients

protected void startClients()
                     throws IOException
Distributes the clients to the services on which they will execute and returns a map containing their Futures. The kind of service on which the clients are run is determined by TaskMaster.JobState.clientsTemplate but must implement IRemoteExecutor. Clients are assigned to the services using a stable ordered assignment JobState#clientServiceUUIDs. If there are more clients than services, then some services will be tasked with more than one client. If there is a problem submitting the clients then any clients already submitted will be canceled and the original exception will be thrown out of this method.

Throws:
IOException - If there is an RMI problem submitting the clients to the IRemoteExecutors.
net.jini.config.ConfigurationException
See Also:
JobState#futures}

awaitAll

protected void awaitAll()
                 throws ExecutionException,
                        InterruptedException
Await the completion of the Future. If any client fails then the remaining clients will be canceled.

Throws:
IllegalStateException - if TaskMaster.JobState.futures is null.
ExecutionException - for the first client whose failure is noticed.
InterruptedException - if the master is interrupted while awaiting the Futures.

success

protected void success(S jobState)
                throws Exception
Callback invoked when the job is done executing (normal completion) but is still holding the ZLock for the TaskMaster.JobState. The default implementation destroys the znodes for the job since it is done executing.

Throws:
Exception

error

protected void error(S jobState,
                     Throwable t)
Callback invoked if an exception is thrown during the job execution. The TaskMaster.JobState.endMillis is set by this method, the client tasks are canceled, and an error message is logged. By default, the znode for the job is not destroyed. You can destroy a terminated job using zookeeper or automatically destroy a pre-existing job when re-submitting the job using TaskMaster.ConfigurationOptions.DELETE_JOB.

Note: This method should not throw anything since that could cause the root cause of the job error to be masked.

Parameters:
jobState - The TaskMaster.JobState.
t - The exception.

tearDownJob

protected void tearDownJob(S jobState,
                           ZLock zlock)
                    throws Exception
Callback invoked when the job is done executing (any completion) but has not yet release the ZLock for the TaskMaster.JobState. The default releases the ZLock. It may be extended to handle other cleanup.

Throws:
Exception

newJobState

protected abstract S newJobState(String component,
                                 net.jini.config.Configuration config)
                                                      throws net.jini.config.ConfigurationException
Return a TaskMaster.JobState.

Parameters:
component - The component.
config - The configuration.
Returns:
The TaskMaster.JobState.
Throws:
net.jini.config.ConfigurationException

newClientTask

protected abstract T newClientTask(int clientNum)
Return a client to be executed on a remote data service. The client can obtain access to the IBigdataFederation when it executes on the remote data service if it implements IDataServiceCallable. You can use AbstractClientTask as a starting point.

Parameters:
clientNum - The client number.
Returns:
The client task.
See Also:
AbstractClientTask

beginJob

protected void beginJob(S jobState)
                 throws Exception
Callback invoked when the job is ready to execute and is holding the ZLock for the TaskMaster.JobState. This may be extended to register indices, etc. The default implementation handles the setup of the optional index partition metadata dumps.

Throws:
Exception
See Also:
TaskMaster.ConfigurationOptions.INDEX_DUMP_DIR, TaskMaster.ConfigurationOptions.INDEX_DUMP_NAMESPACE

setupJob

protected ZLock setupJob()
                  throws org.apache.zookeeper.KeeperException,
                         InterruptedException,
                         TimeoutException
Sets up the TaskMaster.JobState in zookeeper, including the assignment of service UUIDs to each client. jobState will be replaced with the TaskMaster.JobState read from zookeeper if a pre-existing job is found in zookeeper.

Returns:
The global lock for the master running the job.
Throws:
org.apache.zookeeper.KeeperException
InterruptedException
TimeoutException

detachPerformanceCounters

protected void detachPerformanceCounters()
Detach the performance counters for the job.

TODO:
does not remove the counters on the LBS, just in local memory so this is not much help. It would only be useful if we re-ran the same job within the same JVM instance.

attachPerformanceCounters

protected void attachPerformanceCounters(CounterSet counterSet)
Attach to the counters reported by the client to the LBS.


allDone

protected boolean allDone()
                   throws InterruptedException,
                          ExecutionException
Check the futures.

Returns:
true when no more tasks are running.
Throws:
ExecutionException
InterruptedException

cancelAll

protected void cancelAll(boolean mayInterruptIfRunning)
Cancel the futures.

Parameters:
futures - The futures.
mayInterruptIfRunning - If the tasks for the futures may be interrupted.

forceOverflow

protected void forceOverflow()
Force overflow on all discovered IDataService.

See Also:
TaskMaster.ConfigurationOptions.FORCE_OVERFLOW
TODO:
This is an operation that we would like to run once by the master which actually executes the clients even if there are multiple masters (multiple master support is not really all there yet and there are interactions with how the client tasks handle multiple instances of themselves so this is all forward looking).

notifyOutcome

protected void notifyOutcome(int clientNum,
                             U value)
Callback for the master to consume the outcome of the client's Future (default is NOP).

Parameters:
clientNum - The client number.
value - The value returned by the Future.


Copyright © 2006-2011 SYSTAP, LLC. All Rights Reserved.