From 95a10493dc5717404eb64e0347e07e1ba0de5cfc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Vonheiden?= Date: Mon, 22 Nov 2021 23:22:20 +0100 Subject: [PATCH 1/2] Let the load generators scale linear The load generator divided the data volume by three and rounded the result. Thus, e.g., for load 2, 3, and 4 the same number of iterations are performed and result in the same produced data rate. Now the data volume is taken and not divided. Further, the data volume starts at one to produce data to have a ratio scale and data volume of 0 is the absolute zero. --- .../src/main/scala/ingest/ConstantRatePublisher.scala | 2 +- .../src/main/scala/ingest/FaultyEventPublisher.scala | 2 +- .../src/main/scala/ingest/SingleBurstPublisher.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/data-stream-generator/src/main/scala/ingest/ConstantRatePublisher.scala b/data-stream-generator/src/main/scala/ingest/ConstantRatePublisher.scala index 824e5a3..6ae5208 100644 --- a/data-stream-generator/src/main/scala/ingest/ConstantRatePublisher.scala +++ b/data-stream-generator/src/main/scala/ingest/ConstantRatePublisher.scala @@ -60,7 +60,7 @@ class ConstantRatePublisher(sparkSession: SparkSession, kafkaProperties: Propert 0.to(9).foreach { microBatch => //SUPPOSED TO LAST 100 MS smallGroupsList.foreach { smallList => //SUPPOSED TO LAST 5 MS smallList.foreach { observation => - 0.to(Math.round(ConfigUtils.dataVolume.toInt/3.0).toInt).foreach { volumeIteration => + 1.to(ConfigUtils.dataVolume).foreach { volumeIteration => if (observation.message.contains("flow")) { flowStats.mark() val msg = new ProducerRecord[String, String]( diff --git a/data-stream-generator/src/main/scala/ingest/FaultyEventPublisher.scala b/data-stream-generator/src/main/scala/ingest/FaultyEventPublisher.scala index 5a28d05..b828763 100644 --- a/data-stream-generator/src/main/scala/ingest/FaultyEventPublisher.scala +++ b/data-stream-generator/src/main/scala/ingest/FaultyEventPublisher.scala @@ -64,7 +64,7 @@ class FaultyEventPublisher(sparkSession: SparkSession, kafkaProperties: Properti 0.to(9).foreach { microBatch => //SUPPOSED TO LAST 100 MS smallGroupsList.foreach { smallList => //SUPPOSED TO LAST 5 MS smallList.foreach { observation => - 0.to(Math.round(ConfigUtils.dataVolume.toInt / 3.0).toInt).foreach { volumeIteration => + 1.to(ConfigUtils.dataVolume).foreach { volumeIteration => if (sendError & index == 0 & ConfigUtils.publisherNb.toInt == 1) { // at minute 15 send an error val msg = new ProducerRecord[String, String]( ConfigUtils.flowTopic, diff --git a/data-stream-generator/src/main/scala/ingest/SingleBurstPublisher.scala b/data-stream-generator/src/main/scala/ingest/SingleBurstPublisher.scala index 62d6774..837d33b 100644 --- a/data-stream-generator/src/main/scala/ingest/SingleBurstPublisher.scala +++ b/data-stream-generator/src/main/scala/ingest/SingleBurstPublisher.scala @@ -78,7 +78,7 @@ class SingleBurstPublisher(sparkSession: SparkSession, kafkaProperties: Properti 0.to(9).foreach { microBatch => //SUPPOSED TO LAST 100 MS smallGroupsList.foreach { smallList => //SUPPOSED TO LAST 5 MS smallList.foreach { observation: Observation => - 0.to(Math.round(volume/3.0).toInt).foreach { volumeIteration => + 1.to(ConfigUtils.dataVolume).foreach { volumeIteration => if (observation.message.contains("flow")) { flowStats.mark() val msg = new ProducerRecord[String, String]( From e4257be75172a1efcb4933e258bfa4faec076161 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Vonheiden?= Date: Tue, 23 Nov 2021 08:51:13 +0100 Subject: [PATCH 2/2] use in SingleBurstPublisher the volume from method --- .../src/main/scala/ingest/SingleBurstPublisher.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-stream-generator/src/main/scala/ingest/SingleBurstPublisher.scala b/data-stream-generator/src/main/scala/ingest/SingleBurstPublisher.scala index 837d33b..859815f 100644 --- a/data-stream-generator/src/main/scala/ingest/SingleBurstPublisher.scala +++ b/data-stream-generator/src/main/scala/ingest/SingleBurstPublisher.scala @@ -78,7 +78,7 @@ class SingleBurstPublisher(sparkSession: SparkSession, kafkaProperties: Properti 0.to(9).foreach { microBatch => //SUPPOSED TO LAST 100 MS smallGroupsList.foreach { smallList => //SUPPOSED TO LAST 5 MS smallList.foreach { observation: Observation => - 1.to(ConfigUtils.dataVolume).foreach { volumeIteration => + 1.to(volume).foreach { volumeIteration => if (observation.message.contains("flow")) { flowStats.mark() val msg = new ProducerRecord[String, String](