Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CANTON_CODE_CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ to know which and/or what changes we'll need to upstream before the switch.
* Changed `metrics.filterByNodeAndAttribute` in `InstanceReference ` to filter by `node_name` instead of `node` to match the Splice metrics
* Split `CommandFailure` into `InteractiveCommandFailure` and `CommandFailureWithDetails`.
* `Cli.logLastErrors` default changed from `true` to `false`.
* Added `rawStorage`, `setupStorage` and `DisableDbStorageIdempotency` to `DbTest` to allow better access to non-doubled writes.
* Added better logging of setup and cleanup failures in `DbTest`
## Build system
* Added refs to GH issues in project/DamlPlugin.sbt for two bugs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class SqlIndexInitializationTriggerStoreTest
),
)
for {
_ <- storage.underlying
_ <- rawStorage
.update(
sqlu"create index test_index on update_history_creates (record_time)",
"create test index",
Expand All @@ -116,7 +116,7 @@ class SqlIndexInitializationTriggerStoreTest
)

for {
_ <- storage.underlying
_ <- rawStorage
.update(
sqlu"create index test_index on update_history_creates (record_time)",
"create test index",
Expand Down Expand Up @@ -163,7 +163,7 @@ class SqlIndexInitializationTriggerStoreTest
)
for {
_ <- Future.unit
_ <- storage.underlying
_ <- rawStorage
.update(
DBIOAction
.seq(
Expand All @@ -179,7 +179,7 @@ class SqlIndexInitializationTriggerStoreTest
"insert test data",
)
.failOnShutdown
_ <- storage.underlying
_ <- rawStorage
.update(
sqlu"""
create or replace function slow_function(text) returns text as $$$$
Expand All @@ -192,7 +192,7 @@ class SqlIndexInitializationTriggerStoreTest
"insert test data",
)
.failOnShutdown
_ <- storage.underlying
_ <- rawStorage
.update(
DBIOAction
.seq(
Expand Down Expand Up @@ -249,7 +249,7 @@ class SqlIndexInitializationTriggerStoreTest
}

private def listIndexNames(): Future[Seq[String]] = {
storage.underlying
rawStorage
.query(
sql"select indexname from pg_indexes where schemaname = 'public'".as[String],
"listIndexes",
Expand Down Expand Up @@ -277,7 +277,7 @@ class SqlIndexInitializationTriggerStoreTest
// Dumps information about all indexes in the database to the log.
// Used during development to verify that the indexes are created correctly.
private def dumpIndexes(): Future[Unit] = {
storage.underlying
rawStorage
.query(
sql"""
select
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,14 @@ abstract class StoreTest extends AsyncWordSpec with BaseTest {

protected def providerParty(i: Int) = mkPartyId(s"provider-$i")

/** @param n must 0-9
/** @param n must 0-999999
* @param suffix must be a hex string
*/
protected def validContractId(n: Int, suffix: String = "00"): String = "00" + s"0$n" * 31 + suffix
protected def validContractId(n: Int, suffix: String = "00"): String = "00" + (() match {
case _ if n < 0 => fail(s"negative validContractId: $n")
case _ if n <= 999999 => ("%06d" format n) * 10 + "00"
case _ => fail(s"too large validContractId: $n")
}) + suffix

private var cIdCounter = 0

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package org.lfdecentralizedtrust.splice.store.db

import com.daml.metrics.api.noop.NoOpMetricsFactory
import com.digitalasset.canton.HasActorSystem
import com.digitalasset.canton.concurrent.FutureSupervisor
import com.digitalasset.canton.resource.DbStorage
import com.digitalasset.canton.store.db.DbTest
import com.digitalasset.canton.topology.ParticipantId
import com.digitalasset.canton.tracing.TraceContext
import org.lfdecentralizedtrust.splice.environment.{DarResources, RetryProvider}
import org.lfdecentralizedtrust.splice.migration.DomainMigrationInfo
import org.lfdecentralizedtrust.splice.store.StoreTest.testTxLogConfig
import org.lfdecentralizedtrust.splice.store.{
MultiDomainAcsStore,
MultiDomainAcsStoreTest,
TestTxLogEntry,
}
import org.lfdecentralizedtrust.splice.util.{ResourceTemplateDecoder, TemplateJsonDecoder}
import slick.jdbc.JdbcProfile

import scala.concurrent.Future

class IngestionPerfTest
extends MultiDomainAcsStoreTest[DbMultiDomainAcsStore[TestTxLogEntry]]
with SplicePostgresTest
with DbTest.DisableDbStorageIdempotency
with HasActorSystem
with AcsJdbcTypes {

override lazy val profile: JdbcProfile = storage.api.jdbcProfile

"IngestionPerfTest" should {

"Create and Archive many events and measure performance" in {
implicit val store =
mkStore(acsId = 1, txLogId = Some(1), 1L, ParticipantId("IngestPerfTest"))

for {
_ <- initWithAcs()(store)
// TODO use more realistic data, and ingest different kinds of events and combinations of creates, exercises and archives
// Read from CILR, generate a dump of a reasonable size, and the use that as a consistent input instead.
nrEvents = 1000
coupons = (0 to nrEvents).map(i => c(i))
start = System.currentTimeMillis()
_ <- Future.traverse(coupons)(c => d1.create(c)(store))
_ <- Future.traverse(coupons)(c => d1.archive(c)(store))
end = System.currentTimeMillis()
duration = end - start
_ = println(s"Ingestion of $nrEvents took $duration ms")
// verify with sql that the data did actually get ingested
} yield succeed
}
}

override def mkStore(
acsId: Int,
txLogId: Option[Int],
migrationId: Long,
participantId: ParticipantId,
filter: MultiDomainAcsStore.ContractFilter[
GenericAcsRowData,
GenericInterfaceRowData,
],
) = {
mkStoreWithAcsRowDataF(
acsId,
txLogId,
migrationId,
participantId,
filter,
"acs_store_template",
txLogId.map(_ => "txlog_store_template"),
Some("interface_views_template"),
)
}

def mkStoreWithAcsRowDataF[R <: AcsRowData](
acsId: Int,
txLogId: Option[Int],
migrationId: Long,
participantId: ParticipantId,
filter: MultiDomainAcsStore.ContractFilter[R, GenericInterfaceRowData],
acsTableName: String,
txLogTableName: Option[String],
interfaceViewsTableNameOpt: Option[String],
) = {
val packageSignatures =
ResourceTemplateDecoder.loadPackageSignaturesFromResources(
DarResources.amulet.all ++ DarResources.TokenStandard.allPackageResources.flatMap(_.all)
)
implicit val templateJsonDecoder: TemplateJsonDecoder =
new ResourceTemplateDecoder(packageSignatures, loggerFactory)

new DbMultiDomainAcsStore(
storage,
acsTableName,
txLogTableName,
interfaceViewsTableNameOpt,
storeDescriptor(acsId, participantId),
txLogId.map(storeDescriptor(_, participantId)),
loggerFactory,
filter,
testTxLogConfig,
DomainMigrationInfo(
migrationId,
None,
),
participantId,
RetryProvider(loggerFactory, timeouts, FutureSupervisor.Noop, NoOpMetricsFactory),
)
}

private def storeDescriptor(id: Int, participantId: ParticipantId) =
DbMultiDomainAcsStore.StoreDescriptor(
version = 1,
name = "IngestionPerfTest",
party = dsoParty,
participant = participantId,
key = Map(
"id" -> id.toString
),
)

/** Hook for cleaning database before running next test. */
override protected def cleanDb(
storage: DbStorage
)(implicit traceContext: TraceContext) = {
for {
_ <- resetAllAppTables(storage)
} yield ()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,7 @@ class AcsSnapshotStoreTest
backfillingRequired: BackfillingRequirement = BackfillingRequirement.BackfillingNotRequired,
): Future[UpdateHistory] = {
val updateHistory = new UpdateHistory(
storage.underlying, // not under test
rawStorage, // not under test
new DomainMigrationInfo(migrationId, None),
"update_history_acs_snapshot_test",
mkParticipantId(participantId),
Expand All @@ -872,7 +872,7 @@ class AcsSnapshotStoreTest
new AcsSnapshotStore(
// we're guaranteed to only execute the insert once (in the context of AcsSnapshotTrigger),
// and the insert query is already complicated enough as-is, so I'm not gonna make it worse just for tests.
storage.underlying,
rawStorage,
updateHistory,
migrationId,
loggerFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ import scala.util.{Failure, Random, Success, Try}
trait DatabaseDeadlockTest
extends BaseTestWordSpec
with BeforeAndAfterAll
with HasExecutionContext {
this: DbTest =>
with HasExecutionContext
with DbTest.DisableDbStorageIdempotency {

lazy val rawStorage: DbStorage = storage.underlying
import rawStorage.api.*

val batchSize = 100
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ import slick.sql.SqlAction
trait DatabaseLimitNbParamTest
extends BaseTestWordSpec
with BeforeAndAfterAll
with HasExecutionContext {
this: DbTest =>
with HasExecutionContext
with DbTest.DisableDbStorageIdempotency {

lazy val rawStorage: DbStorage = storage.underlying
import rawStorage.api.*

def createTableAction: SqlAction[Int, NoStream, Effect.Write]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,14 @@ trait DbTest
@SuppressWarnings(Array("org.wartremover.warts.Var", "org.wartremover.warts.Null"))
private var setup: DbStorageSetup = _

protected[this] lazy val rawStorage: DbStorage =
Option(setup).map(_.storage).getOrElse(sys.error("Test has not started"))

protected[this] def setupStorage(underlying: DbStorage): DbStorage =
new DbStorageIdempotency(underlying, timeouts, loggerFactory)

/** Stores the db storage implementation. Will throw if accessed before the test has started */
protected lazy val storage: DbStorageIdempotency = {
val s = Option(setup).map(_.storage).getOrElse(sys.error("Test has not started"))
new DbStorageIdempotency(s, timeouts, loggerFactory)
}
protected final lazy val storage: DbStorage = setupStorage(rawStorage)

override def beforeAll(): Unit = TraceContext.withNewTraceContext { implicit tc =>
// Non-standard order. Setup needs to be created first, because super can be MyDbTest and therefore super.beforeAll
Expand Down Expand Up @@ -105,7 +108,14 @@ trait DbTest
// Use the underlying storage for clean-up operations, so we don't run clean-ups twice
// cleanDB is usually implemented by a TRUNCATE statement, which can be very slow,
// we therefore use a long timeout.
Await.result(cleanDb(storage.underlying), 120.seconds)
Await.result(cleanDb(rawStorage), 120.seconds)
}
}

object DbTest {
trait DisableDbStorageIdempotency extends DbTest { this: Suite =>
override final protected[this] def setupStorage(underlying: DbStorage): DbStorage =
underlying
}
}

Expand Down