Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
141 commits
Select commit Hold shift + click to select a range
d14ed10
add missing collection queue
zhangchiqing Nov 6, 2025
12339f5
add ingestion2 collection syncer
zhangchiqing Nov 7, 2025
a7a0253
update ingestion2 engine
zhangchiqing Nov 7, 2025
6a9ccd5
fix tests
zhangchiqing Nov 7, 2025
4277a72
update ingestion2 and indexer
zhangchiqing Nov 7, 2025
59684ee
update access node builder
zhangchiqing Nov 7, 2025
3353177
simplify the job queue
zhangchiqing Nov 10, 2025
92fe356
add execution data processor
zhangchiqing Nov 10, 2025
79076c4
update exectuion data processor
zhangchiqing Nov 10, 2025
c4ffce4
update exectuion data processor
zhangchiqing Nov 10, 2025
aefe0f2
update exectuion data processor factory
zhangchiqing Nov 10, 2025
7ff6772
refactor collection sync
zhangchiqing Nov 13, 2025
c723fc9
refactor with fetcher
zhangchiqing Nov 13, 2025
69f2da9
simplify the finalizer
zhangchiqing Nov 13, 2025
c9f1ffa
add component to finalized indexer processor
zhangchiqing Nov 13, 2025
1c43552
rename syncer to fetcher
zhangchiqing Nov 13, 2025
66e639b
add comment
zhangchiqing Nov 13, 2025
28d459a
refactor last full block height
zhangchiqing Nov 13, 2025
96caf34
make job consumer LastProcessedIndex and Size to be non blocking
zhangchiqing Nov 13, 2025
31889f8
fix for observer
zhangchiqing Nov 13, 2025
eb4f9f5
fix backend test
zhangchiqing Nov 13, 2025
3b5fc1d
fix execution script test
zhangchiqing Nov 13, 2025
06b83b0
fix indexer tests
zhangchiqing Nov 13, 2025
743d56d
fix lint
zhangchiqing Nov 13, 2025
91fe006
add metrics
zhangchiqing Nov 13, 2025
52149f6
update transaction and collections storage
zhangchiqing Nov 13, 2025
ef5635b
fix benchmark tool
zhangchiqing Nov 13, 2025
11f32a2
add logs
zhangchiqing Nov 13, 2025
9ee10b7
fix lint
zhangchiqing Nov 13, 2025
c4a867c
add logs
zhangchiqing Nov 13, 2025
8664c48
update metrics
zhangchiqing Nov 13, 2025
7f25f25
update comments for collection sync metrics
zhangchiqing Nov 14, 2025
ee66ca9
update comments
zhangchiqing Nov 14, 2025
075e7cd
rename job processor to block processor
zhangchiqing Nov 14, 2025
4770fb7
move indexer
zhangchiqing Nov 14, 2025
8feb65e
update comments
zhangchiqing Nov 14, 2025
2feb170
update metrics
zhangchiqing Nov 14, 2025
232a602
refactor missing collections queue
zhangchiqing Nov 14, 2025
f5bda62
add hotstuff distributor interface
zhangchiqing Nov 14, 2025
25e54a8
access node builder should subscribe for finalization events
zhangchiqing Nov 14, 2025
75c7ed3
use flag to config collectoin fetcher configs
zhangchiqing Nov 16, 2025
a9348ee
add missing collection queue metrics
zhangchiqing Nov 17, 2025
cc66243
fix lint
zhangchiqing Nov 17, 2025
3ed17bb
remove unused var
zhangchiqing Nov 17, 2025
6a44481
add todo
zhangchiqing Nov 17, 2025
1acbab1
Merge branch 'master' into leo/collection-syncing
zhangchiqing Nov 17, 2025
e99a2c0
Merge branch 'master' into leo/collection-syncing
zhangchiqing Nov 17, 2025
b68267e
Merge branch 'master' into leo/collection-syncing
zhangchiqing Nov 18, 2025
389273f
fix tests
zhangchiqing Nov 18, 2025
b65768a
fix linter
zhangchiqing Nov 18, 2025
1ae6355
update ProgressReader
zhangchiqing Nov 18, 2025
a5ba7b0
fix wait for executed to allow tx to be sealed
zhangchiqing Nov 18, 2025
a9de92a
disable ingest receipt engine
zhangchiqing Nov 18, 2025
232e6e0
comment out unused ingest receipt engine
zhangchiqing Nov 18, 2025
d1ffe91
Merge branch 'leo/integration-debugging-readme' into leo/collection-s…
zhangchiqing Nov 18, 2025
271c271
always enable collection fetcher to pass integration tests
zhangchiqing Nov 19, 2025
3b813eb
fix lint
zhangchiqing Nov 19, 2025
16f1c1f
fix cohort3
zhangchiqing Nov 19, 2025
e1d1480
remove ghost verification node
zhangchiqing Nov 19, 2025
0943ee9
Merge branch 'master' into leo/collection-syncing
zhangchiqing Nov 19, 2025
84072c0
fixed
zhangchiqing Nov 19, 2025
e1ebb8c
add logs
zhangchiqing Nov 19, 2025
594ee94
Merge remote-tracking branch 'origin/leo/collection-syncing' into leo…
zhangchiqing Nov 19, 2025
cde5198
remove scripts
zhangchiqing Nov 19, 2025
c49da01
Merge branch 'master' into leo/collection-syncing
zhangchiqing Nov 19, 2025
029e8e0
fix indexer core tests
zhangchiqing Nov 19, 2025
cee0e8f
add collection_fetch flag
zhangchiqing Nov 20, 2025
58228be
improve getMissingCollections
zhangchiqing Nov 20, 2025
5bd3c88
simplify block_processor
zhangchiqing Nov 20, 2025
9d5fdbf
fix lint
zhangchiqing Nov 20, 2025
fea0f65
add comments
zhangchiqing Nov 20, 2025
d0b2fc5
optimize batch store same collection
zhangchiqing Nov 7, 2025
ca8695c
add benchmark
zhangchiqing Nov 20, 2025
43c0402
change flags default to execution_and_collection
zhangchiqing Nov 20, 2025
b2268c7
Merge branch 'master' into leo/collection-syncing
zhangchiqing Nov 20, 2025
1ea00c2
refactor execution data requester's HighestConsecutiveHeight
zhangchiqing Nov 21, 2025
835e793
update mocks
zhangchiqing Nov 21, 2025
1cf295c
add todo
zhangchiqing Nov 21, 2025
a4dd3b3
fetcher to use different consumer progress to avoid contention
zhangchiqing Nov 21, 2025
84b37af
move component initialization functions from access_node_builder to
zhangchiqing Nov 21, 2025
4198d32
fix lint
zhangchiqing Nov 21, 2025
c53a0e3
add collection sync mode tests
zhangchiqing Nov 21, 2025
00f5c1a
add indexer tests
zhangchiqing Nov 21, 2025
8c72128
update finalized indexer comments
zhangchiqing Nov 21, 2025
d405241
use debug level log
zhangchiqing Nov 21, 2025
8d8740e
fix lint
zhangchiqing Nov 21, 2025
743e7b2
fix tests
zhangchiqing Nov 21, 2025
c29aabe
Merge branch 'leo/integration-debugging-readme' into leo/collection-s…
zhangchiqing Nov 21, 2025
af7ec61
refactor using engine.Notifier
zhangchiqing Nov 21, 2025
237973d
address review comments
zhangchiqing Nov 22, 2025
a31eb4f
use sealed root height
zhangchiqing Nov 22, 2025
a6d6c08
update progress reader
zhangchiqing Nov 22, 2025
aedc1db
update comments
zhangchiqing Nov 22, 2025
6578186
use verification node in execution integration tests
zhangchiqing Nov 22, 2025
8f08af3
fix execution integration tests
zhangchiqing Nov 22, 2025
a3e5814
Merge branch 'master' into leo/collection-syncing
zhangchiqing Nov 22, 2025
f555c73
refactor missing collections queue
zhangchiqing Nov 22, 2025
768f401
add pruner for missing collections queue
zhangchiqing Nov 22, 2025
afbf8bc
add test cases for missing collection queue
zhangchiqing Nov 22, 2025
3dcb692
refactor pruner
zhangchiqing Nov 22, 2025
17190cb
refactor with onIndexedCallback
zhangchiqing Nov 23, 2025
964ee3c
fix lint
zhangchiqing Nov 24, 2025
262c755
Merge remote-tracking branch 'origin/leo/collection-syncing' into leo…
zhangchiqing Nov 24, 2025
ec6cdc9
refactor with RequestCollectionsByGuarantees
zhangchiqing Nov 24, 2025
298234c
Merge branch 'leo/optimize-collection-index-metrics' into leo/collect…
zhangchiqing Nov 24, 2025
1436a6b
initilaize metrics to avoid spikes
zhangchiqing Nov 24, 2025
c7d5b50
use sealed root height
zhangchiqing Nov 25, 2025
1fe2607
add back last full height metrics
zhangchiqing Nov 26, 2025
5df74e8
add pebble compaction listener
zhangchiqing Nov 26, 2025
079a2aa
log with seconds
zhangchiqing Nov 26, 2025
50293fd
log missing collection
zhangchiqing Dec 2, 2025
d9b4c20
improve logging
zhangchiqing Dec 3, 2025
65245e9
add retry
zhangchiqing Dec 3, 2025
0728e39
check storage before retry
zhangchiqing Dec 3, 2025
ee346e2
simplify RetryFetchingMissingCollections
zhangchiqing Dec 3, 2025
f6dacb4
Bug fix: failed Unicast calls no longer count as attempts
zhangchiqing Dec 3, 2025
49876a9
add retry interval config
zhangchiqing Dec 3, 2025
75a28bb
report correct height
zhangchiqing Dec 3, 2025
5b17ae2
add retry interval config
zhangchiqing Dec 3, 2025
c8f71b9
log indexer look up
zhangchiqing Dec 4, 2025
8de563a
indexer remove redundant header reads
zhangchiqing Dec 4, 2025
91299c2
add flag to disable bitswap bloom cache
zhangchiqing Dec 4, 2025
199a84a
improve logging
zhangchiqing Dec 4, 2025
9c3c5d9
add flag to disable bitswap bloom cache
zhangchiqing Dec 4, 2025
8e70d15
Merge branch 'master' into leo/disable-bitswap-bloom-cache
zhangchiqing Dec 4, 2025
15450bf
fix lint
zhangchiqing Dec 4, 2025
c4e8228
log block height
zhangchiqing Dec 4, 2025
d71309e
warn about large iteration
zhangchiqing Dec 4, 2025
38ef51d
requester to return the cached store
zhangchiqing Dec 4, 2025
d1250a3
fix lint
zhangchiqing Dec 4, 2025
8e51aae
fix mock
zhangchiqing Dec 4, 2025
e5e3421
Merge branch 'master' into leo/collection-syncing
zhangchiqing Dec 4, 2025
484ff18
fix tests
zhangchiqing Dec 5, 2025
75d04b9
revert to use execution data cache
zhangchiqing Dec 5, 2025
5b07e92
add retry
zhangchiqing Dec 5, 2025
f08edf4
remove notification data
zhangchiqing Dec 5, 2025
e2d29da
remove notification consumer
zhangchiqing Dec 5, 2025
bbc700f
revert blob change
zhangchiqing Dec 8, 2025
01324d6
adjustable bloom cache
zhangchiqing Dec 8, 2025
27f6a59
disable access node builder bloom cache
zhangchiqing Dec 8, 2025
da0c021
Merge branch 'leo/disable-bitswap-bloom-cache' into leo/collection-sy…
zhangchiqing Dec 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
444 changes: 264 additions & 180 deletions cmd/access/node_builder/access_node_builder.go

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,10 @@ func (exeNode *ExecutionNode) LoadBlobService(
opts = append(opts, blob.WithReprovideInterval(-1))
}

if !node.BitswapBloomCacheEnabled {
opts = append(opts, blob.WithSkipBloomCache(true))
}

if exeNode.exeConf.blobstoreRateLimit > 0 && exeNode.exeConf.blobstoreBurstLimit > 0 {
opts = append(opts, blob.WithRateLimit(float64(exeNode.exeConf.blobstoreRateLimit), exeNode.exeConf.blobstoreBurstLimit))
}
Expand Down
20 changes: 14 additions & 6 deletions cmd/node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,13 @@ type BaseConfig struct {
// This is only meaningful to Access and Execution nodes.
BitswapReprovideEnabled bool

// BitswapBloomCacheEnabled configures whether the Bitswap bloom cache is enabled.
// When disabled, uses a plain blockstore instead of cached blockstore, avoiding
// the CPU cost of building the bloom filter on startup. Pebble's built-in bloom
// filters (persisted in SSTables) are still used for efficient lookups.
// This is only meaningful to Access and Execution nodes.
BitswapBloomCacheEnabled bool

TransactionFeesDisabled bool
}

Expand Down Expand Up @@ -297,12 +304,13 @@ func DefaultBaseConfig() *BaseConfig {
Duration: 10 * time.Second,
},

HeroCacheMetricsEnable: false,
SyncCoreConfig: chainsync.DefaultConfig(),
CodecFactory: codecFactory,
ComplianceConfig: compliance.DefaultConfig(),
DhtSystemEnabled: true,
BitswapReprovideEnabled: true,
HeroCacheMetricsEnable: false,
SyncCoreConfig: chainsync.DefaultConfig(),
CodecFactory: codecFactory,
ComplianceConfig: compliance.DefaultConfig(),
DhtSystemEnabled: true,
BitswapReprovideEnabled: true,
BitswapBloomCacheEnabled: true, // default: use cached blockstore TODO leo: change default to false
}
}

Expand Down
47 changes: 5 additions & 42 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/engine/access/apiproxy"
"github.com/onflow/flow-go/engine/access/index"
"github.com/onflow/flow-go/engine/access/ingestion/collections"
"github.com/onflow/flow-go/engine/access/rest"
restapiproxy "github.com/onflow/flow-go/engine/access/rest/apiproxy"
"github.com/onflow/flow-go/engine/access/rest/router"
Expand Down Expand Up @@ -70,7 +69,6 @@ import (
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/blobs"
"github.com/onflow/flow-go/module/chainsync"
"github.com/onflow/flow-go/module/counters"
"github.com/onflow/flow-go/module/execution"
"github.com/onflow/flow-go/module/executiondatasync/execution_data"
execdatacache "github.com/onflow/flow-go/module/executiondatasync/execution_data/cache"
Expand Down Expand Up @@ -1104,7 +1102,6 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
var ds datastore.Batching
var bs network.BlobService
var processedBlockHeight storage.ConsumerProgressInitializer
var processedNotifications storage.ConsumerProgressInitializer
var publicBsDependable *module.ProxiedReadyDoneAware
var execDataDistributor *edrequester.ExecutionDataDistributor
var execDataCacheBackend *herocache.BlockExecutionData
Expand Down Expand Up @@ -1152,14 +1149,6 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
processedBlockHeight = store.NewConsumerProgress(db, module.ConsumeProgressExecutionDataRequesterBlockHeight)
return nil
}).
Module("processed notifications consumer progress", func(node *cmd.NodeConfig) error {
// Note: progress is stored in the datastore's DB since that is where the jobqueue
// writes execution data to.
db := builder.ExecutionDatastoreManager.DB()

processedNotifications = store.NewConsumerProgress(db, module.ConsumeProgressExecutionDataRequesterNotification)
return nil
}).
Module("blobservice peer manager dependencies", func(node *cmd.NodeConfig) error {
publicBsDependable = module.NewProxiedReadyDoneAware()
builder.PeerManagerDependencies.Add(publicBsDependable)
Expand Down Expand Up @@ -1200,6 +1189,10 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
),
}

if !builder.BitswapBloomCacheEnabled {
opts = append(opts, blob.WithSkipBloomCache(true))
}

var err error
bs, err = node.EngineRegistry.RegisterBlobService(channels.PublicExecutionDataService, ds, opts...)
if err != nil {
Expand Down Expand Up @@ -1292,7 +1285,6 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
builder.ExecutionDataDownloader,
executionDataCache,
processedBlockHeight,
processedNotifications,
builder.State,
builder.Storage.Headers,
builder.executionDataConfig,
Expand Down Expand Up @@ -1438,31 +1430,7 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
return nil, fmt.Errorf("could not create derived chain data: %w", err)
}

rootBlockHeight := node.State.Params().FinalizedRoot().Height
progress, err := store.NewConsumerProgress(builder.ProtocolDB, module.ConsumeProgressLastFullBlockHeight).Initialize(rootBlockHeight)
if err != nil {
return nil, fmt.Errorf("could not create last full block height consumer progress: %w", err)
}

lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter(progress)
if err != nil {
return nil, fmt.Errorf("could not create last full block height counter: %w", err)
}

var collectionExecutedMetric module.CollectionExecutedMetric = metrics.NewNoopCollector()
collectionIndexer, err := collections.NewIndexer(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

collection indexer is replaced by the collection_sync fetcher (fetch from LN) or block processor (syncing from EN via execution data syncing)

builder.Logger,
builder.ProtocolDB,
collectionExecutedMetric,
builder.State,
builder.Storage.Blocks,
builder.Storage.Collections,
lastFullBlockHeight,
builder.StorageLockMgr,
)
if err != nil {
return nil, fmt.Errorf("could not create collection indexer: %w", err)
}

builder.ExecutionIndexerCore = indexer.New(
builder.Logger,
Expand All @@ -1477,7 +1445,6 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
builder.scheduledTransactions,
builder.RootChainID,
indexerDerivedChainData,
collectionIndexer,
collectionExecutedMetric,
node.StorageLockMgr,
)
Expand Down Expand Up @@ -1556,10 +1523,6 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
}
builder.stateStreamConf.RpcMetricsEnabled = builder.rpcMetricsEnabled

highestAvailableHeight, err := builder.ExecutionDataRequester.HighestConsecutiveHeight()
if err != nil {
return nil, fmt.Errorf("could not get highest consecutive height: %w", err)
}
broadcaster := engine.NewBroadcaster()

eventQueryMode, err := query_mode.ParseIndexQueryMode(builder.rpcConf.BackendConfig.EventQueryMode)
Expand All @@ -1578,7 +1541,7 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
builder.executionDataConfig.InitialBlockHeight,
node.Storage.Headers,
broadcaster,
highestAvailableHeight,
builder.ExecutionDataRequester,
builder.EventsIndex,
useIndex,
)
Expand Down
4 changes: 4 additions & 0 deletions cmd/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,10 @@ func (fnb *FlowNodeBuilder) BaseFlags() {
"bitswap-reprovide-enabled",
defaultConfig.BitswapReprovideEnabled,
"[experimental] whether to enable bitswap reproviding. This is an experimental feature. Use with caution.")
fnb.flags.BoolVar(&fnb.BaseConfig.BitswapBloomCacheEnabled,
"bitswap-bloom-cache-enabled",
defaultConfig.BitswapBloomCacheEnabled,
"[experimental] whether to enable bitswap bloom cache. When disabled, uses a plain blockstore instead of cached blockstore, avoiding the CPU cost of building the bloom filter on startup. Pebble's built-in bloom filters (persisted in SSTables) are still used. This is an experimental feature. Use with caution.")

// dynamic node startup flags
fnb.flags.StringVar(&fnb.BaseConfig.DynamicStartupANPubkey,
Expand Down
4 changes: 4 additions & 0 deletions cmd/util/cmd/read-protocol-state/cmd/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ func runE(*cobra.Command, []string) error {
return fmt.Errorf("could not get root block: %w", err)
}

log.Info().Msgf("searching executed block between heights %v and %v", root.Height, sealed.Height)

// find the last executed and sealed block
for h := sealed.Height; h >= root.Height; h-- {
block, err := reader.GetBlockByHeight(h)
Expand All @@ -238,6 +240,8 @@ func runE(*cobra.Command, []string) error {
common.PrettyPrintEntity(block)
return nil
}

log.Info().Msgf("block at height %v is not executed yet", h)
}

return fmt.Errorf("could not find executed block")
Expand Down
8 changes: 8 additions & 0 deletions consensus/hotstuff/distributor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package hotstuff

import "github.com/onflow/flow-go/consensus/hotstuff/model"

type Distributor interface {
AddOnBlockFinalizedConsumer(consumer func(block *model.Block))
AddOnBlockIncorporatedConsumer(consumer func(block *model.Block))
}
37 changes: 37 additions & 0 deletions consensus/hotstuff/mocks/distributor.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

98 changes: 98 additions & 0 deletions engine/access/collection_sync/collection_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package collection_sync
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you think about structuring this like

  • engine/access/ingestion/blocks
  • engine/access/ingestion/collections
  • engine/access/ingestion/collections/executiondata
  • engine/access/ingestion/collections/factory
  • engine/access/ingestion/collections/fetcher
  • engine/access/ingestion/receipts

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m open to reorganizing things if needed or make more sense in the optimistic syncing PR.

For now, I’m keeping the structure relatively flat while grouping related logic together.

Receipts, blocks, and collections each have different syncing approaches. Block syncing is handled entirely by the follower engine across all node types, so I’m not sure it even needs to be part of ingestion.

Receipt ingestion is currently disabled because block syncing already ingests receipts.

So at the moment, only collection syncing is enabled.


import (
"context"

"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/irrecoverable"
)

// Tracks missing collections per height and invokes job callbacks when complete.
type MissingCollectionQueue interface {
// EnqueueMissingCollections tracks the given missing collection IDs for the given block height.
EnqueueMissingCollections(blockHeight uint64, ids []flow.Identifier, callback func()) error

// OnIndexedForBlock returns the callback function for the given block height
OnIndexedForBlock(blockHeight uint64) (func(), bool)

// On receipt of a collection, MCQ updates internal state and, if a block
// just became complete, returns: (collections, height, missingCollectionID, true).
// Otherwise, returns (nil, height, missingCollectionID, false).
// missingCollectionID is an arbitrary ID from the remaining missing collections, or ZeroID if none.
OnReceivedCollection(collection *flow.Collection) ([]*flow.Collection, uint64, flow.Identifier, bool)

// IsHeightQueued returns true if the given height is still being tracked (has not been indexed yet).
IsHeightQueued(height uint64) bool

// Size returns the number of missing heights currently in the queue.
Size() uint

// GetMissingCollections returns all collection IDs that are currently missing across all block heights.
GetMissingCollections() []flow.Identifier

// GetMissingCollectionsByHeight returns a map of block height to collection IDs that are missing for that height.
GetMissingCollectionsByHeight() map[uint64][]flow.Identifier
}

// Requests collections by their guarantees.
type CollectionRequester interface {
RequestCollectionsByGuarantees(guarantees []*flow.CollectionGuarantee) error
}

// BlockCollectionIndexer stores and indexes collections for a given block height.
type BlockCollectionIndexer interface {
// IndexCollectionsForBlock stores and indexes collections for a given block height.
// No error is exepcted during normal operation.
IndexCollectionsForBlock(blockHeight uint64, cols []*flow.Collection) error

// GetMissingCollections retrieves the block and returns collection guarantees that whose collections
// are missing in storage.
// Only garantees whose collections that are not already in storage are returned.
GetMissingCollections(block *flow.Block) ([]*flow.CollectionGuarantee, error)
}

// BlockProcessor processes blocks to fetch and index their collections.
type BlockProcessor interface {
// RequestCollectionsForBlock requests all missing collections for the given block.
FetchCollections(ctx irrecoverable.SignalerContext, block *flow.Block, done func()) error
// MissingCollectionQueueSize returns the number of missing collections currently in the queue.
MissingCollectionQueueSize() uint
// RetryFetchingMissingCollections retries fetching all missing collections currently in the queue.
RetryFetchingMissingCollections() error
}

// Fetcher is a component that consumes finalized block jobs and processes them
// to index collections. It uses a job consumer with windowed throttling to prevent node overload.
type Fetcher interface {
component.Component
ProgressReader
// OnFinalizedBlock notifies the fetcher that a new block has been finalized.
OnFinalizedBlock()
// MissingCollectionQueueSize returns the number of missing height currently in the queue.
Size() uint
}

// ExecutionDataProvider provides the latest height for which execution data indexer has collections.
// This can be nil if execution data indexing is disabled.
type ExecutionDataProvider interface {
// HighestIndexedHeight returns the highest block height for which execution data is available.
// It garautnees that all heights below it also have execution data available to be called
// with GetExecutionDataByHeight.
HighestIndexedHeight() uint64

// It might return [execution_data.BlobNotFoundError] error, if some CID in the blob tree could not be found from the blobstore.
GetExecutionDataByHeight(ctx context.Context, height uint64) ([]*flow.Collection, error)
}

// ExecutionDataProcessor processes execution data when new execution data is available.
type ExecutionDataProcessor interface {
// OnNewExectuionData notifies the processor that new execution data is available for processing.
OnNewExectuionData()
}

// ProgressReader provides the current progress of collection fetching/indexing.
type ProgressReader interface {
// ProcessedHeight returns the highest block height for which collections have been processed.
ProcessedHeight() uint64
}
Loading
Loading