66 */
77package io .streamthoughts .kafka .connect .filepulse .fs .clean ;
88
9+ import static io .streamthoughts .kafka .connect .filepulse .fs .clean .AmazonS3MoveCleanupPolicy .Config .EXCLUDE_SOURCE_PREFIX_PATH_CONFIG ;
910import static io .streamthoughts .kafka .connect .filepulse .fs .clean .AmazonS3MoveCleanupPolicy .Config .FAILURES_AWS_BUCKET_NAME_CONFIG ;
11+ import static io .streamthoughts .kafka .connect .filepulse .fs .clean .AmazonS3MoveCleanupPolicy .Config .FAILURES_AWS_INCLUDE_SOURCE_PREFIX_PATH ;
1012import static io .streamthoughts .kafka .connect .filepulse .fs .clean .AmazonS3MoveCleanupPolicy .Config .FAILURES_AWS_PREFIX_PATH_CONFIG ;
1113import static io .streamthoughts .kafka .connect .filepulse .fs .clean .AmazonS3MoveCleanupPolicy .Config .SUCCESS_AWS_BUCKET_NAME_CONFIG ;
14+ import static io .streamthoughts .kafka .connect .filepulse .fs .clean .AmazonS3MoveCleanupPolicy .Config .SUCCESS_AWS_INCLUDE_SOURCE_PREFIX_PATH ;
1215import static io .streamthoughts .kafka .connect .filepulse .fs .clean .AmazonS3MoveCleanupPolicy .Config .SUCCESS_AWS_PREFIX_PATH_CONFIG ;
1316
1417import io .streamthoughts .kafka .connect .filepulse .clean .FileCleanupPolicy ;
@@ -29,7 +32,10 @@ public class AmazonS3MoveCleanupPolicy implements FileCleanupPolicy {
2932 private static final Logger LOG = LoggerFactory .getLogger (AmazonS3MoveCleanupPolicy .class );
3033
3134 private AmazonS3Storage storage ;
32-
35+
36+ private boolean includeSuccessSourcePrefixPath ;
37+ private boolean includeFailuresSourcePrefixPath ;
38+
3339 private Config config ;
3440
3541 /**
@@ -38,27 +44,38 @@ public class AmazonS3MoveCleanupPolicy implements FileCleanupPolicy {
3844 @ Override
3945 public void configure (final Map <String , ?> configs ) {
4046 this .config = new Config (configs );
47+ this .includeSuccessSourcePrefixPath = this .config .getBoolean (SUCCESS_AWS_INCLUDE_SOURCE_PREFIX_PATH );
48+ this .includeFailuresSourcePrefixPath = this .config .getBoolean (FAILURES_AWS_INCLUDE_SOURCE_PREFIX_PATH );
4149 }
4250
4351 /**
4452 * {@inheritDoc}
4553 */
4654 @ Override
4755 public boolean onSuccess (final FileObject source ) {
48- return move (source , SUCCESS_AWS_BUCKET_NAME_CONFIG , SUCCESS_AWS_PREFIX_PATH_CONFIG );
56+ return move (
57+ source ,
58+ SUCCESS_AWS_BUCKET_NAME_CONFIG ,
59+ SUCCESS_AWS_PREFIX_PATH_CONFIG ,
60+ includeSuccessSourcePrefixPath );
4961 }
5062
5163 /**
5264 * {@inheritDoc}
5365 */
5466 @ Override
5567 public boolean onFailure (final FileObject source ) {
56- return move (source , FAILURES_AWS_BUCKET_NAME_CONFIG , FAILURES_AWS_PREFIX_PATH_CONFIG );
68+ return move (
69+ source ,
70+ FAILURES_AWS_BUCKET_NAME_CONFIG ,
71+ FAILURES_AWS_PREFIX_PATH_CONFIG ,
72+ includeFailuresSourcePrefixPath );
5773 }
5874
5975 private boolean move (final FileObject source ,
6076 final String destinationS3BucketConfig ,
61- final String destinationS3PrefixConfig ) {
77+ final String destinationS3PrefixConfig ,
78+ final boolean includeSourcePrefixPath ) {
6279 checkState ();
6380 URI sourceURI = source .metadata ().uri ();
6481 if (!storage .exists (sourceURI )) {
@@ -67,18 +84,35 @@ private boolean move(final FileObject source,
6784 }
6885 S3BucketKey sourceBucketKey = S3BucketKey .fromURI (sourceURI );
6986
87+ String relativeSourcePrefix = extractPrefix (
88+ sourceBucketKey .key ().replaceAll (sourceBucketKey .objectName (), "" ));
89+ String newObjectKey = includeSourcePrefixPath ?
90+ relativeSourcePrefix + sourceBucketKey .objectName () : sourceBucketKey .objectName ();
91+
7092 var destS3BucketName = Optional
7193 .ofNullable (config .getString (destinationS3BucketConfig ))
7294 .orElse (sourceBucketKey .bucketName ());
7395
7496 var destBucketKey = new S3BucketKey (
7597 destS3BucketName ,
7698 config .getString (destinationS3PrefixConfig ),
77- sourceBucketKey . objectName ()
99+ newObjectKey
78100 );
79101 return storage .move (sourceURI , destBucketKey .toURI ());
80102 }
81103
104+ private String extractPrefix (final String p ) {
105+ String excludeSourcePrefixPath = Optional
106+ .ofNullable (config .getString (EXCLUDE_SOURCE_PREFIX_PATH_CONFIG ))
107+ .orElse ("" );
108+ String prefix = p .replaceAll (excludeSourcePrefixPath , "" );
109+ prefix = prefix .replaceAll ("^/+" , "" );
110+ // if there are no subdirectories, return an empty string
111+ if (prefix .length () == 0 ) {
112+ return "" ;
113+ }
114+ return prefix .endsWith ("/" ) ? prefix : prefix + "/" ;
115+ }
82116 /**
83117 * {@inheritDoc}
84118 */
@@ -110,6 +144,14 @@ public static class Config extends AbstractConfig {
110144 private static final String SUCCESS_AWS_PREFIX_PATH_DOC =
111145 "The prefix to be used for defining the key of an S3 object to move into the destination bucket." ;
112146
147+ public static final String SUCCESS_AWS_INCLUDE_SOURCE_PREFIX_PATH =
148+ CONFIG_PREFIX + "success.aws.include.source.prefix.path" ;
149+ private static final String SUCCESS_AWS_INCLUDE_SOURCE_PREFIX_PATH_DOC =
150+ "Indicates whether to include the source prefix path in the destination key." ;
151+ public static final String FAILURES_AWS_INCLUDE_SOURCE_PREFIX_PATH =
152+ CONFIG_PREFIX + "failure.aws.include.source.prefix.path" ;
153+ private static final String FAILURES_AWS_INCLUDE_SOURCE_PREFIX_PATH_DOC =
154+ "Indicates whether to include the source prefix path in the destination key." ;
113155 public static final String FAILURES_AWS_BUCKET_NAME_CONFIG =
114156 CONFIG_PREFIX + "failure.aws.bucket.name" ;
115157 private static final String FAILURES_AWS_BUCKET_NAME_DOC =
@@ -120,6 +162,11 @@ public static class Config extends AbstractConfig {
120162 private static final String FAILURES_AWS_PREFIX_PATH_DOC =
121163 "The prefix to be used for defining the key of S3 object to move into the destination bucket." ;
122164
165+ public static final String EXCLUDE_SOURCE_PREFIX_PATH_CONFIG =
166+ CONFIG_PREFIX + "exclude.source.prefix.path" ;
167+ private static final String EXCLUDE_SOURCE_PREFIX_PATH_DOC =
168+ "Indicates whether to exclude the source prefix path from the destination key." ;
169+
123170 /**
124171 * Creates a new {@link Config} instance.
125172 */
@@ -152,6 +199,17 @@ static ConfigDef configDef() {
152199 ConfigDef .Width .NONE ,
153200 SUCCESS_AWS_PREFIX_PATH_CONFIG
154201 )
202+ .define (
203+ SUCCESS_AWS_INCLUDE_SOURCE_PREFIX_PATH ,
204+ ConfigDef .Type .BOOLEAN ,
205+ false ,
206+ ConfigDef .Importance .LOW ,
207+ SUCCESS_AWS_INCLUDE_SOURCE_PREFIX_PATH_DOC ,
208+ CONFIG_GROUP ,
209+ groupCounter ++,
210+ ConfigDef .Width .NONE ,
211+ SUCCESS_AWS_INCLUDE_SOURCE_PREFIX_PATH
212+ )
155213 .define (
156214 FAILURES_AWS_BUCKET_NAME_CONFIG ,
157215 ConfigDef .Type .STRING ,
@@ -173,6 +231,28 @@ static ConfigDef configDef() {
173231 groupCounter ++,
174232 ConfigDef .Width .NONE ,
175233 FAILURES_AWS_PREFIX_PATH_CONFIG
234+ )
235+ .define (
236+ FAILURES_AWS_INCLUDE_SOURCE_PREFIX_PATH ,
237+ ConfigDef .Type .BOOLEAN ,
238+ false ,
239+ ConfigDef .Importance .LOW ,
240+ FAILURES_AWS_INCLUDE_SOURCE_PREFIX_PATH_DOC ,
241+ CONFIG_GROUP ,
242+ groupCounter ++,
243+ ConfigDef .Width .NONE ,
244+ FAILURES_AWS_INCLUDE_SOURCE_PREFIX_PATH
245+ )
246+ .define (
247+ EXCLUDE_SOURCE_PREFIX_PATH_CONFIG ,
248+ ConfigDef .Type .STRING ,
249+ null ,
250+ ConfigDef .Importance .LOW ,
251+ EXCLUDE_SOURCE_PREFIX_PATH_DOC ,
252+ CONFIG_GROUP ,
253+ groupCounter ++,
254+ ConfigDef .Width .NONE ,
255+ EXCLUDE_SOURCE_PREFIX_PATH_CONFIG
176256 );
177257 }
178258 }
0 commit comments