Skip to content

Commit 1623b36

Browse files
committed
Reduce S3 memory usage by clearing records already sent to S3
Signed-off-by: Aindriu Lavelle <[email protected]>
1 parent 7d74aab commit 1623b36

File tree

9 files changed

+219
-73
lines changed

9 files changed

+219
-73
lines changed

commons/src/main/java/io/aiven/kafka/connect/common/grouper/KeyAndTopicPartitionRecordGrouper.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,12 @@ public void clear() {
101101
fileBuffers.clear();
102102
}
103103

104+
@Override
105+
public void clearProcessedRecords(final String identifier, final List<SinkRecord> records) {
106+
// One entry per file, so the entire file can be removed to reduce memory overhead.
107+
fileBuffers.remove(identifier);
108+
}
109+
104110
@Override
105111
public Map<String, List<SinkRecord>> records() {
106112
return Collections.unmodifiableMap(fileBuffers);

commons/src/main/java/io/aiven/kafka/connect/common/grouper/KeyRecordGrouper.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,12 @@ public void clear() {
9090
fileBuffers.clear();
9191
}
9292

93+
@Override
94+
public void clearProcessedRecords(final String identifier, final List<SinkRecord> records) {
95+
// One record per file, so remove the entry to reduce memory
96+
fileBuffers.remove(identifier);
97+
}
98+
9399
@Override
94100
public Map<String, List<SinkRecord>> records() {
95101
return Collections.unmodifiableMap(fileBuffers);
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright 2024 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.common.grouper;
18+
19+
public class PartitionOffset {
20+
21+
private Long offset;
22+
private int partition;
23+
24+
public PartitionOffset(final int partition, final Long offset) {
25+
this.offset = offset;
26+
this.partition = partition;
27+
}
28+
29+
public int getPartition() {
30+
return partition;
31+
}
32+
33+
public void setPartition(final int partition) {
34+
this.partition = partition;
35+
}
36+
37+
public Long getOffset() {
38+
return offset;
39+
}
40+
41+
public void setOffset(final Long offset) {
42+
this.offset = offset;
43+
}
44+
}

commons/src/main/java/io/aiven/kafka/connect/common/grouper/RecordGrouper.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,14 @@ public interface RecordGrouper {
3838
*/
3939
void clear();
4040

41+
/**
42+
* Clear processed records from memory
43+
*
44+
* @param records
45+
* all records already processed to Sink
46+
*/
47+
void clearProcessedRecords(String identifier, List<SinkRecord> records);
48+
4149
/**
4250
* Get all records associated with files, grouped by the file name.
4351
*
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright 2024 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.common.grouper;
18+
19+
import java.util.ArrayList;
20+
import java.util.Collections;
21+
import java.util.List;
22+
23+
import org.apache.kafka.connect.sink.SinkRecord;
24+
25+
public class SinkRecordsBatch {
26+
27+
private int numberOfRecords;
28+
final private List<SinkRecord> sinkRecords;
29+
final private String filename;
30+
final private long recordCreationDate = System.currentTimeMillis();
31+
32+
public SinkRecordsBatch(final String filename) {
33+
this.filename = filename;
34+
sinkRecords = new ArrayList<>();
35+
numberOfRecords = 0;
36+
}
37+
38+
public SinkRecordsBatch(final String filename, final List<SinkRecord> sinkRecords) {
39+
this.filename = filename;
40+
this.sinkRecords = new ArrayList<>(sinkRecords);
41+
numberOfRecords = sinkRecords.size();
42+
}
43+
public SinkRecordsBatch(final String filename, final SinkRecord sinkRecord) {
44+
this.filename = filename;
45+
this.sinkRecords = new ArrayList<>();
46+
this.sinkRecords.add(sinkRecord);
47+
numberOfRecords = 1;
48+
}
49+
50+
public void addSinkRecord(final SinkRecord sinkRecord) {
51+
this.sinkRecords.add(sinkRecord);
52+
this.numberOfRecords++;
53+
}
54+
55+
public List<SinkRecord> getSinkRecords() {
56+
// Ensure access to the Sink Records can only be changed through the apis and not accidentally by another
57+
// process.
58+
return Collections.unmodifiableList(sinkRecords);
59+
}
60+
61+
public void removeSinkRecords(final List<SinkRecord> sinkRecords) {
62+
this.sinkRecords.removeAll(sinkRecords);
63+
}
64+
65+
public int getNumberOfRecords() {
66+
return numberOfRecords;
67+
}
68+
69+
public String getFilename() {
70+
return filename;
71+
}
72+
73+
public long getRecordCreationDate() {
74+
return recordCreationDate;
75+
}
76+
77+
}

commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouper.java

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@
1616

1717
package io.aiven.kafka.connect.common.grouper;
1818

19-
import java.util.ArrayList;
2019
import java.util.Collections;
2120
import java.util.HashMap;
2221
import java.util.List;
2322
import java.util.Map;
2423
import java.util.Objects;
2524
import java.util.function.Function;
25+
import java.util.stream.Collectors;
2626

2727
import org.apache.kafka.common.TopicPartition;
2828
import org.apache.kafka.connect.data.Schema;
@@ -38,13 +38,13 @@ public class TopicPartitionKeyRecordGrouper implements RecordGrouper {
3838

3939
private final Template filenameTemplate;
4040

41-
private final Map<TopicPartitionKey, SinkRecord> currentHeadRecords = new HashMap<>();
41+
private final Map<TopicPartitionKey, PartitionOffset> currentHeadRecords = new HashMap<>();
4242

43-
private final Map<String, List<SinkRecord>> fileBuffers = new HashMap<>();
43+
private final Map<String, SinkRecordsBatch> fileBuffers = new HashMap<>();
4444

4545
private final StableTimeFormatter timeFormatter;
4646

47-
private final Rotator<List<SinkRecord>> rotator;
47+
private final Rotator<SinkRecordsBatch> rotator;
4848

4949
TopicPartitionKeyRecordGrouper(final Template filenameTemplate, final Integer maxRecordsPerFile,
5050
final TimestampSource tsSource) {
@@ -59,7 +59,7 @@ public class TopicPartitionKeyRecordGrouper implements RecordGrouper {
5959
if (unlimited) {
6060
return false;
6161
} else {
62-
return buffer == null || buffer.size() >= maxRecordsPerFile;
62+
return buffer == null || buffer.getNumberOfRecords() >= maxRecordsPerFile;
6363
}
6464
};
6565
}
@@ -68,15 +68,16 @@ public class TopicPartitionKeyRecordGrouper implements RecordGrouper {
6868
public void put(final SinkRecord record) {
6969
Objects.requireNonNull(record, "record cannot be null");
7070
final String recordKey = resolveRecordKeyFor(record);
71-
fileBuffers.computeIfAbsent(recordKey, ignored -> new ArrayList<>()).add(record);
71+
fileBuffers.computeIfAbsent(recordKey, ignored -> new SinkRecordsBatch(recordKey)).addSinkRecord(record);
7272
}
7373

7474
protected String resolveRecordKeyFor(final SinkRecord record) {
7575
final var key = recordKey(record);
7676

7777
final TopicPartitionKey tpk = new TopicPartitionKey(new TopicPartition(record.topic(), record.kafkaPartition()),
7878
key);
79-
final SinkRecord currentHeadRecord = currentHeadRecords.computeIfAbsent(tpk, ignored -> record);
79+
final PartitionOffset currentHeadRecord = currentHeadRecords.computeIfAbsent(tpk,
80+
ignored -> new PartitionOffset(record.kafkaPartition(), record.kafkaOffset()));
8081
String objectKey = generateObjectKey(tpk, currentHeadRecord, record);
8182
if (rotator.rotate(fileBuffers.get(objectKey))) {
8283
// Create new file using this record as the head record.
@@ -97,14 +98,14 @@ private String recordKey(final SinkRecord record) {
9798
return key;
9899
}
99100

100-
public String generateObjectKey(final TopicPartitionKey tpk, final SinkRecord headRecord,
101+
public String generateObjectKey(final TopicPartitionKey tpk, final PartitionOffset headRecord,
101102
final SinkRecord currentRecord) {
102103
final Function<Parameter, String> setKafkaOffset = usePaddingParameter -> usePaddingParameter.asBoolean()
103-
? String.format("%020d", headRecord.kafkaOffset())
104-
: Long.toString(headRecord.kafkaOffset());
104+
? String.format("%020d", headRecord.getOffset())
105+
: Long.toString(headRecord.getOffset());
105106
final Function<Parameter, String> setKafkaPartition = usePaddingParameter -> usePaddingParameter.asBoolean()
106-
? String.format("%010d", headRecord.kafkaPartition())
107-
: Long.toString(headRecord.kafkaPartition());
107+
? String.format("%010d", headRecord.getPartition())
108+
: Long.toString(headRecord.getPartition());
108109

109110
return filenameTemplate.instance()
110111
.bindVariable(FilenameTemplateVariable.TOPIC.name, tpk.topicPartition::topic)
@@ -118,8 +119,8 @@ public String generateObjectKey(final TopicPartitionKey tpk, final SinkRecord he
118119
protected String generateNewRecordKey(final SinkRecord record) {
119120
final var key = recordKey(record);
120121
final var tpk = new TopicPartitionKey(new TopicPartition(record.topic(), record.kafkaPartition()), key);
121-
currentHeadRecords.put(tpk, record);
122-
return generateObjectKey(tpk, record, record);
122+
currentHeadRecords.put(tpk, new PartitionOffset(record.kafkaPartition(), record.kafkaOffset()));
123+
return generateObjectKey(tpk, new PartitionOffset(record.kafkaPartition(), record.kafkaOffset()), record);
123124
}
124125

125126
@Override
@@ -128,9 +129,20 @@ public void clear() {
128129
fileBuffers.clear();
129130
}
130131

132+
@Override
133+
public void clearProcessedRecords(final String identifier, final List<SinkRecord> records) {
134+
final SinkRecordsBatch grouperRecord = fileBuffers.getOrDefault(identifier, null);
135+
if (Objects.isNull(grouperRecord)) {
136+
return;
137+
}
138+
grouperRecord.removeSinkRecords(records);
139+
}
140+
131141
@Override
132142
public Map<String, List<SinkRecord>> records() {
133-
return Collections.unmodifiableMap(fileBuffers);
143+
return Collections.unmodifiableMap(fileBuffers.values()
144+
.stream()
145+
.collect(Collectors.toMap(SinkRecordsBatch::getFilename, SinkRecordsBatch::getSinkRecords)));
134146
}
135147

136148
public static class TopicPartitionKey {

commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionRecordGrouper.java

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@
1616

1717
package io.aiven.kafka.connect.common.grouper;
1818

19-
import java.util.ArrayList;
2019
import java.util.Collections;
2120
import java.util.HashMap;
2221
import java.util.List;
2322
import java.util.Map;
2423
import java.util.Objects;
2524
import java.util.function.Function;
25+
import java.util.stream.Collectors;
2626

2727
import org.apache.kafka.common.TopicPartition;
2828
import org.apache.kafka.connect.sink.SinkRecord;
@@ -46,14 +46,14 @@
4646
class TopicPartitionRecordGrouper implements RecordGrouper {
4747

4848
private final Template filenameTemplate;
49+
// Offsets are a Long and Partitions are an Integer
50+
private final Map<TopicPartition, PartitionOffset> currentHeadRecords = new HashMap<>();
4951

50-
private final Map<TopicPartition, SinkRecord> currentHeadRecords = new HashMap<>();
51-
52-
private final Map<String, List<SinkRecord>> fileBuffers = new HashMap<>();
52+
private final Map<String, SinkRecordsBatch> fileBuffers = new HashMap<>();
5353

5454
private final StableTimeFormatter timeFormatter;
5555

56-
private final Rotator<List<SinkRecord>> rotator;
56+
private final Rotator<SinkRecordsBatch> rotator;
5757

5858
/**
5959
* A constructor.
@@ -78,7 +78,7 @@ class TopicPartitionRecordGrouper implements RecordGrouper {
7878
if (unlimited) {
7979
return false;
8080
} else {
81-
return buffer == null || buffer.size() >= maxRecordsPerFile;
81+
return buffer == null || buffer.getNumberOfRecords() >= maxRecordsPerFile;
8282
}
8383
};
8484
}
@@ -87,28 +87,30 @@ class TopicPartitionRecordGrouper implements RecordGrouper {
8787
public void put(final SinkRecord record) {
8888
Objects.requireNonNull(record, "record cannot be null");
8989
final String recordKey = resolveRecordKeyFor(record);
90-
fileBuffers.computeIfAbsent(recordKey, ignored -> new ArrayList<>()).add(record);
90+
fileBuffers.computeIfAbsent(recordKey, ignored -> new SinkRecordsBatch(recordKey)).addSinkRecord(record);
9191
}
9292

9393
protected String resolveRecordKeyFor(final SinkRecord record) {
9494
final TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition());
95-
final SinkRecord currentHeadRecord = currentHeadRecords.computeIfAbsent(topicPartition, ignored -> record);
95+
final PartitionOffset currentHeadRecord = currentHeadRecords.computeIfAbsent(topicPartition,
96+
ignored -> new PartitionOffset(record.kafkaPartition(), record.kafkaOffset()));
9697
String recordKey = generateRecordKey(topicPartition, currentHeadRecord, record);
9798
if (rotator.rotate(fileBuffers.get(recordKey))) {
9899
// Create new file using this record as the head record.
99100
recordKey = generateNewRecordKey(record);
100101
}
102+
101103
return recordKey;
102104
}
103105

104-
private String generateRecordKey(final TopicPartition topicPartition, final SinkRecord headRecord,
106+
private String generateRecordKey(final TopicPartition topicPartition, final PartitionOffset headRecord,
105107
final SinkRecord currentRecord) {
106108
final Function<Parameter, String> setKafkaOffset = usePaddingParameter -> usePaddingParameter.asBoolean()
107-
? String.format("%020d", headRecord.kafkaOffset())
108-
: Long.toString(headRecord.kafkaOffset());
109+
? String.format("%020d", headRecord.getOffset())
110+
: Long.toString(headRecord.getOffset());
109111
final Function<Parameter, String> setKafkaPartition = usePaddingParameter -> usePaddingParameter.asBoolean()
110-
? String.format("%010d", headRecord.kafkaPartition())
111-
: Long.toString(headRecord.kafkaPartition());
112+
? String.format("%010d", headRecord.getPartition())
113+
: Long.toString(headRecord.getPartition());
112114

113115
return filenameTemplate.instance()
114116
.bindVariable(FilenameTemplateVariable.TOPIC.name, topicPartition::topic)
@@ -120,8 +122,9 @@ private String generateRecordKey(final TopicPartition topicPartition, final Sink
120122

121123
protected String generateNewRecordKey(final SinkRecord record) {
122124
final TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition());
123-
currentHeadRecords.put(topicPartition, record);
124-
return generateRecordKey(topicPartition, record, record);
125+
currentHeadRecords.put(topicPartition, new PartitionOffset(record.kafkaPartition(), record.kafkaOffset()));
126+
return generateRecordKey(topicPartition, new PartitionOffset(record.kafkaPartition(), record.kafkaOffset()),
127+
record);
125128
}
126129

127130
@Override
@@ -130,9 +133,20 @@ public void clear() {
130133
fileBuffers.clear();
131134
}
132135

136+
@Override
137+
public void clearProcessedRecords(final String identifier, final List<SinkRecord> records) {
138+
final SinkRecordsBatch grouperRecord = fileBuffers.getOrDefault(identifier, null);
139+
if (Objects.isNull(grouperRecord)) {
140+
return;
141+
}
142+
grouperRecord.removeSinkRecords(records);
143+
}
144+
133145
@Override
134146
public Map<String, List<SinkRecord>> records() {
135-
return Collections.unmodifiableMap(fileBuffers);
147+
return Collections.unmodifiableMap(fileBuffers.values()
148+
.stream()
149+
.collect(Collectors.toMap(SinkRecordsBatch::getFilename, SinkRecordsBatch::getSinkRecords)));
136150
}
137151

138152
}

0 commit comments

Comments
 (0)