Apache Mahout Clustering Algorithms Implementation

This is a post analyzing the implementation of a series of Clustering Algorithms, including KMeans, FuzzyKmeans in the Apache Hadoop Mahout Package (http://mahout.apache.org). I analyzed the memory footprint and other performance factor of the implementation.

Please refer to the previous post on the general description of the KMeans algorithm and its MapReduce implementation.

First, we find the package package org.apache.mahout.clustering.kmeans;

There is always a driver file, org.apache.mahout.clustering.kmeans.KMeansDriver. It is responsible for the set up of the program, especially parameters specific to KMeans application. A key function call in KmeansDriver.java is to buildClusters, it sets up the cluster classifier, distance measure and other parameters


public static Path buildClusters(Configuration conf,

Path input, Path clustersIn, Path output,
DistanceMeasure measure, int maxIterations, String delta, boolean runSequential) throws IOException, <span style="line-height: 1.5;">InterruptedException, ClassNotFoundException {</span>
double convergenceDelta = Double.parseDouble(delta);
List<Cluster> clusters = new ArrayList<Cluster>();
KMeansUtil.configureWithClusterInfo(conf, clustersIn, clusters);

if (clusters.isEmpty()) {
throw new IllegalStateException("No input clusters found in " + clustersIn + ". Check your -c argument.");
}

Path priorClustersPath = new Path(output,    Cluster.INITIAL_CLUSTERS_DIR);
ClusteringPolicy policy = new KMeansClusteringPolicy(convergenceDelta);
ClusterClassifier prior = new ClusterClassifier(clusters, policy);
prior.writeToSeqFiles(priorClustersPath);

if (runSequential) {
ClusterIterator.iterateSeq(conf, input, priorClustersPath, output, maxIterations);
} else {
ClusterIterator.iterateMR(conf, input, priorClustersPath, output, maxIterations);
}
return output;
 }

The key line is at the end


ClusterIterator.iterateMR(conf, input, priorClustersPath, output, maxIterations);

This call chains together multiple MapReduce jobs simulating multiple iterations.

The key source code in the implementation of the Map task for KMeans is in CIMapper class in package org.apache.mahout.clustering.iterator. Moving on, we look at key implementation components in ClusterIterator.java class


while (iteration <= numIterations) {
conf.set(PRIOR_PATH_KEY, priorPath.toString());

String jobName = "Cluster Iterator running iteration " + iteration + " over priorPath: " + priorPath;
Job job = new Job(conf, jobName);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(ClusterWritable.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(ClusterWritable.class);

job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapperClass(CIMapper.class);
job.setReducerClass(CIReducer.class);

FileInputFormat.addInputPath(job, inPath);
clustersOut = new Path(outPath, Cluster.CLUSTERS_DIR + iteration);
priorPath = clustersOut;
FileOutputFormat.setOutputPath(job, clustersOut);

job.setJarByClass(ClusterIterator.class);
//record start time of the iteration
startTime = System.currentTimeMillis();
if (!job.waitForCompletion(true)) {
throw new InterruptedException("Cluster Iteration " + iteration + " failed processing " + priorPath);
 }
 //record the end time and elapsed time of the iteration
 endTime = System.currentTimeMillis();
 elapsedTime = endTime - startTime;

 //accumulate the elasped time of the iterations
 totalElapsedTime += elapsedTime;

log.info("iteration " + iteration + " took " + elapsedTime/1000 + " seconds");

&nbsp;

We can see the CIMapper and CIReducer class is used for each job. Going into the CIMapper class


@Override
 protected void setup(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
String priorClustersPath = conf.get(ClusterIterator.PRIOR_PATH_KEY);
classifier = new ClusterClassifier();
classifier.readFromSeqFiles(conf, new Path(priorClustersPath));
policy = classifier.getPolicy();
policy.update(classifier);
super.setup(context);
 }

In the set up phase, the mapper reads in cluster information (in this case, cluster centroids) from the previous iteration or initialized. It sets a clustering policy.

Next, let’s take a look at the map function, which contains the key algorithm

</pre>
@Override
 protected void map(WritableComparable<?> key, VectorWritable value, Context context) throws IOException,InterruptedException {
Vector probabilities = classifier.classify(value.get());
Vector selections = policy.select(probabilities);
for (Iterator<Element> it = selections.iterateNonZero(); it.hasNext();) {
Element el = it.next();
classifier.train(el.index(), value.get(), el.get());
}
 }

It streams through the data points (elements) and train the classifier (classify them into clusters). We analyze this function carefully as the key information about the algorithms and memory footprint can be found here,

<pre>Vector probabilities = classifier.classify(value.get());
Vector selections = policy.select(probabilities);

For each data point, the above two calls calculate the probability that the data point belongs to each cluster and select the cluster with the highest probability.

To show this, we go into ClusterClassifer.java

</pre>
@Override
 public Vector classify(Vector instance) {
 return policy.classify(instance, this);
 }

The policy used here is KMeansClusteringPolicy, which extends AbstractClusteringPolicy. AbstractClusteringPolicy has the actual implementation for instance


@Override
 public Vector classify(Vector data, ClusterClassifier prior) {
 List<Cluster> models = prior.getModels();
 int i = 0;
 Vector pdfs = new DenseVector(models.size());
 for (Cluster model : models) {
 pdfs.set(i++, model.pdf(new VectorWritable(data)));
 }
 return pdfs.assign(new TimesFunction(), 1.0 / pdfs.zSum());
 }

To calculate the probability distribution function (pdf), we trace the call to model.pdf(new VectorWritable(data))

In this case, a model is a cluster, which extends DistanceMeasureCluster.java class,


@Override
 public double pdf(VectorWritable vw) {
 return 1 / (1 + measure.distance(vw.get(), getCenter()));
 }

As we can see, it uses the inputed distance measure to calculate the distance between the value of the current data point and the center. There is a field “center” in AbstractCluster.java (DistanceMeasureCluster extends AbstractCluster.java)


public abstract class AbstractCluster implements Cluster {

 // cluster persistent state

...

private Vector center;

...

<span style="line-height: 1.5;">

As a result, we know that the Mapper stores information about the old center information and the data points assignment to the clusters in the new iteration.

 

The rest of the mapper code

<pre>for (Iterator<Element> it = selections.iterateNonZero(); it.hasNext();) {
Element el = it.next();
classifier.train(el.index(), value.get(), el.get());
}

Adds the data point’s vector value to an accumulated vector and increment a count. The implementation details of the train step can be traced to AbstractCluster.java.


In the clean up tasks, it produces the final cluster coordinates of produced by the mapper and output it to the context.



@Override
 protected void cleanup(Context context) throws IOException, InterruptedException {
List<Cluster> clusters = classifier.getModels();
ClusterWritable cw = new ClusterWritable();
for (int index = 0; index < clusters.size(); index++) {
cw.setValue(clusters.get(index));
context.write(new IntWritable(index), cw);
}
super.cleanup(context);
 }

Advertisements
This entry was posted in Algorithms, Mahout, MapReduce Algorithms. Bookmark the permalink.

8 Responses to Apache Mahout Clustering Algorithms Implementation

  1. Thank you for great documents. 🙂
    I will be expecting your next post.

    I have a question.
    I am specially interested in creating my own MapReduce job doing K-means clustering for large data sets.
    In case I have to pre-process my data set(ex: parsing and converting the data into vector in my own way), I am unable to use KMeansDriver class directly.
    Rather I should implement alternatives for KMeansDriver, CIMapper, CIReducer, and so on to run K-Means clustering.
    Am I right?

    • Hmm, I am not very sure if you absolutely have to write your own vector class, take my comments with a grain of salt. I believe you might be able to just use the vector, cluster class without rewriting much of the code.

      You might need to write your own preprocessing operations to convert your documents into the specific vector classes. Even in Mahout, you have to do a preprocessing step to parse documents into the right data structures. Look at the quick start section of this post (the post is outdated, but the order you process the input and the command line to run still works, I tested that)
      http://mahout.apache.org/users/clustering/k-means-clustering.html

      Get the Reuters dataset
      Run org.apache.lucene.benchmark.utils.ExtractReuters to generate reuters-out from reuters-sgm(the downloaded archive)
      Run seqdirectory to convert reuters-out to SequenceFile format
      Run seq2sparse to convert SequenceFiles to sparse vector format
      Finally, run kMeans with 20 clusters.

      You have to convert the text file into a SequenceFile format, then into sparse vector format. May be you could do the same for your own documents.

      Hope this helps!

      Yunming

      • Thanks for kind response Yunming.

        The problem is, I need to pre-process about 70GB or more logs.
        And I want to run all the above procedures as a sequence.
        (Not executing the command one by one.)

        So I what I thought is as follow:

        0. Create my own Driver to load multiple MapReduce Jobs.

        1. Run first MapReduce Job to parse my logs and create vectors line by line.
        There will be Mapper only, and each Mapper will store the vectors in SequenceFile format.

        2. Run second MapReduce Job to read vectors in the form of SequenceFile and run K-means clustering. Each Mapper will calculate the distance between some number of centroids and each points. Reducer will assign each points to specific centroids and update the centroids based on assigned points and store the results in sequence file format.

        3. Driver check the convergence delta to determine whether proceed further or not.
        It will calculate differences between previous centroids and updated centroids.

        4. Run Multiple K-Means Clustering Jobs until differences of centroids converge.

        5. Display clustering results.

        I wondered whether I can reuse KMeansDriver which is provided by Mahout.
        If possible, I can run second MapReduce Job(KMeans Clustering) using KMeansDriver, and it will save my efforts.
        In this case, there will be a driver(KMeansDriver) in a driver(my own). So I doubt it`s not possible.
        Then I should write down all my own..

        If I miss something, kindly let me know.

        Thank you in advance.

  2. It has been a while since I actually run the jobs. I think you might be able to use a script to do step 2, and another script to do step 3?

    Sorry, I can’t be of much help here. I was mostly looking into the implementation for CIMapper. (It seems like you can still reuse the CIMapper, CIReducer class?)

  3. walter white says:

    Hello yunming,
    Could you explain what is the CIReducer doing in the reduce function please?

    protected void reduce(IntWritable key, Iterable values, Context context) throws IOException,
    InterruptedException {
    Iterator iter = values.iterator();
    ClusterWritable first = null;
    while (iter.hasNext()) {
    ClusterWritable cw = iter.next();
    if (first == null) {
    first = cw;
    } else {
    first.getValue().observe(cw.getValue());
    }
    }
    List models = new ArrayList();
    models.add(first.getValue());
    classifier = new ClusterClassifier(models, policy);
    classifier.close();
    context.write(key, first);}

    Thanks a lot.

    • I am not sure about the specific snippet. Sorry.

      • walter white says:

        Hello Yunming,

        I went through this code yesterday and what I understand is that it will read each of the clusters that were trained in the map() function, and in the classifier.close() function, it will calculate the variables S0, S1 and S2 and also wether that specific cluster converged or not.

        Best regards!

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s