Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions execution_chain/config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,11 @@ type
defaultValue: 4'u64
name: "debug-persist-batch-size" .}: uint64

dynamicBatchSize* {.
hidden
defaultValue: false
name: "debug-dynamic-batch-size" .}: bool

rocksdbMaxOpenFiles {.
hidden
defaultValue: defaultMaxOpenFiles
Expand Down
34 changes: 32 additions & 2 deletions execution_chain/core/chain/forked_chain.nim
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,8 @@ proc updateBase(c: ForkedChainRef, base: BlockRef): uint =
# No update, return
return

let startTime = Moment.now()

# State root sanity check is performed to verify, before writing to disk,
# that optimistically checked blocks indeed end up being stored with a
# consistent state root.
Expand All @@ -338,8 +340,7 @@ with --debug-eager-state-root."""

# Cleanup in-memory blocks starting from base backward
# e.g. B2 backward.
var
count = 0'u
var count = 0'u

for it in ancestors(base.parent):
c.removeBlockFromCache(it)
Expand All @@ -352,6 +353,33 @@ with --debug-eager-state-root."""
# Base block always have finalized marker
c.base.finalize()

if c.dynamicBatchSize:
# Dynamicly adjust the persistBatchSize based on the recorded run time.
# The goal here is use the maximum batch size possible without blocking the
# event loop for too long which could negatively impact the p2p networking.
# Increasing the batch size can improve performance because the stateroot
# computation and persist calls are performed less frequently.
const
targetTime = 500.milliseconds
targetTimeDelta = 200.milliseconds
targetTimeLowerBound = (targetTime - targetTimeDelta).milliseconds
targetTimeUpperBound = (targetTime + targetTimeDelta).milliseconds
batchSizeLowerBound = 4
batchSizeUpperBound = 512

let
finishTime = Moment.now()
runTime = (finishTime - startTime).milliseconds

if runTime < targetTimeLowerBound and c.persistBatchSize <= batchSizeUpperBound:
c.persistBatchSize *= 2
info "Increased persistBatchSize", runTime, targetTime,
persistBatchSize = c.persistBatchSize
elif runTime > targetTimeUpperBound and c.persistBatchSize >= batchSizeLowerBound:
c.persistBatchSize = c.persistBatchSize div 2
info "Decreased persistBatchSize", runTime, targetTime,
persistBatchSize = c.persistBatchSize

count

proc processUpdateBase(c: ForkedChainRef): Future[Result[void, string]] {.async: (raises: [CancelledError]).} =
Expand Down Expand Up @@ -586,6 +614,7 @@ proc init*(
com: CommonRef;
baseDistance = BaseDistance;
persistBatchSize = PersistBatchSize;
dynamicBatchSize = false;
eagerStateRoot = false;
enableQueue = false;
): T =
Expand Down Expand Up @@ -627,6 +656,7 @@ proc init*(
baseTxFrame: baseTxFrame,
baseDistance: baseDistance,
persistBatchSize: persistBatchSize,
dynamicBatchSize: dynamicBatchSize,
quarantine: Quarantine.init(),
fcuHead: fcuHead,
fcuSafe: fcuSafe,
Expand Down
4 changes: 4 additions & 0 deletions execution_chain/core/chain/forked_chain/chain_desc.nim
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ type
# to move the base. And the bulk writing can works
# efficiently.

dynamicBatchSize*: bool
# Enable adjusting the persistBatchSize dynamically based on the
# time it takes to update base.

portal*: HistoryExpiryRef
# History Expiry tracker and portal access entry point

Expand Down
1 change: 1 addition & 0 deletions execution_chain/nimbus_execution_client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ proc basicServices(nimbus: NimbusNode, conf: NimbusConf, com: CommonRef) =
let fc = ForkedChainRef.init(com,
eagerStateRoot = conf.eagerStateRootCheck,
persistBatchSize = conf.persistBatchSize,
dynamicBatchSize = conf.dynamicBatchSize,
enableQueue = true)
fc.deserialize().isOkOr:
warn "Loading block DAG from database", msg=error
Expand Down
5 changes: 4 additions & 1 deletion tests/test_forked_chain.nim
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,10 @@ suite "ForkedChainRef tests":
test "newBase move forward, auto mode, base finalized marker needed":
const info = "newBase move forward, auto mode, base finalized marker needed"
let com = env.newCom()
var chain = ForkedChainRef.init(com, baseDistance = 2, persistBatchSize = 1)
var chain = ForkedChainRef.init(com,
baseDistance = 2,
persistBatchSize = 1,
dynamicBatchSize = false)
check (waitFor chain.forkChoice(blk8.blockHash, blk8.blockHash)).isErr
check chain.tryUpdatePendingFCU(blk8.blockHash, blk8.header.number)
checkImportBlock(chain, blk1)
Expand Down