Hadoop Multiple InputSplit to the same Map Task Research Notes

This post is a summary to my research into the source code of Hadoop. I was investigating what would be the easiest way to parallelize I/O, deserialization and computation in the same Map Task.

Currently, each Hadoop job tracker accepts a job from user and breaks it down into a large number of map tasks and a few reduce tasks. Each Map Task is assigned a single input split and process the assigned split in its own JVM. The input split is determined by a user specified input formatter and its size can be specified as an input argument in a Hadoop Job Driver (user defined main class).

The way Map Tasks are assigned to each node is through the task tracker process. There is one TaskTracker on each physical node. Each TaskTracker has the capacity information of the node, how many map tasks / reduce tasks can run in parallel.  It is responsible for communicating with the Job Tracker through a heartbeat signal and request tasks when it has a free slot.

TaskTracker has a TaskLauncher, TaskLauncher has a run method


public void run() {

while (!Thread.interrupted()) {

try {

TaskInProgress tip;

Task task;

synchronized (tasksToLaunch) {

while (tasksToLaunch.isEmpty()) {



//get the TIP

tip = tasksToLaunch.remove(0);

task = tip.getTask();

LOG.info("Trying to launch : " + tip.getTask().getTaskID() +

" which needs " + task.getNumSlotsRequired() + " slots");


//wait for free slots to run

synchronized (numFreeSlots) {

boolean canLaunch = true;

while (numFreeSlots.get() < task.getNumSlotsRequired()) {

//Make sure that there is no kill task action for this task!

//We are not locking tip here, because it would reverse the

//locking order!

//Also, Lock for the tip is not required here! because :

// 1. runState of TaskStatus is volatile

// 2. Any notification is not missed because notification is

// synchronized on numFreeSlots. So, while we are doing the check,

// if the tip is half way through the kill(), we don't miss

// notification for the following wait().

if (!tip.canBeLaunched()) {

//got killed externally while still in the launcher queue

LOG.info("Not blocking slots for " + task.getTaskID()

+ " as it got killed externally. Task's state is "

+ tip.getRunState());

canLaunch = false;



LOG.info("TaskLauncher : Waiting for " + task.getNumSlotsRequired() +

" to launch " + task.getTaskID() + ", currently we have " +

numFreeSlots.get() + " free slots");



if (!canLaunch) {



LOG.info("In TaskLauncher, current free slots : " + numFreeSlots.get()+

" and trying to launch "+tip.getTask().getTaskID() +

" which needs " + task.getNumSlotsRequired() + " slots");

numFreeSlots.set(numFreeSlots.get() - task.getNumSlotsRequired());

assert (numFreeSlots.get() >= 0);


synchronized (tip) {

//to make sure that there is no kill task action for this

if (!tip.canBeLaunched()) {

//got killed externally while still in the launcher queue

LOG.info("Not launching task " + task.getTaskID() + " as it got"

+ " killed externally. Task's state is " + tip.getRunState());




tip.slotTaken = true;


//got a free slot. launch the task


} catch (InterruptedException e) {

return; // ALL DONE

} catch (Throwable th) {

LOG.error("TaskLauncher error " +





The StartNewTask call creates a new task

RunningJob rjob = localizeJob(tip);


// Localization is done. Neither rjob.jobConf nor rjob.ugi can be null

launchTaskForJob(tip, new JobConf(rjob.getJobConf()), rjob);

We trace to launch task call

if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {



 setTaskRunner(task.createRunner(TaskTracker.this, this, rjob));


This leads to the main process TaskRunner, which is responsible for executing the task. We go to TaskRunner class

public TaskRunner(TaskTracker.TaskInProgress tip, TaskTracker tracker,

JobConf conf, TaskTracker.RunningJob rjob

) throws IOException {

this.tip = tip; //the task assigned to the runner

this.t = tip.getTask(); //information from the task assigned to the runner

this.tracker = tracker; //has reference to the tracker that created it

this.conf = conf;

this.mapOutputFile = new MapOutputFile();


this.jvmManager = tracker.getJvmManagerInstance();

this.localdirs = conf.getLocalDirs();

taskDistributedCacheManager = rjob.distCacheMgr;


The task is being executed in the run method of the TaskRunner

public final void run() {

launchJvmAndWait(setupCmds, vargs, stdout, stderr, logSize, workDir);


We track down the link to launchJvmAndWait

void launchJvmAndWait(List <String> setup, Vector<String> vargs, File stdout,

File stderr, long logSize, File workDir)

throws InterruptedException, IOException {

jvmManager.launchJvm(this, jvmManager.constructJvmEnv(setup, vargs, stdout,

stderr, logSize, workDir, conf));

synchronized (lock) {

while (!done) {





Tracking down JvmManager, we go into the reapJvm method. The key is to find how the task input information is being transferred to the specific Map Task

spawnNewJvm(jobId, env, t);

private void spawnNewJvm(JobID jobId, JvmEnv env,

TaskRunner t) {

JvmRunner jvmRunner = new JvmRunner(env, jobId, t.getTask());

jvmIdToRunner.put(jvmRunner.jvmId, jvmRunner);

//spawn the JVM in a new thread. Note that there will be very little

//extra overhead of launching the new thread for a new JVM since

//most of the cost is involved in launching the process. Moreover,

//since we are going to be using the JVM for running many tasks,

//the thread launch cost becomes trivial when amortized over all

//tasks. Doing it this way also keeps code simple.


jvmRunner.setName("JVM Runner " + jvmRunner.jvmId + " spawned.");

setRunningTaskForJvm(jvmRunner.jvmId, t);




Stay patient, and we move to JvmRunner class, where this would be the interface with the running task. THe key component of the JvmRunner is the runChild method, notice that the Task object is created from the runner. Again it calls TaskTracker.TaskController to launch Task

public void runChild(JvmEnv env) throws IOException, InterruptedException{

int exitCode = 0;

try {


TaskRunner runner = jvmToRunningTask.get(jvmId);

if (runner != null) {

Task task = runner.getTask();

//Launch the task controller to run task JVM

String user = task.getUser();

TaskAttemptID taskAttemptId = task.getTaskID();

String taskAttemptIdStr = task.isTaskCleanupTask() ?

(taskAttemptId.toString() + TaskTracker.TASK_CLEANUP_SUFFIX) :


exitCode = tracker.getTaskController().launchTask(user,

jvmId.jobId.toString(), taskAttemptIdStr, env.setup,

env.vargs, env.workDir, env.stdout.toString(),



We go to TaskTracker.taskcontroller, to see how the task is actually being created

in the launch task method, a command line is constructed to launch a new JVM and is executed by a shell executor.

File jvm =                                  // use same jvm as parent

new File(new File(System.getProperty("java.home"), "bin"), "java");




command.add("-Dhadoop.log.dir=" + TaskLog.getBaseLogDir());


command.add("-Djava.library.path=" +


command.add(JobLocalizer.class.getName());  // main of JobLocalizer



// add the task tracker's reporting address



Now that we have launched a JVM, the key to understanding goes to the mani process of the JVM launched

What is the main class launched? It is the Child.java class, we can trace to this line in TaskRunner, where it sets up the jvmArgs used to launch the JVM

vargs.add(Child.class.getName());  // main of Child

In child process’s main function, we can trace down to one call to run the task,

public Object run() throws Exception {

try {

// use job-specified working directory


taskFinal.run(job, umbilical);        // run the task

} finally {….

Almost there, now a Task can be a Map Task or a Reduce Task. Another point to make is that the umbilical interface for remote access is passed in as an argument to keep track of the progress of tasks.

This converges with my previous search from Mapper function back to where the thing started.

if (useNewApi) {

runNewMapper(job, splitMetaInfo, umbilical, reporter);

} else {

runOldMapper(job, splitMetaInfo, umbilical, reporter);


In runNewMapper, we finally call user defined mapper class

mapperContext = contextConstructor.newInstance(mapper, job, getTaskID(),

input, output, committer,

reporter, split);


input.initialize(split, mapperContext);


In the next post, we need to explore how the split information gets passed through. The key here is that if we want to have multiple splits running in a single map task, we need to be able to pass through multiple split paths instead of just one. The keys lie in MapTask.java and Child.java, where the arguments to the Child JVM are being parsed.

This entry was posted in Hadoop Research, HJ-Hadoop Improvements. Bookmark the permalink.

One Response to Hadoop Multiple InputSplit to the same Map Task Research Notes

  1. Yoonmin Nam (Rony) says:

    Your article is very helpful for me.
    Thank you for sharing!

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s