This is a post introducing KMeans Clustering Algorithm and explaining the MapReduce implementation of the algorithms.
KMeans is a classical clustering algorithm. Clustering algorithms classify a given data set into a number of clusters. Let’s put the algorithm in the context of an application. Given all the news that come out today, the algorithm group news that belong to the same topic, such as sports, politics and movies.
The idea behind KMeans clustering is to
 First define k centroids (can be randomly chosen, but the centroids should be far away from each other if possible).
 The next step is to take each point belonging to a given data set and associate it to the nearest centroid.
 Now that we have all the points assigned to one of the k clusters, we recalculate the centroids of the kclusters.
 Go back to step 2 with the updated centroids location. Repeat the step 2 and 3 until the centroids no longer move.
To implement the algorithm in MapReduce fashion,
In Map phase, each map task read in a shared file containing all the cluster centroids. The cluster centroids file can be typically stored in a Hadoop distributed cache. Each map task read in a small slice of input data points. For each data point, it assign it to the cluster whose centroid is closest to the current data.
An optimization that can be done is we create a set of k centroids that store the new coordinates. As a result, data points in the same map task are accumulated in the new cluster centroids locally. This reduces the total number of output key,val pairs of the map task from NumberOfDataPoints to K (number of clusters). A reduction in the number of output key,val pairs will lead to a significantly shorter shuffle time.
In the reduce phase, the centroid coordinates outputted from each Map Task will be further reduced.
Multiple MapReduce jobs are chained together to simulate the multiple iterations of the algorithm.
References
 Prof. Chris Jermaine’s summer Big Data Insititute Program
 http://home.deib.polimi.it/matteucc/Clustering/tutorial_html/kmeans.html

Parallel KMeans Clustering Based on MapReduce (http://link.springer.com/chapter/10.1007/9783642106651_71#page1). I am sure you can find a free version online too.
Hello yummingZhang,
I liked your post, could you explain please more about the optimization to the map phase or some reference that explains more this optimization?
Also, do you is mahout is doing this optimization?
Thank you
Hi,
Yes, essentially the optimization reduces the number of key,val pair output from the map task.
Assuming you stream through N key,val pairs (unclassified documents) for each map task, and cluster M documents. Without the optimization, you will have N output key,val pairs, but if you do the optimization there should be M output key,val pairs.
Yes, Mahout has an accumulator that accumulates the results too.
Hello Yunming,
Thanks a lot for replying, very nice from you.
Is that optimization hapenning in the cleanup() function or in the map() function, could you please mention which part does this optimization please?
By the way, I noticed that once the clusters converged, you will produce the clusteredpoints folder. I noticed that this one goes through clusterClassificationDriver executing another mapReduce job, only mapper function not reducer. Do you know what ClusterClassificationMapper.class does?
Thanks a lot !!
The optimization accumulates in the map phase, and output in the clean up phase.
Here is my code for what I meant, the new Clusters is the accumulator
public class KMeansMapper extends ParMapper {
//org.apache.hadoop.mapreduce.Delegation
// this is the list of current clusters
private static final ArrayList oldClusters = new ArrayList (0);
// this is the list of clusters that come out of the current iteration
private ArrayList newClusters = new ArrayList (0);
…….
}
In the map phase, you accumulate the result into the new clusters
In the reduce phase, you output the new clusters object
protected void cleanup (Context context) throws IOException, InterruptedException {
System.err.println(“memory usage in clean up method”);
printMemoryUsage();
long startt = System.currentTimeMillis();
// go through each cluster and send it to the reducer
for (VectorizedObject i : newClusters) {
// this deals with writing the current cluster to the reducer
Text key = new Text ();
Text value = new Text ();
key.set (i.getKey ());
value.set (i.writeOut ());
context.write (key, value);
}
long endt = System.currentTimeMillis();
double elapsedt = endt – startt;
System.out.println(“clean up took ” + elapsedt);
}
I am not sure about how the classification driver works.
Thank you so much Yunming!
Here is a very easy to read paper on KMeans for MapReduce.
The optimization I described is implemented as a combiner function in this paper, for holding (1) partial sums of the cluster vector (2) number of samples in each vector
“Parallel KMeans Clustering Based on MapReduce” by
Weizhong Zhao1,2, Huifang Ma1,2, and Qing He1
1 The Key Laboratory of Intelligent Information Processing, Institute of Computing Technology, Chinese Academy of Sciences
2 Graduate University of Chinese Academy of Sciences
You can search for it in google scholar. It has the pseudo code in there too.