- 
                Notifications
    You must be signed in to change notification settings 
- Fork 773
Apply initial centroids on Spark Kmeans workload. #187
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
… hold the same convergence condition on both MapReduce and Spark benchmark.
| cc @hqzizania | 
| .setK(k) | ||
| .setMaxIterations(iterations) | ||
| .setRuns(runs) | ||
| .setInitialModel(initModel) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use KMeans.RANDOM? What is the difference?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KMeans.RANDOM is from Spark MLLib and KMeans.setInitialModel shares the initial centroids generated by HiBench GenKMeansDataset. To get a meaningful comparison, we have to pick one of approaches and apply it on both of MapReduce and Spark benchmarks. In this PR, we select the latter approach, HiBench GenKMeansDataset, to generate the random centroids based-on normal (Gaussian) distribution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ye, the random centroids would be generated by takeSample without a normal distribution in Spark Kmeans even if KMeans.RANDOM is used. If getting a meaningful comparison between MapReduce and Spark benchmarks is the core value of HiBench, I agree with your approach, it looks like not enough for the performance of Spark K-Means algorithm implementation by itself though. @carsonwang
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if k != number of centroids in initiModel, Spark KMeans will throw exception:
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: mismatched cluster count
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hqzizania, this should be an expected behavior. If Spark KMeans uses HiBench generated initial model, the parameter k must has the matched value as defined in the initiModel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pfxuan , please see 10-data-scale-profile.conf. The num_of_clusters used to generate the input data and the k might be different. This is one of the concerns for using the input centroids as the initial model. The MR version is currently using the input centroids and doesn't pass the k parameter. We'd prefer to using the k parameter as this is the expected number of clusters.
As @hqzizania mentioned, the problem in Java Kmeans is the RRD is not cached. Can you please have a check with the scala version which supports both KMeas || and Random. It seems there is no huge computation cost when initializing the mode as the RDD is cached? If this is the case, we can fix the Java version and also pass -k to the MR version. This should make all of them comparable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did the performance test on scala version Kmeans. The size of input data is about 100 GB across 4 nodes. The running time on Random and Parallel is almost same, which took about 4 mins for running 3 iterations including 1.3 mins on centroid initialization. So there is about 48.1% overhead when using either the Spark version of Random or Parallel.  As a comparison, the implementation of this PR only took about 2.4 mins for 3 iterations with almost zero-overhead on initialization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In addition, Mahout version of random initialization is a sequential rather than MapReduce-based implementation. I passed -k 20 parameter to the MR benchmark, and it took 18.8 mins to generate 20 initial centroids using only one CPU core. To make a reasonable comparison, I think it would be better to keep the original HiBench generator for all Kmeans benchmarks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @pfxuan , I just got chance to run KMean with Random initialization by passing --initMode Random in run.sh. In my test, the random initialization ran much faster and I saw much less stages. The Parallel initialization is slow as there are many stages like collect and aggregate which run several iterations. But these stages are not expected when using Random initialization. Can you take a look at the Spark UI to see if there is any difference?
| @carsonwang, @hqzizania any update on this PR? Thanks! | 
| @hqzizania are you suggesting to merge this PR? | 
| This is a good way to get a meaningful comparison between MapReduce and Spark. But unfortunately it will not use "k-means||", a specific and excellent initialization parallel algorithm in Spark KMeans, which is very time consuming but can get better initial centroids than "randomly choosing" to speed up the convergence. Thus, the benchmark results can't completely reflect  the performance of spark KMeans, since the important feature "k-means||" is not used. | 
| I think it is easier to update the KMeans MR version to make them comparable. Simply pass a  | 
| Is HiBench a performance benchmark suite for big data execution engines or for big data applications? I probably misunderstood the design goal of HiBench benchmark suite, and please correct me if i'm wrong. I was thinking HiBench is a performance benchmark tool for characterizing and measuring the efficiency of big data execution engines/runners (MapReduce/Spark/Flink/Storm). Thus, to get a fair enough comparison, it's better to apply a similar workload with an exactly same initial condition (e.g. randomly chosen centroids) over different engines/runners, which eventually requires a matched algorithm with a very close implementation for each of those engines/runners. Otherwise, the comparison of the use of two different workloads is something like the comparison of apples and oranges. The comparison mentioned in the previous discussion looks more like an algorithm and implementation oriented benchmark, which suggests that we should use a fixed convergence condition rather than a fixed algorithm, implementation and initial conditions to benchmark each of those engines/runners. It makes sense to get a highly efficient implementation for our production system by using such measurement rule. But this might lead an imprecise evaluation on big data execution engines/runners, and makes HiBench become a benchmark suite for the big data applications. Any thoughts and comments will be much appreciated? BIG THANKS FOR ALL OF U! | 
| OK, thanks for the above explanation on Hibench design goal. As @carsonwang suggested, choosing random centroids is a more elegant way? | 
| @hqzizania, if we use application built-in generator, it'll result two different random sets on centroids and thus provides the same input sample with two different initial conditions on MapReduce and Spark benchmarks. If this difference can be safely ignored comparing with the overall Kmeans computation cost, the computation cost on generating random centroids may or may not be ignored. I did a quick check. It looks like Mahout and Spark MLLib use a slight different implmenetation on generating random centroids. Would you be able to confirm if its computation cost, especially the cost used on I/O part, is equivalent? Because I know the Spark  Thank you so much! | 
| @pfxuan  ooops, a key problem is that the RDD is not cached in Java code. I suggest it should be cached before transformed into KMeans like scala version, since Spark is a memory-based computing engine. Even if a very very large RDD cannot be cached, Spark only use RDD  In other hand, we can support both  | 
76a5a5f    to
    cb633ac      
    Compare
  
    
This PR makes HiBench hold the same convergence condition on both MapReduce and Spark benchmarks. Otherwise, Spark will use scalable K-means++ as default implementation, which results a huge computational cost and make Spark Kmeans performance totally incomparable with MapReduce. e.g. the existing Spark Kmeans performance is actually lower than MapReduce benchmark with a same problem size. As a comparison, the improved version enable Spark to achieve >4 speedups using 2 iterations.