Hadoop Heartbeat, Task Tracker, Task Runner exploration notes

HeartBeat Mechanism

Task Assignment (pg 192, Hadoop The Definitive Guide, 3rd Edition)

Tasktrackers run a simple loop that periodically sends heartbeat method calls to the jobtracker. It is a channel for messages that indicates whether a task tracker is alive and ready to run a task.

The heartbeat loop is implemented in the task tracker as

<p dir="ltr">       // Send the heartbeat and process the jobtracker's directives</p>
<p dir="ltr">       HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);</p>
<p dir="ltr">

The information about the task tracker is transmitted using a status object,

below is the code section in transmitHeartBeat( ) method

<p dir="ltr">   //</p>
<p dir="ltr">   // Check if the last heartbeat got through...</p>
<p dir="ltr">   // if so then build the heartbeat information for the JobTracker;</p>
<p dir="ltr">   // else resend the previous status information.</p>
<p dir="ltr">   //</p>
<p dir="ltr">   if (status == null) {</p>
<p dir="ltr">     synchronized (this) {</p>
<p dir="ltr">       status = new TaskTrackerStatus(taskTrackerName, localHostname,</p>
<p dir="ltr">                                      httpPort,</p>
<p dir="ltr">                                      cloneAndResetRunningTaskStatuses(</p>
<p dir="ltr">                                        sendCounters),</p>
<p dir="ltr">                                      failures,</p>
<p dir="ltr">                                      maxMapSlots,</p>
<p dir="ltr">                                      maxReduceSlots);</p>
<p dir="ltr">     }</p>
<p dir="ltr">   } else {</p>
<p dir="ltr">     LOG.info("Resending 'status' to '" + jobTrackAddr.getHostName() +</p>
<p dir="ltr">              "' with reponseId '" + heartbeatResponseId);</p>
<p dir="ltr">   }</p>
<p dir="ltr">

The TaskTrackerStatus object has the methods that record the number of available Mapper and Reducer tasks

Constructor for the class

<p dir="ltr">public TaskTrackerStatus(String trackerName, String host,</p>
<p dir="ltr">                          int httpPort, List<TaskStatus> taskReports,</p>
<p dir="ltr">                          int failures, int maxMapTasks,</p>
<p dir="ltr">                          int maxReduceTasks) {</p>
<p dir="ltr">   this.trackerName = trackerName;</p>
<p dir="ltr">   this.host = host;</p>
<p dir="ltr">   this.httpPort = httpPort;</p>
<p dir="ltr">   this.taskReports = new ArrayList<TaskStatus>(taskReports);</p>
<p dir="ltr">   this.failures = failures;</p>
<p dir="ltr">   this.maxMapTasks = maxMapTasks;</p>
<p dir="ltr">   this.maxReduceTasks = maxReduceTasks;</p>
<p dir="ltr">   this.resStatus = new ResourceStatus();</p>
<p dir="ltr">   this.healthStatus = new TaskTrackerHealthStatus();</p>
<p dir="ltr"> }</p>
<p dir="ltr">

Methods for calculating the number of available map tasks (calculation for reduce tasks follow the same producredures)

<p dir="ltr"> /**</p>
<p dir="ltr">  * Get the maximum map slots for this node.</p>
<p dir="ltr">  * @return the maximum map slots for this node</p>
<p dir="ltr">  */</p>
<p dir="ltr"> public int getMaxMapSlots() {</p>
<p dir="ltr">   return maxMapTasks;</p>
<p dir="ltr"> }</p>
<p dir="ltr"> /**</p>
<p dir="ltr">  * Get the number of occupied map slots.</p>
<p dir="ltr">  * @return the number of occupied map slots</p>
<p dir="ltr">  */</p>
<p dir="ltr"> public int countOccupiedMapSlots() {</p>
<p dir="ltr">   int mapSlotsCount = 0;</p>
<p dir="ltr">   for (TaskStatus ts : taskReports) {</p>
<p dir="ltr">     if (ts.getIsMap() && isTaskRunning(ts)) {</p>
<p dir="ltr">       mapSlotsCount += ts.getNumSlots();</p>
<p dir="ltr">     }</p>
<p dir="ltr">   }</p>
<p dir="ltr">   return mapSlotsCount;</p>
<p dir="ltr"> }</p>
<p dir="ltr"> /**</p>
<p dir="ltr">  * Get available map slots.</p>
<p dir="ltr">  * @return available map slots</p>
<p dir="ltr">  */</p>
<p dir="ltr"> public int getAvailableMapSlots() {</p>
<p dir="ltr">   return getMaxMapSlots() - countOccupiedMapSlots();</p>
<p dir="ltr"> }</p>
<p dir="ltr"></p>
<p dir="ltr">

Once we have the taskTrackerStatus object set up and ready, we need to transfer it to JobTracker, through RPC routine

The line in the TaskTracker (hadoop.mapred.TaskTracker) class that call on remote procedure and get back the reponse

<p dir="ltr">   //</p>
<p dir="ltr">   // Xmit the heartbeat</p>
<p dir="ltr">   //</p>
<p dir="ltr">   HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,</p>
<p dir="ltr">                                                             justStarted,</p>
<p dir="ltr">                                                             justInited,</p>
<p dir="ltr">                                                             askForNewTask,</p>
<p dir="ltr">                                                             heartbeatResponseId);</p>
<p dir="ltr">   //</p>
<p dir="ltr">   // The heartbeat got through successfully!</p>
<p dir="ltr">   //</p>
<p dir="ltr">   heartbeatResponseId = heartbeatResponse.getResponseId();</p>
<p dir="ltr"></p>
<p dir="ltr">

Now we move away from Tasktracker and to Job Tracker,

The RPC routine is made possible through an interface implemented by jobClient and JobTracker


* Protocol that a TaskTracker and the central JobTracker use to communicate.

* The JobTracker is the Server, which implements this protocol.



   serverPrincipal = JobTracker.JT_USER_NAME,

   clientPrincipal = TaskTracker.TT_USER_NAME)

interface InterTrackerProtocol extends VersionedProtocol {


* JobTracker is the central location for submitting and

* tracking MR jobs in a network environment.



public class JobTracker implements MRConstants, InterTrackerProtocol,

   JobSubmissionProtocol, TaskTrackerManager, RefreshUserMappingsProtocol,

   RefreshAuthorizationPolicyProtocol, AdminOperationsProtocol,


In the JobTracker,

public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,

                                                 boolean restarted,

                                                 boolean initialContact,

                                                 boolean acceptNewTasks,

                                                 short responseId)

is invoked to get the heartbeat signal. It then the Job tracker uses the info to assign task

through a task scheduler. There are two kinds of schedulers, the capacity scheduler and the fair scheduler,

     List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);

       if (tasks == null ) {

         tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName));


In the task scheduler, (we checked the fair scheuler)

   while (true) {

     // Computing the ending conditions for the loop

     // Reject a task type if one of the following condition happens

     // 1. number of assigned task reaches per heatbeat limit

     // 2. number of running tasks reaches runnable tasks

     // 3. task is rejected by the LoadManager.canAssign

     if (!mapRejected) {

       if (mapsAssigned == mapCapacity ||

           runningMaps == runnableMaps ||

           !loadMgr.canAssignMap(tts, runnableMaps, totalMapSlots)) {

         eventLog.log(“INFO”, “Can’t assign another MAP to ” + trackerName);

         mapRejected = true;



     if (!reduceRejected) {

       if (reducesAssigned == reduceCapacity ||

           runningReduces == runnableReduces ||

           !loadMgr.canAssignReduce(tts, runnableReduces, totalReduceSlots)) {

         eventLog.log(“INFO”, “Can’t assign another REDUCE to ” + trackerName);

         reduceRejected = true;



Parallel processing of input Key-Val Pair

Tracing starts from Child java

run method

 public Object run() throws Exception {

           try {

             // use job-specified working directory


             taskFinal.run(job, umbilical);        // run the task

           } finally {


               (logLocation, taskid, isCleanup, logIsSegmented(job));

             TaskLogsTruncater trunc = new TaskLogsTruncater(defaultConf);

             trunc.truncateLogs(new JVMInfo(


                   taskFinal.isTaskCleanupTask()), Arrays.asList(taskFinal)));


           return null;


MapTask.java run mapper

 public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)

   throws IOException, ClassNotFoundException, InterruptedException {

   this.umbilical = umbilical;

   // start thread that will handle communication with parent

   TaskReporter reporter = new TaskReporter(getProgress(), umbilical,



   boolean useNewApi = job.getUseNewMapper();

   initialize(job, getJobID(), reporter, useNewApi);

   // check if it is a cleanupJobTask

   if (jobCleanup) {

     runJobCleanupTask(umbilical, reporter);



   if (jobSetup) {

     runJobSetupTask(umbilical, reporter);



   if (taskCleanup) {

     runTaskCleanupTask(umbilical, reporter);



   if (useNewApi) {

     runNewMapper(job, splitMetaInfo, umbilical, reporter);

   } else {

     runOldMapper(job, splitMetaInfo, umbilical, reporter);


runNewMapper() method inside Map.java



public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {


  * Expert users can override this method for more complete control over the

  * execution of the Mapper.

  * @param context

  * @throws IOException


 public void run(Context context) throws IOException, InterruptedException {


   while (context.nextKeyValue()) {

     map(context.getCurrentKey(), context.getCurrentValue(), context);





This entry was posted in Hadoop Research, HJ-Hadoop Improvements, Uncategorized. Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s