Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1091,7 +1091,8 @@ class DecentralizedSynchronizerMigrationIntegrationTest

withClueAndLog("Backfilled history includes ACS import") {
eventually() {
sv1ScanLocalBackend.appState.store.updateHistory.sourceHistory
sv1ScanLocalBackend.appState.store.updateHistory
.sourceHistory(excludeAcsImportUpdates = false)
.migrationInfo(1L)
.futureValue
.exists(_.complete) should be(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ final class TxLogBackfilling(
) extends NamedLogging {

private val currentMigrationId = updateHistory.domainMigrationInfo.currentMigrationId
private val sourceHistory = updateHistory.sourceHistory
// ACS import updates should not be included in txlog
private val sourceHistory = updateHistory.sourceHistory(excludeAcsImportUpdates = true)
private val destinationHistory = store.destinationHistory
private val backfilling =
new HistoryBackfilling(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1959,7 +1959,9 @@ class UpdateHistory(
storage.query(readOffsetAction(), "readOffset")
}

lazy val sourceHistory: HistoryBackfilling.SourceHistory[UpdateHistoryResponse] =
def sourceHistory(
excludeAcsImportUpdates: Boolean
): HistoryBackfilling.SourceHistory[UpdateHistoryResponse] =
new HistoryBackfilling.SourceHistory[UpdateHistoryResponse] {
override def isReady: Boolean = state
.get()
Expand Down Expand Up @@ -2019,7 +2021,8 @@ class UpdateHistory(
migrationId = migrationId,
synchronizerId = synchronizerId,
beforeRecordTime = before,
atOrAfterRecordTime = None,
atOrAfterRecordTime =
Option.when(excludeAcsImportUpdates)(CantonTimestamp.MinValue.plusSeconds(1L)),
limit = PageLimit.tryCreate(count),
).map(_.map(_.update))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class UpdateHistoryBackfillingTest extends UpdateHistoryTestBase {
None,
PageLimit.tryCreate(1000),
)
infoB2 <- storeB2.sourceHistory.migrationInfo(0)
infoB2 <- storeB2.sourceHistory(excludeAcsImportUpdates = false).migrationInfo(0)
} yield {
infoB2.value.complete shouldBe true
updatesA.map(_.update.update.updateId) should contain theSameElementsAs updatesB.map(
Expand All @@ -88,7 +88,7 @@ class UpdateHistoryBackfillingTest extends UpdateHistoryTestBase {
_ <- create(domain2, validContractId(2), validOffset(2), party1, storeA0, time(2))
// If the store doesn't need backfilling, it should return the correct info
// without explicit initialization of backfilling
infoS <- storeA0.sourceHistory.migrationInfo(13)
infoS <- storeA0.sourceHistory(excludeAcsImportUpdates = false).migrationInfo(13)
} yield {
infoS.value.complete shouldBe true
infoS.value.recordTimeRange shouldBe Map(
Expand All @@ -107,11 +107,11 @@ class UpdateHistoryBackfillingTest extends UpdateHistoryTestBase {
tx1 <- create(domain1, validContractId(1), validOffset(1), party1, storeA0, time(1))
_ <- create(domain2, validContractId(2), validOffset(2), party1, storeA0, time(2))
// Before initializing backfilling, it should not return any data
infoS1 <- storeA0.sourceHistory.migrationInfo(13)
infoS1 <- storeA0.sourceHistory(excludeAcsImportUpdates = false).migrationInfo(13)
infoD1 <- storeA0.destinationHistory.backfillingInfo
// After initializing backfilling, it should return the correct data
_ <- storeA0.initializeBackfilling(13, domain1, tx1.getUpdateId, complete = true)
infoS2 <- storeA0.sourceHistory.migrationInfo(13)
infoS2 <- storeA0.sourceHistory(excludeAcsImportUpdates = false).migrationInfo(13)
infoD2 <- storeA0.destinationHistory.backfillingInfo
} yield {
infoS1 shouldBe None
Expand All @@ -128,9 +128,9 @@ class UpdateHistoryBackfillingTest extends UpdateHistoryTestBase {
tx1 <- create(domain1, validContractId(1), validOffset(1), party1, storeA0, time(1))
_ <- create(domain2, validContractId(2), validOffset(2), party1, storeA0, time(2))
_ <- storeA0.initializeBackfilling(13, domain1, tx1.getUpdateId, complete = true)
info12 <- storeA0.sourceHistory.migrationInfo(12)
info13 <- storeA0.sourceHistory.migrationInfo(13)
info14 <- storeA0.sourceHistory.migrationInfo(14)
info12 <- storeA0.sourceHistory(excludeAcsImportUpdates = false).migrationInfo(12)
info13 <- storeA0.sourceHistory(excludeAcsImportUpdates = false).migrationInfo(13)
info14 <- storeA0.sourceHistory(excludeAcsImportUpdates = false).migrationInfo(14)
} yield {
info12 shouldBe None
inside(info13) { case Some(s: SourceMigrationInfo) =>
Expand Down Expand Up @@ -341,7 +341,7 @@ class UpdateHistoryBackfillingTest extends UpdateHistoryTestBase {
) =
new HistoryBackfilling[UpdateHistoryResponse](
destination.destinationHistory,
source.sourceHistory,
source.sourceHistory(excludeAcsImportUpdates = false),
latestMigrationId,
batchSize = 10,
loggerFactory = loggerFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1816,7 +1816,7 @@ class HttpScanHandler(
)(extracted: TraceContext): Future[ScanResource.GetMigrationInfoResponse] = {
implicit val tc = extracted
withSpan(s"$workflowId.getMigrationInfo") { _ => _ =>
val sourceHistory = store.updateHistory.sourceHistory
val sourceHistory = store.updateHistory.sourceHistory(excludeAcsImportUpdates = false)
for {
infoO <- sourceHistory.migrationInfo(body.migrationId)
} yield infoO match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ class AcsSnapshotTrigger(
* And also for past migrations, whether the SV was present in them or not.
*/
private def isHistoryBackfilled(migrationId: Long)(implicit tc: TraceContext) = {
updateHistory.sourceHistory
updateHistory
.sourceHistory(excludeAcsImportUpdates = false)
.migrationInfo(migrationId)
.map(_.exists(i => i.complete && i.importUpdatesComplete))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ class AcsSnapshotTriggerTest
)
)
)
when(updateHistory.sourceHistory).thenReturn(sourceHistory)
when(updateHistory.sourceHistory(anyBoolean)).thenReturn(sourceHistory)
when(updateHistory.getPreviousMigrationId(anyLong)(any[TraceContext])).thenAnswer { (n: Long) =>
Future.successful(n match {
case 0L => None
Expand Down Expand Up @@ -667,7 +667,9 @@ class AcsSnapshotTriggerTest

def historyBackfilled(migrationId: Long, complete: Boolean): Unit = {
when(
updateHistory.sourceHistory.migrationInfo(eqTo(migrationId))(any[TraceContext])
updateHistory
.sourceHistory(excludeAcsImportUpdates = false)
.migrationInfo(eqTo(migrationId))(any[TraceContext])
)
.thenReturn(
Future.successful(
Expand All @@ -690,7 +692,9 @@ class AcsSnapshotTriggerTest
importUpdatesComplete: Boolean,
): Unit = {
when(
updateHistory.sourceHistory.migrationInfo(eqTo(migrationId))(any[TraceContext])
updateHistory
.sourceHistory(excludeAcsImportUpdates = false)
.migrationInfo(eqTo(migrationId))(any[TraceContext])
)
.thenReturn(
Future.successful(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ class ScanHistoryBackfillingTest extends UpdateHistoryTestBase {
testData.destinationHistory,
Map(domain1 -> time(0), domain2 -> time(0)),
)
backfillingComplete <- testData.destinationHistory.sourceHistory
backfillingComplete <- testData.destinationHistory
.sourceHistory(excludeAcsImportUpdates = false)
.migrationInfo(0)
.map(_.value.complete)
// Check that the updates are the same
Expand Down Expand Up @@ -101,7 +102,8 @@ class ScanHistoryBackfillingTest extends UpdateHistoryTestBase {
testData.destinationHistory,
Map(domain1 -> time(5), domain2 -> time(5)),
)
migrationInfo0 <- testData.destinationHistory.sourceHistory
migrationInfo0 <- testData.destinationHistory
.sourceHistory(excludeAcsImportUpdates = false)
.migrationInfo(0)
updatesB1 <- testData.destinationHistory.getAllUpdates(
None,
Expand All @@ -114,7 +116,8 @@ class ScanHistoryBackfillingTest extends UpdateHistoryTestBase {
testData.destinationHistory,
Map(domain1 -> time(0), domain2 -> time(0)),
)
backfillingComplete2 <- testData.destinationHistory.sourceHistory
backfillingComplete2 <- testData.destinationHistory
.sourceHistory(excludeAcsImportUpdates = false)
.migrationInfo(0)
.map(_.value.complete)

Expand Down Expand Up @@ -306,7 +309,9 @@ class ScanHistoryBackfillingTest extends UpdateHistoryTestBase {
migrationId: Long
)(implicit tc: TraceContext): Future[Option[SourceMigrationInfo]] =
for {
original <- history.sourceHistory.migrationInfo(migrationId)(tc)
original <- history
.sourceHistory(excludeAcsImportUpdates = false)
.migrationInfo(migrationId)(tc)
filteredRange = original.map(
_.recordTimeRange.toList
.flatMap { case (k, v) =>
Expand Down
Loading