diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs new file mode 100644 index 00000000..4be57c5b --- /dev/null +++ b/.git-blame-ignore-revs @@ -0,0 +1,2 @@ +# Scala Steward: Reformat with scalafmt 3.8.6 +8c760d2bb03d68d0179748429ec00f92f014108c diff --git a/.scalafmt.conf b/.scalafmt.conf index 0bef4732..d082f60e 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -1,4 +1,4 @@ -version = 3.8.2 +version = 3.8.6 runner.dialect = scala213 preset = default align.preset = more diff --git a/backup-s3/src/main/scala/io/aiven/guardian/kafka/backup/s3/BackupClient.scala b/backup-s3/src/main/scala/io/aiven/guardian/kafka/backup/s3/BackupClient.scala index a592b16d..ac47547a 100644 --- a/backup-s3/src/main/scala/io/aiven/guardian/kafka/backup/s3/BackupClient.scala +++ b/backup-s3/src/main/scala/io/aiven/guardian/kafka/backup/s3/BackupClient.scala @@ -213,33 +213,33 @@ class BackupClient[T <: KafkaConsumerInterface](maybeS3Settings: Option[S3Settin for { exists <- checkObjectExists(previousState.previousKey) } yield - // The backupToStorageTerminateSink gets called in response to finding in progress multipart uploads. If an S3 object exists - // the same key that means that in fact the upload has already been completed so in this case lets not do anything - if (exists) { - logger.debug( - s"Previous upload with uploadId: ${previousState.stateDetails.state.uploadId} and key: ${previousState.previousKey} doesn't actually exist, skipping terminating" - ) - Sink.ignore - } else { - logger.info( - s"Terminating and completing previous backup with key: ${previousState.previousKey} and uploadId: ${previousState.stateDetails.state.uploadId}" - ) - val sink = S3 - .resumeMultipartUploadWithHeaders( - s3Config.dataBucket, - previousState.previousKey, - previousState.stateDetails.state.uploadId, - previousState.stateDetails.state.parts, - s3Headers = s3Headers, - chunkingParallelism = 1 + // The backupToStorageTerminateSink gets called in response to finding in progress multipart uploads. If an S3 object exists + // the same key that means that in fact the upload has already been completed so in this case lets not do anything + if (exists) { + logger.debug( + s"Previous upload with uploadId: ${previousState.stateDetails.state.uploadId} and key: ${previousState.previousKey} doesn't actually exist, skipping terminating" ) - - val base = - sink.mapMaterializedValue(future => future.map(result => Some(result))(ExecutionContext.parasitic)) - - maybeS3Settings - .fold(base)(s3Settings => base.withAttributes(S3Attributes.settings(s3Settings))) - } + Sink.ignore + } else { + logger.info( + s"Terminating and completing previous backup with key: ${previousState.previousKey} and uploadId: ${previousState.stateDetails.state.uploadId}" + ) + val sink = S3 + .resumeMultipartUploadWithHeaders( + s3Config.dataBucket, + previousState.previousKey, + previousState.stateDetails.state.uploadId, + previousState.stateDetails.state.parts, + s3Headers = s3Headers, + chunkingParallelism = 1 + ) + + val base = + sink.mapMaterializedValue(future => future.map(result => Some(result))(ExecutionContext.parasitic)) + + maybeS3Settings + .fold(base)(s3Settings => base.withAttributes(S3Attributes.settings(s3Settings))) + } } } diff --git a/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/MockedKafkaConsumerInterface.scala b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/MockedKafkaConsumerInterface.scala index 978f5063..cfcae1f5 100644 --- a/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/MockedKafkaConsumerInterface.scala +++ b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/MockedKafkaConsumerInterface.scala @@ -76,9 +76,8 @@ class MockedKafkaConsumerInterface(kafkaData: Source[ReducedConsumerRecord, NotU val finalSource = if (handleOffsets) { source.filter { reducedConsumerRecord => - (commitStorage.isEmpty || { - reducedConsumerRecord.offset > commitStorage.getLast - }) && { + (commitStorage.isEmpty || + reducedConsumerRecord.offset > commitStorage.getLast) && { (stopAfterDuration, Option(firstReducedConsumerRecord.get())) match { case (Some(afterDuration), Some(firstRecord)) => val difference =