From 296f3c5f95a4e5bd7934c4ac8aa4f715017ca84c Mon Sep 17 00:00:00 2001 From: Isaac Cheng Date: Wed, 16 Jul 2025 16:15:59 -0500 Subject: [PATCH] feat(filesystems): keep subfolders path for AmazonS3MovePolicy After processing files and moving to the success/failure folders, keep source prefix path for these processed files --- .../fs/clean/AmazonS3MoveCleanupPolicy.java | 90 +++++++++++++++++-- .../clean/AmazonS3MoveCleanupPolicyTest.java | 61 +++++++++++++ 2 files changed, 146 insertions(+), 5 deletions(-) diff --git a/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/clean/AmazonS3MoveCleanupPolicy.java b/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/clean/AmazonS3MoveCleanupPolicy.java index e74ecd00b..ff261d874 100644 --- a/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/clean/AmazonS3MoveCleanupPolicy.java +++ b/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/clean/AmazonS3MoveCleanupPolicy.java @@ -6,9 +6,12 @@ */ package io.streamthoughts.kafka.connect.filepulse.fs.clean; +import static io.streamthoughts.kafka.connect.filepulse.fs.clean.AmazonS3MoveCleanupPolicy.Config.EXCLUDE_SOURCE_PREFIX_PATH_CONFIG; import static io.streamthoughts.kafka.connect.filepulse.fs.clean.AmazonS3MoveCleanupPolicy.Config.FAILURES_AWS_BUCKET_NAME_CONFIG; +import static io.streamthoughts.kafka.connect.filepulse.fs.clean.AmazonS3MoveCleanupPolicy.Config.FAILURES_AWS_INCLUDE_SOURCE_PREFIX_PATH; import static io.streamthoughts.kafka.connect.filepulse.fs.clean.AmazonS3MoveCleanupPolicy.Config.FAILURES_AWS_PREFIX_PATH_CONFIG; import static io.streamthoughts.kafka.connect.filepulse.fs.clean.AmazonS3MoveCleanupPolicy.Config.SUCCESS_AWS_BUCKET_NAME_CONFIG; +import static io.streamthoughts.kafka.connect.filepulse.fs.clean.AmazonS3MoveCleanupPolicy.Config.SUCCESS_AWS_INCLUDE_SOURCE_PREFIX_PATH; import static io.streamthoughts.kafka.connect.filepulse.fs.clean.AmazonS3MoveCleanupPolicy.Config.SUCCESS_AWS_PREFIX_PATH_CONFIG; import io.streamthoughts.kafka.connect.filepulse.clean.FileCleanupPolicy; @@ -29,7 +32,10 @@ public class AmazonS3MoveCleanupPolicy implements FileCleanupPolicy { private static final Logger LOG = LoggerFactory.getLogger(AmazonS3MoveCleanupPolicy.class); private AmazonS3Storage storage; - + + private boolean includeSuccessSourcePrefixPath; + private boolean includeFailuresSourcePrefixPath; + private Config config; /** @@ -38,6 +44,8 @@ public class AmazonS3MoveCleanupPolicy implements FileCleanupPolicy { @Override public void configure(final Map configs) { this.config = new Config(configs); + this.includeSuccessSourcePrefixPath = this.config.getBoolean(SUCCESS_AWS_INCLUDE_SOURCE_PREFIX_PATH); + this.includeFailuresSourcePrefixPath = this.config.getBoolean(FAILURES_AWS_INCLUDE_SOURCE_PREFIX_PATH); } /** @@ -45,7 +53,11 @@ public void configure(final Map configs) { */ @Override public boolean onSuccess(final FileObject source) { - return move(source, SUCCESS_AWS_BUCKET_NAME_CONFIG, SUCCESS_AWS_PREFIX_PATH_CONFIG); + return move( + source, + SUCCESS_AWS_BUCKET_NAME_CONFIG, + SUCCESS_AWS_PREFIX_PATH_CONFIG, + includeSuccessSourcePrefixPath); } /** @@ -53,12 +65,17 @@ public boolean onSuccess(final FileObject source) { */ @Override public boolean onFailure(final FileObject source) { - return move(source, FAILURES_AWS_BUCKET_NAME_CONFIG, FAILURES_AWS_PREFIX_PATH_CONFIG); + return move( + source, + FAILURES_AWS_BUCKET_NAME_CONFIG, + FAILURES_AWS_PREFIX_PATH_CONFIG, + includeFailuresSourcePrefixPath); } private boolean move(final FileObject source, final String destinationS3BucketConfig, - final String destinationS3PrefixConfig) { + final String destinationS3PrefixConfig, + final boolean includeSourcePrefixPath) { checkState(); URI sourceURI = source.metadata().uri(); if (!storage.exists(sourceURI)) { @@ -67,6 +84,11 @@ private boolean move(final FileObject source, } S3BucketKey sourceBucketKey = S3BucketKey.fromURI(sourceURI); + String relativeSourcePrefix = extractPrefix( + sourceBucketKey.key().replaceAll(sourceBucketKey.objectName(), "")); + String newObjectKey = includeSourcePrefixPath ? + relativeSourcePrefix + sourceBucketKey.objectName() : sourceBucketKey.objectName(); + var destS3BucketName = Optional .ofNullable(config.getString(destinationS3BucketConfig)) .orElse(sourceBucketKey.bucketName()); @@ -74,11 +96,23 @@ private boolean move(final FileObject source, var destBucketKey = new S3BucketKey( destS3BucketName, config.getString(destinationS3PrefixConfig), - sourceBucketKey.objectName() + newObjectKey ); return storage.move(sourceURI, destBucketKey.toURI()); } + private String extractPrefix(final String p) { + String excludeSourcePrefixPath = Optional + .ofNullable(config.getString(EXCLUDE_SOURCE_PREFIX_PATH_CONFIG)) + .orElse(""); + String prefix = p.replaceAll(excludeSourcePrefixPath, ""); + prefix = prefix.replaceAll("^/+", ""); + // if there are no subdirectories, return an empty string + if (prefix.length() == 0) { + return ""; + } + return prefix.endsWith("/") ? prefix : prefix + "/"; + } /** * {@inheritDoc} */ @@ -110,6 +144,14 @@ public static class Config extends AbstractConfig { private static final String SUCCESS_AWS_PREFIX_PATH_DOC = "The prefix to be used for defining the key of an S3 object to move into the destination bucket."; + public static final String SUCCESS_AWS_INCLUDE_SOURCE_PREFIX_PATH = + CONFIG_PREFIX + "success.aws.include.source.prefix.path"; + private static final String SUCCESS_AWS_INCLUDE_SOURCE_PREFIX_PATH_DOC = + "Indicates whether to include the source prefix path in the destination key."; + public static final String FAILURES_AWS_INCLUDE_SOURCE_PREFIX_PATH = + CONFIG_PREFIX + "failure.aws.include.source.prefix.path"; + private static final String FAILURES_AWS_INCLUDE_SOURCE_PREFIX_PATH_DOC = + "Indicates whether to include the source prefix path in the destination key."; public static final String FAILURES_AWS_BUCKET_NAME_CONFIG = CONFIG_PREFIX + "failure.aws.bucket.name"; private static final String FAILURES_AWS_BUCKET_NAME_DOC = @@ -120,6 +162,11 @@ public static class Config extends AbstractConfig { private static final String FAILURES_AWS_PREFIX_PATH_DOC = "The prefix to be used for defining the key of S3 object to move into the destination bucket."; + public static final String EXCLUDE_SOURCE_PREFIX_PATH_CONFIG = + CONFIG_PREFIX + "exclude.source.prefix.path"; + private static final String EXCLUDE_SOURCE_PREFIX_PATH_DOC = + "Indicates whether to exclude the source prefix path from the destination key."; + /** * Creates a new {@link Config} instance. */ @@ -152,6 +199,17 @@ static ConfigDef configDef() { ConfigDef.Width.NONE, SUCCESS_AWS_PREFIX_PATH_CONFIG ) + .define( + SUCCESS_AWS_INCLUDE_SOURCE_PREFIX_PATH, + ConfigDef.Type.BOOLEAN, + false, + ConfigDef.Importance.LOW, + SUCCESS_AWS_INCLUDE_SOURCE_PREFIX_PATH_DOC, + CONFIG_GROUP, + groupCounter++, + ConfigDef.Width.NONE, + SUCCESS_AWS_INCLUDE_SOURCE_PREFIX_PATH + ) .define( FAILURES_AWS_BUCKET_NAME_CONFIG, ConfigDef.Type.STRING, @@ -173,6 +231,28 @@ static ConfigDef configDef() { groupCounter++, ConfigDef.Width.NONE, FAILURES_AWS_PREFIX_PATH_CONFIG + ) + .define( + FAILURES_AWS_INCLUDE_SOURCE_PREFIX_PATH, + ConfigDef.Type.BOOLEAN, + false, + ConfigDef.Importance.LOW, + FAILURES_AWS_INCLUDE_SOURCE_PREFIX_PATH_DOC, + CONFIG_GROUP, + groupCounter++, + ConfigDef.Width.NONE, + FAILURES_AWS_INCLUDE_SOURCE_PREFIX_PATH + ) + .define( + EXCLUDE_SOURCE_PREFIX_PATH_CONFIG, + ConfigDef.Type.STRING, + null, + ConfigDef.Importance.LOW, + EXCLUDE_SOURCE_PREFIX_PATH_DOC, + CONFIG_GROUP, + groupCounter++, + ConfigDef.Width.NONE, + EXCLUDE_SOURCE_PREFIX_PATH_CONFIG ); } } diff --git a/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/clean/AmazonS3MoveCleanupPolicyTest.java b/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/clean/AmazonS3MoveCleanupPolicyTest.java index c768c35a7..462a85f52 100644 --- a/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/clean/AmazonS3MoveCleanupPolicyTest.java +++ b/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/clean/AmazonS3MoveCleanupPolicyTest.java @@ -22,6 +22,8 @@ public class AmazonS3MoveCleanupPolicyTest extends BaseAmazonS3Test { public static final String S3_TEST_BUCKET = "bucket"; public static final String OBJECT_NAME = "object"; public static final String S3_OBJECT_KEY = "input/" + OBJECT_NAME; + public static final String S3_OBJECT_KEY_WITH_PREFIX = "input/prefix/" + OBJECT_NAME; + public static final String EXCLUDE_SOURCE_PREFIX_PATH = "input"; private AmazonS3Storage storage; @@ -58,7 +60,36 @@ public void should_move_object_on_success() { Assert.assertTrue(storage.exists(new S3BucketKey(S3_TEST_BUCKET, "/success/" + OBJECT_NAME).toURI())); } + @Test + public void should_move_object_on_success_with_prefix() { + // GIVEN + client.createBucket(S3_TEST_BUCKET); + client.putObject(S3_TEST_BUCKET, S3_OBJECT_KEY_WITH_PREFIX, "contents"); + var cleaner = new AmazonS3MoveCleanupPolicy(); + cleaner.setStorage(storage); + cleaner.configure(Map.of( + AmazonS3MoveCleanupPolicy.Config.SUCCESS_AWS_PREFIX_PATH_CONFIG, "/success/", + AmazonS3MoveCleanupPolicy.Config.FAILURES_AWS_PREFIX_PATH_CONFIG, "/failure/", + AmazonS3MoveCleanupPolicy.Config.SUCCESS_AWS_INCLUDE_SOURCE_PREFIX_PATH, true, + AmazonS3MoveCleanupPolicy.Config.FAILURES_AWS_INCLUDE_SOURCE_PREFIX_PATH, true, + AmazonS3MoveCleanupPolicy.Config.EXCLUDE_SOURCE_PREFIX_PATH_CONFIG, EXCLUDE_SOURCE_PREFIX_PATH + )); + + // WHEN + FileObjectMeta objectMetadata = storage.getObjectMetadata(new S3BucketKey(S3_TEST_BUCKET, S3_OBJECT_KEY_WITH_PREFIX)); + cleaner.onSuccess(new FileObject( + objectMetadata, + FileObjectOffset.empty(), + FileObjectStatus.COMPLETED + ) + ); + + // THEN + Assert.assertFalse(storage.exists(objectMetadata.uri())); + Assert.assertTrue(storage.exists(new S3BucketKey(S3_TEST_BUCKET, "/success/prefix/" + OBJECT_NAME).toURI())); + } + @Test public void should_move_object_on_failure() { // GIVEN @@ -85,4 +116,34 @@ public void should_move_object_on_failure() { Assert.assertFalse(storage.exists(objectMetadata.uri())); Assert.assertTrue(storage.exists(new S3BucketKey(S3_TEST_BUCKET, "/failure/" + OBJECT_NAME).toURI())); } + + @Test + public void should_move_object_on_failure_with_prefix() { + // GIVEN + client.createBucket(S3_TEST_BUCKET); + client.putObject(S3_TEST_BUCKET, S3_OBJECT_KEY_WITH_PREFIX, "contents"); + + var cleaner = new AmazonS3MoveCleanupPolicy(); + cleaner.setStorage(storage); + cleaner.configure(Map.of( + AmazonS3MoveCleanupPolicy.Config.SUCCESS_AWS_PREFIX_PATH_CONFIG, "/success/", + AmazonS3MoveCleanupPolicy.Config.FAILURES_AWS_PREFIX_PATH_CONFIG, "/failure/", + AmazonS3MoveCleanupPolicy.Config.SUCCESS_AWS_INCLUDE_SOURCE_PREFIX_PATH, true, + AmazonS3MoveCleanupPolicy.Config.FAILURES_AWS_INCLUDE_SOURCE_PREFIX_PATH, true, + AmazonS3MoveCleanupPolicy.Config.EXCLUDE_SOURCE_PREFIX_PATH_CONFIG, EXCLUDE_SOURCE_PREFIX_PATH + )); + + // WHEN + FileObjectMeta objectMetadata = storage.getObjectMetadata(new S3BucketKey(S3_TEST_BUCKET, S3_OBJECT_KEY_WITH_PREFIX)); + cleaner.onFailure(new FileObject( + objectMetadata, + FileObjectOffset.empty(), + FileObjectStatus.COMPLETED + ) + ); + + // THEN + Assert.assertFalse(storage.exists(objectMetadata.uri())); + Assert.assertTrue(storage.exists(new S3BucketKey(S3_TEST_BUCKET, "/failure/prefix/" + OBJECT_NAME).toURI())); + } } \ No newline at end of file