Skip to content

Commit 1673c0b

Browse files
committed
Reduce S3 memory usage by clearing records already sent to S3
Signed-off-by: Aindriu Lavelle <[email protected]>
1 parent f719afd commit 1673c0b

File tree

7 files changed

+174
-38
lines changed

7 files changed

+174
-38
lines changed
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright 2024 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.common.grouper;
18+
19+
import java.util.ArrayList;
20+
import java.util.Collections;
21+
import java.util.List;
22+
23+
import org.apache.kafka.connect.sink.SinkRecord;
24+
25+
public class GrouperSinkRecord {
26+
27+
private int numberOfRecords;
28+
final private List<SinkRecord> sinkRecords;
29+
final private String filename;
30+
final private long recordCreationDate = System.currentTimeMillis();
31+
private boolean completeUpload = false;
32+
33+
public GrouperSinkRecord(final String filename) {
34+
this.filename = filename;
35+
sinkRecords = new ArrayList<>();
36+
numberOfRecords = 0;
37+
}
38+
39+
public GrouperSinkRecord(final String filename, final List<SinkRecord> sinkRecords) {
40+
this.filename = filename;
41+
this.sinkRecords = new ArrayList<>(sinkRecords);
42+
numberOfRecords = sinkRecords.size();
43+
}
44+
public GrouperSinkRecord(final String filename, final SinkRecord sinkRecord) {
45+
this.filename = filename;
46+
this.sinkRecords = new ArrayList<>();
47+
this.sinkRecords.add(sinkRecord);
48+
numberOfRecords = 1;
49+
}
50+
51+
public void addSinkRecord(final SinkRecord sinkRecord) {
52+
this.sinkRecords.add(sinkRecord);
53+
this.numberOfRecords++;
54+
}
55+
56+
public List<SinkRecord> getSinkRecords() {
57+
// Ensure access to the Sink Records can only be changed through the apis and not accidentally by another
58+
// process.
59+
return Collections.unmodifiableList(sinkRecords);
60+
}
61+
62+
public void removeSinkRecords(final List<SinkRecord> sinkRecords) {
63+
this.sinkRecords.removeAll(sinkRecords);
64+
}
65+
66+
public int getNumberOfRecords() {
67+
return numberOfRecords;
68+
}
69+
70+
public String getFilename() {
71+
return filename;
72+
}
73+
74+
public long getRecordCreationDate() {
75+
return recordCreationDate;
76+
}
77+
78+
public boolean isCompleteUpload() {
79+
return completeUpload;
80+
}
81+
82+
public void setCompleteUpload(boolean completeUpload) {
83+
this.completeUpload = completeUpload;
84+
}
85+
}

commons/src/main/java/io/aiven/kafka/connect/common/grouper/KeyAndTopicPartitionRecordGrouper.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,12 @@ public void clear() {
103103
fileBuffers.clear();
104104
}
105105

106+
@Override
107+
public void clearProcessedRecords(final String identifier, final List<SinkRecord> records) {
108+
// One entry per file, so the entire file can be removed to reduce memory overhead.
109+
fileBuffers.remove(identifier);
110+
}
111+
106112
@Override
107113
public Map<String, List<SinkRecord>> records() {
108114
return Collections.unmodifiableMap(fileBuffers);

commons/src/main/java/io/aiven/kafka/connect/common/grouper/KeyRecordGrouper.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,12 @@ public void clear() {
9090
fileBuffers.clear();
9191
}
9292

93+
@Override
94+
public void clearProcessedRecords(final String identifier, final List<SinkRecord> records) {
95+
// One record per file, so remove the entry to reduce memory
96+
fileBuffers.remove(identifier);
97+
}
98+
9399
@Override
94100
public Map<String, List<SinkRecord>> records() {
95101
return Collections.unmodifiableMap(fileBuffers);

commons/src/main/java/io/aiven/kafka/connect/common/grouper/RecordGrouper.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,18 @@ public interface RecordGrouper {
3838
*/
3939
void clear();
4040

41+
/**
42+
* Clear processed records from memory
43+
*
44+
* @param records
45+
* all records already processed to Sink
46+
*/
47+
void clearProcessedRecords(String identifier, List<SinkRecord> records);
48+
4149
/**
4250
* Get all records associated with files, grouped by the file name.
4351
*
44-
* @return map of records assotiated with files
52+
* @return map of records associated with files
4553
*/
4654
Map<String, List<SinkRecord>> records();
4755

commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouper.java

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@
1717
package io.aiven.kafka.connect.common.grouper;
1818

1919
import java.time.format.DateTimeFormatter;
20-
import java.util.ArrayList;
2120
import java.util.Collections;
2221
import java.util.HashMap;
2322
import java.util.List;
2423
import java.util.Map;
2524
import java.util.Objects;
2625
import java.util.function.Function;
26+
import java.util.stream.Collectors;
2727

2828
import org.apache.kafka.common.TopicPartition;
2929
import org.apache.kafka.connect.data.Schema;
@@ -34,6 +34,8 @@
3434
import io.aiven.kafka.connect.common.templating.Template;
3535
import io.aiven.kafka.connect.common.templating.VariableTemplatePart.Parameter;
3636

37+
import org.apache.commons.lang3.tuple.Pair;
38+
3739
public class TopicPartitionKeyRecordGrouper implements RecordGrouper {
3840

3941
private static final Map<String, DateTimeFormatter> TIMESTAMP_FORMATTERS = Map.of("yyyy",
@@ -42,13 +44,13 @@ public class TopicPartitionKeyRecordGrouper implements RecordGrouper {
4244

4345
private final Template filenameTemplate;
4446

45-
private final Map<TopicPartitionKey, SinkRecord> currentHeadRecords = new HashMap<>();
47+
private final Map<TopicPartitionKey, Pair<Long, Integer>> currentHeadRecords = new HashMap<>();
4648

47-
private final Map<String, List<SinkRecord>> fileBuffers = new HashMap<>();
49+
private final Map<String, GrouperSinkRecord> fileBuffers = new HashMap<>();
4850

4951
private final Function<SinkRecord, Function<Parameter, String>> setTimestampBasedOnRecord;
5052

51-
private final Rotator<List<SinkRecord>> rotator;
53+
private final Rotator<GrouperSinkRecord> rotator;
5254

5355
TopicPartitionKeyRecordGrouper(final Template filenameTemplate, final Integer maxRecordsPerFile,
5456
final TimestampSource tsSource) {
@@ -64,7 +66,7 @@ public class TopicPartitionKeyRecordGrouper implements RecordGrouper {
6466
if (unlimited) {
6567
return false;
6668
} else {
67-
return buffer == null || buffer.size() >= maxRecordsPerFile;
69+
return buffer == null || buffer.getNumberOfRecords() >= maxRecordsPerFile;
6870
}
6971
};
7072
}
@@ -73,15 +75,16 @@ public class TopicPartitionKeyRecordGrouper implements RecordGrouper {
7375
public void put(final SinkRecord record) {
7476
Objects.requireNonNull(record, "record cannot be null");
7577
final String recordKey = resolveRecordKeyFor(record);
76-
fileBuffers.computeIfAbsent(recordKey, ignored -> new ArrayList<>()).add(record);
78+
fileBuffers.computeIfAbsent(recordKey, ignored -> new GrouperSinkRecord(recordKey)).addSinkRecord(record);
7779
}
7880

7981
protected String resolveRecordKeyFor(final SinkRecord record) {
8082
final var key = recordKey(record);
8183

8284
final TopicPartitionKey tpk = new TopicPartitionKey(new TopicPartition(record.topic(), record.kafkaPartition()),
8385
key);
84-
final SinkRecord currentHeadRecord = currentHeadRecords.computeIfAbsent(tpk, ignored -> record);
86+
final Pair<Long, Integer> currentHeadRecord = currentHeadRecords.computeIfAbsent(tpk,
87+
ignored -> Pair.of(record.kafkaOffset(), record.kafkaPartition()));
8588
String objectKey = generateObjectKey(tpk, currentHeadRecord, record);
8689
if (rotator.rotate(fileBuffers.get(objectKey))) {
8790
// Create new file using this record as the head record.
@@ -102,14 +105,14 @@ private String recordKey(final SinkRecord record) {
102105
return key;
103106
}
104107

105-
public String generateObjectKey(final TopicPartitionKey tpk, final SinkRecord headRecord,
108+
public String generateObjectKey(final TopicPartitionKey tpk, final Pair<Long, Integer> headRecord,
106109
final SinkRecord currentRecord) {
107110
final Function<Parameter, String> setKafkaOffset = usePaddingParameter -> usePaddingParameter.asBoolean()
108-
? String.format("%020d", headRecord.kafkaOffset())
109-
: Long.toString(headRecord.kafkaOffset());
111+
? String.format("%020d", headRecord.getLeft())
112+
: Long.toString(headRecord.getLeft());
110113
final Function<Parameter, String> setKafkaPartition = usePaddingParameter -> usePaddingParameter.asBoolean()
111-
? String.format("%010d", headRecord.kafkaPartition())
112-
: Long.toString(headRecord.kafkaPartition());
114+
? String.format("%010d", headRecord.getRight())
115+
: Long.toString(headRecord.getRight());
113116

114117
return filenameTemplate.instance()
115118
.bindVariable(FilenameTemplateVariable.TOPIC.name, tpk.topicPartition::topic)
@@ -123,8 +126,8 @@ public String generateObjectKey(final TopicPartitionKey tpk, final SinkRecord he
123126
protected String generateNewRecordKey(final SinkRecord record) {
124127
final var key = recordKey(record);
125128
final var tpk = new TopicPartitionKey(new TopicPartition(record.topic(), record.kafkaPartition()), key);
126-
currentHeadRecords.put(tpk, record);
127-
return generateObjectKey(tpk, record, record);
129+
currentHeadRecords.put(tpk, Pair.of(record.kafkaOffset(), record.kafkaPartition()));
130+
return generateObjectKey(tpk, Pair.of(record.kafkaOffset(), record.kafkaPartition()), record);
128131
}
129132

130133
@Override
@@ -133,9 +136,20 @@ public void clear() {
133136
fileBuffers.clear();
134137
}
135138

139+
@Override
140+
public void clearProcessedRecords(final String identifier, final List<SinkRecord> records) {
141+
final GrouperSinkRecord grouperRecord = fileBuffers.getOrDefault(identifier, null);
142+
if (Objects.isNull(grouperRecord)) {
143+
return;
144+
}
145+
grouperRecord.removeSinkRecords(records);
146+
}
147+
136148
@Override
137149
public Map<String, List<SinkRecord>> records() {
138-
return Collections.unmodifiableMap(fileBuffers);
150+
return Collections.unmodifiableMap(fileBuffers.values()
151+
.stream()
152+
.collect(Collectors.toMap(GrouperSinkRecord::getFilename, GrouperSinkRecord::getSinkRecords)));
139153
}
140154

141155
public static class TopicPartitionKey {

commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionRecordGrouper.java

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@
1717
package io.aiven.kafka.connect.common.grouper;
1818

1919
import java.time.format.DateTimeFormatter;
20-
import java.util.ArrayList;
2120
import java.util.Collections;
2221
import java.util.HashMap;
2322
import java.util.List;
2423
import java.util.Map;
2524
import java.util.Objects;
2625
import java.util.function.Function;
26+
import java.util.stream.Collectors;
2727

2828
import org.apache.kafka.common.TopicPartition;
2929
import org.apache.kafka.connect.sink.SinkRecord;
@@ -33,6 +33,8 @@
3333
import io.aiven.kafka.connect.common.templating.Template;
3434
import io.aiven.kafka.connect.common.templating.VariableTemplatePart.Parameter;
3535

36+
import org.apache.commons.lang3.tuple.Pair;
37+
3638
/**
3739
* A {@link RecordGrouper} that groups records by topic and partition.
3840
*
@@ -50,14 +52,14 @@ class TopicPartitionRecordGrouper implements RecordGrouper {
5052
DateTimeFormatter.ofPattern("dd"), "HH", DateTimeFormatter.ofPattern("HH"));
5153

5254
private final Template filenameTemplate;
55+
// Offsets are a Long and Partitions are an Integer
56+
private final Map<TopicPartition, Pair<Long, Integer>> currentHeadRecords = new HashMap<>();
5357

54-
private final Map<TopicPartition, SinkRecord> currentHeadRecords = new HashMap<>();
55-
56-
private final Map<String, List<SinkRecord>> fileBuffers = new HashMap<>();
58+
private final Map<String, GrouperSinkRecord> fileBuffers = new HashMap<>();
5759

5860
private final Function<SinkRecord, Function<Parameter, String>> setTimestampBasedOnRecord;
5961

60-
private final Rotator<List<SinkRecord>> rotator;
62+
private final Rotator<GrouperSinkRecord> rotator;
6163

6264
/**
6365
* A constructor.
@@ -83,7 +85,7 @@ class TopicPartitionRecordGrouper implements RecordGrouper {
8385
if (unlimited) {
8486
return false;
8587
} else {
86-
return buffer == null || buffer.size() >= maxRecordsPerFile;
88+
return buffer == null || buffer.getNumberOfRecords() >= maxRecordsPerFile;
8789
}
8890
};
8991
}
@@ -92,28 +94,30 @@ class TopicPartitionRecordGrouper implements RecordGrouper {
9294
public void put(final SinkRecord record) {
9395
Objects.requireNonNull(record, "record cannot be null");
9496
final String recordKey = resolveRecordKeyFor(record);
95-
fileBuffers.computeIfAbsent(recordKey, ignored -> new ArrayList<>()).add(record);
97+
fileBuffers.computeIfAbsent(recordKey, ignored -> new GrouperSinkRecord(recordKey)).addSinkRecord(record);
9698
}
9799

98100
protected String resolveRecordKeyFor(final SinkRecord record) {
99101
final TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition());
100-
final SinkRecord currentHeadRecord = currentHeadRecords.computeIfAbsent(topicPartition, ignored -> record);
102+
final Pair<Long, Integer> currentHeadRecord = currentHeadRecords.computeIfAbsent(topicPartition,
103+
ignored -> Pair.of(record.kafkaOffset(), record.kafkaPartition()));
101104
String recordKey = generateRecordKey(topicPartition, currentHeadRecord, record);
102105
if (rotator.rotate(fileBuffers.get(recordKey))) {
103106
// Create new file using this record as the head record.
104107
recordKey = generateNewRecordKey(record);
105108
}
109+
106110
return recordKey;
107111
}
108112

109-
private String generateRecordKey(final TopicPartition topicPartition, final SinkRecord headRecord,
113+
private String generateRecordKey(final TopicPartition topicPartition, final Pair<Long, Integer> headRecord,
110114
final SinkRecord currentRecord) {
111115
final Function<Parameter, String> setKafkaOffset = usePaddingParameter -> usePaddingParameter.asBoolean()
112-
? String.format("%020d", headRecord.kafkaOffset())
113-
: Long.toString(headRecord.kafkaOffset());
116+
? String.format("%020d", headRecord.getLeft())
117+
: Long.toString(headRecord.getLeft());
114118
final Function<Parameter, String> setKafkaPartition = usePaddingParameter -> usePaddingParameter.asBoolean()
115-
? String.format("%010d", headRecord.kafkaPartition())
116-
: Long.toString(headRecord.kafkaPartition());
119+
? String.format("%010d", headRecord.getRight())
120+
: Long.toString(headRecord.getRight());
117121

118122
return filenameTemplate.instance()
119123
.bindVariable(FilenameTemplateVariable.TOPIC.name, topicPartition::topic)
@@ -125,8 +129,8 @@ private String generateRecordKey(final TopicPartition topicPartition, final Sink
125129

126130
protected String generateNewRecordKey(final SinkRecord record) {
127131
final TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition());
128-
currentHeadRecords.put(topicPartition, record);
129-
return generateRecordKey(topicPartition, record, record);
132+
currentHeadRecords.put(topicPartition, Pair.of(record.kafkaOffset(), record.kafkaPartition()));
133+
return generateRecordKey(topicPartition, Pair.of(record.kafkaOffset(), record.kafkaPartition()), record);
130134
}
131135

132136
@Override
@@ -135,9 +139,20 @@ public void clear() {
135139
fileBuffers.clear();
136140
}
137141

142+
@Override
143+
public void clearProcessedRecords(final String identifier, final List<SinkRecord> records) {
144+
final GrouperSinkRecord grouperRecord = fileBuffers.getOrDefault(identifier, null);
145+
if (Objects.isNull(grouperRecord)) {
146+
return;
147+
}
148+
grouperRecord.removeSinkRecords(records);
149+
}
150+
138151
@Override
139152
public Map<String, List<SinkRecord>> records() {
140-
return Collections.unmodifiableMap(fileBuffers);
153+
return Collections.unmodifiableMap(fileBuffers.values()
154+
.stream()
155+
.collect(Collectors.toMap(GrouperSinkRecord::getFilename, GrouperSinkRecord::getSinkRecords)));
141156
}
142157

143158
}

0 commit comments

Comments
 (0)