Running Multiple Hadoop Map/Reduce Tasks simultaneously in the same JVM – Code Analysis

In the last phase, I was able to establish a connection from the TaskTracker to the ChildJVM and pass in arguments needed for task execution. This enables me to assign multiple tasks to the same JVM by sending in task specific information.

Now, I am working on the next phase after the Child JVM got all the information needed for multiple tasks. I am going to look into the Child JVM code and research into details such as locating variables that are task specific. The analysis focuses on Child.java file in 1.0.3 Hadoop distribution.

In the Child’s main program. Local variables can be associated with every task. The static variables are the ones that are more problematics. The static variables that are tricky are here,


// Create the JobConf and determine if this job gets segmented task logs

final JobConf job = new JobConf(task.getJobFile());

currentJobSegmented = logIsSegmented(job);

 

isCleanup = task.isTaskCleanupTask();

// reset the statistics for the task

FileSystem.clearStatistics();

// Set credentials

job.setCredentials(defaultConf.getCredentials());

//forcefully turn off caching for localfs. All cached FileSystems

//are closed during the JVM shutdown. We do certain

//localfs operations in the shutdown hook, and we don't

//want the localfs to be "closed"

job.setBoolean("fs.file.impl.disable.cache", false);

 

The next set of variables are easier to deal with since they are hopefully specific to the task (since setting up the working directory, mapred local directory all involved inputing a task specific directory, attempt directory)


// setup the child's mapred-local-dir. The child is now sandboxed and

// can only see files down and under attemtdir only.

TaskRunner.setupChildMapredLocalDirs(task, job);

// setup the child's attempt directories

localizeTask(task, job, logLocation);

 

//setupWorkDir actually sets up the symlinks for the distributed

//cache. After a task exits we wipe the workdir clean, and hence

//the symlinks have to be rebuilt.

TaskRunner.setupWorkDir(job, new File(cwd));

//create the index file so that the log files

//are viewable immediately

TaskLog.syncLogs

(logLocation, taskid, isCleanup, logIsSegmented(job));

The problematic segment is here


public Object run() throws Exception {

try {

// use job-specified working directory

FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());

taskFinal.run(job, umbilical);        // run the task

} finally {

TaskLog.syncLogs

(logLocation, taskid, isCleanup, logIsSegmented(job));

TaskLogsTruncater trunc = new TaskLogsTruncater(defaultConf);

trunc.truncateLogs(new JVMInfo(

TaskLog.getAttemptDir(taskFinal.getTaskID(),

taskFinal.isTaskCleanupTask()), Arrays.asList(taskFinal)));

}

It appears that the working directory might be related t the job. We have to make sure two tasks from the same Job will not interfere with each other. Currently, there is no way to figure out if that happened or not.

TaskLog and InitMetrics might work as they are creating separate directories for separate tasks.

We are going to work under these assumptions and try to produce a prototype by Wed, Dec 18th

Advertisements
This entry was posted in Hadoop Research, HJ-Hadoop Improvements. Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s