diff --git a/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java b/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java index 5e4da2d6d..a64d9ffd0 100644 --- a/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java +++ b/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java @@ -57,7 +57,6 @@ import io.aiven.kafka.tieredstorage.fetch.KeyNotFoundRuntimeException; import io.aiven.kafka.tieredstorage.fetch.cache.DiskChunkCache; import io.aiven.kafka.tieredstorage.fetch.cache.MemoryChunkCache; -import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadataV1; import io.aiven.kafka.tieredstorage.manifest.SegmentIndexesV1Builder; import io.aiven.kafka.tieredstorage.manifest.index.ChunkIndex; import io.aiven.kafka.tieredstorage.manifest.serde.EncryptionSerdeModule; @@ -480,16 +479,15 @@ void testTransformingIndexes(final boolean encryption) { "storage.root", targetDir.toString(), "encryption.enabled", Boolean.toString(encryption) )); - final SegmentEncryptionMetadataV1 encryptionMetadata; + final DataKeyAndAAD maybeEncryptionKey; if (encryption) { config.put("encryption.key.pair.id", KEY_ENCRYPTION_KEY_ID); config.put("encryption.key.pairs", KEY_ENCRYPTION_KEY_ID); config.put("encryption.key.pairs." + KEY_ENCRYPTION_KEY_ID + ".public.key.file", publicKeyPem.toString()); config.put("encryption.key.pairs." + KEY_ENCRYPTION_KEY_ID + ".private.key.file", privateKeyPem.toString()); - final var dataKeyAndAAD = aesEncryptionProvider.createDataKeyAndAAD(); - encryptionMetadata = new SegmentEncryptionMetadataV1(dataKeyAndAAD.dataKey, dataKeyAndAAD.aad); + maybeEncryptionKey = aesEncryptionProvider.createDataKeyAndAAD(); } else { - encryptionMetadata = null; + maybeEncryptionKey = null; } rsm.configure(config); @@ -499,7 +497,7 @@ void testTransformingIndexes(final boolean encryption) { IndexType.OFFSET, new ByteArrayInputStream(bytes), bytes.length, - encryptionMetadata, + maybeEncryptionKey, segmentIndexBuilder ); assertThat(is).isNotEmpty(); @@ -511,21 +509,21 @@ void testTransformingIndexes(final boolean encryption) { IndexType.TIMESTAMP, new ByteArrayInputStream(bytes), bytes.length, - encryptionMetadata, + maybeEncryptionKey, segmentIndexBuilder ); rsm.transformIndex( IndexType.LEADER_EPOCH, new ByteArrayInputStream(bytes), bytes.length, - encryptionMetadata, + maybeEncryptionKey, segmentIndexBuilder ); rsm.transformIndex( IndexType.PRODUCER_SNAPSHOT, new ByteArrayInputStream(bytes), bytes.length, - encryptionMetadata, + maybeEncryptionKey, segmentIndexBuilder ); final var index = segmentIndexBuilder.build(); @@ -545,14 +543,15 @@ void testTransformingEmptyIndexes(final boolean encryption) { "storage.root", targetDir.toString(), "encryption.enabled", Boolean.toString(encryption) )); - SegmentEncryptionMetadataV1 encryptionMetadata = null; + final DataKeyAndAAD maybeEncryptionKey; if (encryption) { config.put("encryption.key.pair.id", KEY_ENCRYPTION_KEY_ID); config.put("encryption.key.pairs", KEY_ENCRYPTION_KEY_ID); config.put("encryption.key.pairs." + KEY_ENCRYPTION_KEY_ID + ".public.key.file", publicKeyPem.toString()); config.put("encryption.key.pairs." + KEY_ENCRYPTION_KEY_ID + ".private.key.file", privateKeyPem.toString()); - final var dataKeyAndAAD = aesEncryptionProvider.createDataKeyAndAAD(); - encryptionMetadata = new SegmentEncryptionMetadataV1(dataKeyAndAAD.dataKey, dataKeyAndAAD.aad); + maybeEncryptionKey = aesEncryptionProvider.createDataKeyAndAAD(); + } else { + maybeEncryptionKey = null; } rsm.configure(config); @@ -561,7 +560,7 @@ void testTransformingEmptyIndexes(final boolean encryption) { IndexType.OFFSET, InputStream.nullInputStream(), 0, - encryptionMetadata, + maybeEncryptionKey, segmentIndexBuilder ); assertThat(is).isEmpty(); @@ -572,21 +571,21 @@ void testTransformingEmptyIndexes(final boolean encryption) { IndexType.TIMESTAMP, InputStream.nullInputStream(), 0, - encryptionMetadata, + maybeEncryptionKey, segmentIndexBuilder ); rsm.transformIndex( IndexType.LEADER_EPOCH, InputStream.nullInputStream(), 0, - encryptionMetadata, + maybeEncryptionKey, segmentIndexBuilder ); rsm.transformIndex( IndexType.PRODUCER_SNAPSHOT, InputStream.nullInputStream(), 0, - encryptionMetadata, + maybeEncryptionKey, segmentIndexBuilder ); final var index = segmentIndexBuilder.build(); diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java index b96484044..46c5a7d8f 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java @@ -223,36 +223,46 @@ public Optional copyLogSegmentData(final RemoteLogSegmentMetadat final long startedMs = time.milliseconds(); try { - SegmentEncryptionMetadataV1 encryptionMetadata = null; final boolean requiresCompression = requiresCompression(logSegmentData); - - final ChunkIndex chunkIndex; - try (final InputStream logSegmentInputStream = Files.newInputStream(logSegmentData.logSegment())) { - TransformChunkEnumeration transformEnum = new BaseTransformChunkEnumeration( - logSegmentInputStream, chunkSize); - if (requiresCompression) { - transformEnum = new CompressionChunkEnumeration(transformEnum); - } - if (encryptionEnabled) { - final DataKeyAndAAD dataKeyAndAAD = aesEncryptionProvider.createDataKeyAndAAD(); - transformEnum = new EncryptionChunkEnumeration( - transformEnum, - () -> aesEncryptionProvider.encryptionCipher(dataKeyAndAAD)); - encryptionMetadata = new SegmentEncryptionMetadataV1(dataKeyAndAAD.dataKey, dataKeyAndAAD.aad); - } - final TransformFinisher transformFinisher = - new TransformFinisher(transformEnum, remoteLogSegmentMetadata.segmentSizeInBytes()); - uploadSegmentLog(remoteLogSegmentMetadata, transformFinisher, customMetadataBuilder); - chunkIndex = transformFinisher.chunkIndex(); + final DataKeyAndAAD maybeEncryptionKey; + if (encryptionEnabled) { + maybeEncryptionKey = aesEncryptionProvider.createDataKeyAndAAD(); + } else { + maybeEncryptionKey = null; } + // upload segment + final ChunkIndex chunkIndex = uploadSegmentLog( + remoteLogSegmentMetadata, + logSegmentData, + requiresCompression, + maybeEncryptionKey, + customMetadataBuilder + ); + + // upload indexes final SegmentIndexesV1 segmentIndexes = uploadIndexes( - remoteLogSegmentMetadata, logSegmentData, encryptionMetadata, customMetadataBuilder); - final SegmentManifest segmentManifest = new SegmentManifestV1( - chunkIndex, segmentIndexes, requiresCompression, encryptionMetadata, remoteLogSegmentMetadata); - uploadManifest(remoteLogSegmentMetadata, segmentManifest, customMetadataBuilder); + remoteLogSegmentMetadata, + logSegmentData, + maybeEncryptionKey, + customMetadataBuilder + ); + // upload manifest + uploadManifest( + remoteLogSegmentMetadata, + chunkIndex, + segmentIndexes, + requiresCompression, + maybeEncryptionKey, + customMetadataBuilder); } catch (final Exception e) { + try { + // best effort on removing orphan files + tryDeleteSegmentObjects(remoteLogSegmentMetadata); + } catch (final Exception ignored) { + // ignore all exceptions + } throw new RemoteStorageException(e); } @@ -267,10 +277,81 @@ public Optional copyLogSegmentData(final RemoteLogSegmentMetadat return customMetadata; } - private SegmentIndexesV1 uploadIndexes( + boolean requiresCompression(final LogSegmentData logSegmentData) { + boolean requiresCompression = false; + if (compressionEnabled) { + if (compressionHeuristic) { + try { + final File segmentFile = logSegmentData.logSegment().toFile(); + final boolean alreadyCompressed = SegmentCompressionChecker.check(segmentFile); + requiresCompression = !alreadyCompressed; + } catch (final InvalidRecordBatchException e) { + // Log and leave value as false to upload uncompressed. + log.warn("Failed to check compression on log segment: {}", logSegmentData.logSegment(), e); + } + } else { + requiresCompression = true; + } + } + return requiresCompression; + } + + ChunkIndex uploadSegmentLog( + final RemoteLogSegmentMetadata remoteLogSegmentMetadata, + final LogSegmentData logSegmentData, + final boolean requiresCompression, + final DataKeyAndAAD maybeEncryptionKey, + final SegmentCustomMetadataBuilder customMetadataBuilder + ) throws IOException, StorageBackendException { + final var fileKey = objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.LOG); + + try (final var logSegmentInputStream = Files.newInputStream(logSegmentData.logSegment())) { + final var transformEnum = transformation(logSegmentInputStream, requiresCompression, maybeEncryptionKey); + final var transformFinisher = new TransformFinisher( + transformEnum, + remoteLogSegmentMetadata.segmentSizeInBytes() + ); + + try (final var sis = transformFinisher.toInputStream()) { + final var bytes = uploader.upload(sis, fileKey); + metrics.recordObjectUpload( + remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(), + ObjectKeyFactory.Suffix.LOG, + bytes + ); + customMetadataBuilder.addUploadResult(ObjectKeyFactory.Suffix.LOG, bytes); + + log.debug("Uploaded segment log for {}, size: {}", remoteLogSegmentMetadata, bytes); + } + return transformFinisher.chunkIndex(); + } + } + + private TransformChunkEnumeration transformation( + final InputStream logSegmentInputStream, + final boolean requiresCompression, + final DataKeyAndAAD maybeEncryptionKey + ) { + TransformChunkEnumeration transformEnum = new BaseTransformChunkEnumeration( + logSegmentInputStream, + chunkSize + ); + if (requiresCompression) { + transformEnum = new CompressionChunkEnumeration(transformEnum); + } + if (encryptionEnabled) { + transformEnum = new EncryptionChunkEnumeration( + transformEnum, + () -> aesEncryptionProvider.encryptionCipher(maybeEncryptionKey) + ); + } + return transformEnum; + } + + SegmentIndexesV1 uploadIndexes( final RemoteLogSegmentMetadata remoteLogSegmentMetadata, final LogSegmentData segmentData, - final SegmentEncryptionMetadataV1 encryptionMeta, + final DataKeyAndAAD maybeEncryptionKey, final SegmentCustomMetadataBuilder customMetadataBuilder ) throws IOException, RemoteStorageException, StorageBackendException { final List indexes = new ArrayList<>(IndexType.values().length); @@ -281,7 +362,7 @@ private SegmentIndexesV1 uploadIndexes( IndexType.OFFSET, closableInputStreamHolder.add(Files.newInputStream(segmentData.offsetIndex())), indexSize(segmentData.offsetIndex()), - encryptionMeta, + maybeEncryptionKey, segmentIndexBuilder ); indexes.add(offsetIndex); @@ -289,7 +370,7 @@ private SegmentIndexesV1 uploadIndexes( IndexType.TIMESTAMP, closableInputStreamHolder.add(Files.newInputStream(segmentData.timeIndex())), indexSize(segmentData.timeIndex()), - encryptionMeta, + maybeEncryptionKey, segmentIndexBuilder ); indexes.add(timeIndex); @@ -297,7 +378,7 @@ private SegmentIndexesV1 uploadIndexes( IndexType.PRODUCER_SNAPSHOT, closableInputStreamHolder.add(Files.newInputStream(segmentData.producerSnapshotIndex())), indexSize(segmentData.producerSnapshotIndex()), - encryptionMeta, + maybeEncryptionKey, segmentIndexBuilder ); indexes.add(producerSnapshotIndex); @@ -305,7 +386,7 @@ private SegmentIndexesV1 uploadIndexes( IndexType.LEADER_EPOCH, closableInputStreamHolder.add(new ByteBufferInputStream(segmentData.leaderEpochIndex())), segmentData.leaderEpochIndex().remaining(), - encryptionMeta, + maybeEncryptionKey, segmentIndexBuilder ); indexes.add(leaderEpoch); @@ -314,7 +395,7 @@ private SegmentIndexesV1 uploadIndexes( IndexType.TRANSACTION, closableInputStreamHolder.add(Files.newInputStream(segmentData.transactionIndex().get())), indexSize(segmentData.transactionIndex().get()), - encryptionMeta, + maybeEncryptionKey, segmentIndexBuilder ); indexes.add(transactionIndex); @@ -361,60 +442,34 @@ private Optional buildCustomMetadata(final SegmentCustomMetadata } } - boolean requiresCompression(final LogSegmentData logSegmentData) { - boolean requiresCompression = false; - if (compressionEnabled) { - if (compressionHeuristic) { - try { - final File segmentFile = logSegmentData.logSegment().toFile(); - final boolean alreadyCompressed = SegmentCompressionChecker.check(segmentFile); - requiresCompression = !alreadyCompressed; - } catch (final InvalidRecordBatchException e) { - // Log and leave value as false to upload uncompressed. - log.warn("Failed to check compression on log segment: {}", logSegmentData.logSegment(), e); - } - } else { - requiresCompression = true; - } - } - return requiresCompression; - } - - private void uploadSegmentLog(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, - final TransformFinisher transformFinisher, - final SegmentCustomMetadataBuilder customMetadataBuilder) - throws IOException, StorageBackendException { - final ObjectKey fileKey = objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.LOG); - try (final var sis = transformFinisher.toInputStream()) { - final var bytes = uploader.upload(sis, fileKey); - metrics.recordObjectUpload( - remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(), - ObjectKeyFactory.Suffix.LOG, - bytes - ); - customMetadataBuilder.addUploadResult(ObjectKeyFactory.Suffix.LOG, bytes); - - log.debug("Uploaded segment log for {}, size: {}", remoteLogSegmentMetadata, bytes); - } - } - InputStream transformIndex(final IndexType indexType, final InputStream index, final int size, - final SegmentEncryptionMetadata encryptionMetadata, + final DataKeyAndAAD maybeEncryptionKey, final SegmentIndexesV1Builder segmentIndexBuilder) { log.debug("Transforming index {} with size {}", indexType, size); if (size > 0) { TransformChunkEnumeration transformEnum = new BaseTransformChunkEnumeration(index, size); if (encryptionEnabled) { - final var dataKeyAndAAD = new DataKeyAndAAD(encryptionMetadata.dataKey(), encryptionMetadata.aad()); transformEnum = new EncryptionChunkEnumeration( transformEnum, - () -> aesEncryptionProvider.encryptionCipher(dataKeyAndAAD)); + () -> aesEncryptionProvider.encryptionCipher(maybeEncryptionKey) + ); } final var transformFinisher = new TransformFinisher(transformEnum, size); + // Getting next element and expecting that it is the only one. + // No need to get a sequenced input stream final var inputStream = transformFinisher.nextElement(); - segmentIndexBuilder.add(indexType, singleChunk(transformFinisher.chunkIndex()).range().size()); + final var chunkIndex = transformFinisher.chunkIndex(); + // by getting a chunk index, means that the transformation is completed. + if (chunkIndex == null) { + throw new IllegalStateException("Chunking disabled when single chunk is expected"); + } + if (chunkIndex.chunks().size() != 1) { + // not expected, as next element run once. But for safety + throw new IllegalStateException("Number of chunks different than 1, single chunk is expected"); + } + segmentIndexBuilder.add(indexType, chunkIndex.chunks().get(0).range().size()); return inputStream; } else { segmentIndexBuilder.add(indexType, 0); @@ -422,18 +477,26 @@ InputStream transformIndex(final IndexType indexType, } } - private Chunk singleChunk(final ChunkIndex chunkIndex) { - final var chunks = chunkIndex.chunks(); - if (chunks.size() != 1) { - throw new IllegalStateException("Single chunk expected when transforming indexes"); + void uploadManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, + final ChunkIndex chunkIndex, + final SegmentIndexesV1 segmentIndexes, + final boolean requiresCompression, + final DataKeyAndAAD maybeEncryptionKey, + final SegmentCustomMetadataBuilder customMetadataBuilder + ) throws StorageBackendException, IOException { + final SegmentEncryptionMetadataV1 maybeEncryptionMetadata; + if (maybeEncryptionKey != null) { + maybeEncryptionMetadata = new SegmentEncryptionMetadataV1(maybeEncryptionKey); + } else { + maybeEncryptionMetadata = null; } - return chunks.get(0); - } - - private void uploadManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, - final SegmentManifest segmentManifest, - final SegmentCustomMetadataBuilder customMetadataBuilder) - throws StorageBackendException, IOException { + final SegmentManifest segmentManifest = new SegmentManifestV1( + chunkIndex, + segmentIndexes, + requiresCompression, + maybeEncryptionMetadata, + remoteLogSegmentMetadata + ); final String manifest = mapper.writeValueAsString(segmentManifest); final ObjectKey manifestObjectKey = objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.MANIFEST); @@ -607,10 +670,7 @@ public void deleteLogSegmentData(final RemoteLogSegmentMetadata remoteLogSegment final long startedMs = time.milliseconds(); try { - final Set keys = Arrays.stream(ObjectKeyFactory.Suffix.values()) - .map(s -> objectKeyFactory.key(remoteLogSegmentMetadata, s)) - .collect(Collectors.toSet()); - deleter.delete(keys); + tryDeleteSegmentObjects(remoteLogSegmentMetadata); } catch (final Exception e) { metrics.recordSegmentDeleteError(remoteLogSegmentMetadata.remoteLogSegmentId() .topicIdPartition().topicPartition()); @@ -624,6 +684,15 @@ public void deleteLogSegmentData(final RemoteLogSegmentMetadata remoteLogSegment log.info("Deleting log segment data for completed successfully {}", remoteLogSegmentMetadata); } + private void tryDeleteSegmentObjects( + final RemoteLogSegmentMetadata remoteLogSegmentMetadata + ) throws StorageBackendException { + final Set keys = Arrays.stream(ObjectKeyFactory.Suffix.values()) + .map(s -> objectKeyFactory.key(remoteLogSegmentMetadata, s)) + .collect(Collectors.toSet()); + deleter.delete(keys); + } + @Override public void close() { metrics.close(); diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentEncryptionMetadataV1.java b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentEncryptionMetadataV1.java index 26bcc3765..b4594f778 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentEncryptionMetadataV1.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentEncryptionMetadataV1.java @@ -21,6 +21,8 @@ import java.util.Arrays; import java.util.Objects; +import io.aiven.kafka.tieredstorage.security.DataKeyAndAAD; + import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -37,6 +39,10 @@ public SegmentEncryptionMetadataV1(@JsonProperty(value = "dataKey", required = t this.aad = Objects.requireNonNull(aad, "aad cannot be null"); } + public SegmentEncryptionMetadataV1(final DataKeyAndAAD dataKeyAndAAD) { + this(dataKeyAndAAD.dataKey, dataKeyAndAAD.aad); + } + @Override public int ivSize() { return IV_SIZE; diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/transform/BaseDetransformChunkEnumeration.java b/core/src/main/java/io/aiven/kafka/tieredstorage/transform/BaseDetransformChunkEnumeration.java index ad2a3983c..a91948ce8 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/transform/BaseDetransformChunkEnumeration.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/transform/BaseDetransformChunkEnumeration.java @@ -33,7 +33,8 @@ * both transformed and not. We rely on this information for determining the transformed chunks borders * in the input stream. We also can tell if the input stream has too few bytes. * - *

An empty list of chunks means no chunking has been applied to the incoming stream. + *

An empty list of chunks means no chunking has been applied to the incoming stream + * and all content should be returned at once. */ public class BaseDetransformChunkEnumeration implements DetransformChunkEnumeration { private final InputStream inputStream; diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/transform/BaseTransformChunkEnumeration.java b/core/src/main/java/io/aiven/kafka/tieredstorage/transform/BaseTransformChunkEnumeration.java index 3c781eb81..fde97243a 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/transform/BaseTransformChunkEnumeration.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/transform/BaseTransformChunkEnumeration.java @@ -32,12 +32,10 @@ public class BaseTransformChunkEnumeration implements TransformChunkEnumeration private byte[] chunk = null; - public BaseTransformChunkEnumeration(final InputStream inputStream) { - this.inputStream = Objects.requireNonNull(inputStream, "inputStream cannot be null"); - - this.originalChunkSize = 0; - } - + /** + * @param inputStream original content + * @param originalChunkSize chunk size from the original content. If zero, it disables chunking. + */ public BaseTransformChunkEnumeration(final InputStream inputStream, final int originalChunkSize) { this.inputStream = Objects.requireNonNull(inputStream, "inputStream cannot be null"); diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/transform/TransformFinisher.java b/core/src/main/java/io/aiven/kafka/tieredstorage/transform/TransformFinisher.java index 6f1d9c50f..44c6b50cb 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/transform/TransformFinisher.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/transform/TransformFinisher.java @@ -27,8 +27,6 @@ import io.aiven.kafka.tieredstorage.manifest.index.FixedSizeChunkIndexBuilder; import io.aiven.kafka.tieredstorage.manifest.index.VariableSizeChunkIndexBuilder; -// TODO test transforms and detransforms with property-based tests - /** * The transformation finisher. * @@ -36,34 +34,46 @@ * so that it could be used in {@link SequenceInputStream}. * *

It's responsible for building the chunk index. + * The chunk index is empty (i.e. null) if chunking has been disabled (i.e. chunk size is zero), + * but could also have a single chunk if the chunk size is equal or higher to the original file size. + * Otherwise, the chunk index will contain more than one chunk. */ public class TransformFinisher implements Enumeration { private final TransformChunkEnumeration inner; private final AbstractChunkIndexBuilder chunkIndexBuilder; - private final int originalFileSize; private ChunkIndex chunkIndex = null; - public TransformFinisher(final TransformChunkEnumeration inner) { - this(inner, 0); - } - - public TransformFinisher(final TransformChunkEnumeration inner, final int originalFileSize) { + public TransformFinisher( + final TransformChunkEnumeration inner, + final int originalFileSize + ) { this.inner = Objects.requireNonNull(inner, "inner cannot be null"); - this.originalFileSize = originalFileSize; if (originalFileSize < 0) { throw new IllegalArgumentException( "originalFileSize must be non-negative, " + originalFileSize + " given"); } + this.chunkIndexBuilder = chunkIndexBuilder(inner, inner.originalChunkSize(), originalFileSize); + } + + private static AbstractChunkIndexBuilder chunkIndexBuilder( + final TransformChunkEnumeration inner, + final int originalChunkSize, + final int originalFileSize + ) { final Integer transformedChunkSize = inner.transformedChunkSize(); - if (originalFileSize == 0) { - this.chunkIndexBuilder = null; - } else if (transformedChunkSize == null) { - this.chunkIndexBuilder = new VariableSizeChunkIndexBuilder(inner.originalChunkSize(), originalFileSize); + if (transformedChunkSize == null) { + return new VariableSizeChunkIndexBuilder( + originalChunkSize, + originalFileSize + ); } else { - this.chunkIndexBuilder = new FixedSizeChunkIndexBuilder( - inner.originalChunkSize(), originalFileSize, transformedChunkSize); + return new FixedSizeChunkIndexBuilder( + originalChunkSize, + originalFileSize, + transformedChunkSize + ); } } @@ -75,7 +85,7 @@ public boolean hasMoreElements() { @Override public InputStream nextElement() { final var chunk = inner.nextElement(); - if (chunkIndexBuilder != null) { + if (inner.originalChunkSize() > 0) { if (hasMoreElements()) { this.chunkIndexBuilder.addChunk(chunk.length); } else { @@ -87,7 +97,7 @@ public InputStream nextElement() { } public ChunkIndex chunkIndex() { - if (chunkIndex == null && originalFileSize > 0) { + if (chunkIndex == null && inner.originalChunkSize() > 0) { throw new IllegalStateException("Chunk index was not built, was finisher used?"); } return this.chunkIndex; diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java index 9ef568e6f..7c75183f0 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java @@ -18,15 +18,19 @@ import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; import java.nio.channels.ClosedByInterruptException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.util.Map; +import java.util.Optional; import java.util.stream.Stream; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.server.log.remote.storage.LogSegmentData; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.server.log.remote.storage.RemoteStorageException; @@ -36,6 +40,7 @@ import io.aiven.kafka.tieredstorage.storage.StorageBackendException; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -45,8 +50,12 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.params.provider.Arguments.arguments; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; class RemoteStorageManagerTest { @@ -221,6 +230,85 @@ void fetchSegmentNonInterruptionExceptionWhenGettingSegment( .hasRootCauseInstanceOf(exceptionClass); } + @Test + void deleteObjectsWhenUploadFails( + @TempDir final Path partitionDir + ) throws IOException, StorageBackendException, RemoteStorageException { + // given a sample local segment to be uploaded + final var segmentPath = Files.createFile(partitionDir.resolve("0000.log")); + final var segmentContent = "test"; + Files.writeString(segmentPath, segmentContent); + final var indexPath = Files.createFile(partitionDir.resolve("0000.index")); + final var timeIndexPath = Files.createFile(partitionDir.resolve("0000.timeindex")); + final var producerSnapshotPath = Files.createFile(partitionDir.resolve("0000.snapshot")); + final var logSegmentData = new LogSegmentData( + segmentPath, + indexPath, + timeIndexPath, + Optional.empty(), + producerSnapshotPath, + ByteBuffer.wrap("test".getBytes(StandardCharsets.UTF_8)) + ); + + final var remoteLogSegmentMetadata = new RemoteLogSegmentMetadata( + REMOTE_SEGMENT_ID, 0, 1L, + 0, 0, 0, segmentContent.length(), Map.of(0, 0L)); + + final var remotePartitionPath = targetDir.resolve(TOPIC_ID_PARTITION.topic() + "-" + TOPIC_ID) + .resolve(String.valueOf(TOPIC_ID_PARTITION.partition())); + + final var config = Map.of( + "chunk.size", "1", + "storage.backend.class", "io.aiven.kafka.tieredstorage.storage.filesystem.FileSystemStorage", + "storage.root", targetDir.toString() + ); + rsm.configure(config); + rsm = spy(rsm); + + // when first upload fails + doThrow(IOException.class).when(rsm).uploadSegmentLog(any(), any(), anyBoolean(), any(), any()); + + assertThatThrownBy(() -> rsm.copyLogSegmentData(remoteLogSegmentMetadata, logSegmentData)) + .isInstanceOf(RemoteStorageException.class) + .hasRootCauseInstanceOf(IOException.class); + + // then no files stored in remote + assertThat(remotePartitionPath).doesNotExist(); + + // fallback to real method + doCallRealMethod().when(rsm).uploadSegmentLog(any(), any(), anyBoolean(), any(), any()); + + // when second upload fails + doThrow(IOException.class).when(rsm).uploadIndexes(any(), any(), any(), any()); + + assertThatThrownBy(() -> rsm.copyLogSegmentData(remoteLogSegmentMetadata, logSegmentData)) + .isInstanceOf(RemoteStorageException.class) + .hasRootCauseInstanceOf(IOException.class); + + // then no files stored in remote + assertThat(remotePartitionPath).doesNotExist(); + + // fallback to real method + doCallRealMethod().when(rsm).uploadIndexes(any(), any(), any(), any()); + + // when third upload fails + doThrow(IOException.class).when(rsm).uploadManifest(any(), any(), any(), anyBoolean(), any(), any()); + + assertThatThrownBy(() -> rsm.copyLogSegmentData(remoteLogSegmentMetadata, logSegmentData)) + .isInstanceOf(RemoteStorageException.class) + .hasRootCauseInstanceOf(IOException.class); + + // then no files stored in remote + assertThat(remotePartitionPath).doesNotExist(); + + // fallback to real method + doCallRealMethod().when(rsm).uploadManifest(any(), any(), any(), anyBoolean(), any(), any()); + + // when all good + rsm.copyLogSegmentData(remoteLogSegmentMetadata, logSegmentData); + assertThat(Files.list(remotePartitionPath)).hasSize(3); + } + static Stream provideNonInterruptionExceptions() { return Stream.of( arguments(null, Exception.class), diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/TransformsEndToEndTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/TransformsEndToEndTest.java index cad7355d0..21c0dff1e 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/TransformsEndToEndTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/TransformsEndToEndTest.java @@ -70,39 +70,43 @@ void compressionAndEncryption(final int chunkSize) throws IOException { private void test(final int chunkSize, final boolean compression, final boolean encryption) throws IOException { // Transform. - TransformChunkEnumeration transformEnum = new BaseTransformChunkEnumeration( - new ByteArrayInputStream(original), chunkSize); - if (compression) { - transformEnum = new CompressionChunkEnumeration(transformEnum); - } - if (encryption) { - transformEnum = new EncryptionChunkEnumeration(transformEnum, AesKeyAwareTest::encryptionCipherSupplier); - } - final var transformFinisher = chunkSize == 0 - ? new TransformFinisher(transformEnum) - : new TransformFinisher(transformEnum, ORIGINAL_SIZE); - final byte[] uploadedData; - final ChunkIndex chunkIndex; - try (final var sis = transformFinisher.toInputStream()) { - uploadedData = sis.readAllBytes(); - chunkIndex = transformFinisher.chunkIndex(); - } + try (final var inputStream = new ByteArrayInputStream(original)) { + TransformChunkEnumeration transformEnum = new BaseTransformChunkEnumeration(inputStream, chunkSize); + if (compression) { + transformEnum = new CompressionChunkEnumeration(transformEnum); + } + if (encryption) { + transformEnum = new EncryptionChunkEnumeration( + transformEnum, + AesKeyAwareTest::encryptionCipherSupplier + ); + } + final var transformFinisher = new TransformFinisher(transformEnum, ORIGINAL_SIZE); + final byte[] uploadedData; + final ChunkIndex chunkIndex; + try (final var sis = transformFinisher.toInputStream()) { + uploadedData = sis.readAllBytes(); + chunkIndex = transformFinisher.chunkIndex(); + } - // Detransform. - DetransformChunkEnumeration detransformEnum = chunkIndex == null - ? new BaseDetransformChunkEnumeration(new ByteArrayInputStream(uploadedData)) - : new BaseDetransformChunkEnumeration(new ByteArrayInputStream(uploadedData), chunkIndex.chunks()); - if (encryption) { - detransformEnum = new DecryptionChunkEnumeration( - detransformEnum, ivSize, AesKeyAwareTest::decryptionCipherSupplier); - } - if (compression) { - detransformEnum = new DecompressionChunkEnumeration(detransformEnum); - } - final var detransformFinisher = new DetransformFinisher(detransformEnum); - try (final var sis = detransformFinisher.toInputStream()) { - final byte[] downloaded = sis.readAllBytes(); - assertThat(downloaded).isEqualTo(original); + // Detransform. + try (final var uploadedStream = new ByteArrayInputStream(uploadedData)) { + DetransformChunkEnumeration detransformEnum = chunkIndex == null + ? new BaseDetransformChunkEnumeration(uploadedStream) + : new BaseDetransformChunkEnumeration(uploadedStream, chunkIndex.chunks()); + if (encryption) { + detransformEnum = new DecryptionChunkEnumeration( + detransformEnum, ivSize, AesKeyAwareTest::decryptionCipherSupplier); + } + if (compression) { + detransformEnum = new DecompressionChunkEnumeration(detransformEnum); + } + final var detransformFinisher = new DetransformFinisher(detransformEnum); + try (final var sis = detransformFinisher.toInputStream()) { + final byte[] downloaded = sis.readAllBytes(); + assertThat(downloaded).isEqualTo(original); + } + } } } }