Skip to content

Commit 7c0acdf

Browse files
authored
fix: backtracking when fallback is enabled (#166)
1 parent 554d11f commit 7c0acdf

File tree

3 files changed

+36
-1
lines changed

3 files changed

+36
-1
lines changed

core/src/main/scala/akka/persistence/dynamodb/internal/QueryDao.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,13 @@ import software.amazon.awssdk.services.dynamodb.model.QueryResponse
182182
}
183183

184184
// implements BySliceQuery.Dao
185+
// NB: if backtracking and an event that was saved to the fallback store is encountered,
186+
// the payload will be None (as with any backtracking event) and the serId of the returned
187+
// item will be one not used by any bound serializer.
188+
//
189+
// Without a payload, the serializer ID is kind of meaningless (and the events-by-slice
190+
// queries in the read journal will ignore the serializer ID unless it is the filtered
191+
// payload serializer).
185192
override def itemsBySlice(
186193
entityType: String,
187194
slice: Int,
@@ -277,7 +284,9 @@ import software.amazon.awssdk.services.dynamodb.model.QueryResponse
277284
writeTimestamp = getTimestamp(item),
278285
readTimestamp = InstantFactory.now(),
279286
payload = None, // lazy loaded for backtracking
280-
serId = item.get(EventSerId).n().toInt,
287+
serId =
288+
if (item.containsKey(EventSerId)) item.get(EventSerId).n().toInt
289+
else 0, // absent or loaded later from S3
281290
serManifest = "",
282291
writerUuid = "", // not need in this query
283292
tags = if (item.containsKey(Tags)) item.get(Tags).ss().asScala.toSet else Set.empty,

s3-fallback-store/src/test/resources/application-test.conf

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,7 @@ akka.persistence.dynamodb.journal.fallback-store {
2020
}
2121

2222
akka.persistence.dynamodb.snapshot.fallback-store = ${akka.persistence.dynamodb.journal.fallback-store}
23+
24+
akka.persistence.dynamodb.query.refresh-interval = 500ms
25+
akka.persistence.dynamodb.query.backtracking.window = 500ms
26+
akka.persistence.dynamodb.query.backtracking.behind-current-time = 1s

s3-fallback-store/src/test/scala/akka/persistence/s3fallback/LargeSnapshotAndEventSpec.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
1010
import akka.actor.typed.ActorSystem
1111
import akka.persistence.dynamodb.TestActors
1212
import akka.persistence.dynamodb.TestData
13+
import akka.persistence.dynamodb.internal.EnvelopeOrigin
1314
import akka.persistence.dynamodb.internal.JournalAttributes
1415
import akka.persistence.dynamodb.internal.SnapshotAttributes
1516
import akka.persistence.dynamodb.query.scaladsl.DynamoDBReadJournal
@@ -199,6 +200,27 @@ class LargeSnapshotAndEventSpec
199200
cebsEvent.getEvent() shouldBe padding
200201

201202
cebsProbe.cancel()
203+
204+
// We want to force backtracking to see the first event, so persist another
205+
//reincarnation ! PersistWithAck("ignore-me", probe.ref)
206+
//probe.expectMessage(Done)
207+
208+
val liveEventsBySlicesSubscriber = query
209+
.eventsBySlices[String](entityType, slice, slice, TimestampOffset.Zero)
210+
.filter(_.persistenceId == persistenceId.id)
211+
.runWith(sinkProbe)
212+
213+
val lebsProbe = liveEventsBySlicesSubscriber.request(2)
214+
val forwardLebsEvent = lebsProbe.expectNext()
215+
216+
forwardLebsEvent.event shouldBe padding
217+
218+
val backtrackLebsEvent = lebsProbe.expectNext()
219+
220+
backtrackLebsEvent.eventOption shouldBe empty
221+
backtrackLebsEvent.source shouldBe EnvelopeOrigin.SourceBacktracking
222+
223+
lebsProbe.cancel()
202224
}
203225
}
204226

0 commit comments

Comments
 (0)