Skip to content

fix: backtracking when fallback is enabled #166

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,13 @@ import software.amazon.awssdk.services.dynamodb.model.QueryResponse
}

// implements BySliceQuery.Dao
// NB: if backtracking and an event that was saved to the fallback store is encountered,
// the payload will be None (as with any backtracking event) and the serId of the returned
// item will be one not used by any bound serializer.
//
// Without a payload, the serializer ID is kind of meaningless (and the events-by-slice
// queries in the read journal will ignore the serializer ID unless it is the filtered
// payload serializer).
override def itemsBySlice(
entityType: String,
slice: Int,
Expand Down Expand Up @@ -277,7 +284,9 @@ import software.amazon.awssdk.services.dynamodb.model.QueryResponse
writeTimestamp = getTimestamp(item),
readTimestamp = InstantFactory.now(),
payload = None, // lazy loaded for backtracking
serId = item.get(EventSerId).n().toInt,
serId =
if (item.containsKey(EventSerId)) item.get(EventSerId).n().toInt
else 0, // absent or loaded later from S3
serManifest = "",
writerUuid = "", // not need in this query
tags = if (item.containsKey(Tags)) item.get(Tags).ss().asScala.toSet else Set.empty,
Expand Down
4 changes: 4 additions & 0 deletions s3-fallback-store/src/test/resources/application-test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,7 @@ akka.persistence.dynamodb.journal.fallback-store {
}

akka.persistence.dynamodb.snapshot.fallback-store = ${akka.persistence.dynamodb.journal.fallback-store}

akka.persistence.dynamodb.query.refresh-interval = 500ms
akka.persistence.dynamodb.query.backtracking.window = 500ms
akka.persistence.dynamodb.query.backtracking.behind-current-time = 1s
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorSystem
import akka.persistence.dynamodb.TestActors
import akka.persistence.dynamodb.TestData
import akka.persistence.dynamodb.internal.EnvelopeOrigin
import akka.persistence.dynamodb.internal.JournalAttributes
import akka.persistence.dynamodb.internal.SnapshotAttributes
import akka.persistence.dynamodb.query.scaladsl.DynamoDBReadJournal
Expand Down Expand Up @@ -199,6 +200,27 @@ class LargeSnapshotAndEventSpec
cebsEvent.getEvent() shouldBe padding

cebsProbe.cancel()

// We want to force backtracking to see the first event, so persist another
//reincarnation ! PersistWithAck("ignore-me", probe.ref)
//probe.expectMessage(Done)

val liveEventsBySlicesSubscriber = query
.eventsBySlices[String](entityType, slice, slice, TimestampOffset.Zero)
.filter(_.persistenceId == persistenceId.id)
.runWith(sinkProbe)

val lebsProbe = liveEventsBySlicesSubscriber.request(2)
val forwardLebsEvent = lebsProbe.expectNext()

forwardLebsEvent.event shouldBe padding

val backtrackLebsEvent = lebsProbe.expectNext()

backtrackLebsEvent.eventOption shouldBe empty
backtrackLebsEvent.source shouldBe EnvelopeOrigin.SourceBacktracking

lebsProbe.cancel()
}
}

Expand Down
Loading