Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,17 @@ clean:
checkstyle:
./gradlew checkstyleMain checkstyleTest checkstyleIntegrationTest

build: build/distributions/tiered-storage-for-apache-kafka-$(VERSION).tgz storage/s3/build/distributions/s3-$(VERSION).tgz storage/gcs/build/distributions/gcs-$(VERSION).tgz storage/azure/build/distributions/azure-$(VERSION).tgz
build: build/distributions/tiered-storage-for-apache-kafka-$(VERSION).tgz storage/s3/build/distributions/s3-$(VERSION).tgz storage/oci/build/distributions/oci-$(VERSION).tgz storage/gcs/build/distributions/gcs-$(VERSION).tgz storage/azure/build/distributions/azure-$(VERSION).tgz

build/distributions/tiered-storage-for-apache-kafka-$(VERSION).tgz:
./gradlew build distTar -x test -x integrationTest

storage/s3/build/distributions/s3-$(VERSION).tgz:
./gradlew build :storage:s3:distTar -x test -x integrationTest

storage/oci/build/distributions/oci-$(VERSION).tgz:
./gradlew build :storage:oci:distTar -x test -x integrationTest

storage/gcs/build/distributions/gcs-$(VERSION).tgz:
./gradlew build :storage:gcs:distTar -x test -x integrationTest

Expand Down
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ subprojects {

awsSdkVersion = "2.34.7"

oracleSdkVersion = "3.74.1"

gcpSdkVersion = "2.58.0"

azureSdkVersion = "1.2.28"
Expand Down Expand Up @@ -396,6 +398,7 @@ def modulesToPublish = [
':storage:core',
':storage:filesystem',
':storage:s3',
':storage:oci',
':storage:gcs',
':storage:azure',
]
Expand Down
1 change: 1 addition & 0 deletions docs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies {
implementation "org.apache.kafka:kafka-clients:$kafkaVersion"
implementation project(":core")
implementation project(":storage:s3")
implementation project(":storage:oci")
implementation project(":storage:gcs")
implementation project(":storage:azure")
implementation project(":storage:filesystem")
Expand Down
41 changes: 41 additions & 0 deletions docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,47 @@ S3StorageConfig



-----------------
OciStorageConfig
-----------------
``oci.bucket.name``
oci bucket to store log segments

* Type: string
* Valid Values: non-empty string
* Importance: high

``oci.namespace.name``
oci namespace which the bucket belongs to

* Type: string
* Valid Values: non-empty string
* Importance: high

``oci.region``
OCI region where the bucket is placed

* Type: string
* Importance: high

``oci.multipart.upload.part.size``
Size of parts in bytes to use when uploading. All parts but the last one will have this size. The smaller the part size, the more calls to oci are needed to upload a file; increasing costs. The higher the part size, the more memory is needed to buffer the part. Valid values: between 5MiB and 2GiB

* Type: int
* Default: 26214400
* Valid Values: [5242880,...,2147483647]
* Importance: medium

``oci.storage.tier``
Defines which storage tier to use when uploading objects

* Type: string
* Default: UnknownEnumValue
* Valid Values: [Standard, InfrequentAccess, Archive, UnknownEnumValue]
* Importance: medium



-----------------
FilesystemStorageConfig
-----------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.aiven.kafka.tieredstorage.storage.azure.AzureBlobStorageConfig;
import io.aiven.kafka.tieredstorage.storage.filesystem.FileSystemStorageConfig;
import io.aiven.kafka.tieredstorage.storage.gcs.GcsStorageConfig;
import io.aiven.kafka.tieredstorage.storage.oci.OciStorageConfig;
import io.aiven.kafka.tieredstorage.storage.s3.S3StorageConfig;

import static io.aiven.kafka.tieredstorage.config.ChunkManagerFactoryConfig.FETCH_CHUNK_CACHE_PREFIX;
Expand Down Expand Up @@ -96,6 +97,11 @@ public static void main(final String[] args) {
out.println(s3StorageConfigDef.toEnrichedRst());
out.println();

printSubsectionTitle("OciStorageConfig");
final var ociStorageConfigDef = OciStorageConfig.configDef();
out.println(ociStorageConfigDef.toEnrichedRst());
out.println();

printSubsectionTitle("FilesystemStorageConfig");
out.println(".. Only for development/testing purposes");
final var fsStorageConfigDef = FileSystemStorageConfig.configDef();
Expand Down
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ include 'storage:filesystem'
include 'storage:azure'
include 'storage:gcs'
include 'storage:s3'
include 'storage:oci'
include 'e2e'
include 'commons'
include 'docs'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
/*
* Copyright 2021 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.storage.upload;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Enable uploads to object storage (such as S3) when the total size is unknown.
* Feed input bytes to multiple parts or as a single file for upload.
*
* <p>Requires an object storage client and starts a multipart transaction when sending file over upload part size.
* Do not reuse.
*
* <p>{@link AbstractUploadOutputStream} is not thread-safe.
*/
public abstract class AbstractUploadOutputStream<T> extends OutputStream {

private static final Logger log = LoggerFactory.getLogger(AbstractUploadOutputStream.class);

private final ByteBuffer partBuffer;
private final String bucketName;
private final String key;
private final int partSize;

private String uploadId;
private final List<T> completedParts = new ArrayList<>();

private boolean closed;
private long processedBytes;

protected AbstractUploadOutputStream(final String bucketName,
final String key,
final int partSize) {
this.bucketName = bucketName;
this.key = key;
this.partSize = partSize;
this.partBuffer = ByteBuffer.allocate(partSize);
}

@Override
public void write(final int b) throws IOException {
write(new byte[] {(byte) b}, 0, 1);
}

@Override
public void write(final byte[] b, final int off, final int len) throws IOException {
if (isClosed()) {
throw new IllegalStateException("Already closed");
}
if (b.length == 0) {
return;
}
try {
final ByteBuffer inputBuffer = ByteBuffer.wrap(b, off, len);
while (inputBuffer.hasRemaining()) {
// copy batch to part buffer
final int inputLimit = inputBuffer.limit();
final int toCopy = Math.min(partBuffer.remaining(), inputBuffer.remaining());
final int positionAfterCopying = inputBuffer.position() + toCopy;
inputBuffer.limit(positionAfterCopying);
partBuffer.put(inputBuffer.slice());

// prepare current batch for next part
inputBuffer.limit(inputLimit);
inputBuffer.position(positionAfterCopying);

if (!partBuffer.hasRemaining()) {
if (uploadId == null){
uploadId = createMultipartUploadRequest(this.bucketName, this.key);
// this is not expected (another exception should be thrown by S3) but adding for completeness
if (uploadId == null || uploadId.isEmpty()) {
throw new IOException("Failed to create multipart upload, uploadId is empty");
}
}
partBuffer.position(0);
partBuffer.limit(partSize);
flushBuffer(partBuffer.slice(), partSize, true);
}
}
} catch (final RuntimeException e) {
closed = true;
if (multiPartUploadStarted()) {
log.error("Failed to write to stream on upload {}, aborting transaction", uploadId, e);
abortUpload(this.bucketName, this.key, this.uploadId);
}
throw new IOException(e);
}
}

protected abstract String createMultipartUploadRequest(String bucketName, String key);

private boolean multiPartUploadStarted() {
return uploadId != null;
}

@Override
public void close() throws IOException {
if (!isClosed()) {
closed = true;
final int lastPosition = partBuffer.position();
if (lastPosition > 0) {
try {
partBuffer.position(0);
partBuffer.limit(lastPosition);
flushBuffer(partBuffer.slice(), lastPosition, multiPartUploadStarted());
} catch (final RuntimeException e) {
if (multiPartUploadStarted()) {
log.error("Failed to upload last part {}, aborting transaction", uploadId, e);
abortUpload(this.bucketName, this.key, this.uploadId);
} else {
log.error("Failed to upload the file {}", key, e);
}
throw new IOException(e);
}
}
if (multiPartUploadStarted()) {
completeOrAbortMultiPartUpload();
}
}
}

private void completeOrAbortMultiPartUpload() throws IOException {
if (!completedParts.isEmpty()) {
try {
completeUpload(this.completedParts, this.bucketName, this.key, this.uploadId);
log.debug("Completed multipart upload {}", uploadId);
} catch (final RuntimeException e) {
log.error("Failed to complete multipart upload {}, aborting transaction", uploadId, e);
abortUpload(this.bucketName, this.key, this.uploadId);
throw new IOException(e);
}
} else {
abortUpload(this.bucketName, this.key, this.uploadId);
}
}

/**
* Upload the {@code size} of {@code inputStream} as one whole single file to object storage.
* The caller of this method should be responsible for closing the inputStream.
*/
protected abstract void uploadAsSingleFile(final String bucketName,
final String key,
final InputStream inputStream,
final int size);

public boolean isClosed() {
return closed;
}

protected abstract void completeUpload(List<T> completedParts,
String bucketName,
String key,
String uploadId);

protected abstract void abortUpload(String bucketName, String key, String uploadId);

private void flushBuffer(final ByteBuffer buffer,
final int actualPartSize,
final boolean multiPartUpload) {
try (final InputStream in = new ByteBufferMarkableInputStream(buffer)) {
processedBytes += actualPartSize;
if (multiPartUpload){
uploadPart(in, actualPartSize);
} else {
uploadAsSingleFile(this.bucketName, this.key, in, actualPartSize);
}
} catch (final IOException e) {
throw new RuntimeException(e);
}
}

private void uploadPart(final InputStream in, final int actualPartSize) {
final int partNumber = completedParts.size() + 1;
final T completedPart = _uploadPart(this.bucketName, this.key, this.uploadId, partNumber, in, actualPartSize);
completedParts.add(completedPart);
}

protected abstract T _uploadPart(final String bucketName,
final String key,
final String uploadId,
final int partNumber,
final InputStream in,
final int actualPartSize);

public long processedBytes() {
return processedBytes;
}

public int partSize() {
return partSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.storage.s3;
package io.aiven.kafka.tieredstorage.storage.upload;

import java.io.InputStream;
import java.nio.ByteBuffer;
Expand Down
49 changes: 49 additions & 0 deletions storage/oci/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2023 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

archivesBaseName = "storage-oci"

ext {
wireMockVersion = "3.13.1"
}

dependencies {
implementation project(":storage:core")

def excludeFromOracleDeps = { ModuleDependency dep ->
dep.exclude group: "org.slf4j"
//Exclude these to avoid conflicts with Kafka, which also depends on these jars
dep.exclude group: "org.glassfish.jersey.core"
dep.exclude group: "org.glassfish.jersey.media"
dep.exclude group: "org.glassfish.jersey.inject"
dep.exclude group: "org.glassfish.jersey.connectors"
dep.exclude group: "jakarta.ws.rs"
}
implementation ("com.oracle.oci.sdk:oci-java-sdk-objectstorage:$oracleSdkVersion") {excludeFromOracleDeps(it)}
implementation ("com.oracle.oci.sdk:oci-java-sdk-common-httpclient-jersey:$oracleSdkVersion") {excludeFromOracleDeps(it)}
implementation ("org.glassfish.jersey.media:jersey-media-json-jackson:2.39.1") {excludeFromOracleDeps(it)}

implementation project(':commons')

testImplementation(testFixtures(project(":storage:core")))

testImplementation "org.testcontainers:junit-jupiter:$testcontainersVersion"
testImplementation "org.testcontainers:localstack:$testcontainersVersion"

integrationTestImplementation("org.wiremock:wiremock:$wireMockVersion") {
exclude group: "org.slf4j"
}
}
Loading
Loading