Skip to content

Commit b6cebcc

Browse files
committed
This update initiates the multipart upload as soon as a record begins, and closes the file on flush.
Signed-off-by: Aindriu Lavelle <[email protected]>
1 parent 939796f commit b6cebcc

File tree

2 files changed

+154
-30
lines changed

2 files changed

+154
-30
lines changed

s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java

Lines changed: 72 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@
2525
import java.time.ZonedDateTime;
2626
import java.time.format.DateTimeFormatter;
2727
import java.util.Collection;
28+
import java.util.HashMap;
2829
import java.util.List;
2930
import java.util.Map;
3031
import java.util.Objects;
32+
import java.util.stream.Collectors;
3133

3234
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
3335
import org.apache.kafka.common.TopicPartition;
@@ -53,17 +55,19 @@
5355
import org.slf4j.Logger;
5456
import org.slf4j.LoggerFactory;
5557

56-
@SuppressWarnings("PMD.ExcessiveImports")
58+
@SuppressWarnings({ "PMD.ExcessiveImports", "PMD.TooManyMethods" })
5759
public final class S3SinkTask extends SinkTask {
5860

59-
private static final Logger LOGGER = LoggerFactory.getLogger(AivenKafkaConnectS3SinkConnector.class);
61+
private static final Logger LOGGER = LoggerFactory.getLogger(S3SinkTask.class);
6062

6163
private RecordGrouper recordGrouper;
6264

6365
private S3SinkConfig config;
6466

6567
private AmazonS3 s3Client;
6668

69+
private Map<String, OutputWriter> writers;
70+
6771
AwsCredentialProviderFactory credentialFactory = new AwsCredentialProviderFactory();
6872

6973
@SuppressWarnings("PMD.UnnecessaryConstructor") // required by Connect
@@ -76,6 +80,7 @@ public void start(final Map<String, String> props) {
7680
Objects.requireNonNull(props, "props hasn't been set");
7781
config = new S3SinkConfig(props);
7882
s3Client = createAmazonS3Client(config);
83+
writers = new HashMap<>();
7984
try {
8085
recordGrouper = RecordGrouperFactory.newRecordGrouper(config);
8186
} catch (final Exception e) { // NOPMD AvoidCatchingGenericException
@@ -110,39 +115,81 @@ public void put(final Collection<SinkRecord> records) {
110115
Objects.requireNonNull(records, "records cannot be null");
111116
LOGGER.info("Processing {} records", records.size());
112117
records.forEach(recordGrouper::put);
118+
119+
recordGrouper.records().forEach((filename, groupedRecords) -> writeToS3(filename, groupedRecords, records));
120+
113121
}
114122

123+
// Flush is used to roll over file and complete the S3 Mutli part upload.
115124
@Override
116125
public void flush(final Map<TopicPartition, OffsetAndMetadata> offsets) {
117-
try {
118-
recordGrouper.records().forEach(this::flushFile);
119-
} finally {
120-
recordGrouper.clear();
121-
}
126+
// On Flush Get Active writers
127+
final Collection<OutputWriter> activeWriters = writers.values();
128+
// Clear recordGrouper so it restarts OFFSET HEADS etc and on next put new writers will be created.
129+
recordGrouper.clear();
130+
// Close
131+
activeWriters.forEach(writer -> {
132+
try {
133+
// Close active writers && remove from writers Map
134+
// Calling close will write anything in the buffer before closing and complete the S3 multi part upload
135+
writer.close();
136+
// Remove once closed
137+
writers.remove(writer);
138+
} catch (IOException e) {
139+
throw new ConnectException(e);
140+
}
141+
});
142+
122143
}
123144

124-
private void flushFile(final String filename, final List<SinkRecord> records) {
125-
Objects.requireNonNull(records, "records cannot be null");
126-
if (records.isEmpty()) {
127-
return;
145+
private OutputWriter getOutputWriter(final String filename, final SinkRecord sinkRecord) {
146+
final String fileNameTemplate = getFileNameTemplate(filename, sinkRecord);
147+
148+
if (writers.get(fileNameTemplate) == null) {
149+
final var out = newStreamFor(filename, sinkRecord);
150+
try {
151+
writers.put(fileNameTemplate,
152+
OutputWriter.builder()
153+
.withCompressionType(config.getCompressionType())
154+
.withExternalProperties(config.originalsStrings())
155+
.withOutputFields(config.getOutputFields())
156+
.withEnvelopeEnabled(config.envelopeEnabled())
157+
.build(out, config.getFormatType()));
158+
} catch (IOException e) {
159+
throw new ConnectException(e);
160+
}
128161
}
162+
return writers.get(fileNameTemplate);
163+
}
164+
165+
private void writeToS3(final String filename, final List<SinkRecord> records,
166+
final Collection<SinkRecord> recordToBeWritten) {
167+
129168
final SinkRecord sinkRecord = records.get(0);
130-
try (var out = newStreamFor(filename, sinkRecord);
131-
var outputWriter = OutputWriter.builder()
132-
.withCompressionType(config.getCompressionType())
133-
.withExternalProperties(config.originalsStrings())
134-
.withOutputFields(config.getOutputFields())
135-
.withEnvelopeEnabled(config.envelopeEnabled())
136-
.build(out, config.getFormatType())) {
137-
outputWriter.writeRecords(records);
138-
} catch (final IOException e) {
169+
// This writer is being left open until a flush occurs.
170+
final OutputWriter writer; // NOPMD CloseResource
171+
try {
172+
writer = getOutputWriter(filename, sinkRecord);
173+
// Record Grouper returns all records for that filename, all we want is the new batch of records to be added
174+
// to the multi part upload.
175+
writer.writeRecords(records.stream().filter(recordToBeWritten::contains).collect(Collectors.toList()));
176+
} catch (IOException e) {
139177
throw new ConnectException(e);
140178
}
179+
141180
}
142181

143182
@Override
144183
public void stop() {
184+
writers.forEach((k, v) -> {
185+
try {
186+
v.close();
187+
} catch (IOException e) {
188+
throw new ConnectException(e);
189+
}
190+
});
145191
s3Client.shutdown();
192+
146193
LOGGER.info("Stop S3 Sink Task");
147194
}
148195

@@ -152,11 +199,15 @@ public String version() {
152199
}
153200

154201
private OutputStream newStreamFor(final String filename, final SinkRecord record) {
155-
final var fullKey = config.usesFileNameTemplate() ? filename : oldFullKey(record);
202+
final var fullKey = getFileNameTemplate(filename, record);
156203
return new S3OutputStream(config.getAwsS3BucketName(), fullKey, config.getAwsS3PartSize(), s3Client,
157204
config.getServerSideEncryptionAlgorithmName());
158205
}
159206

207+
private String getFileNameTemplate(final String filename, final SinkRecord record) {
208+
return config.usesFileNameTemplate() ? filename : oldFullKey(record);
209+
}
210+
160211
private EndpointConfiguration newEndpointConfiguration(final S3SinkConfig config) {
161212
if (Objects.isNull(config.getAwsS3EndPoint())) {
162213
return null;

s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/S3SinkTaskTest.java

Lines changed: 82 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2121
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.Mockito.mock;
2223
import static org.mockito.Mockito.never;
2324
import static org.mockito.Mockito.verify;
25+
import static org.mockito.Mockito.when;
2426

2527
import java.io.BufferedReader;
2628
import java.io.IOException;
@@ -431,9 +433,7 @@ void failedForStringValuesByDefault() {
431433

432434
);
433435

434-
task.put(records);
435-
436-
assertThatThrownBy(() -> task.flush(null)).isInstanceOf(ConnectException.class)
436+
assertThatThrownBy(() -> task.put(records)).isInstanceOf(ConnectException.class)
437437
.hasMessage("Record value schema type must be BYTES, STRING given");
438438
}
439439

@@ -501,9 +501,7 @@ void failedForStructValuesByDefault() {
501501
createRecordWithStructValueSchema("topic0", 1, "key1", "name1", 20, 1001),
502502
createRecordWithStructValueSchema("topic1", 0, "key2", "name2", 30, 1002));
503503

504-
task.put(records);
505-
506-
assertThatThrownBy(() -> task.flush(null)).isInstanceOf(ConnectException.class)
504+
assertThatThrownBy(() -> task.put(records)).isInstanceOf(ConnectException.class)
507505
.hasMessage("Record value schema type must be BYTES, STRUCT given");
508506
}
509507

@@ -689,17 +687,92 @@ void supportUnwrappedJsonEnvelopeForStructAndClassicJson() throws IOException {
689687
void requestCredentialProviderFromFactoryOnStart() {
690688
final S3SinkTask task = new S3SinkTask();
691689

692-
final AwsCredentialProviderFactory mockedFactory = Mockito.mock(AwsCredentialProviderFactory.class);
693-
final AWSCredentialsProvider provider = Mockito.mock(AWSCredentialsProvider.class);
690+
final AwsCredentialProviderFactory mockedFactory = mock(AwsCredentialProviderFactory.class);
691+
final AWSCredentialsProvider provider = mock(AWSCredentialsProvider.class);
694692

695693
task.credentialFactory = mockedFactory;
696-
Mockito.when(mockedFactory.getProvider(any(S3SinkConfig.class))).thenReturn(provider);
694+
when(mockedFactory.getProvider(any(S3SinkConfig.class))).thenReturn(provider);
697695

698696
task.start(properties);
699697

700698
verify(mockedFactory, Mockito.times(1)).getProvider(any(S3SinkConfig.class));
701699
}
702700

701+
@Test
702+
void mutliPartUploadWriteOnlyExpectedRecordsAndFilesToS3() throws IOException {
703+
final String compression = "none";
704+
properties.put(S3SinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression);
705+
properties.put(S3SinkConfig.FORMAT_OUTPUT_FIELDS_CONFIG, "value");
706+
properties.put(S3SinkConfig.FORMAT_OUTPUT_ENVELOPE_CONFIG, "false");
707+
properties.put(S3SinkConfig.FORMAT_OUTPUT_TYPE_CONFIG, "json");
708+
properties.put(S3SinkConfig.AWS_S3_PREFIX_CONFIG, "prefix-");
709+
710+
final S3SinkTask task = new S3SinkTask();
711+
task.start(properties);
712+
int timestamp = 1000;
713+
int offset1 = 10;
714+
int offset2 = 20;
715+
int offset3 = 30;
716+
final List<List<SinkRecord>> allRecords = new ArrayList<>();
717+
for (int i = 0; i < 3; i++) {
718+
allRecords.add(
719+
List.of(createRecordWithStructValueSchema("topic0", 0, "key0", "name0", offset1++, timestamp++),
720+
createRecordWithStructValueSchema("topic0", 1, "key1", "name1", offset2++, timestamp++),
721+
createRecordWithStructValueSchema("topic1", 0, "key2", "name2", offset3++, timestamp++)));
722+
}
723+
final TopicPartition tp00 = new TopicPartition("topic0", 0);
724+
final TopicPartition tp01 = new TopicPartition("topic0", 1);
725+
final TopicPartition tp10 = new TopicPartition("topic1", 0);
726+
final Collection<TopicPartition> tps = List.of(tp00, tp01, tp10);
727+
task.open(tps);
728+
729+
allRecords.forEach(task::put);
730+
731+
final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
732+
offsets.put(tp00, new OffsetAndMetadata(offset1));
733+
offsets.put(tp01, new OffsetAndMetadata(offset2));
734+
offsets.put(tp10, new OffsetAndMetadata(offset3));
735+
task.flush(offsets);
736+
737+
final CompressionType compressionType = CompressionType.forName(compression);
738+
739+
List<String> expectedBlobs = Lists.newArrayList(
740+
"prefix-topic0-0-00000000000000000010" + compressionType.extension(),
741+
"prefix-topic0-1-00000000000000000020" + compressionType.extension(),
742+
"prefix-topic1-0-00000000000000000030" + compressionType.extension());
743+
assertThat(expectedBlobs).allMatch(blobName -> testBucketAccessor.doesObjectExist(blobName));
744+
745+
assertThat(testBucketAccessor.readLines("prefix-topic0-0-00000000000000000010", compression))
746+
.containsExactly("[", "{\"name\":\"name0\"},", "{\"name\":\"name0\"},", "{\"name\":\"name0\"}", "]");
747+
assertThat(testBucketAccessor.readLines("prefix-topic0-1-00000000000000000020", compression))
748+
.containsExactly("[", "{\"name\":\"name1\"},", "{\"name\":\"name1\"},", "{\"name\":\"name1\"}", "]");
749+
assertThat(testBucketAccessor.readLines("prefix-topic1-0-00000000000000000030", compression))
750+
.containsExactly("[", "{\"name\":\"name2\"},", "{\"name\":\"name2\"},", "{\"name\":\"name2\"}", "]");
751+
// Reset and send another batch of records to S3
752+
allRecords.clear();
753+
for (int i = 0; i < 3; i++) {
754+
allRecords.add(
755+
List.of(createRecordWithStructValueSchema("topic0", 0, "key0", "name0", offset1++, timestamp++),
756+
createRecordWithStructValueSchema("topic0", 1, "key1", "name1", offset2++, timestamp++),
757+
createRecordWithStructValueSchema("topic1", 0, "key2", "name2", offset3++, timestamp++)));
758+
}
759+
allRecords.forEach(task::put);
760+
offsets.clear();
761+
offsets.put(tp00, new OffsetAndMetadata(offset1));
762+
offsets.put(tp01, new OffsetAndMetadata(offset2));
763+
offsets.put(tp10, new OffsetAndMetadata(offset3));
764+
task.flush(offsets);
765+
expectedBlobs.clear();
766+
expectedBlobs = Lists.newArrayList("prefix-topic0-0-00000000000000000010" + compressionType.extension(),
767+
"prefix-topic0-1-00000000000000000020" + compressionType.extension(),
768+
"prefix-topic1-0-00000000000000000030" + compressionType.extension(),
769+
"prefix-topic0-0-00000000000000000013" + compressionType.extension(),
770+
"prefix-topic0-1-00000000000000000023" + compressionType.extension(),
771+
"prefix-topic1-0-00000000000000000033" + compressionType.extension());
772+
assertThat(expectedBlobs).allMatch(blobName -> testBucketAccessor.doesObjectExist(blobName));
773+
774+
}
775+
703776
private SinkRecord createRecordWithStringValueSchema(final String topic, final int partition, final String key,
704777
final String value, final int offset, final long timestamp) {
705778
return new SinkRecord(topic, partition, Schema.STRING_SCHEMA, key, Schema.STRING_SCHEMA, value, offset,

0 commit comments

Comments
 (0)