Skip to content

Commit 05ef2ed

Browse files
committed
Allocate direct buffers for multipart upload
Allow to allocate multipart upload buffers as direct buffer rather than on the heap. We try to set a pretty large multipart upload part size on cluster to optimize throughput and reduce S3 requests. At the same time, we try to keep kafka JVM heap size contained on most kafka installation in order to leave as much memory as possible for the page cache. As a matter of example, we will use 4GB heap size on machines with 64GB available memory. The consequence of using pretty large multipart upload size on contained JVM heap size is that we can pretty easily run out of heap size if we suddenly have to upload many segments to tiered storage. The strategy we propose is to allocate multipart buffer in direct memory so that we can more easily configure direct buffer budget.
1 parent 1bd82c3 commit 05ef2ed

File tree

5 files changed

+36
-12
lines changed

5 files changed

+36
-12
lines changed

storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ protected StorageBackend storage() {
8181
"aws.access.key.id", LOCALSTACK.getAccessKey(),
8282
"aws.secret.access.key", LOCALSTACK.getSecretKey(),
8383
"s3.path.style.access.enabled", true,
84+
"s3.multipart.upload.direct.buffers", false,
8485
"s3.multipart.upload.part.size", PART_SIZE
8586
);
8687
s3Storage.configure(configs);

storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStream.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,17 @@ public class S3MultiPartOutputStream extends OutputStream {
6666
public S3MultiPartOutputStream(final String bucketName,
6767
final ObjectKey key,
6868
final int partSize,
69-
final S3Client client) {
69+
final S3Client client,
70+
final boolean directAllocation) {
7071
this.bucketName = bucketName;
7172
this.key = key;
7273
this.client = client;
7374
this.partSize = partSize;
74-
this.partBuffer = ByteBuffer.allocate(partSize);
75+
if (directAllocation) {
76+
this.partBuffer = ByteBuffer.allocateDirect(partSize);
77+
} else {
78+
this.partBuffer = ByteBuffer.allocate(partSize);
79+
}
7580
final CreateMultipartUploadRequest initialRequest = CreateMultipartUploadRequest.builder().bucket(bucketName)
7681
.key(key.value()).build();
7782
final CreateMultipartUploadResponse initiateResult = client.createMultipartUpload(initialRequest);

storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3Storage.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,15 @@ public class S3Storage implements StorageBackend {
4444
private String bucketName;
4545
private int partSize;
4646

47+
private boolean multipartDirectBuffers;
48+
4749
@Override
4850
public void configure(final Map<String, ?> configs) {
4951
final S3StorageConfig config = new S3StorageConfig(configs);
5052
this.s3Client = S3ClientBuilder.build(config);
5153
this.bucketName = config.bucketName();
5254
this.partSize = config.uploadPartSize();
55+
this.multipartDirectBuffers = config.multipartDirectBuffers();
5356
}
5457

5558
@Override
@@ -63,7 +66,7 @@ public long upload(final InputStream inputStream, final ObjectKey key) throws St
6366
}
6467

6568
S3MultiPartOutputStream s3OutputStream(final ObjectKey key) {
66-
return new S3MultiPartOutputStream(bucketName, key, partSize, s3Client);
69+
return new S3MultiPartOutputStream(bucketName, key, partSize, s3Client, multipartDirectBuffers);
6770
}
6871

6972
@Override

storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageConfig.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,15 @@ public class S3StorageConfig extends AbstractConfig {
5858
private static final String S3_MULTIPART_UPLOAD_PART_SIZE_DOC = "Size of parts in bytes to use when uploading. "
5959
+ "All parts but the last one will have this size. "
6060
+ "Valid values: between 5MiB and 2GiB";
61+
62+
public static final String S3_MULTIPART_UPLOAD_DIRECT_BUFFERS_CONFIG = "s3.multipart.upload.direct.buffers";
63+
public static final String S3_MULTIPART_UPLOAD_DIRECT_BUFFERS_DOC =
64+
"Allocate multipart upload buffers as direct buffers (off-heap)";
65+
6166
static final int S3_MULTIPART_UPLOAD_PART_SIZE_MIN = 5 * 1024 * 1024; // 5MiB
6267
static final int S3_MULTIPART_UPLOAD_PART_SIZE_MAX = Integer.MAX_VALUE;
6368
static final int S3_MULTIPART_UPLOAD_PART_SIZE_DEFAULT = S3_MULTIPART_UPLOAD_PART_SIZE_MIN;
64-
69+
6570
private static final String S3_API_CALL_TIMEOUT_CONFIG = "s3.api.call.timeout";
6671
private static final String S3_API_CALL_TIMEOUT_DOC = "AWS S3 API call timeout in milliseconds";
6772
private static final String S3_API_CALL_ATTEMPT_TIMEOUT_CONFIG = "s3.api.call.attempt.timeout";
@@ -120,6 +125,12 @@ public class S3StorageConfig extends AbstractConfig {
120125
null,
121126
ConfigDef.Importance.LOW,
122127
S3_PATH_STYLE_ENABLED_DOC)
128+
.define(
129+
S3_MULTIPART_UPLOAD_DIRECT_BUFFERS_CONFIG,
130+
ConfigDef.Type.BOOLEAN,
131+
null,
132+
ConfigDef.Importance.LOW,
133+
S3_MULTIPART_UPLOAD_DIRECT_BUFFERS_DOC)
123134
.define(
124135
S3_MULTIPART_UPLOAD_PART_SIZE_CONFIG,
125136
ConfigDef.Type.INT,
@@ -261,6 +272,10 @@ public Boolean pathStyleAccessEnabled() {
261272
return getBoolean(S3_PATH_STYLE_ENABLED_CONFIG);
262273
}
263274

275+
public Boolean multipartDirectBuffers() {
276+
return getBoolean(S3_MULTIPART_UPLOAD_DIRECT_BUFFERS_CONFIG);
277+
}
278+
264279
public int uploadPartSize() {
265280
return getInt(S3_MULTIPART_UPLOAD_PART_SIZE_CONFIG);
266281
}

storage/s3/src/test/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStreamTest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ void sendAbortForAnyExceptionWhileWriting() {
8383
when(mockedS3.uploadPart(any(UploadPartRequest.class), any(RequestBody.class)))
8484
.thenThrow(testException);
8585

86-
final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 1, mockedS3);
86+
final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 1, mockedS3, false);
8787
assertThatThrownBy(() -> out.write(new byte[] {1, 2, 3}))
8888
.isInstanceOf(IOException.class)
8989
.hasRootCause(testException);
@@ -105,7 +105,7 @@ void sendAbortForAnyExceptionWhenClosingUpload() throws Exception {
105105
when(mockedS3.uploadPart(any(UploadPartRequest.class), any(RequestBody.class)))
106106
.thenThrow(RuntimeException.class);
107107

108-
final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 10, mockedS3);
108+
final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 10, mockedS3, false);
109109

110110
final byte[] buffer = new byte[5];
111111
random.nextBytes(buffer);
@@ -132,7 +132,7 @@ void sendAbortForAnyExceptionWhenClosingComplete() throws Exception {
132132
when(mockedS3.completeMultipartUpload(any(CompleteMultipartUploadRequest.class)))
133133
.thenThrow(RuntimeException.class);
134134

135-
final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 10, mockedS3);
135+
final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 10, mockedS3, false);
136136

137137
final byte[] buffer = new byte[5];
138138
random.nextBytes(buffer);
@@ -159,7 +159,7 @@ void writesOneByte() throws Exception {
159159
when(mockedS3.completeMultipartUpload(any(CompleteMultipartUploadRequest.class)))
160160
.thenReturn(CompleteMultipartUploadResponse.builder().eTag("SOME_ETAG").build());
161161

162-
final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 100, mockedS3);
162+
final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 100, mockedS3, false);
163163
out.write(1);
164164
out.close();
165165

@@ -197,7 +197,7 @@ void writesMultipleMessages() throws Exception {
197197
.thenReturn(CompleteMultipartUploadResponse.builder().build());
198198

199199
final List<byte[]> expectedMessagesList = new ArrayList<>();
200-
final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, bufferSize, mockedS3);
200+
final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, bufferSize, mockedS3, false);
201201
for (int i = 0; i < 3; i++) {
202202
random.nextBytes(message);
203203
out.write(message, 0, message.length);
@@ -257,7 +257,7 @@ void writesTailMessages() throws Exception {
257257
final byte[] expectedFullMessage = new byte[messageSize + 10];
258258
final byte[] expectedTailMessage = new byte[10];
259259

260-
final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, messageSize + 10, mockedS3);
260+
final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, messageSize + 10, mockedS3, false);
261261
final byte[] message = new byte[messageSize];
262262
random.nextBytes(message);
263263
out.write(message);
@@ -288,7 +288,7 @@ void writesTailMessages() throws Exception {
288288

289289
@Test
290290
void sendAbortIfNoWritingHappened() throws IOException {
291-
final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 100, mockedS3);
291+
final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 100, mockedS3, false);
292292
out.close();
293293

294294
verify(mockedS3).abortMultipartUpload(abortMultipartUploadRequestCaptor.capture());
@@ -299,7 +299,7 @@ void sendAbortIfNoWritingHappened() throws IOException {
299299

300300
@Test
301301
void failWhenUploadingPartAfterStreamIsClosed() throws IOException {
302-
final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 100, mockedS3);
302+
final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 100, mockedS3, false);
303303
out.close();
304304

305305
verify(mockedS3).abortMultipartUpload(abortMultipartUploadRequestCaptor.capture());

0 commit comments

Comments
 (0)