Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
462 changes: 434 additions & 28 deletions codegenerator/cli/npm/envio/src/Batch.res

Large diffs are not rendered by default.

31 changes: 18 additions & 13 deletions codegenerator/cli/npm/envio/src/FetchState.res
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,15 @@ let mergeIntoPartition = (p: partition, ~target: partition, ~maxAddrInPartition)

let allowedAddressesNumber = ref(maxAddrInPartition)

target.addressesByContractName->Utils.Dict.forEachWithKey((contractName, addresses) => {
target.addressesByContractName->Utils.Dict.forEachWithKey((addresses, contractName) => {
allowedAddressesNumber := allowedAddressesNumber.contents - addresses->Array.length
mergedAddresses->Js.Dict.set(contractName, addresses)
})

// Start with putting all addresses to the merging dict
// And if they exceed the limit, start removing from the merging dict
// and putting into the rest dict
p.addressesByContractName->Utils.Dict.forEachWithKey((contractName, addresses) => {
p.addressesByContractName->Utils.Dict.forEachWithKey((addresses, contractName) => {
allowedAddressesNumber := allowedAddressesNumber.contents - addresses->Array.length
switch mergedAddresses->Utils.Dict.dangerouslyGetNonOption(contractName) {
| Some(targetAddresses) =>
Expand All @@ -112,7 +112,7 @@ let mergeIntoPartition = (p: partition, ~target: partition, ~maxAddrInPartition)
let rest = if allowedAddressesNumber.contents < 0 {
let restAddresses = Js.Dict.empty()

mergedAddresses->Utils.Dict.forEachWithKey((contractName, addresses) => {
mergedAddresses->Utils.Dict.forEachWithKey((addresses, contractName) => {
if allowedAddressesNumber.contents === 0 {
()
} else if addresses->Array.length <= -allowedAddressesNumber.contents {
Expand Down Expand Up @@ -1153,7 +1153,7 @@ let rollbackPartition = (
})
| {addressesByContractName} =>
let rollbackedAddressesByContractName = Js.Dict.empty()
addressesByContractName->Utils.Dict.forEachWithKey((contractName, addresses) => {
addressesByContractName->Utils.Dict.forEachWithKey((addresses, contractName) => {
let keptAddresses =
addresses->Array.keep(address => !(addressesToRemove->Utils.Set.has(address)))
if keptAddresses->Array.length > 0 {
Expand Down Expand Up @@ -1252,7 +1252,7 @@ let isReadyToEnterReorgThreshold = (
buffer->Utils.Array.isEmpty
}

let filterAndSortForUnorderedBatch = {
let sortForUnorderedBatch = {
let hasFullBatch = ({buffer} as fetchState: t, ~batchSizeTarget) => {
switch buffer->Belt.Array.get(batchSizeTarget - 1) {
| Some(item) => item->Internal.getItemBlockNumber <= fetchState->bufferBlockNumber
Expand All @@ -1262,20 +1262,24 @@ let filterAndSortForUnorderedBatch = {

(fetchStates: array<t>, ~batchSizeTarget: int) => {
fetchStates
->Array.keepU(hasReadyItem)
->Array.copy
->Js.Array2.sortInPlaceWith((a: t, b: t) => {
switch (a->hasFullBatch(~batchSizeTarget), b->hasFullBatch(~batchSizeTarget)) {
| (true, true)
| (false, false) =>
// Use unsafe since we filtered out all queues without batch items
switch (a.buffer->Belt.Array.getUnsafe(0), b.buffer->Belt.Array.getUnsafe(0)) {
| (Event({timestamp: aTimestamp}), Event({timestamp: bTimestamp})) =>
switch (a.buffer->Belt.Array.get(0), b.buffer->Belt.Array.get(0)) {
| (Some(Event({timestamp: aTimestamp})), Some(Event({timestamp: bTimestamp}))) =>
aTimestamp - bTimestamp
| (Block(_), _)
| (_, Block(_)) =>
| (Some(Block(_)), _)
| (_, Some(Block(_))) =>
// Currently block items don't have a timestamp,
// so we sort chains with them in a random order
Js.Math.random_int(-1, 1)
// We don't care about the order of chains with no items
// Just keep them to increase the progress block number when relevant
| (Some(_), None) => -1
| (None, Some(_)) => 1
| (None, None) => 0
}
| (true, false) => -1
| (false, true) => 1
Expand All @@ -1284,9 +1288,10 @@ let filterAndSortForUnorderedBatch = {
}
}

let getProgressBlockNumber = ({buffer} as fetchState: t) => {
// Ordered multichain mode can't skip blocks, even if there are no items.
let getUnorderedMultichainProgressBlockNumberAt = ({buffer} as fetchState: t, ~index) => {
let bufferBlockNumber = fetchState->bufferBlockNumber
switch buffer->Belt.Array.get(0) {
switch buffer->Belt.Array.get(index) {
| Some(item) if bufferBlockNumber >= item->Internal.getItemBlockNumber =>
item->Internal.getItemBlockNumber - 1
| _ => bufferBlockNumber
Expand Down
11 changes: 11 additions & 0 deletions codegenerator/cli/npm/envio/src/Internal.res
Original file line number Diff line number Diff line change
Expand Up @@ -285,3 +285,14 @@ let makeCacheTable = (~effectName) => {

@genType.import(("./Types.ts", "Invalid"))
type noEventFilters

type reorgCheckpoint = {
@as("id")
checkpointId: int,
@as("chain_id")
chainId: int,
@as("block_number")
blockNumber: int,
@as("block_hash")
blockHash: string,
}
2 changes: 1 addition & 1 deletion codegenerator/cli/npm/envio/src/InternalConfig.res
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type chain = {
id: int,
startBlock: int,
endBlock?: int,
confirmedBlockThreshold: int,
maxReorgDepth: int,
contracts: array<contract>,
sources: array<Source.t>,
}
Expand Down
3 changes: 3 additions & 0 deletions codegenerator/cli/npm/envio/src/Persistence.res
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ type initialState = {
cleanRun: bool,
cache: dict<effectCacheRecord>,
chains: array<InternalTable.Chains.t>,
checkpointId: int,
// Needed to keep reorg detection logic between restarts
reorgCheckpoints: array<Internal.reorgCheckpoint>,
}

type operator = [#">" | #"="]
Expand Down
16 changes: 13 additions & 3 deletions codegenerator/cli/npm/envio/src/PgStorage.res
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ let makeInitializeTransaction = (
let generalTables = [
InternalTable.Chains.table,
InternalTable.PersistedState.table,
InternalTable.EndOfBlockRangeScannedData.table,
InternalTable.Checkpoints.table,
InternalTable.RawEvents.table,
]

Expand Down Expand Up @@ -325,7 +325,7 @@ let chunkArray = (arr: array<'a>, ~chunkSize) => {
let removeInvalidUtf8InPlace = entities =>
entities->Js.Array2.forEach(item => {
let dict = item->(Utils.magic: 'a => dict<unknown>)
dict->Utils.Dict.forEachWithKey((key, value) => {
dict->Utils.Dict.forEachWithKey((value, key) => {
if value->Js.typeof === "string" {
let value = value->(Utils.magic: unknown => string)
// We mutate here, since we don't care
Expand Down Expand Up @@ -707,7 +707,9 @@ let make = (
{
cleanRun: true,
cache,
reorgCheckpoints: [],
chains: chainConfigs->Js.Array2.map(InternalTable.Chains.initialFromConfig),
checkpointId: InternalTable.Checkpoints.initialCheckpointId,
}
}

Expand Down Expand Up @@ -895,19 +897,27 @@ let make = (
}

let resumeInitialState = async (): Persistence.initialState => {
let (cache, chains) = await Promise.all2((
let (cache, chains, checkpointIdResult, reorgCheckpoints) = await Promise.all4((
restoreEffectCache(~withUpload=false),
sql
->Postgres.unsafe(
makeLoadAllQuery(~pgSchema, ~tableName=InternalTable.Chains.table.tableName),
)
->(Utils.magic: promise<array<unknown>> => promise<array<InternalTable.Chains.t>>),
sql
->Postgres.unsafe(InternalTable.Checkpoints.makeCommitedCheckpointIdQuery(~pgSchema))
->(Utils.magic: promise<array<unknown>> => promise<array<{"id": int}>>),
sql
->Postgres.unsafe(InternalTable.Checkpoints.makeGetReorgCheckpointsQuery(~pgSchema))
->(Utils.magic: promise<array<unknown>> => promise<array<Internal.reorgCheckpoint>>),
))

{
cleanRun: false,
reorgCheckpoints,
cache,
chains,
checkpointId: (checkpointIdResult->Belt.Array.getUnsafe(0))["id"],
}
}

Expand Down
Loading