From e3166088a0a0dfae14fe82943014d551155bdfc9 Mon Sep 17 00:00:00 2001 From: jiafu zhang Date: Wed, 14 Apr 2021 16:33:10 +0800 Subject: [PATCH 1/3] Use memory data instead of data reading from hadoop Signed-off-by: jiafu zhang --- .../sparkbench/micro/ScalaRepartition.scala | 43 +++++++++++++------ 1 file changed, 30 insertions(+), 13 deletions(-) diff --git a/sparkbench/micro/src/main/scala/com/intel/sparkbench/micro/ScalaRepartition.scala b/sparkbench/micro/src/main/scala/com/intel/sparkbench/micro/ScalaRepartition.scala index f8efc6966..b159859b0 100644 --- a/sparkbench/micro/src/main/scala/com/intel/sparkbench/micro/ScalaRepartition.scala +++ b/sparkbench/micro/src/main/scala/com/intel/sparkbench/micro/ScalaRepartition.scala @@ -33,26 +33,27 @@ object ScalaRepartition { def main(args: Array[String]) { if (args.length != 4) { System.err.println( - s"Usage: $ScalaRepartition " + s"Usage: $ScalaRepartition " ) System.exit(1) } val cache = toBoolean(args(2), ("CACHE_IN_MEMORY")) + if (!cache) { + throw new IllegalArgumentException("CACHE_IN_MEMORY should be set to true") + } + + val mapSize = toLong(args(0), ("MAP_SIZE")) + MockedData.dataSize = mapSize + val disableOutput = toBoolean(args(3), ("DISABLE_OUTPUT")) val sparkConf = new SparkConf().setAppName("ScalaRepartition") val sc = new SparkContext(sparkConf) - val data = sc.newAPIHadoopFile[Text, Text, TeraInputFormat](args(0)).map { - case (k,v) => k.copyBytes ++ v.copyBytes - } + val mapParallelism = sc.getConf.getInt("spark.default.parallelism", sc.defaultParallelism) - if (cache) { - data.persist(StorageLevel.MEMORY_ONLY) - data.count() - } + val data = sc.parallelize(Range(0, mapParallelism - 1)) - val mapParallelism = sc.getConf.getInt("spark.default.parallelism", sc.defaultParallelism) val reduceParallelism = IOCommon.getProperty("hibench.default.shuffle.parallelism") .getOrElse((mapParallelism / 2).toString).toInt @@ -79,16 +80,27 @@ object ScalaRepartition { } } + private def toLong(str: String, parameterName: String): Long = { + try { + str.toLong + } catch { + case e: IllegalArgumentException => + throw new IllegalArgumentException( + s"Unrecognizable parameter ${parameterName}: ${str}, should be integer") + } + } + // Save a CoalescedRDD than RDD.repartition API - private def repartition(previous: RDD[Array[Byte]], numReducers: Int): ShuffledRDD[Int, Array[Byte], Array[Byte]] = { + private def repartition(previous: RDD[Int], numReducers: Int): + ShuffledRDD[Int, Array[Byte], Array[Byte]] = { /** Distributes elements evenly across output partitions, starting from a random partition. */ - val distributePartition = (index: Int, items: Iterator[Array[Byte]]) => { + val distributePartition = (index: Int, items: Iterator[Int]) => { var position = new Random(hashing.byteswap32(index)).nextInt(numReducers) - items.map { t => + items.map { i => // Note that the hash code of the key will just be the key itself. The HashPartitioner // will mod it with the number of total partitions. position = position + 1 - (position, t) + (position, MockedData.data) } } : Iterator[(Int, Array[Byte])] @@ -97,4 +109,9 @@ object ScalaRepartition { new HashPartitioner(numReducers)) } + object MockedData { + var dataSize: Long = _ + lazy val data: Array[Byte] = Seq(0, dataSize).map(i => i.toByte).toArray + } + } From 880640d11310682ed5eb4e4be6e09a367311921e Mon Sep 17 00:00:00 2001 From: jiafu zhang Date: Mon, 30 Aug 2021 01:21:20 +0000 Subject: [PATCH 2/3] Failed to prepare sql/aggregation due to code error #682 Signed-off-by: jiafu zhang --- autogen/src/main/java/HiBench/Visit.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/autogen/src/main/java/HiBench/Visit.java b/autogen/src/main/java/HiBench/Visit.java index cd5fc2692..0492aac80 100644 --- a/autogen/src/main/java/HiBench/Visit.java +++ b/autogen/src/main/java/HiBench/Visit.java @@ -44,7 +44,7 @@ public class Visit { // examples of user agents BufferedReader br = - new BufferedReader(new FileReader(cachePath.toString())); + new BufferedReader(new FileReader(cachePath.toUri().getPath())); StringBuffer all = new StringBuffer(); String line = null; @@ -57,7 +57,7 @@ public class Visit { } else if (cachePath.getName().contains(HiveData.countryf)) { // examples of country codes BufferedReader br = - new BufferedReader(new FileReader(cachePath.toString())); + new BufferedReader(new FileReader(cachePath.toUri().getPath())); StringBuffer all = new StringBuffer(); String line = null; @@ -70,7 +70,7 @@ public class Visit { } else if (cachePath.getName().contains(HiveData.searchkeyf)) { // examples of search keys BufferedReader br = - new BufferedReader(new FileReader(cachePath.toString())); + new BufferedReader(new FileReader(cachePath.toUri().getPath())); StringBuffer all = new StringBuffer(); String line = null; From 50af25f9afbc53b555a41e888124f8127d52bbc7 Mon Sep 17 00:00:00 2001 From: jiafu zhang Date: Mon, 30 Aug 2021 01:30:26 +0000 Subject: [PATCH 3/3] Revert "Use memory data instead of data reading from hadoop" This reverts commit e3166088a0a0dfae14fe82943014d551155bdfc9. --- .../sparkbench/micro/ScalaRepartition.scala | 43 ++++++------------- 1 file changed, 13 insertions(+), 30 deletions(-) diff --git a/sparkbench/micro/src/main/scala/com/intel/sparkbench/micro/ScalaRepartition.scala b/sparkbench/micro/src/main/scala/com/intel/sparkbench/micro/ScalaRepartition.scala index b159859b0..f8efc6966 100644 --- a/sparkbench/micro/src/main/scala/com/intel/sparkbench/micro/ScalaRepartition.scala +++ b/sparkbench/micro/src/main/scala/com/intel/sparkbench/micro/ScalaRepartition.scala @@ -33,27 +33,26 @@ object ScalaRepartition { def main(args: Array[String]) { if (args.length != 4) { System.err.println( - s"Usage: $ScalaRepartition " + s"Usage: $ScalaRepartition " ) System.exit(1) } val cache = toBoolean(args(2), ("CACHE_IN_MEMORY")) - if (!cache) { - throw new IllegalArgumentException("CACHE_IN_MEMORY should be set to true") - } - - val mapSize = toLong(args(0), ("MAP_SIZE")) - MockedData.dataSize = mapSize - val disableOutput = toBoolean(args(3), ("DISABLE_OUTPUT")) val sparkConf = new SparkConf().setAppName("ScalaRepartition") val sc = new SparkContext(sparkConf) - val mapParallelism = sc.getConf.getInt("spark.default.parallelism", sc.defaultParallelism) + val data = sc.newAPIHadoopFile[Text, Text, TeraInputFormat](args(0)).map { + case (k,v) => k.copyBytes ++ v.copyBytes + } - val data = sc.parallelize(Range(0, mapParallelism - 1)) + if (cache) { + data.persist(StorageLevel.MEMORY_ONLY) + data.count() + } + val mapParallelism = sc.getConf.getInt("spark.default.parallelism", sc.defaultParallelism) val reduceParallelism = IOCommon.getProperty("hibench.default.shuffle.parallelism") .getOrElse((mapParallelism / 2).toString).toInt @@ -80,27 +79,16 @@ object ScalaRepartition { } } - private def toLong(str: String, parameterName: String): Long = { - try { - str.toLong - } catch { - case e: IllegalArgumentException => - throw new IllegalArgumentException( - s"Unrecognizable parameter ${parameterName}: ${str}, should be integer") - } - } - // Save a CoalescedRDD than RDD.repartition API - private def repartition(previous: RDD[Int], numReducers: Int): - ShuffledRDD[Int, Array[Byte], Array[Byte]] = { + private def repartition(previous: RDD[Array[Byte]], numReducers: Int): ShuffledRDD[Int, Array[Byte], Array[Byte]] = { /** Distributes elements evenly across output partitions, starting from a random partition. */ - val distributePartition = (index: Int, items: Iterator[Int]) => { + val distributePartition = (index: Int, items: Iterator[Array[Byte]]) => { var position = new Random(hashing.byteswap32(index)).nextInt(numReducers) - items.map { i => + items.map { t => // Note that the hash code of the key will just be the key itself. The HashPartitioner // will mod it with the number of total partitions. position = position + 1 - (position, MockedData.data) + (position, t) } } : Iterator[(Int, Array[Byte])] @@ -109,9 +97,4 @@ object ScalaRepartition { new HashPartitioner(numReducers)) } - object MockedData { - var dataSize: Long = _ - lazy val data: Array[Byte] = Seq(0, dataSize).map(i => i.toByte).toArray - } - }