Skip to content

Commit d2f7250

Browse files
authored
feat: fallback to S3 for large snapshots and events (#155)
1 parent 45199ed commit d2f7250

31 files changed

+3355
-126
lines changed

.editorconfig

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
root = true
2+
3+
[*]
4+
charset = utf-8
5+
insert_final_newline = true
6+
7+
# No need to enforce an editor config for Java/Scala files, as the build automatically styles
8+
[*.conf]
9+
indent_style = space
10+
indent_size = 2

.github/workflows/build-test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ jobs:
7979

8080
- name: Start DB
8181
run: |-
82-
docker compose -f docker/docker-compose.yml up --wait
82+
docker compose -f docker/docker-compose-with-s3.yml up --wait
8383
8484
- name: sbt test
8585
run: |-

CONTRIBUTING.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,12 @@ It can be started with the docker-compose file in the docker folder:
1010
docker-compose -f docker/docker-compose.yml up --wait
1111
```
1212

13+
Or if running tests of the S3 fallback functionality (`s3Fallback / test` in sbt)
14+
15+
```
16+
docker-compose -f docker/docker-compose-with-s3.yml up --wait
17+
```
18+
1319
## General Workflow
1420

1521
This is the process for committing code into main.

build.sbt

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ lazy val root = (project in file("."))
115115
publishTo := Some(Resolver.file("Unused transient repository", file("target/unusedrepo"))))
116116
.enablePlugins(ScalaUnidocPlugin)
117117
.disablePlugins(SitePlugin, MimaPlugin, CiReleasePlugin)
118-
.aggregate(core, docs)
118+
.aggregate(core, s3Fallback, docs)
119119

120120
def suffixFileFilter(suffix: String): FileFilter = new SimpleFileFilter(f => f.getAbsolutePath.endsWith(suffix))
121121

@@ -125,6 +125,14 @@ lazy val core = (project in file("core"))
125125
.enablePlugins(AutomateHeaderPlugin)
126126
.disablePlugins(CiReleasePlugin)
127127

128+
lazy val s3Fallback = (project in file("s3-fallback-store"))
129+
.settings(common)
130+
.settings(name := "akka-persistence-dynamodb-s3-fallback-store")
131+
.settings(libraryDependencies ++= Dependencies.s3Fallback)
132+
.dependsOn(core % "compile->compile;test->test")
133+
.enablePlugins(AutomateHeaderPlugin)
134+
.disablePlugins(CiReleasePlugin)
135+
128136
lazy val docs = project
129137
.in(file("docs"))
130138
.enablePlugins(AkkaParadoxPlugin, ParadoxSitePlugin, PreprocessPlugin, PublishRsyncPlugin)

core/src/main/resources/reference.conf

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,34 @@ akka.persistence.dynamodb {
5252
# replay filter not needed for this plugin
5353
replay-filter.mode = off
5454

55+
# Configure the fallback store which will be used to store a batch of events when
56+
# the write's size exceeds the threshold herein
57+
fallback-store {
58+
# If enabling the fallback store, set to the config path of the fallback store plugin to use
59+
#
60+
# The referred-to config path must at least contain a value named 'class' set to the FQCN of
61+
# the fallback store plugin. For example if `plugin = "fallback-store"`, your config will need
62+
#
63+
# fallback-store {
64+
# class = "package.containing.my.FallbackStore"
65+
#
66+
# # Additional fallback store specific configuration would go here
67+
# }
68+
#
69+
# If not set, no fallback store will be used
70+
plugin = ""
71+
72+
# The journal will estimate the size of the write (including attributes): if the estimate exceeds this
73+
# value and the fallback store is enabled (by setting `plugin`, above), then the events
74+
# will be written to the fallback store and a "breadcrumb" describing how to
75+
# retrieve the event from the fallback store will be written.
76+
threshold = 300 KiB
77+
78+
# Write at most this many events from one batch (e.g. `persistAll`) concurrently to the fallback store.
79+
# Additionally, when reading events, at most this many events (for one query) will have in-flight
80+
# retrievals from the fallback store
81+
batch-size = 16
82+
}
5583
}
5684
}
5785
// #journal-settings
@@ -73,7 +101,35 @@ akka.persistence.dynamodb {
73101
by-slice-idx = ""
74102

75103
# Enables an optimization in Akka for avoiding snapshot deletes in retention.
104+
#
105+
# Note that the data model for this plugin can only store one snapshot per
106+
# persistence ID: disabling this will not allow multiple snapshots per
107+
# persistence ID
76108
only-one-snapshot = true
109+
110+
# Configure the fallback store which will be used to store a snapshot when the
111+
# write's size exceeds the threshold herein
112+
fallback-store {
113+
# If enabling the fallback store, set to the config path of the fallback store plugin to use
114+
#
115+
# The referred-to config path must at least contain a value named 'class' set to the FQCN of
116+
# the fallback store plugin. For example if `plugin = "fallback-store"`, your config will need
117+
#
118+
# fallback-store {
119+
# class = "package.containing.my.FallbackStore"
120+
#
121+
# # Additional fallback store specific configuration would go here
122+
# }
123+
#
124+
# If not set, no fallback store will be used
125+
plugin = ""
126+
127+
# The snapshot store will estimate the size of the write (including attributes): if
128+
# the estimate exceeds this value and the fallback store is enabled (by setting `plugin`
129+
# above), then the snapshot will be written to the fallback store and a "breadcrumb"
130+
# describing how to retrieve the snapshot from the fallback store will be written.
131+
threshold = 300 KiB
132+
}
77133
}
78134
}
79135
// #snapshot-settings

core/src/main/scala/akka/persistence/dynamodb/DynamoDBSettings.scala

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import scala.jdk.CollectionConverters._
1010
import scala.jdk.DurationConverters._
1111

1212
import akka.actor.typed.ActorSystem
13+
import akka.annotation.ApiMayChange
14+
import akka.annotation.DoNotInherit
1315
import akka.annotation.InternalStableApi
1416
import akka.util.Helpers
1517
import com.typesafe.config.Config
@@ -61,6 +63,10 @@ object DynamoDBSettings {
6163

6264
val clockSkewSettings = new ClockSkewSettings(config)
6365

66+
val journalFallbackSettings = JournalFallbackSettings(config.getConfig("journal.fallback-store"))
67+
68+
val snapshotFallbackSettings = SnapshotFallbackSettings(config.getConfig("snapshot.fallback-store"))
69+
6470
new DynamoDBSettings(
6571
journalTable,
6672
journalPublishEvents,
@@ -70,7 +76,9 @@ object DynamoDBSettings {
7076
timeToLiveSettings,
7177
journalBySliceGsi,
7278
snapshotBySliceGsi,
73-
clockSkewSettings)
79+
clockSkewSettings,
80+
journalFallbackSettings,
81+
snapshotFallbackSettings)
7482
}
7583

7684
/**
@@ -93,7 +101,16 @@ final class DynamoDBSettings private (
93101
val timeToLiveSettings: TimeToLiveSettings,
94102
val journalBySliceGsi: String,
95103
val snapshotBySliceGsi: String,
96-
val clockSkewSettings: ClockSkewSettings)
104+
val clockSkewSettings: ClockSkewSettings,
105+
val journalFallbackSettings: JournalFallbackSettings,
106+
val snapshotFallbackSettings: SnapshotFallbackSettings) {
107+
override def toString: String =
108+
s"DynamoDBSettings(journalTable=$journalTable, journalPublishEvents=$journalPublishEvents, " +
109+
s"snapshotTable=$snapshotTable, querySettings=$querySettings, cleanupSettings=$cleanupSettings, " +
110+
s"timeToLiveSettings=$timeToLiveSettings, journalBySliceGsi=$journalBySliceGsi, " +
111+
s"snapshotBySliceGsi=$snapshotBySliceGsi, clockSkewSettings=$clockSkewSettings, " +
112+
s"journalFallbackSettings=$journalFallbackSettings, snapshotFallbackSettings=$snapshotFallbackSettings)"
113+
}
97114

98115
final class QuerySettings(config: Config) {
99116
val refreshInterval: FiniteDuration = config.getDuration("refresh-interval").toScala
@@ -320,6 +337,55 @@ final class ClockSkewSettings(config: Config) {
320337
override def toString: String = s"ClockSkewSettings($warningTolerance)"
321338
}
322339

340+
/** Not for user extension */
341+
@DoNotInherit
342+
sealed abstract class FallbackSettings(val plugin: String, val threshold: Int) {
343+
require(threshold > 0, "threshold must be positive")
344+
require(threshold <= (400 * 1000), "threshold must be at most 400 KB")
345+
346+
// Must be overridden in subclasses
347+
override def toString: String = throw new scala.NotImplementedError
348+
349+
def isEnabled: Boolean = plugin.nonEmpty
350+
}
351+
352+
@ApiMayChange
353+
final class SnapshotFallbackSettings(plugin: String, threshold: Int) extends FallbackSettings(plugin, threshold) {
354+
override def toString: String =
355+
if (isEnabled)
356+
s"SnapshotFallbackSettings(plugin=$plugin, threshold=${threshold}B)"
357+
else "disabled"
358+
}
359+
360+
object SnapshotFallbackSettings {
361+
def apply(config: Config): SnapshotFallbackSettings = {
362+
val plugin = config.getString("plugin")
363+
val threshold = java.lang.Long.min(config.getBytes("threshold"), 400 * 1000).toInt
364+
365+
new SnapshotFallbackSettings(plugin, threshold)
366+
}
367+
}
368+
369+
@ApiMayChange
370+
final class JournalFallbackSettings(commonSettings: SnapshotFallbackSettings, val batchSize: Int)
371+
extends FallbackSettings(commonSettings.plugin, commonSettings.threshold) {
372+
require(!commonSettings.isEnabled || batchSize > 0, "batch size must be positive")
373+
374+
override def toString: String =
375+
if (isEnabled)
376+
s"JournalFallbackSettings(plugin=$plugin, threshold=${threshold}B, batchSize=$batchSize)"
377+
else "disabled"
378+
}
379+
380+
object JournalFallbackSettings {
381+
def apply(config: Config): JournalFallbackSettings = {
382+
val commonSettings = SnapshotFallbackSettings(config)
383+
val batchSize = config.getInt("batch-size")
384+
385+
new JournalFallbackSettings(commonSettings, batchSize)
386+
}
387+
}
388+
323389
private[akka] object ConfigHelpers {
324390
def optString(config: Config, path: String): Option[String] = {
325391
if (config.hasPath(path)) {

0 commit comments

Comments
 (0)