Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 60 additions & 16 deletions codegenerator/cli/npm/envio/src/Batch.res
Original file line number Diff line number Diff line change
@@ -1,3 +1,28 @@
open Belt

type batchCheckpoint = {
checkpointId: bigint,
chainId: int,
blockNumber: int,
// Might be empty if we are not in reorg threshold
// or rollback on reorg is disabled
blockHash?: string,
// Might be empty if we it's a reorg guard block
items?: array<Internal.item>,
}

type chainBeforeBatch = {
chainId: int,
fetchState: FetchState.t,
blocks: ChainBlocks.t,
totalEventsProcessed: int,
}

type mutChainAcc = {
mutable batchSize: int,
mutable lastCheckpoint: batchCheckpoint,
}

type progressedChain = {
chainId: int,
batchSize: int,
Expand All @@ -6,11 +31,12 @@ type progressedChain = {
}

type t = {
items: array<Internal.item>,
checkpoints: array<batchCheckpoint>,
progressedChains: array<progressedChain>,
updatedFetchStates: ChainMap.t<FetchState.t>,
dcsToStoreByChainId: dict<array<FetchState.indexingContract>>,
creationTimeMs: int,
totalBatchSize: int,
// updatedFetchStates: ChainMap.t<FetchState.t>,
// dcsToStoreByChainId: dict<array<FetchState.indexingContract>>,
// creationTimeMs: int,
}

/**
Expand All @@ -21,7 +47,7 @@ let getOrderedNextChain = (fetchStates: ChainMap.t<FetchState.t>, ~batchSizePerC
let earliestChainTimestamp = ref(0)
let chainKeys = fetchStates->ChainMap.keys
for idx in 0 to chainKeys->Array.length - 1 {
let chain = chainKeys->Array.get(idx)
let chain = chainKeys->Array.getUnsafe(idx)
let fetchState = fetchStates->ChainMap.get(chain)
if fetchState->FetchState.isActivelyIndexing {
let timestamp = fetchState->FetchState.getTimestampAt(
Expand Down Expand Up @@ -120,41 +146,59 @@ let prepareOrderedBatch = (
}

let prepareUnorderedBatch = (
~chains: array<chainBeforeBatch>,
~batchSizeTarget,
~fetchStates: ChainMap.t<FetchState.t>,
~mutBatchSizePerChain: dict<int>,
) => {
~progressCheckpointId,
): t => {
let progressCheckpointId = ref(progressCheckpointId)
let accPerChain = Js.Dict.empty()

let preparedFetchStates =
fetchStates
->ChainMap.values
chains
->Array.map(chain => chain.fetchState)
->FetchState.filterAndSortForUnorderedBatch(~batchSizeTarget)

let chainIdx = ref(0)
let preparedNumber = preparedFetchStates->Array.length
let batchSize = ref(0)
let totalBatchSize = ref(0)

let items = []

// Accumulate items for all actively indexing chains
// the way to group as many items from a single chain as possible
// This way the loaders optimisations will hit more often
while batchSize.contents < batchSizeTarget && chainIdx.contents < preparedNumber {
while totalBatchSize.contents < batchSizeTarget && chainIdx.contents < preparedNumber {
let fetchState = preparedFetchStates->Js.Array2.unsafe_get(chainIdx.contents)
let chainAcc = switch accPerChain->Utils.Dict.dangerouslyGetByIntNonOption(fetchState.chainId) {
| Some(chainAcc) => chainAcc
| None =>
let acc = {
batchSize: 0,
lastCheckpoint: %raw(`null`),
}
accPerChain->Utils.Dict.setByInt(fetchState.chainId, acc)
acc
}

let chainBatchSize =
fetchState->FetchState.getReadyItemsCount(
~targetSize=batchSizeTarget - batchSize.contents,
~targetSize=batchSizeTarget - totalBatchSize.contents,
~fromItem=0,
)
if chainBatchSize > 0 {
for idx in 0 to chainBatchSize - 1 {
items->Js.Array2.push(fetchState.buffer->Belt.Array.getUnsafe(idx))->ignore
}
batchSize := batchSize.contents + chainBatchSize
mutBatchSizePerChain->Utils.Dict.setByInt(fetchState.chainId, chainBatchSize)
totalBatchSize := totalBatchSize.contents + chainBatchSize
chainAcc.batchSize = chainBatchSize
}

chainIdx := chainIdx.contents + 1
}

items
{
totalBatchSize: totalBatchSize.contents,
progressedChains: [],
checkpoints: [],
}
}
49 changes: 49 additions & 0 deletions codegenerator/cli/npm/envio/src/ChainBlocks.res
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
open Belt

type t = {
shouldRollbackOnReorg: bool,
lastBlockScannedHashes: ReorgDetection.LastBlockScannedHashes.t,
}

let make = (
~chainId,
~maxReorgDepth,
~shouldRollbackOnReorg,
~reorgCheckpoints: array<InternalTable.Checkpoints.t>,
) => {
{
shouldRollbackOnReorg,
lastBlockScannedHashes: reorgCheckpoints
->Array.keepMapU(reorgCheckpoint => {
if reorgCheckpoint.chainId === chainId {
Some({
ReorgDetection.blockNumber: reorgCheckpoint.blockNumber,
blockHash: reorgCheckpoint.blockHash,
})
} else {
None
}
})
->ReorgDetection.LastBlockScannedHashes.makeWithData(~maxReorgDepth),
}
}

let registerReorgGuard = (
chainBlocks: t,
~reorgGuard: ReorgDetection.reorgGuard,
~currentBlockHeight: int,
) => {
let (updatedLastBlockScannedHashes, reorgResult: ReorgDetection.reorgResult) =
chainBlocks.lastBlockScannedHashes->ReorgDetection.LastBlockScannedHashes.registerReorgGuard(
~reorgGuard,
~currentBlockHeight,
~shouldRollbackOnReorg=chainBlocks.shouldRollbackOnReorg,
)
(
{
...chainBlocks,
lastBlockScannedHashes: updatedLastBlockScannedHashes,
},
reorgResult,
)
}
2 changes: 1 addition & 1 deletion codegenerator/cli/npm/envio/src/InternalConfig.res
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type chain = {
id: int,
startBlock: int,
endBlock?: int,
confirmedBlockThreshold: int,
maxReorgDepth: int,
contracts: array<contract>,
sources: array<Source.t>,
}
Expand Down
1 change: 1 addition & 0 deletions codegenerator/cli/npm/envio/src/Persistence.res
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type initialState = {
cleanRun: bool,
cache: dict<effectCacheRecord>,
chains: array<InternalTable.Chains.t>,
reorgCheckpoints: array<InternalTable.Checkpoints.t>,
}

type operator = [#">" | #"="]
Expand Down
11 changes: 9 additions & 2 deletions codegenerator/cli/npm/envio/src/PgStorage.res
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ let makeInitializeTransaction = (
let generalTables = [
InternalTable.Chains.table,
InternalTable.PersistedState.table,
InternalTable.EndOfBlockRangeScannedData.table,
InternalTable.Checkpoints.table,
InternalTable.RawEvents.table,
]

Expand Down Expand Up @@ -707,6 +707,7 @@ let make = (
{
cleanRun: true,
cache,
reorgCheckpoints: [],
chains: chainConfigs->Js.Array2.map(InternalTable.Chains.initialFromConfig),
}
}
Expand Down Expand Up @@ -895,17 +896,23 @@ let make = (
}

let resumeInitialState = async (): Persistence.initialState => {
let (cache, chains) = await Promise.all2((
let (cache, chains, reorgCheckpoints) = await Promise.all3((
restoreEffectCache(~withUpload=false),
sql
->Postgres.unsafe(
makeLoadAllQuery(~pgSchema, ~tableName=InternalTable.Chains.table.tableName),
)
->(Utils.magic: promise<array<unknown>> => promise<array<InternalTable.Chains.t>>),
sql
->Postgres.unsafe(
makeLoadAllQuery(~pgSchema, ~tableName=InternalTable.Checkpoints.table.tableName),
)
->(Utils.magic: promise<array<unknown>> => promise<array<InternalTable.Checkpoints.t>>),
))

{
cleanRun: false,
reorgCheckpoints,
cache,
chains,
}
Expand Down
35 changes: 14 additions & 21 deletions codegenerator/cli/npm/envio/src/ReorgDetection.res
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,10 @@ type validBlockResult = result<blockDataWithTimestamp, validBlockError>
module LastBlockScannedHashes: {
type t
/**Instantiat t with existing data*/
let makeWithData: (
array<blockData>,
~confirmedBlockThreshold: int,
~detectedReorgBlock: blockData=?,
) => t
let makeWithData: (array<blockData>, ~maxReorgDepth: int, ~detectedReorgBlock: blockData=?) => t

/**Instantiat empty t with no block data*/
let empty: (~confirmedBlockThreshold: int) => t
let empty: (~maxReorgDepth: int) => t

/** Registers a new reorg guard, prunes unneeded data, and returns the updated state.
* Resets internal state if shouldRollbackOnReorg is false (detect-only mode)
Expand Down Expand Up @@ -82,7 +78,7 @@ module LastBlockScannedHashes: {
// as a threshold for reorgs. If for eg. this is 200,
// it means we are accounting for reorgs up to 200 blocks
// behind the head
confirmedBlockThreshold: int,
maxReorgDepth: int,
// A hash map of recent blockdata by block number to make comparison checks
// for reorgs.
dataByBlockNumber: dict<blockData>,
Expand All @@ -93,33 +89,33 @@ module LastBlockScannedHashes: {
detectedReorgBlock: option<blockData>,
}

let makeWithData = (blocks, ~confirmedBlockThreshold, ~detectedReorgBlock=?) => {
let makeWithData = (blocks, ~maxReorgDepth, ~detectedReorgBlock=?) => {
let dataByBlockNumber = Js.Dict.empty()

blocks->Belt.Array.forEach(block => {
dataByBlockNumber->Js.Dict.set(block.blockNumber->Js.Int.toString, block)
})

{
confirmedBlockThreshold,
maxReorgDepth,
dataByBlockNumber,
detectedReorgBlock,
}
}
//Instantiates empty LastBlockHashes
let empty = (~confirmedBlockThreshold) => {
confirmedBlockThreshold,
let empty = (~maxReorgDepth) => {
maxReorgDepth,
dataByBlockNumber: Js.Dict.empty(),
detectedReorgBlock: None,
}

let getDataByBlockNumberCopyInThreshold = (
{dataByBlockNumber, confirmedBlockThreshold}: t,
{dataByBlockNumber, maxReorgDepth}: t,
~currentBlockHeight,
) => {
// Js engine automatically orders numeric object keys
let ascBlockNumberKeys = dataByBlockNumber->Js.Dict.keys
let thresholdBlockNumber = currentBlockHeight - confirmedBlockThreshold
let thresholdBlockNumber = currentBlockHeight - maxReorgDepth

let copy = Js.Dict.empty()

Expand All @@ -136,7 +132,7 @@ module LastBlockScannedHashes: {
}

let registerReorgGuard = (
{confirmedBlockThreshold} as self: t,
{maxReorgDepth} as self: t,
~reorgGuard: reorgGuard,
~currentBlockHeight,
~shouldRollbackOnReorg,
Expand Down Expand Up @@ -180,7 +176,7 @@ module LastBlockScannedHashes: {
...self,
detectedReorgBlock: Some(reorgDetected.scannedBlock),
}
: empty(~confirmedBlockThreshold),
: empty(~maxReorgDepth),
ReorgDetected(reorgDetected),
)
| None => {
Expand All @@ -199,7 +195,7 @@ module LastBlockScannedHashes: {

(
{
confirmedBlockThreshold,
maxReorgDepth,
dataByBlockNumber: dataByBlockNumberCopyInThreshold,
detectedReorgBlock: None,
},
Expand Down Expand Up @@ -289,10 +285,7 @@ module LastBlockScannedHashes: {
Return a BlockNumbersAndHashes.t rolled back to where blockData is less
than the provided blockNumber
*/
let rollbackToValidBlockNumber = (
{dataByBlockNumber, confirmedBlockThreshold}: t,
~blockNumber: int,
) => {
let rollbackToValidBlockNumber = ({dataByBlockNumber, maxReorgDepth}: t, ~blockNumber: int) => {
// Js engine automatically orders numeric object keys
let ascBlockNumberKeys = dataByBlockNumber->Js.Dict.keys

Expand All @@ -316,7 +309,7 @@ module LastBlockScannedHashes: {
loop(0)

{
confirmedBlockThreshold,
maxReorgDepth,
dataByBlockNumber: newDataByBlockNumber,
detectedReorgBlock: None,
}
Expand Down
Loading
Loading