com.bigdata.service.mapred
Class AbstractJobAndTaskService<M extends IJobMetadata,T extends ITask>

java.lang.Object
  extended by com.bigdata.service.mapred.AbstractJobAndTaskService<M,T>
All Implemented Interfaces:
IServiceShutdown, IJobAndTaskService<M,T>, Remote
Direct Known Subclasses:
MapService, ReduceService

public abstract class AbstractJobAndTaskService<M extends IJobMetadata,T extends ITask>
extends Object
implements IServiceShutdown, IJobAndTaskService<M,T>

Abstract base class for services implementing IJobAndTaskService.

Concrete subclasses must:

FIXME reduce visibility for threadPoolSize and jobs, both of which are used now by some unit tests.

Version:
$Id: AbstractJobAndTaskService.java 2265 2009-10-26 12:51:06Z thompsonbry $
Author:
Bryan Thompson

Nested Class Summary
static class AbstractJobAndTaskService.AbstractTaskWorker<M,T extends ITask>
          Abstract base class for task workers running in the MapService or the ReduceService.
static class AbstractJobAndTaskService.Options
          Options for AbstractJobAndTaskService(Properties)
 
Field Summary
protected  ExecutorService cancelService
          A service used by the HeartbeatMonitorTask to cancel jobs.
static String ERR_JOB_EXISTS
          Text of the error message used when the UUID of a job is already known to the service.
static String ERR_NO_JOB_IDENTIFIER
          Text of the error message used when the UUID of a job is not available from the IJobMetadata provided to IJobAndTaskService.startJob(IJobMetadata).
static String ERR_NO_SUCH_JOB
          Text of the error message used when the UUID of a job is not know to this service.
protected  Timer heartbeatMonitor
          Service that looks for heartbeat messages and cancels a job if it misses more than N heartbeats.
protected  long heartbeatPeriod
          The heartbeat period (ms).
protected  long heartbeatTimeout
          The heartbeat timeout (ms).
 Map<UUID,JobState<M>> jobs
          Running jobs.
static org.apache.log4j.Logger log
           
protected  ExecutorService taskService
          Queue of executing ITasks.
 int threadPoolSize
          The size of the thread pool.
protected  ScheduledExecutorService timeoutService
          Service that looks for task execution timeouts and cancels a task if it exceeds its timeout.
 
Constructor Summary
protected AbstractJobAndTaskService(Properties properties)
           
 
Method Summary
 boolean cancel(UUID job, UUID task)
          Cancel a task.
 void cancelJob(UUID job)
          Terminate a job.
 void destroy()
           
 Outcome[] drain(UUID job)
          Drains the Outcomes for all completed tasks.
abstract  IBigdataClient getBigdataClient()
           
 int heartbeat(UUID job)
          The service will IJobAndTaskService.cancelJob(UUID) the job if it does not continue to receive heartbeat for that job.
 boolean isOpen()
          Return true iff the service is running.
protected  JobState<M> newJobState(UUID job, M metadata, IBigdataClient client)
          Factory for job state.
protected abstract  AbstractJobAndTaskService.AbstractTaskWorker<M,T> newTaskWorker(JobState<M> jobState, T task)
          Factory for task workers.
 void shutdown()
          The service will no longer accept new requests, but existing requests will be processed (sychronous).
 void shutdownNow()
          The service will no longer accept new requests and will make a best effort attempt to terminate all existing requests and return ASAP.
 void startJob(M jobMetadata)
          Declare a job.
 void submit(UUID job, T task, long timeout)
          Submit a task to be executed as part of a job (asynchronous).
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 
Methods inherited from interface com.bigdata.service.mapred.IJobAndTaskService
getServiceUUID
 

Field Detail

log

public static final transient org.apache.log4j.Logger log

ERR_NO_JOB_IDENTIFIER

public static final String ERR_NO_JOB_IDENTIFIER
Text of the error message used when the UUID of a job is not available from the IJobMetadata provided to IJobAndTaskService.startJob(IJobMetadata).

See Also:
Constant Field Values

ERR_JOB_EXISTS

public static final String ERR_JOB_EXISTS
Text of the error message used when the UUID of a job is already known to the service.

See Also:
Constant Field Values

ERR_NO_SUCH_JOB

public static final String ERR_NO_SUCH_JOB
Text of the error message used when the UUID of a job is not know to this service.

See Also:
Constant Field Values

threadPoolSize

public final int threadPoolSize
The size of the thread pool.


taskService

protected final ExecutorService taskService
Queue of executing ITasks.

See Also:
AbstractJobAndTaskService.Options.THREAD_POOL_SIZE

timeoutService

protected final ScheduledExecutorService timeoutService
Service that looks for task execution timeouts and cancels a task if it exceeds its timeout.


heartbeatMonitor

protected final Timer heartbeatMonitor
Service that looks for heartbeat messages and cancels a job if it misses more than N heartbeats.


heartbeatTimeout

protected final long heartbeatTimeout
The heartbeat timeout (ms). A job will be cancelled after this much time without receiving a heartbeat. See AbstractJobAndTaskService.Options.HEARTBEAT_TIMEOUT


heartbeatPeriod

protected final long heartbeatPeriod
The heartbeat period (ms). The HeartbeatMonitorTask will run every N milliseconds, where N is the heartbeatPeriod. See AbstractJobAndTaskService.Options.HEARTBEAT_PERIOD


cancelService

protected final ExecutorService cancelService
A service used by the HeartbeatMonitorTask to cancel jobs. The use of this services keeps down the latency of the timer task.


jobs

public final Map<UUID,JobState<M extends IJobMetadata>> jobs
Running jobs.

Constructor Detail

AbstractJobAndTaskService

protected AbstractJobAndTaskService(Properties properties)
Parameters:
properties - See AbstractJobAndTaskService.Options
Method Detail

isOpen

public boolean isOpen()
Description copied from interface: IServiceShutdown
Return true iff the service is running.

Specified by:
isOpen in interface IServiceShutdown

shutdown

public void shutdown()
Description copied from interface: IServiceShutdown
The service will no longer accept new requests, but existing requests will be processed (sychronous). This method should await the termination of pending requests, but no longer than the timeout specified by IServiceShutdown.Options.SHUTDOWN_TIMEOUT. Implementations SHOULD be synchronized. If the service is aleady shutdown, then this method should be a NOP.

Specified by:
shutdown in interface IServiceShutdown

shutdownNow

public void shutdownNow()
Description copied from interface: IServiceShutdown
The service will no longer accept new requests and will make a best effort attempt to terminate all existing requests and return ASAP. This method should terminate any asynchronous processing, release all resources and return immediately. Implementations SHOULD be synchronized. If the service is aleady shutdown, then this method should be a NOP.

Specified by:
shutdownNow in interface IServiceShutdown

destroy

public void destroy()

getBigdataClient

public abstract IBigdataClient getBigdataClient()
Returns:
The client used to read/write data.
TODO:
The IBigdataClient should be initialized from the IJobMetadata a per job basis. This will let us: (a) reuse these services across federations; and (b) avoid mishap when the client is connected to one federation and it tries to discover services connected to another federation. Of course, this does not matter until we support multiple federations....

This approach requires a weak value hash map of the clients and a hard reference to the client in the job. Once no job references a client that client should be disconnected from its federation. Rather than always minting a client for a job, we could also first check the hash map for an existing client for the same federation. Will the client explicitly disconnect from federations or should that be automatic when there is no longer a hard reference to a given federation? plus a delay?

It also makes sense to allow a different client for input and output so that a map/reduce job can cross federations.


newJobState

protected JobState<M> newJobState(UUID job,
                                  M metadata,
                                  IBigdataClient client)
Factory for job state.

Parameters:
job - The job identifier.
metadata - The job metadata.
client - Used to access the IBigdataFederation when reading or writing data.
Returns:
A JobState object.

newTaskWorker

protected abstract AbstractJobAndTaskService.AbstractTaskWorker<M,T> newTaskWorker(JobState<M> jobState,
                                                                                   T task)
Factory for task workers.

Parameters:
jobState - The job state maintained by this service.
task - The task to be executed.
Returns:
The worker that will execute a given task.

startJob

public void startJob(M jobMetadata)
Description copied from interface: IJobAndTaskService
Declare a job. The service will now accept tasks for the job. The client MUST send IJobAndTaskService.heartbeat(UUID) messages or the service will cancel the job.

Specified by:
startJob in interface IJobAndTaskService<M extends IJobMetadata,T extends ITask>
Parameters:
jobMetadata - The service specific job metadata.

cancelJob

public void cancelJob(UUID job)
Description copied from interface: IJobAndTaskService
Terminate a job. The service will no longer accept tasks for this job. Any running tasks will be terminated. This method SHOULD be invoked whether a job is terminated normally or aborted. It will be invoked by the service in any case if the service no longer recieves IJobAndTaskService.heartbeat(UUID) messages for this job.

Specified by:
cancelJob in interface IJobAndTaskService<M extends IJobMetadata,T extends ITask>
Parameters:
job - The job identifier.

heartbeat

public int heartbeat(UUID job)
              throws IOException
Description copied from interface: IJobAndTaskService
The service will IJobAndTaskService.cancelJob(UUID) the job if it does not continue to receive heartbeat for that job. The service timeout for a job is on the order of a few seconds, but clients are encouraged to send heartbeats ever 100ms.

Note: Clients may also use this message to monitor the progression of tasks for the specified job.

Specified by:
heartbeat in interface IJobAndTaskService<M extends IJobMetadata,T extends ITask>
Parameters:
job - The job identifier.
Returns:
The #of completed tasks for that job.
Throws:
IOException

drain

public Outcome[] drain(UUID job)
Description copied from interface: IJobAndTaskService
Drains the Outcomes for all completed tasks. The Outcome for those tasks is no longer available from the service.

Specified by:
drain in interface IJobAndTaskService<M extends IJobMetadata,T extends ITask>
Parameters:
job - The job identifier.
Returns:
The task Outcomes.

submit

public void submit(UUID job,
                   T task,
                   long timeout)
Description copied from interface: IJobAndTaskService
Submit a task to be executed as part of a job (asynchronous). The task will be placed into a queue and will begin executing once there is an available worker. When the tasks completes with any Status, it will be reported by IJobAndTaskService.heartbeat(UUID) and its Outcome will be made available by IJobAndTaskService.drain(UUID).

Specified by:
submit in interface IJobAndTaskService<M extends IJobMetadata,T extends ITask>
Parameters:
job - The job identifier.
task - The task to be executed.
timeout - When non-zero, the timeout in milliseconds for this task once it begins to execute on the service (this controls the actual maximum run time of the task rather than the time since the task was submitted or since the job was started).

cancel

public boolean cancel(UUID job,
                      UUID task)
Description copied from interface: IJobAndTaskService
Cancel a task. If the task is queued it will not be run. It is is running, it will be cancelled. If the task has already completed the service will ignore this message.

Specified by:
cancel in interface IJobAndTaskService<M extends IJobMetadata,T extends ITask>
Parameters:
job - The job identifier.
task - The task identifier.
Returns:
false if the task could not be cancelled (per Future.cancel(boolean)).


Copyright © 2006-2009 SYSTAP, LLC. All Rights Reserved.