From cd4434c3768eef3f6a91c3bfc04b052793f55152 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Wed, 3 Apr 2024 21:50:23 +0300 Subject: [PATCH 1/3] chore: refactor RSM copyLogSegment RSM upload steps are messy and hard to read, mainly because of the encryption metadata. By moving the instantiation to the data key and aad, we can separate the upload stages clearly. --- .../RemoteStorageManagerTest.java | 31 ++- .../tieredstorage/RemoteStorageManager.java | 202 +++++++++++------- .../manifest/SegmentEncryptionMetadataV1.java | 6 + 3 files changed, 149 insertions(+), 90 deletions(-) diff --git a/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java b/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java index 5e4da2d6d..a64d9ffd0 100644 --- a/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java +++ b/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java @@ -57,7 +57,6 @@ import io.aiven.kafka.tieredstorage.fetch.KeyNotFoundRuntimeException; import io.aiven.kafka.tieredstorage.fetch.cache.DiskChunkCache; import io.aiven.kafka.tieredstorage.fetch.cache.MemoryChunkCache; -import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadataV1; import io.aiven.kafka.tieredstorage.manifest.SegmentIndexesV1Builder; import io.aiven.kafka.tieredstorage.manifest.index.ChunkIndex; import io.aiven.kafka.tieredstorage.manifest.serde.EncryptionSerdeModule; @@ -480,16 +479,15 @@ void testTransformingIndexes(final boolean encryption) { "storage.root", targetDir.toString(), "encryption.enabled", Boolean.toString(encryption) )); - final SegmentEncryptionMetadataV1 encryptionMetadata; + final DataKeyAndAAD maybeEncryptionKey; if (encryption) { config.put("encryption.key.pair.id", KEY_ENCRYPTION_KEY_ID); config.put("encryption.key.pairs", KEY_ENCRYPTION_KEY_ID); config.put("encryption.key.pairs." + KEY_ENCRYPTION_KEY_ID + ".public.key.file", publicKeyPem.toString()); config.put("encryption.key.pairs." + KEY_ENCRYPTION_KEY_ID + ".private.key.file", privateKeyPem.toString()); - final var dataKeyAndAAD = aesEncryptionProvider.createDataKeyAndAAD(); - encryptionMetadata = new SegmentEncryptionMetadataV1(dataKeyAndAAD.dataKey, dataKeyAndAAD.aad); + maybeEncryptionKey = aesEncryptionProvider.createDataKeyAndAAD(); } else { - encryptionMetadata = null; + maybeEncryptionKey = null; } rsm.configure(config); @@ -499,7 +497,7 @@ void testTransformingIndexes(final boolean encryption) { IndexType.OFFSET, new ByteArrayInputStream(bytes), bytes.length, - encryptionMetadata, + maybeEncryptionKey, segmentIndexBuilder ); assertThat(is).isNotEmpty(); @@ -511,21 +509,21 @@ void testTransformingIndexes(final boolean encryption) { IndexType.TIMESTAMP, new ByteArrayInputStream(bytes), bytes.length, - encryptionMetadata, + maybeEncryptionKey, segmentIndexBuilder ); rsm.transformIndex( IndexType.LEADER_EPOCH, new ByteArrayInputStream(bytes), bytes.length, - encryptionMetadata, + maybeEncryptionKey, segmentIndexBuilder ); rsm.transformIndex( IndexType.PRODUCER_SNAPSHOT, new ByteArrayInputStream(bytes), bytes.length, - encryptionMetadata, + maybeEncryptionKey, segmentIndexBuilder ); final var index = segmentIndexBuilder.build(); @@ -545,14 +543,15 @@ void testTransformingEmptyIndexes(final boolean encryption) { "storage.root", targetDir.toString(), "encryption.enabled", Boolean.toString(encryption) )); - SegmentEncryptionMetadataV1 encryptionMetadata = null; + final DataKeyAndAAD maybeEncryptionKey; if (encryption) { config.put("encryption.key.pair.id", KEY_ENCRYPTION_KEY_ID); config.put("encryption.key.pairs", KEY_ENCRYPTION_KEY_ID); config.put("encryption.key.pairs." + KEY_ENCRYPTION_KEY_ID + ".public.key.file", publicKeyPem.toString()); config.put("encryption.key.pairs." + KEY_ENCRYPTION_KEY_ID + ".private.key.file", privateKeyPem.toString()); - final var dataKeyAndAAD = aesEncryptionProvider.createDataKeyAndAAD(); - encryptionMetadata = new SegmentEncryptionMetadataV1(dataKeyAndAAD.dataKey, dataKeyAndAAD.aad); + maybeEncryptionKey = aesEncryptionProvider.createDataKeyAndAAD(); + } else { + maybeEncryptionKey = null; } rsm.configure(config); @@ -561,7 +560,7 @@ void testTransformingEmptyIndexes(final boolean encryption) { IndexType.OFFSET, InputStream.nullInputStream(), 0, - encryptionMetadata, + maybeEncryptionKey, segmentIndexBuilder ); assertThat(is).isEmpty(); @@ -572,21 +571,21 @@ void testTransformingEmptyIndexes(final boolean encryption) { IndexType.TIMESTAMP, InputStream.nullInputStream(), 0, - encryptionMetadata, + maybeEncryptionKey, segmentIndexBuilder ); rsm.transformIndex( IndexType.LEADER_EPOCH, InputStream.nullInputStream(), 0, - encryptionMetadata, + maybeEncryptionKey, segmentIndexBuilder ); rsm.transformIndex( IndexType.PRODUCER_SNAPSHOT, InputStream.nullInputStream(), 0, - encryptionMetadata, + maybeEncryptionKey, segmentIndexBuilder ); final var index = segmentIndexBuilder.build(); diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java index b96484044..9098e1f98 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java @@ -223,35 +223,39 @@ public Optional copyLogSegmentData(final RemoteLogSegmentMetadat final long startedMs = time.milliseconds(); try { - SegmentEncryptionMetadataV1 encryptionMetadata = null; final boolean requiresCompression = requiresCompression(logSegmentData); - - final ChunkIndex chunkIndex; - try (final InputStream logSegmentInputStream = Files.newInputStream(logSegmentData.logSegment())) { - TransformChunkEnumeration transformEnum = new BaseTransformChunkEnumeration( - logSegmentInputStream, chunkSize); - if (requiresCompression) { - transformEnum = new CompressionChunkEnumeration(transformEnum); - } - if (encryptionEnabled) { - final DataKeyAndAAD dataKeyAndAAD = aesEncryptionProvider.createDataKeyAndAAD(); - transformEnum = new EncryptionChunkEnumeration( - transformEnum, - () -> aesEncryptionProvider.encryptionCipher(dataKeyAndAAD)); - encryptionMetadata = new SegmentEncryptionMetadataV1(dataKeyAndAAD.dataKey, dataKeyAndAAD.aad); - } - final TransformFinisher transformFinisher = - new TransformFinisher(transformEnum, remoteLogSegmentMetadata.segmentSizeInBytes()); - uploadSegmentLog(remoteLogSegmentMetadata, transformFinisher, customMetadataBuilder); - chunkIndex = transformFinisher.chunkIndex(); + final DataKeyAndAAD maybeEncryptionKey; + if (encryptionEnabled) { + maybeEncryptionKey = aesEncryptionProvider.createDataKeyAndAAD(); + } else { + maybeEncryptionKey = null; } + // upload segment + final ChunkIndex chunkIndex = uploadSegmentLog( + remoteLogSegmentMetadata, + logSegmentData, + requiresCompression, + maybeEncryptionKey, + customMetadataBuilder + ); + + // upload indexes final SegmentIndexesV1 segmentIndexes = uploadIndexes( - remoteLogSegmentMetadata, logSegmentData, encryptionMetadata, customMetadataBuilder); - final SegmentManifest segmentManifest = new SegmentManifestV1( - chunkIndex, segmentIndexes, requiresCompression, encryptionMetadata, remoteLogSegmentMetadata); - uploadManifest(remoteLogSegmentMetadata, segmentManifest, customMetadataBuilder); + remoteLogSegmentMetadata, + logSegmentData, + maybeEncryptionKey, + customMetadataBuilder + ); + // upload manifest + uploadManifest( + remoteLogSegmentMetadata, + chunkIndex, + segmentIndexes, + requiresCompression, + maybeEncryptionKey, + customMetadataBuilder); } catch (final Exception e) { throw new RemoteStorageException(e); } @@ -267,10 +271,81 @@ public Optional copyLogSegmentData(final RemoteLogSegmentMetadat return customMetadata; } + boolean requiresCompression(final LogSegmentData logSegmentData) { + boolean requiresCompression = false; + if (compressionEnabled) { + if (compressionHeuristic) { + try { + final File segmentFile = logSegmentData.logSegment().toFile(); + final boolean alreadyCompressed = SegmentCompressionChecker.check(segmentFile); + requiresCompression = !alreadyCompressed; + } catch (final InvalidRecordBatchException e) { + // Log and leave value as false to upload uncompressed. + log.warn("Failed to check compression on log segment: {}", logSegmentData.logSegment(), e); + } + } else { + requiresCompression = true; + } + } + return requiresCompression; + } + + ChunkIndex uploadSegmentLog( + final RemoteLogSegmentMetadata remoteLogSegmentMetadata, + final LogSegmentData logSegmentData, + final boolean requiresCompression, + final DataKeyAndAAD maybeEncryptionKey, + final SegmentCustomMetadataBuilder customMetadataBuilder + ) throws IOException, StorageBackendException { + final var fileKey = objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.LOG); + + try (final var logSegmentInputStream = Files.newInputStream(logSegmentData.logSegment())) { + final var transformEnum = transformation(logSegmentInputStream, requiresCompression, maybeEncryptionKey); + final var transformFinisher = new TransformFinisher( + transformEnum, + remoteLogSegmentMetadata.segmentSizeInBytes() + ); + + try (final var sis = transformFinisher.toInputStream()) { + final var bytes = uploader.upload(sis, fileKey); + metrics.recordObjectUpload( + remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(), + ObjectKeyFactory.Suffix.LOG, + bytes + ); + customMetadataBuilder.addUploadResult(ObjectKeyFactory.Suffix.LOG, bytes); + + log.debug("Uploaded segment log for {}, size: {}", remoteLogSegmentMetadata, bytes); + } + return transformFinisher.chunkIndex(); + } + } + + private TransformChunkEnumeration transformation( + final InputStream logSegmentInputStream, + final boolean requiresCompression, + final DataKeyAndAAD maybeEncryptionKey + ) { + TransformChunkEnumeration transformEnum = new BaseTransformChunkEnumeration( + logSegmentInputStream, + chunkSize + ); + if (requiresCompression) { + transformEnum = new CompressionChunkEnumeration(transformEnum); + } + if (encryptionEnabled) { + transformEnum = new EncryptionChunkEnumeration( + transformEnum, + () -> aesEncryptionProvider.encryptionCipher(maybeEncryptionKey) + ); + } + return transformEnum; + } + private SegmentIndexesV1 uploadIndexes( final RemoteLogSegmentMetadata remoteLogSegmentMetadata, final LogSegmentData segmentData, - final SegmentEncryptionMetadataV1 encryptionMeta, + final DataKeyAndAAD maybeEncryptionKey, final SegmentCustomMetadataBuilder customMetadataBuilder ) throws IOException, RemoteStorageException, StorageBackendException { final List indexes = new ArrayList<>(IndexType.values().length); @@ -281,7 +356,7 @@ private SegmentIndexesV1 uploadIndexes( IndexType.OFFSET, closableInputStreamHolder.add(Files.newInputStream(segmentData.offsetIndex())), indexSize(segmentData.offsetIndex()), - encryptionMeta, + maybeEncryptionKey, segmentIndexBuilder ); indexes.add(offsetIndex); @@ -289,7 +364,7 @@ private SegmentIndexesV1 uploadIndexes( IndexType.TIMESTAMP, closableInputStreamHolder.add(Files.newInputStream(segmentData.timeIndex())), indexSize(segmentData.timeIndex()), - encryptionMeta, + maybeEncryptionKey, segmentIndexBuilder ); indexes.add(timeIndex); @@ -297,7 +372,7 @@ private SegmentIndexesV1 uploadIndexes( IndexType.PRODUCER_SNAPSHOT, closableInputStreamHolder.add(Files.newInputStream(segmentData.producerSnapshotIndex())), indexSize(segmentData.producerSnapshotIndex()), - encryptionMeta, + maybeEncryptionKey, segmentIndexBuilder ); indexes.add(producerSnapshotIndex); @@ -305,7 +380,7 @@ private SegmentIndexesV1 uploadIndexes( IndexType.LEADER_EPOCH, closableInputStreamHolder.add(new ByteBufferInputStream(segmentData.leaderEpochIndex())), segmentData.leaderEpochIndex().remaining(), - encryptionMeta, + maybeEncryptionKey, segmentIndexBuilder ); indexes.add(leaderEpoch); @@ -314,7 +389,7 @@ private SegmentIndexesV1 uploadIndexes( IndexType.TRANSACTION, closableInputStreamHolder.add(Files.newInputStream(segmentData.transactionIndex().get())), indexSize(segmentData.transactionIndex().get()), - encryptionMeta, + maybeEncryptionKey, segmentIndexBuilder ); indexes.add(transactionIndex); @@ -361,56 +436,19 @@ private Optional buildCustomMetadata(final SegmentCustomMetadata } } - boolean requiresCompression(final LogSegmentData logSegmentData) { - boolean requiresCompression = false; - if (compressionEnabled) { - if (compressionHeuristic) { - try { - final File segmentFile = logSegmentData.logSegment().toFile(); - final boolean alreadyCompressed = SegmentCompressionChecker.check(segmentFile); - requiresCompression = !alreadyCompressed; - } catch (final InvalidRecordBatchException e) { - // Log and leave value as false to upload uncompressed. - log.warn("Failed to check compression on log segment: {}", logSegmentData.logSegment(), e); - } - } else { - requiresCompression = true; - } - } - return requiresCompression; - } - - private void uploadSegmentLog(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, - final TransformFinisher transformFinisher, - final SegmentCustomMetadataBuilder customMetadataBuilder) - throws IOException, StorageBackendException { - final ObjectKey fileKey = objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.LOG); - try (final var sis = transformFinisher.toInputStream()) { - final var bytes = uploader.upload(sis, fileKey); - metrics.recordObjectUpload( - remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(), - ObjectKeyFactory.Suffix.LOG, - bytes - ); - customMetadataBuilder.addUploadResult(ObjectKeyFactory.Suffix.LOG, bytes); - - log.debug("Uploaded segment log for {}, size: {}", remoteLogSegmentMetadata, bytes); - } - } - InputStream transformIndex(final IndexType indexType, final InputStream index, final int size, - final SegmentEncryptionMetadata encryptionMetadata, + final DataKeyAndAAD maybeEncryptionKey, final SegmentIndexesV1Builder segmentIndexBuilder) { log.debug("Transforming index {} with size {}", indexType, size); if (size > 0) { TransformChunkEnumeration transformEnum = new BaseTransformChunkEnumeration(index, size); if (encryptionEnabled) { - final var dataKeyAndAAD = new DataKeyAndAAD(encryptionMetadata.dataKey(), encryptionMetadata.aad()); transformEnum = new EncryptionChunkEnumeration( transformEnum, - () -> aesEncryptionProvider.encryptionCipher(dataKeyAndAAD)); + () -> aesEncryptionProvider.encryptionCipher(maybeEncryptionKey) + ); } final var transformFinisher = new TransformFinisher(transformEnum, size); final var inputStream = transformFinisher.nextElement(); @@ -430,10 +468,26 @@ private Chunk singleChunk(final ChunkIndex chunkIndex) { return chunks.get(0); } - private void uploadManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, - final SegmentManifest segmentManifest, - final SegmentCustomMetadataBuilder customMetadataBuilder) - throws StorageBackendException, IOException { + void uploadManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, + final ChunkIndex chunkIndex, + final SegmentIndexesV1 segmentIndexes, + final boolean requiresCompression, + final DataKeyAndAAD maybeEncryptionKey, + final SegmentCustomMetadataBuilder customMetadataBuilder + ) throws StorageBackendException, IOException { + final SegmentEncryptionMetadataV1 maybeEncryptionMetadata; + if (maybeEncryptionKey != null) { + maybeEncryptionMetadata = new SegmentEncryptionMetadataV1(maybeEncryptionKey); + } else { + maybeEncryptionMetadata = null; + } + final SegmentManifest segmentManifest = new SegmentManifestV1( + chunkIndex, + segmentIndexes, + requiresCompression, + maybeEncryptionMetadata, + remoteLogSegmentMetadata + ); final String manifest = mapper.writeValueAsString(segmentManifest); final ObjectKey manifestObjectKey = objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.MANIFEST); diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentEncryptionMetadataV1.java b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentEncryptionMetadataV1.java index 26bcc3765..b4594f778 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentEncryptionMetadataV1.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentEncryptionMetadataV1.java @@ -21,6 +21,8 @@ import java.util.Arrays; import java.util.Objects; +import io.aiven.kafka.tieredstorage.security.DataKeyAndAAD; + import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -37,6 +39,10 @@ public SegmentEncryptionMetadataV1(@JsonProperty(value = "dataKey", required = t this.aad = Objects.requireNonNull(aad, "aad cannot be null"); } + public SegmentEncryptionMetadataV1(final DataKeyAndAAD dataKeyAndAAD) { + this(dataKeyAndAAD.dataKey, dataKeyAndAAD.aad); + } + @Override public int ivSize() { return IV_SIZE; From cdffc7f77da12f087b23b60bae1918daa7777f56 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Thu, 4 Apr 2024 11:32:03 +0300 Subject: [PATCH 2/3] feat: remove orphan objects if copy fails at any point Given that there are at least 3 objects uploaded per segment, and it could fail at any stage, the plugin should guard that the chances for orphan objects is minimal. --- .../tieredstorage/RemoteStorageManager.java | 22 +++-- .../RemoteStorageManagerTest.java | 88 +++++++++++++++++++ 2 files changed, 105 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java index 9098e1f98..bac574a43 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java @@ -257,6 +257,12 @@ public Optional copyLogSegmentData(final RemoteLogSegmentMetadat maybeEncryptionKey, customMetadataBuilder); } catch (final Exception e) { + try { + // best effort on removing orphan files + tryDeleteSegmentObjects(remoteLogSegmentMetadata); + } catch (final Exception ignored) { + // ignore all exceptions + } throw new RemoteStorageException(e); } @@ -342,7 +348,7 @@ private TransformChunkEnumeration transformation( return transformEnum; } - private SegmentIndexesV1 uploadIndexes( + SegmentIndexesV1 uploadIndexes( final RemoteLogSegmentMetadata remoteLogSegmentMetadata, final LogSegmentData segmentData, final DataKeyAndAAD maybeEncryptionKey, @@ -661,10 +667,7 @@ public void deleteLogSegmentData(final RemoteLogSegmentMetadata remoteLogSegment final long startedMs = time.milliseconds(); try { - final Set keys = Arrays.stream(ObjectKeyFactory.Suffix.values()) - .map(s -> objectKeyFactory.key(remoteLogSegmentMetadata, s)) - .collect(Collectors.toSet()); - deleter.delete(keys); + tryDeleteSegmentObjects(remoteLogSegmentMetadata); } catch (final Exception e) { metrics.recordSegmentDeleteError(remoteLogSegmentMetadata.remoteLogSegmentId() .topicIdPartition().topicPartition()); @@ -678,6 +681,15 @@ public void deleteLogSegmentData(final RemoteLogSegmentMetadata remoteLogSegment log.info("Deleting log segment data for completed successfully {}", remoteLogSegmentMetadata); } + private void tryDeleteSegmentObjects( + final RemoteLogSegmentMetadata remoteLogSegmentMetadata + ) throws StorageBackendException { + final Set keys = Arrays.stream(ObjectKeyFactory.Suffix.values()) + .map(s -> objectKeyFactory.key(remoteLogSegmentMetadata, s)) + .collect(Collectors.toSet()); + deleter.delete(keys); + } + @Override public void close() { metrics.close(); diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java index 9ef568e6f..7c75183f0 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java @@ -18,15 +18,19 @@ import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; import java.nio.channels.ClosedByInterruptException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.util.Map; +import java.util.Optional; import java.util.stream.Stream; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.server.log.remote.storage.LogSegmentData; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.server.log.remote.storage.RemoteStorageException; @@ -36,6 +40,7 @@ import io.aiven.kafka.tieredstorage.storage.StorageBackendException; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -45,8 +50,12 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.params.provider.Arguments.arguments; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; class RemoteStorageManagerTest { @@ -221,6 +230,85 @@ void fetchSegmentNonInterruptionExceptionWhenGettingSegment( .hasRootCauseInstanceOf(exceptionClass); } + @Test + void deleteObjectsWhenUploadFails( + @TempDir final Path partitionDir + ) throws IOException, StorageBackendException, RemoteStorageException { + // given a sample local segment to be uploaded + final var segmentPath = Files.createFile(partitionDir.resolve("0000.log")); + final var segmentContent = "test"; + Files.writeString(segmentPath, segmentContent); + final var indexPath = Files.createFile(partitionDir.resolve("0000.index")); + final var timeIndexPath = Files.createFile(partitionDir.resolve("0000.timeindex")); + final var producerSnapshotPath = Files.createFile(partitionDir.resolve("0000.snapshot")); + final var logSegmentData = new LogSegmentData( + segmentPath, + indexPath, + timeIndexPath, + Optional.empty(), + producerSnapshotPath, + ByteBuffer.wrap("test".getBytes(StandardCharsets.UTF_8)) + ); + + final var remoteLogSegmentMetadata = new RemoteLogSegmentMetadata( + REMOTE_SEGMENT_ID, 0, 1L, + 0, 0, 0, segmentContent.length(), Map.of(0, 0L)); + + final var remotePartitionPath = targetDir.resolve(TOPIC_ID_PARTITION.topic() + "-" + TOPIC_ID) + .resolve(String.valueOf(TOPIC_ID_PARTITION.partition())); + + final var config = Map.of( + "chunk.size", "1", + "storage.backend.class", "io.aiven.kafka.tieredstorage.storage.filesystem.FileSystemStorage", + "storage.root", targetDir.toString() + ); + rsm.configure(config); + rsm = spy(rsm); + + // when first upload fails + doThrow(IOException.class).when(rsm).uploadSegmentLog(any(), any(), anyBoolean(), any(), any()); + + assertThatThrownBy(() -> rsm.copyLogSegmentData(remoteLogSegmentMetadata, logSegmentData)) + .isInstanceOf(RemoteStorageException.class) + .hasRootCauseInstanceOf(IOException.class); + + // then no files stored in remote + assertThat(remotePartitionPath).doesNotExist(); + + // fallback to real method + doCallRealMethod().when(rsm).uploadSegmentLog(any(), any(), anyBoolean(), any(), any()); + + // when second upload fails + doThrow(IOException.class).when(rsm).uploadIndexes(any(), any(), any(), any()); + + assertThatThrownBy(() -> rsm.copyLogSegmentData(remoteLogSegmentMetadata, logSegmentData)) + .isInstanceOf(RemoteStorageException.class) + .hasRootCauseInstanceOf(IOException.class); + + // then no files stored in remote + assertThat(remotePartitionPath).doesNotExist(); + + // fallback to real method + doCallRealMethod().when(rsm).uploadIndexes(any(), any(), any(), any()); + + // when third upload fails + doThrow(IOException.class).when(rsm).uploadManifest(any(), any(), any(), anyBoolean(), any(), any()); + + assertThatThrownBy(() -> rsm.copyLogSegmentData(remoteLogSegmentMetadata, logSegmentData)) + .isInstanceOf(RemoteStorageException.class) + .hasRootCauseInstanceOf(IOException.class); + + // then no files stored in remote + assertThat(remotePartitionPath).doesNotExist(); + + // fallback to real method + doCallRealMethod().when(rsm).uploadManifest(any(), any(), any(), anyBoolean(), any(), any()); + + // when all good + rsm.copyLogSegmentData(remoteLogSegmentMetadata, logSegmentData); + assertThat(Files.list(remotePartitionPath)).hasSize(3); + } + static Stream provideNonInterruptionExceptions() { return Stream.of( arguments(null, Exception.class), From 5726bc97ad8f1adb3c8fc30b87894e009187d6ca Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Thu, 11 Apr 2024 11:45:37 +0300 Subject: [PATCH 3/3] refactor: decouple original size from chunking Chunking is defined by original chunk size, not original content size. This PR removed the scenario where these two sizes are conflated (e.g. passing original size zero to disable chunking, etc.). --- .../tieredstorage/RemoteStorageManager.java | 21 +++--- .../BaseDetransformChunkEnumeration.java | 3 +- .../BaseTransformChunkEnumeration.java | 10 ++- .../transform/TransformFinisher.java | 44 +++++++----- .../transform/TransformsEndToEndTest.java | 68 ++++++++++--------- 5 files changed, 81 insertions(+), 65 deletions(-) diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java index bac574a43..46c5a7d8f 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java @@ -457,8 +457,19 @@ InputStream transformIndex(final IndexType indexType, ); } final var transformFinisher = new TransformFinisher(transformEnum, size); + // Getting next element and expecting that it is the only one. + // No need to get a sequenced input stream final var inputStream = transformFinisher.nextElement(); - segmentIndexBuilder.add(indexType, singleChunk(transformFinisher.chunkIndex()).range().size()); + final var chunkIndex = transformFinisher.chunkIndex(); + // by getting a chunk index, means that the transformation is completed. + if (chunkIndex == null) { + throw new IllegalStateException("Chunking disabled when single chunk is expected"); + } + if (chunkIndex.chunks().size() != 1) { + // not expected, as next element run once. But for safety + throw new IllegalStateException("Number of chunks different than 1, single chunk is expected"); + } + segmentIndexBuilder.add(indexType, chunkIndex.chunks().get(0).range().size()); return inputStream; } else { segmentIndexBuilder.add(indexType, 0); @@ -466,14 +477,6 @@ InputStream transformIndex(final IndexType indexType, } } - private Chunk singleChunk(final ChunkIndex chunkIndex) { - final var chunks = chunkIndex.chunks(); - if (chunks.size() != 1) { - throw new IllegalStateException("Single chunk expected when transforming indexes"); - } - return chunks.get(0); - } - void uploadManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, final ChunkIndex chunkIndex, final SegmentIndexesV1 segmentIndexes, diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/transform/BaseDetransformChunkEnumeration.java b/core/src/main/java/io/aiven/kafka/tieredstorage/transform/BaseDetransformChunkEnumeration.java index ad2a3983c..a91948ce8 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/transform/BaseDetransformChunkEnumeration.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/transform/BaseDetransformChunkEnumeration.java @@ -33,7 +33,8 @@ * both transformed and not. We rely on this information for determining the transformed chunks borders * in the input stream. We also can tell if the input stream has too few bytes. * - *

An empty list of chunks means no chunking has been applied to the incoming stream. + *

An empty list of chunks means no chunking has been applied to the incoming stream + * and all content should be returned at once. */ public class BaseDetransformChunkEnumeration implements DetransformChunkEnumeration { private final InputStream inputStream; diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/transform/BaseTransformChunkEnumeration.java b/core/src/main/java/io/aiven/kafka/tieredstorage/transform/BaseTransformChunkEnumeration.java index 3c781eb81..fde97243a 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/transform/BaseTransformChunkEnumeration.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/transform/BaseTransformChunkEnumeration.java @@ -32,12 +32,10 @@ public class BaseTransformChunkEnumeration implements TransformChunkEnumeration private byte[] chunk = null; - public BaseTransformChunkEnumeration(final InputStream inputStream) { - this.inputStream = Objects.requireNonNull(inputStream, "inputStream cannot be null"); - - this.originalChunkSize = 0; - } - + /** + * @param inputStream original content + * @param originalChunkSize chunk size from the original content. If zero, it disables chunking. + */ public BaseTransformChunkEnumeration(final InputStream inputStream, final int originalChunkSize) { this.inputStream = Objects.requireNonNull(inputStream, "inputStream cannot be null"); diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/transform/TransformFinisher.java b/core/src/main/java/io/aiven/kafka/tieredstorage/transform/TransformFinisher.java index 6f1d9c50f..44c6b50cb 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/transform/TransformFinisher.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/transform/TransformFinisher.java @@ -27,8 +27,6 @@ import io.aiven.kafka.tieredstorage.manifest.index.FixedSizeChunkIndexBuilder; import io.aiven.kafka.tieredstorage.manifest.index.VariableSizeChunkIndexBuilder; -// TODO test transforms and detransforms with property-based tests - /** * The transformation finisher. * @@ -36,34 +34,46 @@ * so that it could be used in {@link SequenceInputStream}. * *

It's responsible for building the chunk index. + * The chunk index is empty (i.e. null) if chunking has been disabled (i.e. chunk size is zero), + * but could also have a single chunk if the chunk size is equal or higher to the original file size. + * Otherwise, the chunk index will contain more than one chunk. */ public class TransformFinisher implements Enumeration { private final TransformChunkEnumeration inner; private final AbstractChunkIndexBuilder chunkIndexBuilder; - private final int originalFileSize; private ChunkIndex chunkIndex = null; - public TransformFinisher(final TransformChunkEnumeration inner) { - this(inner, 0); - } - - public TransformFinisher(final TransformChunkEnumeration inner, final int originalFileSize) { + public TransformFinisher( + final TransformChunkEnumeration inner, + final int originalFileSize + ) { this.inner = Objects.requireNonNull(inner, "inner cannot be null"); - this.originalFileSize = originalFileSize; if (originalFileSize < 0) { throw new IllegalArgumentException( "originalFileSize must be non-negative, " + originalFileSize + " given"); } + this.chunkIndexBuilder = chunkIndexBuilder(inner, inner.originalChunkSize(), originalFileSize); + } + + private static AbstractChunkIndexBuilder chunkIndexBuilder( + final TransformChunkEnumeration inner, + final int originalChunkSize, + final int originalFileSize + ) { final Integer transformedChunkSize = inner.transformedChunkSize(); - if (originalFileSize == 0) { - this.chunkIndexBuilder = null; - } else if (transformedChunkSize == null) { - this.chunkIndexBuilder = new VariableSizeChunkIndexBuilder(inner.originalChunkSize(), originalFileSize); + if (transformedChunkSize == null) { + return new VariableSizeChunkIndexBuilder( + originalChunkSize, + originalFileSize + ); } else { - this.chunkIndexBuilder = new FixedSizeChunkIndexBuilder( - inner.originalChunkSize(), originalFileSize, transformedChunkSize); + return new FixedSizeChunkIndexBuilder( + originalChunkSize, + originalFileSize, + transformedChunkSize + ); } } @@ -75,7 +85,7 @@ public boolean hasMoreElements() { @Override public InputStream nextElement() { final var chunk = inner.nextElement(); - if (chunkIndexBuilder != null) { + if (inner.originalChunkSize() > 0) { if (hasMoreElements()) { this.chunkIndexBuilder.addChunk(chunk.length); } else { @@ -87,7 +97,7 @@ public InputStream nextElement() { } public ChunkIndex chunkIndex() { - if (chunkIndex == null && originalFileSize > 0) { + if (chunkIndex == null && inner.originalChunkSize() > 0) { throw new IllegalStateException("Chunk index was not built, was finisher used?"); } return this.chunkIndex; diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/TransformsEndToEndTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/TransformsEndToEndTest.java index cad7355d0..21c0dff1e 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/TransformsEndToEndTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/TransformsEndToEndTest.java @@ -70,39 +70,43 @@ void compressionAndEncryption(final int chunkSize) throws IOException { private void test(final int chunkSize, final boolean compression, final boolean encryption) throws IOException { // Transform. - TransformChunkEnumeration transformEnum = new BaseTransformChunkEnumeration( - new ByteArrayInputStream(original), chunkSize); - if (compression) { - transformEnum = new CompressionChunkEnumeration(transformEnum); - } - if (encryption) { - transformEnum = new EncryptionChunkEnumeration(transformEnum, AesKeyAwareTest::encryptionCipherSupplier); - } - final var transformFinisher = chunkSize == 0 - ? new TransformFinisher(transformEnum) - : new TransformFinisher(transformEnum, ORIGINAL_SIZE); - final byte[] uploadedData; - final ChunkIndex chunkIndex; - try (final var sis = transformFinisher.toInputStream()) { - uploadedData = sis.readAllBytes(); - chunkIndex = transformFinisher.chunkIndex(); - } + try (final var inputStream = new ByteArrayInputStream(original)) { + TransformChunkEnumeration transformEnum = new BaseTransformChunkEnumeration(inputStream, chunkSize); + if (compression) { + transformEnum = new CompressionChunkEnumeration(transformEnum); + } + if (encryption) { + transformEnum = new EncryptionChunkEnumeration( + transformEnum, + AesKeyAwareTest::encryptionCipherSupplier + ); + } + final var transformFinisher = new TransformFinisher(transformEnum, ORIGINAL_SIZE); + final byte[] uploadedData; + final ChunkIndex chunkIndex; + try (final var sis = transformFinisher.toInputStream()) { + uploadedData = sis.readAllBytes(); + chunkIndex = transformFinisher.chunkIndex(); + } - // Detransform. - DetransformChunkEnumeration detransformEnum = chunkIndex == null - ? new BaseDetransformChunkEnumeration(new ByteArrayInputStream(uploadedData)) - : new BaseDetransformChunkEnumeration(new ByteArrayInputStream(uploadedData), chunkIndex.chunks()); - if (encryption) { - detransformEnum = new DecryptionChunkEnumeration( - detransformEnum, ivSize, AesKeyAwareTest::decryptionCipherSupplier); - } - if (compression) { - detransformEnum = new DecompressionChunkEnumeration(detransformEnum); - } - final var detransformFinisher = new DetransformFinisher(detransformEnum); - try (final var sis = detransformFinisher.toInputStream()) { - final byte[] downloaded = sis.readAllBytes(); - assertThat(downloaded).isEqualTo(original); + // Detransform. + try (final var uploadedStream = new ByteArrayInputStream(uploadedData)) { + DetransformChunkEnumeration detransformEnum = chunkIndex == null + ? new BaseDetransformChunkEnumeration(uploadedStream) + : new BaseDetransformChunkEnumeration(uploadedStream, chunkIndex.chunks()); + if (encryption) { + detransformEnum = new DecryptionChunkEnumeration( + detransformEnum, ivSize, AesKeyAwareTest::decryptionCipherSupplier); + } + if (compression) { + detransformEnum = new DecompressionChunkEnumeration(detransformEnum); + } + final var detransformFinisher = new DetransformFinisher(detransformEnum); + try (final var sis = detransformFinisher.toInputStream()) { + final byte[] downloaded = sis.readAllBytes(); + assertThat(downloaded).isEqualTo(original); + } + } } } }