How does Kill a task work in Hadoop

This is a post summarizing how Hadoop kills a task when something went wrong with the task. (Ran for too long, etc). It is important for me because I have multiple virtual JVMs (each hoisting a map task) running in a physical JVM. When any one of the JVMs get killed, I need to make sure all the tasks in the JVM are killed.

The call comes from TaskTracker.java

in the kill method


/**

* Something went wrong and the task must be killed.

* @param wasFailure was it a failure (versus a kill request)?

* @throws InterruptedException

*/

public synchronized void kill(boolean wasFailure

) throws IOException, InterruptedException {

    if (taskStatus.getRunState() == TaskStatus.State.RUNNING || taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING || isCleaningup()) {

        wasKilled = true;

        if (wasFailure) {

            failures += 1;

        }

        // runner could be null if task-cleanup attempt is not localized yet

       if (runner != null) {

           runner.kill();

       }

....

The runner.kill() calls on killing the current task runner. (A task runner is responsible for executing a single map or reduce task.

</pre>
/**

* Kill the child process

* @throws InterruptedException

* @throws IOException

*/

public void kill() throws IOException, InterruptedException {

    killed = true;

    jvmManager.taskKilled(this);

    signalDone();

}

This leads to JvmManager (apparently JvmManager manages all the JVMs executing in the same physical node as it stores all the JVMid to task mapping and other information)


public void taskKilled(TaskRunner tr) throws IOException, InterruptedException {

    if (tr.getTask().isMapTask()) {

        mapJvmManager.taskKilled(tr);

    } else {

        reduceJvmManager.taskKilled(tr);

    }

}

To perform the actual kill action


synchronized public void taskKilled(TaskRunner tr) throws IOException,InterruptedException {

    JVMId jvmId = runningTaskToJvm.remove(tr);

    if (jvmId != null) {

        jvmToRunningTask.remove(jvmId);

        killJvm(jvmId);

    }

}

And this uses killJvm(JVMId mvmid) method shown below

public void runChild(JvmEnv env) throws IOException, InterruptedException{

int exitCode = 0;

try {

env.vargs.add(Integer.toString(jvmId.getId()));

TaskRunner runner = jvmToRunningTask.get(jvmId);

if (runner != null) {

Task task = runner.getTask();

//Launch the task controller to run task JVM

String user = task.getUser();

TaskAttemptID taskAttemptId = task.getTaskID();

String taskAttemptIdStr = task.isTaskCleanupTask() ?

(taskAttemptId.toString() + TaskTracker.TASK_CLEANUP_SUFFIX) :

taskAttemptId.toString();

exitCode = tracker.getTaskController().launchTask(user,

jvmId.jobId.toString(), taskAttemptIdStr, env.setup,env.vargs, env.workDir, env.stdout.toString(), env.stderr.toString());

}

} catch (IOException ioe) {

// do nothing

// error and output are appropriately redirected

} finally { // handle the exit code

// although the process has exited before we get here,

// make sure the entire process group has also been killed.

kill();

updateOnJvmExit(jvmId, exitCode);

LOG.info("JVM : " + jvmId + " exited with exit code " + exitCode + ". Number of tasks it ran: " + numTasksRan);

deleteWorkDir(tracker, firstTask);

}

}

 


synchronized public void killJvm(JVMId jvmId) throws IOException,

InterruptedException {

    JvmRunner jvmRunner;

    if ((jvmRunner = jvmIdToRunner.get(jvmId)) != null) {

        killJvmRunner(jvmRunner);

    }

}

This calls on JvmRunner. JvmRunner is responsible for the process of a single JVM, so JvmManager manages multiple JvmRunner. JvmRunner also performs the initial spawn of JVMs.


private synchronized void killJvmRunner(JvmRunner jvmRunner

) throws IOException,InterruptedException {

    jvmRunner.kill();

    removeJvm(jvmRunner.jvmId);

}

Another place that used the kill() signal is at the end of a task execution


The barebones of the kill action, sending Signal.kill and Singal.term to the process.


synchronized void kill() throws IOException, InterruptedException {

    if (!killed) {

    TaskController controller = tracker.getTaskController();

    // Check inital context before issuing a kill to prevent situations

    // where kill is issued before task is launched.

    String pidStr = jvmIdToPid.get(jvmId);

    if (pidStr != null) {

    String user = env.conf.getUser();

    int pid = Integer.parseInt(pidStr);

    // start a thread that will kill the process dead

    if (sleeptimeBeforeSigkill > 0) {

        new DelayedProcessKiller(user, pid, sleeptimeBeforeSigkill,Signal.KILL).start();

        controller.signalTask(user, pid, Signal.TERM);

    } else {

        controller.signalTask(user, pid, Signal.KILL);

    }

    } else {

        LOG.info(String.format("JVM Not killed %s but just removed", jvmId.toString()));

    }

    killed = true;

    }

}

Advertisements
This entry was posted in Hadoop Research, HJ-Hadoop Improvements. Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s