diff --git a/data-stream-generator/src/main/scala/ingest/ConstantRatePublisher.scala b/data-stream-generator/src/main/scala/ingest/ConstantRatePublisher.scala index 824e5a3..6ae5208 100644 --- a/data-stream-generator/src/main/scala/ingest/ConstantRatePublisher.scala +++ b/data-stream-generator/src/main/scala/ingest/ConstantRatePublisher.scala @@ -60,7 +60,7 @@ class ConstantRatePublisher(sparkSession: SparkSession, kafkaProperties: Propert 0.to(9).foreach { microBatch => //SUPPOSED TO LAST 100 MS smallGroupsList.foreach { smallList => //SUPPOSED TO LAST 5 MS smallList.foreach { observation => - 0.to(Math.round(ConfigUtils.dataVolume.toInt/3.0).toInt).foreach { volumeIteration => + 1.to(ConfigUtils.dataVolume).foreach { volumeIteration => if (observation.message.contains("flow")) { flowStats.mark() val msg = new ProducerRecord[String, String]( diff --git a/data-stream-generator/src/main/scala/ingest/FaultyEventPublisher.scala b/data-stream-generator/src/main/scala/ingest/FaultyEventPublisher.scala index 5a28d05..b828763 100644 --- a/data-stream-generator/src/main/scala/ingest/FaultyEventPublisher.scala +++ b/data-stream-generator/src/main/scala/ingest/FaultyEventPublisher.scala @@ -64,7 +64,7 @@ class FaultyEventPublisher(sparkSession: SparkSession, kafkaProperties: Properti 0.to(9).foreach { microBatch => //SUPPOSED TO LAST 100 MS smallGroupsList.foreach { smallList => //SUPPOSED TO LAST 5 MS smallList.foreach { observation => - 0.to(Math.round(ConfigUtils.dataVolume.toInt / 3.0).toInt).foreach { volumeIteration => + 1.to(ConfigUtils.dataVolume).foreach { volumeIteration => if (sendError & index == 0 & ConfigUtils.publisherNb.toInt == 1) { // at minute 15 send an error val msg = new ProducerRecord[String, String]( ConfigUtils.flowTopic, diff --git a/data-stream-generator/src/main/scala/ingest/SingleBurstPublisher.scala b/data-stream-generator/src/main/scala/ingest/SingleBurstPublisher.scala index 62d6774..859815f 100644 --- a/data-stream-generator/src/main/scala/ingest/SingleBurstPublisher.scala +++ b/data-stream-generator/src/main/scala/ingest/SingleBurstPublisher.scala @@ -78,7 +78,7 @@ class SingleBurstPublisher(sparkSession: SparkSession, kafkaProperties: Properti 0.to(9).foreach { microBatch => //SUPPOSED TO LAST 100 MS smallGroupsList.foreach { smallList => //SUPPOSED TO LAST 5 MS smallList.foreach { observation: Observation => - 0.to(Math.round(volume/3.0).toInt).foreach { volumeIteration => + 1.to(volume).foreach { volumeIteration => if (observation.message.contains("flow")) { flowStats.mark() val msg = new ProducerRecord[String, String](