From 6d78a0b19718febe5383a68b7758192dbce2810a Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 18 Jul 2025 14:05:30 +0100 Subject: [PATCH 1/3] Document read-after-write semantics for `getRegister` Clarifies in its documentation that `BlobContainer#getRegister` offers only read-after-write semantics rather than full linearizability, and adds comments to its callers justifying why this is still safe. --- .../repositories/s3/S3BlobContainer.java | 4 +++- .../common/blobstore/BlobContainer.java | 13 ++++++++++++- .../blobstore/support/BlobContainerUtils.java | 6 +++--- .../analyze/ContendedRegisterAnalyzeAction.java | 2 ++ .../testkit/analyze/RepositoryAnalyzeAction.java | 1 + 5 files changed, 21 insertions(+), 5 deletions(-) diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index 910789b5e6d83..947760064eecb 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -830,7 +830,9 @@ void run(BytesReference expected, BytesReference updated, ActionListenernewForked(l -> ensureOtherUploadsComplete(uploadId, uploadIndex, currentUploads, l)) - // Step 4: Read the current register value. + // Step 4: Read the current register value. Calling getRegister is safe here because all earlier uploads are complete at + // this point, our upload is not completing yet, and later uploads can only be completing if they have already aborted ours, + // so either this read is linearizable or its result does not matter. .andThen(l -> getRegister(purpose, rawKey, l)) diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java index c5e9089f93900..2a88bf8070b60 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java @@ -305,6 +305,9 @@ default void copyBlob( /** * Atomically sets the value stored at the given key to {@code updated} if the {@code current value == expected}. * Keys not yet used start at initial value 0. Returns the current value (before it was updated). + *

+ * This operation, together with {@link #compareAndSetRegister}, must have linearizable semantics: a collection of such operations must + * act as if they operate serially, with each operation taking place at some instant in between its invocation and its completion. * * @param purpose The purpose of the operation * @param key key of the value to update @@ -324,8 +327,11 @@ void compareAndExchangeRegister( /** * Atomically sets the value stored at the given key to {@code updated} if the {@code current value == expected}. * Keys not yet used start at initial value 0. + *

+ * This operation, together with {@link #compareAndExchangeRegister}, must have linearizable semantics: a collection of such operations + * must act as if they operate serially, with each operation taking place at some instant in between its invocation and its completion. * - * @param purpose + * @param purpose The purpose of the operation * @param key key of the value to update * @param expected the expected value * @param updated the new value @@ -351,6 +357,11 @@ default void compareAndSetRegister( /** * Gets the value set by {@link #compareAndSetRegister} or {@link #compareAndExchangeRegister} for a given key. * If a key has not yet been used, the initial value is an empty {@link BytesReference}. + *

+ * This operation has read-after-write consistency with respect to writes performed using {@link #compareAndExchangeRegister} and + * {@link #compareAndSetRegister}, but does not guarantee full linearizability. In particular, a {@code getRegister} performed during + * one of these write operations may return either the old or the new value, and a caller may therefore observe the old value + * after observing the new value, as long as both such read operations take place before the write operation completes. * * @param purpose The purpose of the operation * @param key key of the value to get diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/support/BlobContainerUtils.java b/server/src/main/java/org/elasticsearch/common/blobstore/support/BlobContainerUtils.java index 5019f41a01a4f..32e6852febf8c 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/support/BlobContainerUtils.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/support/BlobContainerUtils.java @@ -33,9 +33,9 @@ public static void ensureValidRegisterContent(BytesReference bytesReference) { } /** - * Many blob stores have consistent (linearizable/atomic) read semantics and in these casees it is safe to implement {@link - * BlobContainer#getRegister} by simply reading the blob using this utility. - * + * Many blob stores have consistent read-after-write semantics and in these cases it is safe to implement + * {@link BlobContainer#getRegister} by simply reading the blob using this utility. + *

* NB it is not safe for the supplied stream to resume a partial downloads, because the resumed stream may see a different state from * the original. */ diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/analyze/ContendedRegisterAnalyzeAction.java b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/analyze/ContendedRegisterAnalyzeAction.java index 13ffea8943f3b..ea865a7204615 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/analyze/ContendedRegisterAnalyzeAction.java +++ b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/analyze/ContendedRegisterAnalyzeAction.java @@ -156,6 +156,8 @@ public void onFailure(Exception e) { }; if (request.getInitialRead() > request.getRequestCount()) { + // This is just the initial read, so we can use getRegister() despite its weaker read-after-write semantics because all + // subsequent operations of this action use compareAndExchangeRegister() and do not rely on this value being accurate. blobContainer.getRegister(OperationPurpose.REPOSITORY_ANALYSIS, registerName, initialValueListener.delegateFailure((l, r) -> { if (r.isPresent()) { l.onResponse(r); diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalyzeAction.java b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalyzeAction.java index 35e11ae40d51a..ededd78ab4ec6 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalyzeAction.java +++ b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalyzeAction.java @@ -651,6 +651,7 @@ private Runnable finalRegisterValueVerifier(String registerName, int expectedFin case 0 -> new CheckedConsumer, Exception>() { @Override public void accept(ActionListener listener) { + // all register operations have completed by this point so getRegister is safe getBlobContainer().getRegister(OperationPurpose.REPOSITORY_ANALYSIS, registerName, listener); } From 6aa6f69e99e020856d1dfd80848a530939480b2a Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 21 Jul 2025 08:11:07 +0100 Subject: [PATCH 2/3] Minor wording improvements --- .../elasticsearch/repositories/s3/S3BlobContainer.java | 8 +++++--- .../org/elasticsearch/common/blobstore/BlobContainer.java | 6 +++--- .../testkit/analyze/ContendedRegisterAnalyzeAction.java | 4 ++-- .../testkit/analyze/RepositoryAnalyzeAction.java | 2 +- 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index 947760064eecb..1c27187f98bbb 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -830,9 +830,11 @@ void run(BytesReference expected, BytesReference updated, ActionListenernewForked(l -> ensureOtherUploadsComplete(uploadId, uploadIndex, currentUploads, l)) - // Step 4: Read the current register value. Calling getRegister is safe here because all earlier uploads are complete at - // this point, our upload is not completing yet, and later uploads can only be completing if they have already aborted ours, - // so either this read is linearizable or its result does not matter. + // Step 4: Read the current register value. Note that getRegister only has read-after-write semantics but that's ok here as: + // - all earlier uploads are now complete, + // - our upload is not completing yet, and + // - later uploads can only be completing if they have already aborted ours. + // Thus either this read is linearizable or its result does not matter because our write is not going to succeed anyway. .andThen(l -> getRegister(purpose, rawKey, l)) diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java index 2a88bf8070b60..340123456435f 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java @@ -304,7 +304,7 @@ default void copyBlob( /** * Atomically sets the value stored at the given key to {@code updated} if the {@code current value == expected}. - * Keys not yet used start at initial value 0. Returns the current value (before it was updated). + * If a key has not yet been used as a register, its initial value is an empty {@link BytesReference}. *

* This operation, together with {@link #compareAndSetRegister}, must have linearizable semantics: a collection of such operations must * act as if they operate serially, with each operation taking place at some instant in between its invocation and its completion. @@ -326,7 +326,7 @@ void compareAndExchangeRegister( /** * Atomically sets the value stored at the given key to {@code updated} if the {@code current value == expected}. - * Keys not yet used start at initial value 0. + * If a key has not yet been used as a register, its initial value is an empty {@link BytesReference}. *

* This operation, together with {@link #compareAndExchangeRegister}, must have linearizable semantics: a collection of such operations * must act as if they operate serially, with each operation taking place at some instant in between its invocation and its completion. @@ -356,7 +356,7 @@ default void compareAndSetRegister( /** * Gets the value set by {@link #compareAndSetRegister} or {@link #compareAndExchangeRegister} for a given key. - * If a key has not yet been used, the initial value is an empty {@link BytesReference}. + * If a key has not yet been used as a register, its initial value is an empty {@link BytesReference}. *

* This operation has read-after-write consistency with respect to writes performed using {@link #compareAndExchangeRegister} and * {@link #compareAndSetRegister}, but does not guarantee full linearizability. In particular, a {@code getRegister} performed during diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/analyze/ContendedRegisterAnalyzeAction.java b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/analyze/ContendedRegisterAnalyzeAction.java index ea865a7204615..fc03be7853cfa 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/analyze/ContendedRegisterAnalyzeAction.java +++ b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/analyze/ContendedRegisterAnalyzeAction.java @@ -156,8 +156,8 @@ public void onFailure(Exception e) { }; if (request.getInitialRead() > request.getRequestCount()) { - // This is just the initial read, so we can use getRegister() despite its weaker read-after-write semantics because all - // subsequent operations of this action use compareAndExchangeRegister() and do not rely on this value being accurate. + // This is just the initial read, so we can use getRegister() despite its weaker read-after-write semantics: all subsequent + // operations of this action use compareAndExchangeRegister() and do not rely on this value being accurate. blobContainer.getRegister(OperationPurpose.REPOSITORY_ANALYSIS, registerName, initialValueListener.delegateFailure((l, r) -> { if (r.isPresent()) { l.onResponse(r); diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalyzeAction.java b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalyzeAction.java index ededd78ab4ec6..1623546b1c494 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalyzeAction.java +++ b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalyzeAction.java @@ -651,7 +651,7 @@ private Runnable finalRegisterValueVerifier(String registerName, int expectedFin case 0 -> new CheckedConsumer, Exception>() { @Override public void accept(ActionListener listener) { - // all register operations have completed by this point so getRegister is safe + // All register operations have completed by this point so getRegister is safe getBlobContainer().getRegister(OperationPurpose.REPOSITORY_ANALYSIS, registerName, listener); } From d55c70e5c8e5a65d35ab2e62ee7f60e1d3e01321 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 21 Jul 2025 09:25:25 +0100 Subject: [PATCH 3/3] More wording improvements --- .../org/elasticsearch/repositories/s3/S3BlobContainer.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index 1c27187f98bbb..f7b910bfb2a32 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -834,7 +834,9 @@ void run(BytesReference expected, BytesReference updated, ActionListenerandThen(l -> getRegister(purpose, rawKey, l))