Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ packages:
config:
dir: "consensus/hotstuff/mocks"
outpkg: "mocks"
github.com/onflow/flow-go/engine/access/ingestion/collections:
github.com/onflow/flow-go/engine/access/ingestion/tx_error_messages:
github.com/onflow/flow-go/engine/access/rest/common/models:
github.com/onflow/flow-go/engine/access/rest/websockets:
Expand Down
99 changes: 64 additions & 35 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/engine/access/index"
"github.com/onflow/flow-go/engine/access/ingestion"
"github.com/onflow/flow-go/engine/access/ingestion/collections"
"github.com/onflow/flow-go/engine/access/ingestion/tx_error_messages"
pingeng "github.com/onflow/flow-go/engine/access/ping"
"github.com/onflow/flow-go/engine/access/rest"
Expand Down Expand Up @@ -332,6 +333,8 @@ type FlowAccessNodeBuilder struct {
ExecutionDataCache *execdatacache.ExecutionDataCache
ExecutionIndexer *indexer.Indexer
ExecutionIndexerCore *indexer.IndexerCore
CollectionIndexer *collections.Indexer
CollectionSyncer *collections.Syncer
ScriptExecutor *backend.ScriptExecutor
RegistersAsyncStore *execution.RegistersAsyncStore
Reporter *index.Reporter
Expand Down Expand Up @@ -567,7 +570,6 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
var bsDependable *module.ProxiedReadyDoneAware
var execDataDistributor *edrequester.ExecutionDataDistributor
var execDataCacheBackend *herocache.BlockExecutionData
var executionDataStoreCache *execdatacache.ExecutionDataCache

// setup dependency chain to ensure indexer starts after the requester
requesterDependable := module.NewProxiedReadyDoneAware()
Expand Down Expand Up @@ -635,7 +637,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
// Execution Data cache that uses a blobstore as the backend (instead of a downloader)
// This ensures that it simply returns a not found error if the blob doesn't exist
// instead of attempting to download it from the network.
executionDataStoreCache = execdatacache.NewExecutionDataCache(
builder.ExecutionDataCache = execdatacache.NewExecutionDataCache(
builder.ExecutionDataStore,
builder.Storage.Headers,
builder.Storage.Seals,
Expand Down Expand Up @@ -955,6 +957,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
notNil(builder.scheduledTransactions),
builder.RootChainID,
indexerDerivedChainData,
notNil(builder.CollectionIndexer),
notNil(builder.collectionExecutedMetric),
node.StorageLockMgr,
)
Expand All @@ -965,7 +968,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
registers.FirstHeight(),
registers,
builder.ExecutionIndexerCore,
executionDataStoreCache,
notNil(builder.ExecutionDataCache),
builder.ExecutionDataRequester.HighestConsecutiveHeight,
indexedBlockHeight,
)
Expand Down Expand Up @@ -1067,7 +1070,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
node.Storage.Seals,
node.Storage.Results,
builder.ExecutionDataStore,
executionDataStoreCache,
notNil(builder.ExecutionDataCache),
builder.RegistersAsyncStore,
builder.EventsIndex,
useIndex,
Expand All @@ -1088,7 +1091,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
stateStreamEng, err := statestreambackend.NewEng(
node.Logger,
builder.stateStreamConf,
executionDataStoreCache,
notNil(builder.ExecutionDataCache),
node.Storage.Headers,
node.RootChainID,
builder.stateStreamGrpcServer,
Expand Down Expand Up @@ -2194,10 +2197,8 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {

return builder.RpcEng, nil
}).
Component("ingestion engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
var err error

builder.RequestEng, err = requester.New(
Component("requester engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
requestEng, err := requester.New(
node.Logger.With().Str("entity", "collection").Logger(),
node.Metrics.Engine,
node.EngineRegistry,
Expand All @@ -2210,31 +2211,62 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
if err != nil {
return nil, fmt.Errorf("could not create requester engine: %w", err)
}
builder.RequestEng = requestEng

if builder.storeTxResultErrorMessages {
builder.TxResultErrorMessagesCore = tx_error_messages.NewTxErrorMessagesCore(
collectionIndexer, err := collections.NewIndexer(
node.Logger,
builder.ProtocolDB,
notNil(builder.collectionExecutedMetric),
node.State,
node.Storage.Blocks,
notNil(builder.collections),
lastFullBlockHeight,
node.StorageLockMgr,
)
if err != nil {
return nil, fmt.Errorf("could not create collection indexer: %w", err)
}
builder.CollectionIndexer = collectionIndexer

// the collection syncer has support for indexing collections from execution data if the
// syncer falls behind. This is only needed if the execution state indexing is disabled,
// since it will also index collections.
var executionDataSyncer *collections.ExecutionDataSyncer
if builder.executionDataSyncEnabled && !builder.executionDataIndexingEnabled {
executionDataSyncer = collections.NewExecutionDataSyncer(
node.Logger,
notNil(builder.txResultErrorMessageProvider),
builder.transactionResultErrorMessages,
notNil(builder.ExecNodeIdentitiesProvider),
node.StorageLockMgr,
notNil(builder.ExecutionDataCache),
collectionIndexer,
)
}

collectionSyncer := ingestion.NewCollectionSyncer(
collectionSyncer := collections.NewSyncer(
node.Logger,
notNil(builder.collectionExecutedMetric),
builder.RequestEng,
node.State,
node.Storage.Blocks,
notNil(builder.collections),
notNil(builder.transactions),
lastFullBlockHeight,
node.StorageLockMgr,
collectionIndexer,
executionDataSyncer,
)
builder.CollectionSyncer = collectionSyncer

builder.RequestEng.WithHandle(collectionSyncer.OnCollectionDownloaded)

builder.IngestEng, err = ingestion.New(
return builder.RequestEng, nil
}).
Component("ingestion engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
if builder.storeTxResultErrorMessages {
builder.TxResultErrorMessagesCore = tx_error_messages.NewTxErrorMessagesCore(
node.Logger,
notNil(builder.txResultErrorMessageProvider),
builder.transactionResultErrorMessages,
notNil(builder.ExecNodeIdentitiesProvider),
node.StorageLockMgr,
)
}

ingestEng, err := ingestion.New(
node.Logger,
node.EngineRegistry,
node.State,
Expand All @@ -2243,34 +2275,31 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
node.Storage.Results,
node.Storage.Receipts,
processedFinalizedBlockHeight,
notNil(collectionSyncer),
notNil(builder.CollectionSyncer),
notNil(builder.CollectionIndexer),
notNil(builder.collectionExecutedMetric),
notNil(builder.TxResultErrorMessagesCore),
)
if err != nil {
return nil, err
}
builder.IngestEng = ingestEng

ingestionDependable.Init(builder.IngestEng)
builder.FollowerDistributor.AddOnBlockFinalizedConsumer(builder.IngestEng.OnFinalizedBlock)

return builder.IngestEng, nil
}).
Component("requester engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
// We initialize the requester engine inside the ingestion engine due to the mutual dependency. However, in
// order for it to properly start and shut down, we should still return it as its own engine here, so it can
// be handled by the scaffold.
return builder.RequestEng, nil
}).
AdminCommand("backfill-tx-error-messages", func(config *cmd.NodeConfig) commands.AdminCommand {
return storageCommands.NewBackfillTxErrorMessagesCommand(
builder.Logger,
builder.State,
builder.TxResultErrorMessagesCore,
)
})

if builder.storeTxResultErrorMessages {
builder.
AdminCommand("backfill-tx-error-messages", func(config *cmd.NodeConfig) commands.AdminCommand {
return storageCommands.NewBackfillTxErrorMessagesCommand(
builder.Logger,
builder.State,
builder.TxResultErrorMessagesCore,
)
}).
Module("transaction result error messages storage", func(node *cmd.NodeConfig) error {
builder.transactionResultErrorMessages = store.NewTransactionResultErrorMessages(
node.Metrics.Cache,
Expand Down
28 changes: 28 additions & 0 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/engine/access/apiproxy"
"github.com/onflow/flow-go/engine/access/index"
"github.com/onflow/flow-go/engine/access/ingestion/collections"
"github.com/onflow/flow-go/engine/access/rest"
restapiproxy "github.com/onflow/flow-go/engine/access/rest/apiproxy"
"github.com/onflow/flow-go/engine/access/rest/router"
Expand Down Expand Up @@ -69,6 +70,7 @@ import (
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/blobs"
"github.com/onflow/flow-go/module/chainsync"
"github.com/onflow/flow-go/module/counters"
"github.com/onflow/flow-go/module/execution"
"github.com/onflow/flow-go/module/executiondatasync/execution_data"
execdatacache "github.com/onflow/flow-go/module/executiondatasync/execution_data/cache"
Expand Down Expand Up @@ -1438,7 +1440,32 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
return nil, fmt.Errorf("could not create derived chain data: %w", err)
}

rootBlockHeight := node.State.Params().FinalizedRoot().Height
progress, err := store.NewConsumerProgress(builder.ProtocolDB, module.ConsumeProgressLastFullBlockHeight).Initialize(rootBlockHeight)
if err != nil {
return nil, fmt.Errorf("could not create last full block height consumer progress: %w", err)
}

lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter(progress)
if err != nil {
return nil, fmt.Errorf("could not create last full block height counter: %w", err)
}

var collectionExecutedMetric module.CollectionExecutedMetric = metrics.NewNoopCollector()
collectionIndexer, err := collections.NewIndexer(
builder.Logger,
builder.ProtocolDB,
collectionExecutedMetric,
builder.State,
builder.Storage.Blocks,
builder.Storage.Collections,
lastFullBlockHeight,
builder.StorageLockMgr,
)
if err != nil {
return nil, fmt.Errorf("could not create collection indexer: %w", err)
}

builder.ExecutionIndexerCore = indexer.New(
builder.Logger,
metrics.NewExecutionStateIndexerCollector(),
Expand All @@ -1452,6 +1479,7 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
builder.scheduledTransactions,
builder.RootChainID,
indexerDerivedChainData,
collectionIndexer,
collectionExecutedMetric,
node.StorageLockMgr,
)
Expand Down
Loading
Loading