Further changes to make Parallel Jvm in Hadoop work

This is a post looking into possible changes to fix an exception that I am seeing in running parallel JVM in hadoop. The issue is that some of the map attempts crashed and the reduce tasks are not able to read the output from the map task, thus, can’t start on making progress on reduce tasks.

The exception is the following

14/02/19 16:05:42 INFO mapred.JobClient: Task Id : attempt_201402191558_0001_m_000335_0, Status : FAILED

java.lang.Throwable: Child Error

at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:271)

Caused by: java.io.IOException: Task process exit with nonzero status of 143.

at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:258)

14/02/19 16:05:42 WARN mapred.JobClient: Error reading task outputhttp://cn-0093.stic.rice.edu:25080/tasklog?plaintext=true&attemptid=attempt_201402191558_0001_m_000335_0&filter=stdout

14/02/19 16:05:42 WARN mapred.JobClient: Error reading task outputhttp://cn-0093.stic.rice.edu:25080/tasklog?plaintext=true&attemptid=attempt_201402191558_0001_m_000335_0&filter=stderr

The failure at the map task attempt results in reduce task failure, as we can find the following log message,

./cn-0092/hadoop-yz17/mapred/local/userlogs/job_201402191558_0001/attempt_201402191558_0001_r_000002_0/syslog:234:2014-02-19 16:05:46,710 INFO org.apache.hadoop.mapred.ReduceTask: Ignoring obsolete output of FAILED map-task: ‘attempt_201402191558_0001_m_000335_0’

It ignores the output from the failed task.

The exception can be starts from TaskRunner.java


launchJvmAndWait(setupCmds, vargs, stdout, stderr, logSize, workDir);

tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID());

if (exitCodeSet) {

    if (!killed && exitCode != 0) {

        if (exitCode == 65) {

        tracker.getTaskTrackerInstrumentation().taskFailedPing(t.getTaskID());

        }

        throw new IOException("Task process exit with nonzero status of " + exitCode + ".");

    }

}

The reason for throwing the exception is that a task returned from launchJvmAndWait(…) call, but it is not killed. Normally, if a task is killed, the killed flag should be set to true.

This is problematic for current implementation because we kept multiple JVMIds running in the same JVM. If a certain task is killed by terminating its JVM, all other tasks running in the same JVM will be terminated. However, it is not registered in the system that the other tasks are no longer running.

I propose changes to the system by creating a group of JVM Id’s running in the same physical JVM, so when a task is killed, the system would know other tasks running in the same JVM are killed as well.

The error message indicated an exit code f 143 (killed by external process). in the successful runs, JVMs often exited with 143. So it is not the error code itself that is the problem, the problem is that it exited with 143 and the killed flag is not set.

The code that set the killed flag is in JvmManager.java, which we modified heavily previously “runChild” method.


finally { // handle the exit code

    // although the process has exited before we get here,

    // make sure the entire process group has also been killed.

    kill();

    updateOnJvmExit(jvmId, exitCode);

    LOG.info("JVM : " + jvmId + " exited with exit code " + exitCode + ". Number of tasks it ran: " + numTasksRan);

    deleteWorkDir(tracker, firstTask);

}

The kill() call basically sends a terminate signal to the process associated with the JVM. One thing that I want to see is that multiple JvmIds should have been mapped to the same pid. (as we are running multiple JvmId processes )


synchronized private void updateOnJvmExit(JVMId jvmId, int exitCode) {

    removeJvm(jvmId);

    TaskRunner t = jvmToRunningTask.remove(jvmId);

    if (t != null) {

        runningTaskToJvm.remove(t);
    if (exitCode != 0) {

        t.setExitCode(exitCode);

    }

    t.signalDone();

    }

}

The cases when a JVM is killed can be found in the documentation of JvmManager.java

//Cases when a JVM is killed:

// (1) the JVM under consideration belongs to the same job

//     (passed in the argument). In this case, kill only when

//     the JVM ran all the tasks it was scheduled to run (in terms

//     of count).

// (2) the JVM under consideration belongs to a different job and is

//     currently not busy

//But in both the above cases, we see if we can assign the current

//task to an idle JVM (hence we continue the loop even on a match)

Since we are turning on JVM reuse, there is no way a JVM will be terminated because it ran all the tasks it is suppose to run. (There is no task count in this case). It is shown in the bit of code below in JobConf.java


/**

* Get the number of tasks that a spawned JVM should execute

*/

public int getNumTasksToExecutePerJvm() {

return getInt("mapred.job.reuse.jvm.num.tasks", 1);

}

Advertisements
This entry was posted in Habanero Java, Hadoop Research, Uncategorized. Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s