Skip to content

Commit 7d74aab

Browse files
committed
Add compatibility for compact records to update only on flush. While allowing changelog records to initiate multipart upload.
Signed-off-by: Aindriu Lavelle <[email protected]>
1 parent f3bea17 commit 7d74aab

File tree

6 files changed

+170
-32
lines changed

6 files changed

+170
-32
lines changed

s3-sink-connector/src/integration-test/java/io/aiven/kafka/connect/AvroIntegrationTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,7 @@ private KafkaProducer<String, GenericRecord> newProducer() {
281281
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
282282
"io.confluent.kafka.serializers.KafkaAvroSerializer");
283283
producerProps.put("schema.registry.url", SCHEMA_REGISTRY.getSchemaRegistryUrl());
284+
producerProps.put("linger.ms", 1000);
284285
return new KafkaProducer<>(producerProps);
285286
}
286287

s3-sink-connector/src/integration-test/java/io/aiven/kafka/connect/AvroParquetIntegrationTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,7 @@ private KafkaProducer<String, GenericRecord> newProducer() {
357357
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
358358
"io.confluent.kafka.serializers.KafkaAvroSerializer");
359359
producerProps.put("schema.registry.url", SCHEMA_REGISTRY.getSchemaRegistryUrl());
360+
producerProps.put("linger.ms", 1000);
360361
return new KafkaProducer<>(producerProps);
361362
}
362363

s3-sink-connector/src/integration-test/java/io/aiven/kafka/connect/IntegrationTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,7 @@ private KafkaProducer<byte[], byte[]> newProducer() {
530530
"org.apache.kafka.common.serialization.ByteArraySerializer");
531531
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
532532
"org.apache.kafka.common.serialization.ByteArraySerializer");
533+
producerProps.put("linger.ms", 1000);
533534
return new KafkaProducer<>(producerProps);
534535
}
535536

s3-sink-connector/src/integration-test/java/io/aiven/kafka/connect/ParquetIntegrationTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ void tearDown() {
125125
private KafkaProducer<byte[], byte[]> newProducer() {
126126
final Map<String, Object> producerProps = new HashMap<>();
127127
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers());
128+
producerProps.put("linger.ms", 1000);
128129
return new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer());
129130
}
130131

s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java

Lines changed: 83 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import io.aiven.kafka.connect.common.grouper.RecordGrouper;
4444
import io.aiven.kafka.connect.common.grouper.RecordGrouperFactory;
4545
import io.aiven.kafka.connect.common.output.OutputWriter;
46+
import io.aiven.kafka.connect.common.templating.Template;
4647
import io.aiven.kafka.connect.common.templating.VariableTemplatePart;
4748
import io.aiven.kafka.connect.config.s3.S3ConfigFragment;
4849
import io.aiven.kafka.connect.iam.AwsCredentialProviderFactory;
@@ -70,6 +71,8 @@ public final class S3SinkTask extends SinkTask {
7071

7172
private Map<String, OutputWriter> writers;
7273

74+
private boolean isKeyRecordGrouper;
75+
7376
AwsCredentialProviderFactory credentialFactory = new AwsCredentialProviderFactory();
7477

7578
@SuppressWarnings("PMD.UnnecessaryConstructor") // required by Connect
@@ -83,6 +86,7 @@ public void start(final Map<String, String> props) {
8386
config = new S3SinkConfig(props);
8487
s3Client = createAmazonS3Client(config);
8588
writers = new HashMap<>();
89+
isKeyRecordGrouper = isOfTypeKeyRecordGrouper(config.getFilenameTemplate());
8690
try {
8791
recordGrouper = RecordGrouperFactory.newRecordGrouper(config);
8892
} catch (final Exception e) { // NOPMD AvoidCatchingGenericException
@@ -93,6 +97,20 @@ public void start(final Map<String, String> props) {
9397
}
9498
}
9599

100+
/**
101+
* This determines if the file is key based, and possible to change a single file multiple times per flush or if
102+
* it's a roll over file which at each flush is reset.
103+
*
104+
* @param fileNameTemplate
105+
* the format type to output files in supplied in the configuration
106+
* @return true if is of type RecordGrouperFactory.KEY_RECORD or RecordGrouperFactory.KEY_TOPIC_PARTITION_RECORD
107+
*/
108+
private boolean isOfTypeKeyRecordGrouper(final Template fileNameTemplate) {
109+
return RecordGrouperFactory.KEY_RECORD.equals(RecordGrouperFactory.resolveRecordGrouperType(fileNameTemplate))
110+
|| RecordGrouperFactory.KEY_TOPIC_PARTITION_RECORD
111+
.equals(RecordGrouperFactory.resolveRecordGrouperType(fileNameTemplate));
112+
}
113+
96114
private AmazonS3 createAmazonS3Client(final S3SinkConfig config) {
97115
final var awsEndpointConfig = newEndpointConfiguration(this.config);
98116
final var clientConfig = PredefinedClientConfigurations.defaultConfig()
@@ -117,34 +135,46 @@ public void put(final Collection<SinkRecord> records) {
117135
Objects.requireNonNull(records, "records cannot be null");
118136
LOGGER.info("Processing {} records", records.size());
119137
records.forEach(recordGrouper::put);
120-
121-
recordGrouper.records().forEach((filename, groupedRecords) -> writeToS3(filename, groupedRecords, records));
122-
138+
if (!isKeyRecordGrouper) {
139+
recordGrouper.records().forEach((filename, groupedRecords) -> writeToS3(filename, groupedRecords, records));
140+
}
123141
}
124142

125143
/**
126-
* Flush is used to roll over file and complete the S3 Mutli part upload.
144+
* Flush is used alongside the KeyRecordGroupers to initate and complete file writes to S3. When not using a key
145+
* record grouper, the S3 upload will be initiated by the put command and flush will be used to write the files and
146+
* roll over the files/
127147
*
128148
* @param offsets
149+
* the latest offset sent to put and that is now ready to be flushed.
129150
*/
130151
@Override
131152
public void flush(final Map<TopicPartition, OffsetAndMetadata> offsets) {
132-
// On Flush Get Active writers
133-
final Collection<OutputWriter> activeWriters = writers.values();
134-
// Clear recordGrouper so it restarts OFFSET HEADS etc and on next put new writers will be created.
135-
recordGrouper.clear();
136-
// Close
137-
activeWriters.forEach(writer -> {
153+
if (isKeyRecordGrouper) {
138154
try {
139-
// Close active writers && remove from writers Map
140-
// Calling close will write anything in the buffer before closing and complete the S3 multi part upload
141-
writer.close();
142-
// Remove once closed
143-
writers.remove(writer);
144-
} catch (IOException e) {
145-
throw new ConnectException(e);
155+
recordGrouper.records().forEach(this::flushToS3);
156+
} finally {
157+
recordGrouper.clear();
146158
}
147-
});
159+
} else {
160+
// On Flush Get Active writers
161+
final Collection<OutputWriter> activeWriters = writers.values();
162+
// Clear recordGrouper so it restarts OFFSET HEADS etc and on next put new writers will be created.
163+
recordGrouper.clear();
164+
// Close
165+
activeWriters.forEach(writer -> {
166+
try {
167+
// Close active writers && remove from writers Map
168+
// Calling close will write anything in the buffer before closing and complete the S3 multi part
169+
// upload
170+
writer.close();
171+
// Remove once closed
172+
writers.remove(writer);
173+
} catch (IOException e) {
174+
throw new ConnectException(e);
175+
}
176+
});
177+
}
148178

149179
}
150180

@@ -159,12 +189,11 @@ public void flush(final Map<TopicPartition, OffsetAndMetadata> offsets) {
159189
* @return correct OutputWriter for writing a particular record to S3
160190
*/
161191
private OutputWriter getOutputWriter(final String filename, final SinkRecord sinkRecord) {
162-
final String fileNameTemplate = getFileNameTemplate(filename, sinkRecord);
163192

164-
if (writers.get(fileNameTemplate) == null) {
193+
if (writers.get(filename) == null) {
165194
final var out = newStreamFor(filename, sinkRecord);
166195
try {
167-
writers.put(fileNameTemplate,
196+
writers.put(filename,
168197
OutputWriter.builder()
169198
.withCompressionType(config.getCompressionType())
170199
.withExternalProperties(config.originalsStrings())
@@ -175,7 +204,7 @@ private OutputWriter getOutputWriter(final String filename, final SinkRecord sin
175204
throw new ConnectException(e);
176205
}
177206
}
178-
return writers.get(fileNameTemplate);
207+
return writers.get(filename);
179208
}
180209

181210
/**
@@ -184,12 +213,9 @@ private OutputWriter getOutputWriter(final String filename, final SinkRecord sin
184213
* the name of the file in S3 to be written to
185214
* @param records
186215
* all records in this record grouping, including those already written to S3
187-
* @param recordToBeWritten
188-
* new records from put() which are to be written to S3
189216
*/
190217
private void writeToS3(final String filename, final List<SinkRecord> records,
191218
final Collection<SinkRecord> recordToBeWritten) {
192-
193219
final SinkRecord sinkRecord = records.get(0);
194220
// This writer is being left open until a flush occurs.
195221
final OutputWriter writer; // NOPMD CloseResource
@@ -198,6 +224,29 @@ private void writeToS3(final String filename, final List<SinkRecord> records,
198224
// Record Grouper returns all records for that filename, all we want is the new batch of records to be added
199225
// to the multi part upload.
200226
writer.writeRecords(records.stream().filter(recordToBeWritten::contains).collect(Collectors.toList()));
227+
228+
} catch (IOException e) {
229+
throw new ConnectException(e);
230+
}
231+
232+
}
233+
234+
/**
235+
* For Key record grouper the file is written just once to reduce the number of calls to S3 to a minimum. Each file
236+
* contains one record and is written once with the latest record when flush is called
237+
*
238+
* @param filename
239+
* the name of the file in S3 to be written to
240+
* @param records
241+
* all records in this record grouping, including those already written to S3
242+
*/
243+
private void flushToS3(final String filename, final List<SinkRecord> records) {
244+
final SinkRecord sinkRecord = records.get(0);
245+
try (var writer = getOutputWriter(filename, sinkRecord)) {
246+
// For Key based files Record Grouper returns only one record for that filename
247+
// to the multi part upload.
248+
writer.writeRecords(records);
249+
writers.remove(filename, writer);
201250
} catch (IOException e) {
202251
throw new ConnectException(e);
203252
}
@@ -206,13 +255,15 @@ private void writeToS3(final String filename, final List<SinkRecord> records,
206255

207256
@Override
208257
public void stop() {
209-
writers.forEach((k, v) -> {
210-
try {
211-
v.close();
212-
} catch (IOException e) {
213-
throw new ConnectException(e);
214-
}
215-
});
258+
if (!isKeyRecordGrouper) {
259+
writers.forEach((k, v) -> {
260+
try {
261+
v.close();
262+
} catch (IOException e) {
263+
throw new ConnectException(e);
264+
}
265+
});
266+
}
216267
s3Client.shutdown();
217268

218269
LOGGER.info("Stop S3 Sink Task");

s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/S3SinkTaskTest.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -719,6 +719,7 @@ void mutliPartUploadWriteOnlyExpectedRecordsAndFilesToS3() throws IOException {
719719
properties.put(OutputFormatArgs.FORMAT_OUTPUT_ENVELOPE_CONFIG.key(), "false");
720720
properties.put(OutputFormatArgs.FORMAT_OUTPUT_TYPE_CONFIG.key(), "json");
721721
properties.put(S3ConfigFragment.AWS_S3_PREFIX_CONFIG, "prefix-");
722+
properties.put(S3SinkConfig.FILE_NAME_TEMPLATE_CONFIG, "{{topic}}-{{partition}}-{{start_offset}}");
722723

723724
final S3SinkTask task = new S3SinkTask();
724725
task.start(properties);
@@ -786,6 +787,88 @@ void mutliPartUploadWriteOnlyExpectedRecordsAndFilesToS3() throws IOException {
786787

787788
}
788789

790+
@Test
791+
void mutliPartUploadUsingKeyPartitioning() throws IOException {
792+
final String compression = "none";
793+
properties.put(S3SinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression);
794+
properties.put(OutputFormatArgs.FORMAT_OUTPUT_FIELDS_CONFIG.key(), "value");
795+
properties.put(OutputFormatArgs.FORMAT_OUTPUT_ENVELOPE_CONFIG.key(), "false");
796+
properties.put(OutputFormatArgs.FORMAT_OUTPUT_TYPE_CONFIG.key(), "json");
797+
properties.put(S3ConfigFragment.AWS_S3_PREFIX_CONFIG, "prefix-");
798+
// Compact/key 'mode' value only updated
799+
properties.put(S3SinkConfig.FILE_NAME_TEMPLATE_CONFIG, "{{key}}-{{topic}}");
800+
801+
final S3SinkTask task = new S3SinkTask();
802+
task.start(properties);
803+
int timestamp = 1000;
804+
int offset1 = 10;
805+
int offset2 = 20;
806+
int offset3 = 30;
807+
final List<List<SinkRecord>> allRecords = new ArrayList<>();
808+
for (int i = 0; i < 3; i++) {
809+
allRecords.add(List.of(
810+
createRecordWithStructValueSchema("topic0", 0, "key0", "name0" + i, offset1++, timestamp++),
811+
createRecordWithStructValueSchema("topic0", 1, "key1", "name1" + i, offset2++, timestamp++),
812+
createRecordWithStructValueSchema("topic1", 0, "key2", "name2" + i, offset3++, timestamp++)));
813+
}
814+
final TopicPartition tp00 = new TopicPartition("topic0", 0);
815+
final TopicPartition tp01 = new TopicPartition("topic0", 1);
816+
final TopicPartition tp10 = new TopicPartition("topic1", 0);
817+
final Collection<TopicPartition> tps = List.of(tp00, tp01, tp10);
818+
task.open(tps);
819+
820+
allRecords.forEach(task::put);
821+
822+
final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
823+
offsets.put(tp00, new OffsetAndMetadata(offset1));
824+
offsets.put(tp01, new OffsetAndMetadata(offset2));
825+
offsets.put(tp10, new OffsetAndMetadata(offset3));
826+
task.flush(offsets);
827+
828+
final CompressionType compressionType = CompressionType.forName(compression);
829+
830+
List<String> expectedBlobs = Lists.newArrayList(
831+
"prefix-topic0-0-00000000000000000012" + compressionType.extension(),
832+
"prefix-topic0-1-00000000000000000022" + compressionType.extension(),
833+
"prefix-topic1-0-00000000000000000032" + compressionType.extension());
834+
835+
assertThat(expectedBlobs).allMatch(blobName -> testBucketAccessor.doesObjectExist(blobName));
836+
837+
assertThat(testBucketAccessor.readLines("prefix-topic0-0-00000000000000000012", compression))
838+
.containsExactly("[", "{\"name\":\"name02\"}", "]");
839+
assertThat(testBucketAccessor.readLines("prefix-topic0-1-00000000000000000022", compression))
840+
.containsExactly("[", "{\"name\":\"name12\"}", "]");
841+
assertThat(testBucketAccessor.readLines("prefix-topic1-0-00000000000000000032", compression))
842+
.containsExactly("[", "{\"name\":\"name22\"}", "]");
843+
// Reset and send another batch of records to S3
844+
allRecords.clear();
845+
for (int i = 0; i < 3; i++) {
846+
allRecords.add(List.of(
847+
createRecordWithStructValueSchema("topic0", 0, "key0", "name01" + i, offset1++, timestamp++),
848+
createRecordWithStructValueSchema("topic0", 1, "key1", "name11" + i, offset2++, timestamp++),
849+
createRecordWithStructValueSchema("topic1", 0, "key2", "name21" + i, offset3++, timestamp++)));
850+
}
851+
allRecords.forEach(task::put);
852+
offsets.clear();
853+
offsets.put(tp00, new OffsetAndMetadata(offset1));
854+
offsets.put(tp01, new OffsetAndMetadata(offset2));
855+
offsets.put(tp10, new OffsetAndMetadata(offset3));
856+
task.flush(offsets);
857+
expectedBlobs.clear();
858+
859+
expectedBlobs = Lists.newArrayList("prefix-topic0-0-00000000000000000015" + compressionType.extension(),
860+
"prefix-topic0-1-00000000000000000025" + compressionType.extension(),
861+
"prefix-topic1-0-00000000000000000035" + compressionType.extension());
862+
assertThat(expectedBlobs).allMatch(blobName -> testBucketAccessor.doesObjectExist(blobName));
863+
assertThat(testBucketAccessor.readLines("prefix-topic0-0-00000000000000000015", compression))
864+
.containsExactly("[", "{\"name\":\"name012\"}", "]");
865+
assertThat(testBucketAccessor.readLines("prefix-topic0-1-00000000000000000025", compression))
866+
.containsExactly("[", "{\"name\":\"name112\"}", "]");
867+
assertThat(testBucketAccessor.readLines("prefix-topic1-0-00000000000000000035", compression))
868+
.containsExactly("[", "{\"name\":\"name212\"}", "]");
869+
870+
}
871+
789872
private SinkRecord createRecordWithStringValueSchema(final String topic, final int partition, final String key,
790873
final String value, final int offset, final long timestamp) {
791874
return new SinkRecord(topic, partition, Schema.STRING_SCHEMA, key, Schema.STRING_SCHEMA, value, offset,

0 commit comments

Comments
 (0)