JVM Reuse Implementation in Hadoop MapReduce

This post delve into the implementation of JVM reuse in Hadoop MapReduce system. I looked through the code in JvmManager and Child.java to identify the design and implementations that enabled JVM Reuse.

In the JvmManager side, the relevant code is in reapJVM method


private synchronized void reapJvm(

TaskRunner t, JvmEnv env) throws IOException, InterruptedException {

if (t.getTaskInProgress().wasKilled()) {

//the task was killed in-flight

//no need to do the rest of the operations

return;

}

boolean spawnNewJvm = false;

JobID jobId = t.getTask().getJobID();

//Check whether there is a free slot to start a new JVM.

//,or, Kill a (idle) JVM and launch a new one

//When this method is called, we *must*

// (1) spawn a new JVM (if we are below the max)

// (2) find an idle JVM (that belongs to the same job), or,

// (3) kill an idle JVM (from a different job)

// (the order of return is in the order above)

int numJvmsSpawned = jvmIdToRunner.size();

JvmRunner runnerToKill = null;

if (numJvmsSpawned >= maxJvms) {

//go through the list of JVMs for all jobs.

Iterator<Map.Entry<JVMId, JvmRunner>> jvmIter =

jvmIdToRunner.entrySet().iterator();

while (jvmIter.hasNext()) {

JvmRunner jvmRunner = jvmIter.next().getValue();

JobID jId = jvmRunner.jvmId.getJobId();

//look for a free JVM for this job; if one exists then just break

if (jId.equals(jobId) && !jvmRunner.isBusy() && !jvmRunner.ranAll()){

setRunningTaskForJvm(jvmRunner.jvmId, t); //reserve the JVM

LOG.info("No new JVM spawned for jobId/taskid: " +

jobId+"/"+t.getTask().getTaskID() +

". Attempting to reuse: " + jvmRunner.jvmId);

return;

}

The key here is there are a few cases, if there is an idle JVM, then set the running task to map to the JVM ID. What is interesting here is that, setting running task doesn’t really do much,


synchronized public void setRunningTaskForJvm(JVMId jvmId,

TaskRunner t) {

jvmToRunningTask.put(jvmId, t);

runningTaskToJvm.put(t,jvmId);

jvmIdToRunner.get(jvmId).setBusy(true);

}

This is merely create a mapping. It didn’t really communicate with the JVM at this point. The JVM would later pick up the task (pull rather than push). It is shown in the below segment in Child.java


while (true) {

taskid = null;

currentJobSegmented = true;

&nbsp;

JvmTask myTask = umbilical.getTask(context);

if (myTask.shouldDie()) {

break;

} else {

if (myTask.getTask() == null) {

taskid = null;

currentJobSegmented = true;

&nbsp;

if (++idleLoopCount >= SLEEP_LONGER_COUNT) {

//we sleep for a bigger interval when we don't receive

//tasks for a while

Thread.sleep(1500);

} else {

Thread.sleep(500);

}

continue;

}

}

idleLoopCount = 0;

task = myTask.getTask();

The while loop continuously pull tasks that are assigned to the JVM through the umbilical protocol from the TaskTracker.

Advertisements
This entry was posted in Hadoop Research, HJ-Hadoop Improvements. Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s