Skip to content

Commit 0029d14

Browse files
committed
fix: backtracking when fallback is enabled
1 parent 40408c3 commit 0029d14

File tree

3 files changed

+68
-1
lines changed

3 files changed

+68
-1
lines changed

core/src/main/scala/akka/persistence/dynamodb/internal/QueryDao.scala

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
2828
import software.amazon.awssdk.services.dynamodb.model.AttributeValue
2929
import software.amazon.awssdk.services.dynamodb.model.QueryRequest
3030
import software.amazon.awssdk.services.dynamodb.model.QueryResponse
31+
import scala.concurrent.Promise
3132

3233
/**
3334
* INTERNAL API
@@ -42,6 +43,38 @@ import software.amazon.awssdk.services.dynamodb.model.QueryResponse
4243
private val serialization = SerializationExtension(system)
4344
private val fallbackStoreProvider = FallbackStoreProvider(system)
4445

46+
private val _backtrackingBreadcrumbSerId = Promise[Int]()
47+
def backtrackingBreadcrumbSerId(): Int = {
48+
_backtrackingBreadcrumbSerId.future.value match {
49+
case Some(v) => v.get
50+
case None =>
51+
val serializerIds = serialization.bindings.iterator.map(_._2.identifier).toSet
52+
// Use an unfolding iterator to find a serializer ID that's not bound. There shouldn't ever be more
53+
// than a few thousand serializer IDs bound (and the docs suggest real serializers should only have
54+
// positive serializer IDs ("couple of billion")), so this shouldn't have to iterate far.
55+
//
56+
// Ranges that aren't indexable by an Int (viz. have more than Int.MaxValue elements) are surprisingly
57+
// restrictive, thus the unfold
58+
Iterator
59+
.unfold(Int.MinValue) { s =>
60+
if (s != Int.MaxValue) Some(s -> (s + 1))
61+
else None
62+
}
63+
// stop fast if some other thread found this before we do
64+
.find { i => !serializerIds(i) || _backtrackingBreadcrumbSerId.isCompleted } match {
65+
case None =>
66+
// Over 4 billion serializers... really?
67+
_backtrackingBreadcrumbSerId.tryFailure(new NoSuchElementException("All serializer IDs used?"))
68+
69+
case Some(id) =>
70+
_backtrackingBreadcrumbSerId.trySuccess(id)
71+
}
72+
// first get is safe, we just ensured completion
73+
// second get will throw if we exhausted serializers, but that's a danger we're prepared to face...
74+
_backtrackingBreadcrumbSerId.future.value.get.get
75+
}
76+
}
77+
4578
private val bySliceProjectionExpression = {
4679
import JournalAttributes._
4780
s"$Pid, $SeqNr, $Timestamp, $EventSerId, $EventSerManifest, $Tags, $BreadcrumbSerId, $BreadcrumbSerManifest"
@@ -182,6 +215,13 @@ import software.amazon.awssdk.services.dynamodb.model.QueryResponse
182215
}
183216

184217
// implements BySliceQuery.Dao
218+
// NB: if backtracking and an event that was saved to the fallback store is encountered,
219+
// the payload will be None (as with any backtracking event) and the serId of the returned
220+
// item will be one not used by any bound serializer.
221+
//
222+
// Without a payload, the serializer ID is kind of meaningless (and the events-by-slice
223+
// queries in the read journal will ignore the serializer ID unless it is the filtered
224+
// payload serializer).
185225
override def itemsBySlice(
186226
entityType: String,
187227
slice: Int,
@@ -277,7 +317,8 @@ import software.amazon.awssdk.services.dynamodb.model.QueryResponse
277317
writeTimestamp = getTimestamp(item),
278318
readTimestamp = InstantFactory.now(),
279319
payload = None, // lazy loaded for backtracking
280-
serId = item.get(EventSerId).n().toInt,
320+
serId =
321+
if (item.containsKey(EventSerId)) item.get(EventSerId).n().toInt else backtrackingBreadcrumbSerId(),
281322
serManifest = "",
282323
writerUuid = "", // not need in this query
283324
tags = if (item.containsKey(Tags)) item.get(Tags).ss().asScala.toSet else Set.empty,

s3-fallback-store/src/test/resources/application-test.conf

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,7 @@ akka.persistence.dynamodb.journal.fallback-store {
2020
}
2121

2222
akka.persistence.dynamodb.snapshot.fallback-store = ${akka.persistence.dynamodb.journal.fallback-store}
23+
24+
akka.persistence.dynamodb.query.refresh-interval = 500ms
25+
akka.persistence.dynamodb.query.backtracking.window = 500ms
26+
akka.persistence.dynamodb.query.backtracking.behind-current-time = 1s

s3-fallback-store/src/test/scala/akka/persistence/s3fallback/LargeSnapshotAndEventSpec.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
1010
import akka.actor.typed.ActorSystem
1111
import akka.persistence.dynamodb.TestActors
1212
import akka.persistence.dynamodb.TestData
13+
import akka.persistence.dynamodb.internal.EnvelopeOrigin
1314
import akka.persistence.dynamodb.internal.JournalAttributes
1415
import akka.persistence.dynamodb.internal.SnapshotAttributes
1516
import akka.persistence.dynamodb.query.scaladsl.DynamoDBReadJournal
@@ -199,6 +200,27 @@ class LargeSnapshotAndEventSpec
199200
cebsEvent.getEvent() shouldBe padding
200201

201202
cebsProbe.cancel()
203+
204+
// We want to force backtracking to see the first event, so persist another
205+
//reincarnation ! PersistWithAck("ignore-me", probe.ref)
206+
//probe.expectMessage(Done)
207+
208+
val liveEventsBySlicesSubscriber = query
209+
.eventsBySlices[String](entityType, slice, slice, TimestampOffset.Zero)
210+
.filter(_.persistenceId == persistenceId.id)
211+
.runWith(sinkProbe)
212+
213+
val lebsProbe = liveEventsBySlicesSubscriber.request(2)
214+
val forwardLebsEvent = lebsProbe.expectNext()
215+
216+
forwardLebsEvent.event shouldBe padding
217+
218+
val backtrackLebsEvent = lebsProbe.expectNext()
219+
220+
backtrackLebsEvent.eventOption shouldBe empty
221+
backtrackLebsEvent.source shouldBe EnvelopeOrigin.SourceBacktracking
222+
223+
lebsProbe.cancel()
202224
}
203225
}
204226

0 commit comments

Comments
 (0)