Skip to content

Commit 69a091f

Browse files
committed
refactor: segment manifest builder
Bunch of optional fields can be passed via builder
1 parent cdffc7f commit 69a091f

File tree

10 files changed

+218
-39
lines changed

10 files changed

+218
-39
lines changed

core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@
5757
import io.aiven.kafka.tieredstorage.fetch.index.MemorySegmentIndexesCache;
5858
import io.aiven.kafka.tieredstorage.fetch.index.SegmentIndexesCache;
5959
import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadata;
60-
import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadataV1;
6160
import io.aiven.kafka.tieredstorage.manifest.SegmentIndex;
6261
import io.aiven.kafka.tieredstorage.manifest.SegmentIndexesV1;
6362
import io.aiven.kafka.tieredstorage.manifest.SegmentIndexesV1Builder;
@@ -481,19 +480,13 @@ void uploadManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
481480
final DataKeyAndAAD maybeEncryptionKey,
482481
final SegmentCustomMetadataBuilder customMetadataBuilder
483482
) throws StorageBackendException, IOException {
484-
final SegmentEncryptionMetadataV1 maybeEncryptionMetadata;
483+
final var segmentManifestBuilder = SegmentManifestV1.newBuilder(chunkIndex, segmentIndexes)
484+
.withRlsm(remoteLogSegmentMetadata)
485+
.withCompressionEnabled(requiresCompression);
485486
if (maybeEncryptionKey != null) {
486-
maybeEncryptionMetadata = new SegmentEncryptionMetadataV1(maybeEncryptionKey);
487-
} else {
488-
maybeEncryptionMetadata = null;
487+
segmentManifestBuilder.withEncryptionKey(maybeEncryptionKey);
489488
}
490-
final SegmentManifest segmentManifest = new SegmentManifestV1(
491-
chunkIndex,
492-
segmentIndexes,
493-
requiresCompression,
494-
maybeEncryptionMetadata,
495-
remoteLogSegmentMetadata
496-
);
489+
final SegmentManifest segmentManifest = segmentManifestBuilder.build();
497490
final String manifest = mapper.writeValueAsString(segmentManifest);
498491
final ObjectKey manifestObjectKey =
499492
objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.MANIFEST);

core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestV1.java

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
2323

2424
import io.aiven.kafka.tieredstorage.manifest.index.ChunkIndex;
25+
import io.aiven.kafka.tieredstorage.security.DataKeyAndAAD;
2526

2627
import com.fasterxml.jackson.annotation.JsonCreator;
2728
import com.fasterxml.jackson.annotation.JsonInclude;
@@ -44,7 +45,7 @@ public SegmentManifestV1(
4445
this(chunkIndex, segmentIndexes, compression, encryption, null);
4546
}
4647

47-
public SegmentManifestV1(final ChunkIndex chunkIndex,
48+
private SegmentManifestV1(final ChunkIndex chunkIndex,
4849
final SegmentIndexesV1 segmentIndexes,
4950
final boolean compression,
5051
final SegmentEncryptionMetadataV1 encryption,
@@ -58,6 +59,13 @@ public SegmentManifestV1(final ChunkIndex chunkIndex,
5859
this.remoteLogSegmentMetadata = remoteLogSegmentMetadata;
5960
}
6061

62+
public static Builder newBuilder(
63+
final ChunkIndex chunkIndex,
64+
final SegmentIndexesV1 segmentIndexes
65+
) {
66+
return new Builder(chunkIndex, segmentIndexes);
67+
}
68+
6169
@Override
6270
@JsonProperty("chunkIndex")
6371
public ChunkIndex chunkIndex() {
@@ -129,4 +137,46 @@ public String toString() {
129137
+ ", encryption=" + encryption
130138
+ ")";
131139
}
140+
141+
public static class Builder {
142+
final ChunkIndex chunkIndex;
143+
final SegmentIndexesV1 segmentIndexes;
144+
boolean compression = false;
145+
SegmentEncryptionMetadataV1 encryptionMetadata = null;
146+
RemoteLogSegmentMetadata rlsm = null;
147+
148+
public Builder(
149+
final ChunkIndex chunkIndex,
150+
final SegmentIndexesV1 segmentIndexes
151+
) {
152+
this.chunkIndex = chunkIndex;
153+
this.segmentIndexes = segmentIndexes;
154+
}
155+
156+
public Builder withCompressionEnabled(final boolean requiresCompression) {
157+
this.compression = requiresCompression;
158+
return this;
159+
}
160+
161+
public Builder withEncryptionMetadata(final SegmentEncryptionMetadataV1 encryptionMetadata) {
162+
this.encryptionMetadata = Objects.requireNonNull(encryptionMetadata, "encryptionMetadata cannot be null");
163+
return this;
164+
}
165+
166+
public Builder withEncryptionKey(final DataKeyAndAAD dataKeyAndAAD) {
167+
this.encryptionMetadata = new SegmentEncryptionMetadataV1(
168+
Objects.requireNonNull(dataKeyAndAAD, "dataKeyAndAAD cannot be null")
169+
);
170+
return this;
171+
}
172+
173+
public Builder withRlsm(final RemoteLogSegmentMetadata rlsm) {
174+
this.rlsm = Objects.requireNonNull(rlsm, "remoteLogSegmentMetadata cannot be null");
175+
return this;
176+
}
177+
178+
public SegmentManifestV1 build() {
179+
return new SegmentManifestV1(chunkIndex, segmentIndexes, compression, encryptionMetadata, rlsm);
180+
}
181+
}
132182
}

core/src/test/java/io/aiven/kafka/tieredstorage/fetch/DefaultChunkManagerTest.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ class DefaultChunkManagerTest extends AesKeyAwareTest {
6262
void testGetChunk() throws Exception {
6363
final FixedSizeChunkIndex chunkIndex = new FixedSizeChunkIndex(10, 10, 10, 10);
6464

65-
final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, SEGMENT_INDEXES, false, null, null);
65+
final SegmentManifest manifest = SegmentManifestV1.newBuilder(chunkIndex, SEGMENT_INDEXES)
66+
.build();
6667
final ChunkManager chunkManager = new DefaultChunkManager(storage, null);
6768
when(storage.fetch(OBJECT_KEY, chunkIndex.chunks().get(0).range()))
6869
.thenReturn(new ByteArrayInputStream("0123456789".getBytes()));
@@ -88,7 +89,9 @@ void testGetChunkWithEncryption() throws Exception {
8889
new ByteArrayInputStream(encrypted));
8990

9091
final var encryption = new SegmentEncryptionMetadataV1(dataKeyAndAAD.dataKey, dataKeyAndAAD.aad);
91-
final var manifest = new SegmentManifestV1(chunkIndex, SEGMENT_INDEXES, false, encryption, null);
92+
final var manifest = SegmentManifestV1.newBuilder(chunkIndex, SEGMENT_INDEXES)
93+
.withEncryptionMetadata(encryption)
94+
.build();
9295
final ChunkManager chunkManager = new DefaultChunkManager(storage, aesEncryptionProvider);
9396

9497
assertThat(chunkManager.getChunk(OBJECT_KEY, manifest, 0)).hasBinaryContent(TEST_CHUNK_CONTENT);
@@ -108,7 +111,9 @@ void testGetChunkWithCompression() throws Exception {
108111
when(storage.fetch(OBJECT_KEY, chunkIndex.chunks().get(0).range()))
109112
.thenReturn(new ByteArrayInputStream(compressed));
110113

111-
final var manifest = new SegmentManifestV1(chunkIndex, SEGMENT_INDEXES, true, null, null);
114+
final var manifest = SegmentManifestV1.newBuilder(chunkIndex, SEGMENT_INDEXES)
115+
.withCompressionEnabled(true)
116+
.build();
112117
final ChunkManager chunkManager = new DefaultChunkManager(storage, null);
113118

114119
assertThat(chunkManager.getChunk(OBJECT_KEY, manifest, 0)).hasBinaryContent(TEST_CHUNK_CONTENT);

core/src/test/java/io/aiven/kafka/tieredstorage/fetch/FetchChunkEnumerationSourceInputStreamClosingTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@ class FetchChunkEnumerationSourceInputStreamClosingTest {
7070
.add(IndexType.LEADER_EPOCH, 1)
7171
.add(IndexType.TRANSACTION, 1)
7272
.build();
73-
static final SegmentManifest SEGMENT_MANIFEST = new SegmentManifestV1(
74-
CHUNK_INDEX, SEGMENT_INDEXES, false, null, null);
73+
static final SegmentManifest SEGMENT_MANIFEST = SegmentManifestV1.newBuilder(CHUNK_INDEX, SEGMENT_INDEXES)
74+
.build();
7575

7676
TestObjectFetcher fetcher;
7777

core/src/test/java/io/aiven/kafka/tieredstorage/fetch/FetchChunkEnumerationTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class FetchChunkEnumerationTest {
5353
.add(IndexType.LEADER_EPOCH, 1)
5454
.add(IndexType.TRANSACTION, 1)
5555
.build();
56-
final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, segmentIndexesV1, false, null, null);
56+
final SegmentManifest manifest = SegmentManifestV1.newBuilder(chunkIndex, segmentIndexesV1).build();
5757

5858
static final byte[] CHUNK_CONTENT = "0123456789".getBytes();
5959
static final ObjectKey SEGMENT_KEY = new TestObjectKey("topic/segment");
@@ -64,7 +64,7 @@ class FetchChunkEnumerationTest {
6464
@Test
6565
void failsWhenLargerStartPosition() {
6666
// Given
67-
final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, segmentIndexesV1, false, null, null);
67+
final SegmentManifest manifest = SegmentManifestV1.newBuilder(chunkIndex, segmentIndexesV1).build();
6868
// When
6969
final int from = 1000;
7070
final int to = from + 1;

core/src/test/java/io/aiven/kafka/tieredstorage/fetch/cache/ChunkCacheTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,11 @@ class ChunkCacheTest {
8686
.add(IndexType.TRANSACTION, 1)
8787
.build();
8888

89-
private static final SegmentManifest SEGMENT_MANIFEST =
90-
new SegmentManifestV1(FIXED_SIZE_CHUNK_INDEX, SEGMENT_INDEXES, false, null, null);
89+
private static final SegmentManifest SEGMENT_MANIFEST = SegmentManifestV1.newBuilder(
90+
FIXED_SIZE_CHUNK_INDEX,
91+
SEGMENT_INDEXES
92+
)
93+
.build();
9194
private static final String TEST_EXCEPTION_MESSAGE = "test_message";
9295
private static final String SEGMENT_KEY = "topic/segment";
9396
private static final ObjectKey SEGMENT_OBJECT_KEY = () -> SEGMENT_KEY;

core/src/test/java/io/aiven/kafka/tieredstorage/fetch/cache/DiskChunkCacheMetricsTest.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,16 +56,16 @@ class DiskChunkCacheMetricsTest {
5656
TimeUnit.SECONDS.convert(new MetricConfig().timeWindowMs(), TimeUnit.MILLISECONDS);
5757

5858
static final SegmentManifest SEGMENT_MANIFEST =
59-
new SegmentManifestV1(
60-
new FixedSizeChunkIndex(10, 30, 10, 10),
61-
SegmentIndexesV1.builder()
62-
.add(RemoteStorageManager.IndexType.OFFSET, 1)
63-
.add(RemoteStorageManager.IndexType.TIMESTAMP, 1)
64-
.add(RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT, 1)
65-
.add(RemoteStorageManager.IndexType.LEADER_EPOCH, 1)
66-
.add(RemoteStorageManager.IndexType.TRANSACTION, 1)
67-
.build(),
68-
false, null, null);
59+
SegmentManifestV1.newBuilder(
60+
new FixedSizeChunkIndex(10, 30, 10, 10),
61+
SegmentIndexesV1.builder()
62+
.add(RemoteStorageManager.IndexType.OFFSET, 1)
63+
.add(RemoteStorageManager.IndexType.TIMESTAMP, 1)
64+
.add(RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT, 1)
65+
.add(RemoteStorageManager.IndexType.LEADER_EPOCH, 1)
66+
.add(RemoteStorageManager.IndexType.TRANSACTION, 1)
67+
.build())
68+
.build();
6969

7070
static final ObjectKey OBJECT_KEY_PATH = () -> "topic/segment";
7171

core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProviderTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ void shouldReturnAndCache() throws StorageBackendException, IOException {
115115
when(storage.fetch(key))
116116
.thenReturn(new ByteArrayInputStream(MANIFEST.getBytes()));
117117
final var chunkIndex = new FixedSizeChunkIndex(100, 1000, 110, 110);
118-
final var expectedManifest = new SegmentManifestV1(chunkIndex, SEGMENT_INDEXES, false, null, null);
118+
final var expectedManifest = SegmentManifestV1.newBuilder(chunkIndex, SEGMENT_INDEXES).build();
119119
assertThat(provider.get(key)).isEqualTo(expectedManifest);
120120
verify(storage).fetch(key);
121121
assertThat(provider.get(key)).isEqualTo(expectedManifest);
@@ -155,7 +155,7 @@ void shouldNotPoisonCacheWithFailedFutures()
155155
.hasMessage("test");
156156

157157
final var chunkIndex = new FixedSizeChunkIndex(100, 1000, 110, 110);
158-
final var expectedManifest = new SegmentManifestV1(chunkIndex, SEGMENT_INDEXES, false, null, null);
158+
final var expectedManifest = SegmentManifestV1.newBuilder(chunkIndex, SEGMENT_INDEXES).build();
159159

160160
await().atMost(Duration.ofMillis(50))
161161
.pollInterval(Duration.ofMillis(5))
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package io.aiven.kafka.tieredstorage.manifest;
2+
3+
import io.aiven.kafka.tieredstorage.manifest.index.FixedSizeChunkIndex;
4+
import io.aiven.kafka.tieredstorage.security.DataKeyAndAAD;
5+
import org.apache.kafka.common.TopicIdPartition;
6+
import org.apache.kafka.common.Uuid;
7+
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
8+
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
9+
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
10+
import org.junit.jupiter.api.Test;
11+
12+
import javax.crypto.SecretKey;
13+
import javax.crypto.spec.SecretKeySpec;
14+
import java.util.Map;
15+
16+
import static org.assertj.core.api.Assertions.assertThat;
17+
18+
class SegmentManifestV1BuilderTest {
19+
static final FixedSizeChunkIndex CHUNK_INDEX =
20+
new FixedSizeChunkIndex(100, 1000, 110, 110);
21+
static final SecretKey DATA_KEY = new SecretKeySpec(new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, "AES");
22+
static final byte[] AAD = {10, 11, 12, 13};
23+
static final SegmentIndexesV1 SEGMENT_INDEXES = new SegmentIndexesV1Builder()
24+
.add(RemoteStorageManager.IndexType.OFFSET, 1)
25+
.add(RemoteStorageManager.IndexType.TIMESTAMP, 1)
26+
.add(RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT, 1)
27+
.add(RemoteStorageManager.IndexType.LEADER_EPOCH, 1)
28+
.add(RemoteStorageManager.IndexType.TRANSACTION, 1)
29+
.build();
30+
static final RemoteLogSegmentMetadata REMOTE_LOG_SEGMENT_METADATA = new RemoteLogSegmentMetadata(
31+
new RemoteLogSegmentId(
32+
new TopicIdPartition(Uuid.fromString("lZ6vvmajTWKDBUTV6SQAtQ"), 42, "topic1"),
33+
Uuid.fromString("adh9f8BMS4anaUnD8KWfWg")
34+
),
35+
0,
36+
1000L,
37+
1000000000L,
38+
2,
39+
2000000000L,
40+
100500,
41+
Map.of(0, 100L, 1, 200L, 2, 300L)
42+
);
43+
44+
@Test
45+
void minimal() {
46+
final var manifest = SegmentManifestV1.newBuilder(CHUNK_INDEX, SEGMENT_INDEXES).build();
47+
assertThat(manifest.chunkIndex()).isEqualTo(CHUNK_INDEX);
48+
assertThat(manifest.segmentIndexes()).isEqualTo(SEGMENT_INDEXES);
49+
assertThat(manifest.compression()).isFalse();
50+
assertThat(manifest.encryption()).isEmpty();
51+
assertThat(manifest.remoteLogSegmentMetadata()).isNull();
52+
}
53+
54+
@Test
55+
void withRlsm() {
56+
final var manifest = SegmentManifestV1.newBuilder(CHUNK_INDEX, SEGMENT_INDEXES)
57+
.withRlsm(REMOTE_LOG_SEGMENT_METADATA)
58+
.build();
59+
assertThat(manifest.chunkIndex()).isEqualTo(CHUNK_INDEX);
60+
assertThat(manifest.segmentIndexes()).isEqualTo(SEGMENT_INDEXES);
61+
assertThat(manifest.compression()).isFalse();
62+
assertThat(manifest.encryption()).isEmpty();
63+
assertThat(manifest.remoteLogSegmentMetadata()).isEqualTo(REMOTE_LOG_SEGMENT_METADATA);
64+
}
65+
66+
@Test
67+
void withCompressionEnabled() {
68+
final var manifest = SegmentManifestV1.newBuilder(CHUNK_INDEX, SEGMENT_INDEXES)
69+
.withCompressionEnabled(true)
70+
.build();
71+
assertThat(manifest.chunkIndex()).isEqualTo(CHUNK_INDEX);
72+
assertThat(manifest.segmentIndexes()).isEqualTo(SEGMENT_INDEXES);
73+
assertThat(manifest.compression()).isTrue();
74+
assertThat(manifest.encryption()).isEmpty();
75+
assertThat(manifest.remoteLogSegmentMetadata()).isNull();
76+
}
77+
78+
@Test
79+
void withCompressionDisabled() {
80+
final var manifest = SegmentManifestV1.newBuilder(CHUNK_INDEX, SEGMENT_INDEXES)
81+
.withCompressionEnabled(false)
82+
.build();
83+
assertThat(manifest.chunkIndex()).isEqualTo(CHUNK_INDEX);
84+
assertThat(manifest.segmentIndexes()).isEqualTo(SEGMENT_INDEXES);
85+
assertThat(manifest.compression()).isFalse();
86+
assertThat(manifest.encryption()).isEmpty();
87+
assertThat(manifest.remoteLogSegmentMetadata()).isNull();
88+
}
89+
90+
@Test
91+
void withEncryptionKey() {
92+
final var manifest = SegmentManifestV1.newBuilder(CHUNK_INDEX, SEGMENT_INDEXES)
93+
.withEncryptionKey(new DataKeyAndAAD(DATA_KEY, AAD))
94+
.build();
95+
assertThat(manifest.chunkIndex()).isEqualTo(CHUNK_INDEX);
96+
assertThat(manifest.segmentIndexes()).isEqualTo(SEGMENT_INDEXES);
97+
assertThat(manifest.compression()).isFalse();
98+
assertThat(manifest.encryption()).isPresent();
99+
manifest.encryption().ifPresent(segmentEncryptionMetadata -> {
100+
assertThat(segmentEncryptionMetadata.dataKey()).isEqualTo(DATA_KEY);
101+
assertThat(segmentEncryptionMetadata.aad()).isEqualTo(AAD);
102+
});
103+
assertThat(manifest.remoteLogSegmentMetadata()).isNull();
104+
}
105+
106+
@Test
107+
void full() {
108+
final var manifest = SegmentManifestV1.newBuilder(CHUNK_INDEX, SEGMENT_INDEXES)
109+
.withCompressionEnabled(true)
110+
.withEncryptionKey(new DataKeyAndAAD(DATA_KEY, AAD))
111+
.withRlsm(REMOTE_LOG_SEGMENT_METADATA)
112+
.build();
113+
assertThat(manifest.chunkIndex()).isEqualTo(CHUNK_INDEX);
114+
assertThat(manifest.segmentIndexes()).isEqualTo(SEGMENT_INDEXES);
115+
assertThat(manifest.compression()).isTrue();
116+
assertThat(manifest.encryption()).isPresent();
117+
manifest.encryption().ifPresent(segmentEncryptionMetadata -> {
118+
assertThat(segmentEncryptionMetadata.dataKey()).isEqualTo(DATA_KEY);
119+
assertThat(segmentEncryptionMetadata.aad()).isEqualTo(AAD);
120+
});
121+
assertThat(manifest.remoteLogSegmentMetadata()).isEqualTo(REMOTE_LOG_SEGMENT_METADATA);
122+
}
123+
}

core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestV1SerdeTest.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,10 @@ void init() {
147147

148148
@Test
149149
void withEncryption() throws JsonProcessingException {
150-
final SegmentManifest manifest = new SegmentManifestV1(INDEX, SEGMENT_INDEXES, false,
151-
new SegmentEncryptionMetadataV1(DATA_KEY, AAD), REMOTE_LOG_SEGMENT_METADATA);
150+
final SegmentManifest manifest = SegmentManifestV1.newBuilder(INDEX, SEGMENT_INDEXES)
151+
.withRlsm(REMOTE_LOG_SEGMENT_METADATA)
152+
.withEncryptionMetadata(new SegmentEncryptionMetadataV1(DATA_KEY, AAD))
153+
.build();
152154

153155
final String jsonStr = mapper.writeValueAsString(manifest);
154156

@@ -174,7 +176,9 @@ void withEncryption() throws JsonProcessingException {
174176

175177
@Test
176178
void withoutEncryption() throws JsonProcessingException {
177-
final var manifest = new SegmentManifestV1(INDEX, SEGMENT_INDEXES, false, null, REMOTE_LOG_SEGMENT_METADATA);
179+
final var manifest = SegmentManifestV1.newBuilder(INDEX, SEGMENT_INDEXES)
180+
.withRlsm(REMOTE_LOG_SEGMENT_METADATA)
181+
.build();
178182

179183
final String jsonStr = mapper.writeValueAsString(manifest);
180184

@@ -188,8 +192,9 @@ void withoutEncryption() throws JsonProcessingException {
188192

189193
@Test
190194
void withoutTxnIndex() throws JsonProcessingException {
191-
final var manifest = new SegmentManifestV1(INDEX, SEGMENT_INDEXES_WITHOUT_TXN_INDEX,
192-
false, null, REMOTE_LOG_SEGMENT_METADATA);
195+
final var manifest = SegmentManifestV1.newBuilder(INDEX, SEGMENT_INDEXES_WITHOUT_TXN_INDEX)
196+
.withRlsm(REMOTE_LOG_SEGMENT_METADATA)
197+
.build();
193198

194199
final String jsonStr = mapper.writeValueAsString(manifest);
195200

0 commit comments

Comments
 (0)