From 2ab07c836e5914dbd3ec27cd4fd8b13cd17882fa Mon Sep 17 00:00:00 2001 From: Raymond Roestenburg Date: Mon, 4 Aug 2025 15:28:26 +0000 Subject: [PATCH 1/4] Started on a test that just creates / exercises / archives with dummy txs. Signed-off-by: Raymond Roestenburg --- .../splice/store/db/IngestionPerfTest.scala | 130 ++++++++++++++++++ 1 file changed, 130 insertions(+) create mode 100644 apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/db/IngestionPerfTest.scala diff --git a/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/db/IngestionPerfTest.scala b/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/db/IngestionPerfTest.scala new file mode 100644 index 0000000000..a448a45b73 --- /dev/null +++ b/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/db/IngestionPerfTest.scala @@ -0,0 +1,130 @@ +package org.lfdecentralizedtrust.splice.store.db + +import com.daml.metrics.api.noop.NoOpMetricsFactory +import com.digitalasset.canton.HasActorSystem +import com.digitalasset.canton.concurrent.FutureSupervisor +import com.digitalasset.canton.resource.DbStorage +import com.digitalasset.canton.topology.ParticipantId +import com.digitalasset.canton.tracing.TraceContext +import org.lfdecentralizedtrust.splice.environment.{DarResources, RetryProvider} +import org.lfdecentralizedtrust.splice.migration.DomainMigrationInfo +import org.lfdecentralizedtrust.splice.store.StoreTest.testTxLogConfig +import org.lfdecentralizedtrust.splice.store.{ + MultiDomainAcsStore, + MultiDomainAcsStoreTest, + TestTxLogEntry, +} +import org.lfdecentralizedtrust.splice.util.{ResourceTemplateDecoder, TemplateJsonDecoder} +import slick.jdbc.JdbcProfile + +import scala.concurrent.Future + +class IngestionPerfTest + extends MultiDomainAcsStoreTest[DbMultiDomainAcsStore[TestTxLogEntry]] + with SplicePostgresTest + with HasActorSystem + with AcsJdbcTypes { + + override lazy val profile: JdbcProfile = storage.api.jdbcProfile + + "IngestionPerfTest" should { + + "Create and Archive many events and measure performance" in { + implicit val store = + mkStore(acsId = 1, txLogId = Some(1), 1L, ParticipantId("IngestPerfTest")) + + for { + _ <- initWithAcs()(store) + // TODO use more realistic data, and ingest different kinds of events and combinations of creates, exercises and archives + // Read from CILR, generate a dump of a reasonable size, and the use that as a consistent input instead. + nrEvents = 1000 + coupons = (0 to nrEvents).map(i => c(i)) + start = System.currentTimeMillis() + _ <- Future.sequence(coupons.map(c => d1.create(c)(store))) + _ <- Future.sequence(coupons.map(c => d1.archive(c)(store))) + end = System.currentTimeMillis() + duration = end - start + _ = println(s"Ingestion of $nrEvents took $duration ms") + // verify with sql that the data did actually get ingested + } yield succeed + } + } + + override def mkStore( + acsId: Int, + txLogId: Option[Int], + migrationId: Long, + participantId: ParticipantId, + filter: MultiDomainAcsStore.ContractFilter[ + GenericAcsRowData, + GenericInterfaceRowData, + ], + ) = { + mkStoreWithAcsRowDataF( + acsId, + txLogId, + migrationId, + participantId, + filter, + "acs_store_template", + txLogId.map(_ => "txlog_store_template"), + Some("interface_views_template"), + ) + } + + def mkStoreWithAcsRowDataF[R <: AcsRowData]( + acsId: Int, + txLogId: Option[Int], + migrationId: Long, + participantId: ParticipantId, + filter: MultiDomainAcsStore.ContractFilter[R, GenericInterfaceRowData], + acsTableName: String, + txLogTableName: Option[String], + interfaceViewsTableNameOpt: Option[String], + ) = { + val packageSignatures = + ResourceTemplateDecoder.loadPackageSignaturesFromResources( + DarResources.amulet.all ++ DarResources.TokenStandard.allPackageResources.flatMap(_.all) + ) + implicit val templateJsonDecoder: TemplateJsonDecoder = + new ResourceTemplateDecoder(packageSignatures, loggerFactory) + + new DbMultiDomainAcsStore( + storage, + acsTableName, + txLogTableName, + interfaceViewsTableNameOpt, + storeDescriptor(acsId, participantId), + txLogId.map(storeDescriptor(_, participantId)), + loggerFactory, + filter, + testTxLogConfig, + DomainMigrationInfo( + migrationId, + None, + ), + participantId, + RetryProvider(loggerFactory, timeouts, FutureSupervisor.Noop, NoOpMetricsFactory), + ) + } + + private def storeDescriptor(id: Int, participantId: ParticipantId) = + DbMultiDomainAcsStore.StoreDescriptor( + version = 1, + name = "IngestionPerfTest", + party = dsoParty, + participant = participantId, + key = Map( + "id" -> id.toString + ), + ) + + /** Hook for cleaning database before running next test. */ + override protected def cleanDb( + storage: DbStorage + )(implicit traceContext: TraceContext) = { + for { + _ <- resetAllAppTables(storage) + } yield () + } +} From 1765db7d324f5e7514b8fa9abab2b41ff5ba8b0b Mon Sep 17 00:00:00 2001 From: Stephen Compall Date: Wed, 27 Aug 2025 20:10:02 +0000 Subject: [PATCH 2/4] allow DisableDbStorageIdempotency for DbTest [skip ci] Signed-off-by: Stephen Compall --- CANTON_CODE_CHANGES.md | 1 + ...lIndexInitializationTriggerStoreTest.scala | 14 ++++++------- .../splice/store/db/IngestionPerfTest.scala | 2 ++ .../store/db/AcsSnapshotStoreTest.scala | 4 ++-- .../store/db/DatabaseDeadlockTest.scala | 5 ++--- .../store/db/DatabaseLimitNbParamTest.scala | 5 ++--- .../digitalasset/canton/store/db/DbTest.scala | 20 ++++++++++++++----- 7 files changed, 31 insertions(+), 20 deletions(-) diff --git a/CANTON_CODE_CHANGES.md b/CANTON_CODE_CHANGES.md index 939601f76e..9fc881acc3 100644 --- a/CANTON_CODE_CHANGES.md +++ b/CANTON_CODE_CHANGES.md @@ -63,6 +63,7 @@ to know which and/or what changes we'll need to upstream before the switch. * Changed `metrics.filterByNodeAndAttribute` in `InstanceReference ` to filter by `node_name` instead of `node` to match the Splice metrics * Split `CommandFailure` into `InteractiveCommandFailure` and `CommandFailureWithDetails`. * `Cli.logLastErrors` default changed from `true` to `false`. +* Added `rawStorage`, `setupStorage` and `DisableDbStorageIdempotency` to `DbTest` to allow better access to non-doubled writes. * Added better logging of setup and cleanup failures in `DbTest` ## Build system * Added refs to GH issues in project/DamlPlugin.sbt for two bugs diff --git a/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/automation/SqlIndexInitializationTriggerStoreTest.scala b/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/automation/SqlIndexInitializationTriggerStoreTest.scala index 1bbdfb5863..add3811af9 100644 --- a/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/automation/SqlIndexInitializationTriggerStoreTest.scala +++ b/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/automation/SqlIndexInitializationTriggerStoreTest.scala @@ -89,7 +89,7 @@ class SqlIndexInitializationTriggerStoreTest ), ) for { - _ <- storage.underlying + _ <- rawStorage .update( sqlu"create index test_index on update_history_creates (record_time)", "create test index", @@ -116,7 +116,7 @@ class SqlIndexInitializationTriggerStoreTest ) for { - _ <- storage.underlying + _ <- rawStorage .update( sqlu"create index test_index on update_history_creates (record_time)", "create test index", @@ -163,7 +163,7 @@ class SqlIndexInitializationTriggerStoreTest ) for { _ <- Future.unit - _ <- storage.underlying + _ <- rawStorage .update( DBIOAction .seq( @@ -179,7 +179,7 @@ class SqlIndexInitializationTriggerStoreTest "insert test data", ) .failOnShutdown - _ <- storage.underlying + _ <- rawStorage .update( sqlu""" create or replace function slow_function(text) returns text as $$$$ @@ -192,7 +192,7 @@ class SqlIndexInitializationTriggerStoreTest "insert test data", ) .failOnShutdown - _ <- storage.underlying + _ <- rawStorage .update( DBIOAction .seq( @@ -249,7 +249,7 @@ class SqlIndexInitializationTriggerStoreTest } private def listIndexNames(): Future[Seq[String]] = { - storage.underlying + rawStorage .query( sql"select indexname from pg_indexes where schemaname = 'public'".as[String], "listIndexes", @@ -277,7 +277,7 @@ class SqlIndexInitializationTriggerStoreTest // Dumps information about all indexes in the database to the log. // Used during development to verify that the indexes are created correctly. private def dumpIndexes(): Future[Unit] = { - storage.underlying + rawStorage .query( sql""" select diff --git a/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/db/IngestionPerfTest.scala b/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/db/IngestionPerfTest.scala index a448a45b73..e7359a0ce4 100644 --- a/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/db/IngestionPerfTest.scala +++ b/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/db/IngestionPerfTest.scala @@ -4,6 +4,7 @@ import com.daml.metrics.api.noop.NoOpMetricsFactory import com.digitalasset.canton.HasActorSystem import com.digitalasset.canton.concurrent.FutureSupervisor import com.digitalasset.canton.resource.DbStorage +import com.digitalasset.canton.store.db.DbTest import com.digitalasset.canton.topology.ParticipantId import com.digitalasset.canton.tracing.TraceContext import org.lfdecentralizedtrust.splice.environment.{DarResources, RetryProvider} @@ -22,6 +23,7 @@ import scala.concurrent.Future class IngestionPerfTest extends MultiDomainAcsStoreTest[DbMultiDomainAcsStore[TestTxLogEntry]] with SplicePostgresTest + with DbTest.DisableDbStorageIdempotency with HasActorSystem with AcsJdbcTypes { diff --git a/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/store/db/AcsSnapshotStoreTest.scala b/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/store/db/AcsSnapshotStoreTest.scala index 87235d15ac..101494d385 100644 --- a/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/store/db/AcsSnapshotStoreTest.scala +++ b/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/store/db/AcsSnapshotStoreTest.scala @@ -852,7 +852,7 @@ class AcsSnapshotStoreTest backfillingRequired: BackfillingRequirement = BackfillingRequirement.BackfillingNotRequired, ): Future[UpdateHistory] = { val updateHistory = new UpdateHistory( - storage.underlying, // not under test + rawStorage, // not under test new DomainMigrationInfo(migrationId, None), "update_history_acs_snapshot_test", mkParticipantId(participantId), @@ -872,7 +872,7 @@ class AcsSnapshotStoreTest new AcsSnapshotStore( // we're guaranteed to only execute the insert once (in the context of AcsSnapshotTrigger), // and the insert query is already complicated enough as-is, so I'm not gonna make it worse just for tests. - storage.underlying, + rawStorage, updateHistory, migrationId, loggerFactory, diff --git a/canton/community/common/src/test/scala/com/digitalasset/canton/store/db/DatabaseDeadlockTest.scala b/canton/community/common/src/test/scala/com/digitalasset/canton/store/db/DatabaseDeadlockTest.scala index 57b3f87c7e..9ac1430d72 100644 --- a/canton/community/common/src/test/scala/com/digitalasset/canton/store/db/DatabaseDeadlockTest.scala +++ b/canton/community/common/src/test/scala/com/digitalasset/canton/store/db/DatabaseDeadlockTest.scala @@ -22,10 +22,9 @@ import scala.util.{Failure, Random, Success, Try} trait DatabaseDeadlockTest extends BaseTestWordSpec with BeforeAndAfterAll - with HasExecutionContext { - this: DbTest => + with HasExecutionContext + with DbTest.DisableDbStorageIdempotency { - lazy val rawStorage: DbStorage = storage.underlying import rawStorage.api.* val batchSize = 100 diff --git a/canton/community/common/src/test/scala/com/digitalasset/canton/store/db/DatabaseLimitNbParamTest.scala b/canton/community/common/src/test/scala/com/digitalasset/canton/store/db/DatabaseLimitNbParamTest.scala index 06c5846931..380dc1f247 100644 --- a/canton/community/common/src/test/scala/com/digitalasset/canton/store/db/DatabaseLimitNbParamTest.scala +++ b/canton/community/common/src/test/scala/com/digitalasset/canton/store/db/DatabaseLimitNbParamTest.scala @@ -16,10 +16,9 @@ import slick.sql.SqlAction trait DatabaseLimitNbParamTest extends BaseTestWordSpec with BeforeAndAfterAll - with HasExecutionContext { - this: DbTest => + with HasExecutionContext + with DbTest.DisableDbStorageIdempotency { - lazy val rawStorage: DbStorage = storage.underlying import rawStorage.api.* def createTableAction: SqlAction[Int, NoStream, Effect.Write] diff --git a/canton/community/common/src/test/scala/com/digitalasset/canton/store/db/DbTest.scala b/canton/community/common/src/test/scala/com/digitalasset/canton/store/db/DbTest.scala index dadadd5223..8532190177 100644 --- a/canton/community/common/src/test/scala/com/digitalasset/canton/store/db/DbTest.scala +++ b/canton/community/common/src/test/scala/com/digitalasset/canton/store/db/DbTest.scala @@ -47,11 +47,14 @@ trait DbTest @SuppressWarnings(Array("org.wartremover.warts.Var", "org.wartremover.warts.Null")) private var setup: DbStorageSetup = _ + protected[this] lazy val rawStorage: DbStorage = + Option(setup).map(_.storage).getOrElse(sys.error("Test has not started")) + + protected[this] def setupStorage(underlying: DbStorage): DbStorage = + new DbStorageIdempotency(underlying, timeouts, loggerFactory) + /** Stores the db storage implementation. Will throw if accessed before the test has started */ - protected lazy val storage: DbStorageIdempotency = { - val s = Option(setup).map(_.storage).getOrElse(sys.error("Test has not started")) - new DbStorageIdempotency(s, timeouts, loggerFactory) - } + protected final lazy val storage: DbStorage = setupStorage(rawStorage) override def beforeAll(): Unit = TraceContext.withNewTraceContext { implicit tc => // Non-standard order. Setup needs to be created first, because super can be MyDbTest and therefore super.beforeAll @@ -105,7 +108,14 @@ trait DbTest // Use the underlying storage for clean-up operations, so we don't run clean-ups twice // cleanDB is usually implemented by a TRUNCATE statement, which can be very slow, // we therefore use a long timeout. - Await.result(cleanDb(storage.underlying), 120.seconds) + Await.result(cleanDb(rawStorage), 120.seconds) + } +} + +object DbTest { + trait DisableDbStorageIdempotency extends DbTest { this: Suite => + override final protected[this] def setupStorage(underlying: DbStorage): DbStorage = + underlying } } From 6b93e54c0ee7fd211e5906ff8d135f91ab5f2c7f Mon Sep 17 00:00:00 2001 From: Stephen Compall Date: Wed, 27 Aug 2025 20:54:48 +0000 Subject: [PATCH 3/4] support longer contract IDs [skip ci] Signed-off-by: Stephen Compall --- .../org/lfdecentralizedtrust/splice/store/StoreTest.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/StoreTest.scala b/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/StoreTest.scala index c2e125d48f..1fda2fdfd9 100644 --- a/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/StoreTest.scala +++ b/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/StoreTest.scala @@ -123,10 +123,14 @@ abstract class StoreTest extends AsyncWordSpec with BaseTest { protected def providerParty(i: Int) = mkPartyId(s"provider-$i") - /** @param n must 0-9 + /** @param n must 0-999999 * @param suffix must be a hex string */ - protected def validContractId(n: Int, suffix: String = "00"): String = "00" + s"0$n" * 31 + suffix + protected def validContractId(n: Int, suffix: String = "00"): String = "00" + (() match { + case _ if n < 0 => fail(s"negative validContractId: $n") + case _ if n <= 999999 => ("%06d" format n) * 10 + "00" + case _ => fail(s"too large validContractId: $n") + }) + suffix private var cIdCounter = 0 From 27a18f77e593cfa23d64d3c59badd4becdddb815 Mon Sep 17 00:00:00 2001 From: Stephen Compall Date: Wed, 27 Aug 2025 20:59:13 +0000 Subject: [PATCH 4/4] traverse has same performance [skip ci] Signed-off-by: Stephen Compall --- .../splice/store/db/IngestionPerfTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/db/IngestionPerfTest.scala b/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/db/IngestionPerfTest.scala index e7359a0ce4..ac526279ab 100644 --- a/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/db/IngestionPerfTest.scala +++ b/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/db/IngestionPerfTest.scala @@ -42,8 +42,8 @@ class IngestionPerfTest nrEvents = 1000 coupons = (0 to nrEvents).map(i => c(i)) start = System.currentTimeMillis() - _ <- Future.sequence(coupons.map(c => d1.create(c)(store))) - _ <- Future.sequence(coupons.map(c => d1.archive(c)(store))) + _ <- Future.traverse(coupons)(c => d1.create(c)(store)) + _ <- Future.traverse(coupons)(c => d1.archive(c)(store)) end = System.currentTimeMillis() duration = end - start _ = println(s"Ingestion of $nrEvents took $duration ms")