Skip to content

Commit 28a3fae

Browse files
authored
Merge branch 'master' into leo/refactor-stored-chunk-data-pack
2 parents 52b05be + 4e9b1c5 commit 28a3fae

File tree

30 files changed

+1464
-557
lines changed

30 files changed

+1464
-557
lines changed

cmd/access/node_builder/access_node_builder.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,7 @@ type FlowAccessNodeBuilder struct {
351351
transactionResultErrorMessages storage.TransactionResultErrorMessages
352352
transactions storage.Transactions
353353
collections storage.Collections
354+
scheduledTransactions storage.ScheduledTransactions
354355

355356
// The sync engine participants provider is the libp2p peer store for the access node
356357
// which is not available until after the network has started.
@@ -852,6 +853,10 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
852853
builder.lightTransactionResults = store.NewLightTransactionResults(node.Metrics.Cache, node.ProtocolDB, bstorage.DefaultCacheSize)
853854
return nil
854855
}).
856+
Module("scheduled transactions storage", func(node *cmd.NodeConfig) error {
857+
builder.scheduledTransactions = store.NewScheduledTransactions(node.Metrics.Cache, node.ProtocolDB, bstorage.DefaultCacheSize)
858+
return nil
859+
}).
855860
DependableComponent("execution data indexer", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
856861
// Note: using a DependableComponent here to ensure that the indexer does not block
857862
// other components from starting while bootstrapping the register db since it may
@@ -937,7 +942,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
937942
return nil, fmt.Errorf("could not create derived chain data: %w", err)
938943
}
939944

940-
indexerCore, err := indexer.New(
945+
builder.ExecutionIndexerCore = indexer.New(
941946
builder.Logger,
942947
metrics.NewExecutionStateIndexerCollector(),
943948
notNil(builder.ProtocolDB),
@@ -947,22 +952,19 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
947952
notNil(builder.collections),
948953
notNil(builder.transactions),
949954
notNil(builder.lightTransactionResults),
950-
builder.RootChainID.Chain(),
955+
notNil(builder.scheduledTransactions),
956+
builder.RootChainID,
951957
indexerDerivedChainData,
952958
notNil(builder.collectionExecutedMetric),
953959
node.StorageLockMgr,
954960
)
955-
if err != nil {
956-
return nil, err
957-
}
958-
builder.ExecutionIndexerCore = indexerCore
959961

960962
// execution state worker uses a jobqueue to process new execution data and indexes it by using the indexer.
961963
builder.ExecutionIndexer, err = indexer.NewIndexer(
962964
builder.Logger,
963965
registers.FirstHeight(),
964966
registers,
965-
indexerCore,
967+
builder.ExecutionIndexerCore,
966968
executionDataStoreCache,
967969
builder.ExecutionDataRequester.HighestConsecutiveHeight,
968970
indexedBlockHeight,

cmd/observer/node_builder/observer_builder.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,7 @@ type ObserverServiceBuilder struct {
299299
// storage
300300
events storage.Events
301301
lightTransactionResults storage.LightTransactionResults
302+
scheduledTransactions storage.ScheduledTransactions
302303

303304
// available until after the network has started. Hence, a factory function that needs to be called just before
304305
// creating the sync engine
@@ -1349,6 +1350,9 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
13491350
}).Module("transaction results storage", func(node *cmd.NodeConfig) error {
13501351
builder.lightTransactionResults = store.NewLightTransactionResults(node.Metrics.Cache, node.ProtocolDB, bstorage.DefaultCacheSize)
13511352
return nil
1353+
}).Module("scheduled transactions storage", func(node *cmd.NodeConfig) error {
1354+
builder.scheduledTransactions = store.NewScheduledTransactions(node.Metrics.Cache, node.ProtocolDB, bstorage.DefaultCacheSize)
1355+
return nil
13521356
}).DependableComponent("execution data indexer", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
13531357
// Note: using a DependableComponent here to ensure that the indexer does not block
13541358
// other components from starting while bootstrapping the register db since it may
@@ -1435,7 +1439,7 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
14351439
}
14361440

14371441
var collectionExecutedMetric module.CollectionExecutedMetric = metrics.NewNoopCollector()
1438-
indexerCore, err := indexer.New(
1442+
builder.ExecutionIndexerCore = indexer.New(
14391443
builder.Logger,
14401444
metrics.NewExecutionStateIndexerCollector(),
14411445
builder.ProtocolDB,
@@ -1445,22 +1449,19 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
14451449
builder.Storage.Collections,
14461450
builder.Storage.Transactions,
14471451
builder.lightTransactionResults,
1448-
builder.RootChainID.Chain(),
1452+
builder.scheduledTransactions,
1453+
builder.RootChainID,
14491454
indexerDerivedChainData,
14501455
collectionExecutedMetric,
14511456
node.StorageLockMgr,
14521457
)
1453-
if err != nil {
1454-
return nil, err
1455-
}
1456-
builder.ExecutionIndexerCore = indexerCore
14571458

14581459
// execution state worker uses a jobqueue to process new execution data and indexes it by using the indexer.
14591460
builder.ExecutionIndexer, err = indexer.NewIndexer(
14601461
builder.Logger,
14611462
registers.FirstHeight(),
14621463
registers,
1463-
indexerCore,
1464+
builder.ExecutionIndexerCore,
14641465
executionDataStoreCache,
14651466
builder.ExecutionDataRequester.HighestConsecutiveHeight,
14661467
indexedBlockHeight,

engine/access/rpc/backend/script_executor_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ func (s *ScriptExecutorSuite) SetupTest() {
129129
derivedChainData, err := derived.NewDerivedChainData(derived.DefaultDerivedDataCacheSize)
130130
s.Require().NoError(err)
131131

132-
indexerCore, err := indexer.New(
132+
indexerCore := indexer.New(
133133
s.log,
134134
module.ExecutionStateIndexerMetrics(metrics.NewNoopCollector()),
135135
nil,
@@ -139,12 +139,12 @@ func (s *ScriptExecutorSuite) SetupTest() {
139139
nil,
140140
nil,
141141
nil,
142-
s.chain,
142+
nil,
143+
s.chain.ChainID(),
143144
derivedChainData,
144145
module.CollectionExecutedMetric(metrics.NewNoopCollector()),
145146
lockManager,
146147
)
147-
s.Require().NoError(err)
148148

149149
s.scripts = execution.NewScripts(
150150
s.log,

fvm/blueprints/scheduled_callback.go

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func ExecuteCallbacksTransactions(chainID flow.Chain, processEvents flow.EventsL
7373
// event.EventIndex
7474

7575
// skip any fee events or other events that are not pending execution events
76-
if !isPendingExecutionEvent(env, event) {
76+
if !IsPendingExecutionEvent(env, event) {
7777
continue
7878
}
7979

@@ -114,19 +114,42 @@ func executeCallbackTransaction(
114114
// callback scheduler contract and has the following signature:
115115
// event PendingExecution(id: UInt64, priority: UInt8, executionEffort: UInt64, fees: UFix64, callbackOwner: Address)
116116
func callbackArgsFromEvent(event flow.Event) ([]byte, uint64, error) {
117+
cadenceId, cadenceEffort, err := ParsePendingExecutionEvent(event)
118+
if err != nil {
119+
return nil, 0, err
120+
}
121+
122+
effort := uint64(cadenceEffort)
123+
124+
if effort > flow.DefaultMaxTransactionGasLimit {
125+
log.Warn().Uint64("effort", effort).Msg("effort is greater than max transaction gas limit, setting to max")
126+
effort = flow.DefaultMaxTransactionGasLimit
127+
}
128+
129+
encID, err := jsoncdc.Encode(cadenceId)
130+
if err != nil {
131+
return nil, 0, fmt.Errorf("failed to encode id: %w", err)
132+
}
133+
134+
return encID, uint64(effort), nil
135+
}
136+
137+
// ParsePendingExecutionEvent decodes the PendingExecution event payload and returns the scheduled
138+
// transaction's id and effort.
139+
func ParsePendingExecutionEvent(event flow.Event) (cadence.UInt64, cadence.UInt64, error) {
117140
const (
118141
processedCallbackIDFieldName = "id"
119142
processedCallbackEffortFieldName = "executionEffort"
120143
)
121144

122145
eventData, err := ccf.Decode(nil, event.Payload)
123146
if err != nil {
124-
return nil, 0, fmt.Errorf("failed to decode event: %w", err)
147+
return 0, 0, fmt.Errorf("failed to decode event: %w", err)
125148
}
126149

127150
cadenceEvent, ok := eventData.(cadence.Event)
128151
if !ok {
129-
return nil, 0, fmt.Errorf("event data is not a cadence event")
152+
return 0, 0, fmt.Errorf("event data is not a cadence event")
130153
}
131154

132155
idValue := cadence.SearchFieldByName(
@@ -139,32 +162,21 @@ func callbackArgsFromEvent(event flow.Event) ([]byte, uint64, error) {
139162
processedCallbackEffortFieldName,
140163
)
141164

142-
id, ok := idValue.(cadence.UInt64)
165+
cadenceId, ok := idValue.(cadence.UInt64)
143166
if !ok {
144-
return nil, 0, fmt.Errorf("id is not uint64")
167+
return 0, 0, fmt.Errorf("id is not uint64")
145168
}
146169

147170
cadenceEffort, ok := effortValue.(cadence.UInt64)
148171
if !ok {
149-
return nil, 0, fmt.Errorf("effort is not uint64")
150-
}
151-
152-
effort := uint64(cadenceEffort)
153-
154-
if effort > flow.DefaultMaxTransactionGasLimit {
155-
log.Warn().Uint64("effort", effort).Msg("effort is greater than max transaction gas limit, setting to max")
156-
effort = flow.DefaultMaxTransactionGasLimit
172+
return 0, 0, fmt.Errorf("effort is not uint64")
157173
}
158174

159-
encID, err := jsoncdc.Encode(id)
160-
if err != nil {
161-
return nil, 0, fmt.Errorf("failed to encode id: %w", err)
162-
}
163-
164-
return encID, uint64(effort), nil
175+
return cadenceId, cadenceEffort, nil
165176
}
166177

167-
func isPendingExecutionEvent(env templates.Environment, event flow.Event) bool {
178+
// IsPendingExecutionEvent returns true if the event is a pending execution event.
179+
func IsPendingExecutionEvent(env templates.Environment, event flow.Event) bool {
168180
processedEventType := PendingExecutionEventType(env)
169181
return event.Type == processedEventType
170182
}

fvm/blueprints/scheduled_callback_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,27 @@ func TestExecuteCallbackTransaction(t *testing.T) {
183183
assert.Equal(t, tx.GasLimit, uint64(effort))
184184
}
185185

186+
func TestIsPendingExecutionEvent(t *testing.T) {
187+
t.Parallel()
188+
189+
chain := flow.Mainnet.Chain()
190+
env := systemcontracts.SystemContractsForChain(chain.ChainID()).AsTemplateEnv()
191+
assert.True(t, blueprints.IsPendingExecutionEvent(env, createValidCallbackEvent(t, 1, 100)))
192+
}
193+
194+
func TestParsePendingExecutionEvent(t *testing.T) {
195+
t.Parallel()
196+
197+
expectedID := uint64(123)
198+
expectedEffort := uint64(456)
199+
event := createValidCallbackEvent(t, expectedID, expectedEffort)
200+
201+
actualID, actualEffort, err := blueprints.ParsePendingExecutionEvent(event)
202+
require.NoError(t, err)
203+
assert.Equal(t, expectedID, uint64(actualID))
204+
assert.Equal(t, expectedEffort, uint64(actualEffort))
205+
}
206+
186207
func createValidCallbackEvent(t *testing.T, id uint64, effort uint64) flow.Event {
187208
const processedEventTypeTemplate = "A.%v.FlowTransactionScheduler.PendingExecution"
188209
env := systemcontracts.SystemContractsForChain(flow.Mainnet.Chain().ChainID()).AsTemplateEnv()

module/builder/collection/builder.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ func (b *Builder) BuildOn(parentID flow.Identifier, setter func(*flow.HeaderBody
191191
// STEP 2: build a payload of valid transactions, while at the same
192192
// time figuring out the correct reference block ID for the collection.
193193
span, _ = b.tracer.StartSpanFromContext(ctx, trace.COLBuildOnCreatePayload)
194-
payload, err := b.buildPayload(buildCtx)
194+
payload, priorityTransactionsCount, err := b.buildPayload(buildCtx)
195195
span.End()
196196
if err != nil {
197197
return nil, fmt.Errorf("could not build payload: %w", err)
@@ -216,6 +216,8 @@ func (b *Builder) BuildOn(parentID flow.Identifier, setter func(*flow.HeaderBody
216216
return nil, fmt.Errorf("could not build cluster block: %w", err)
217217
}
218218

219+
b.metrics.ClusterBlockCreated(block, priorityTransactionsCount)
220+
219221
blockProposal, err := cluster.NewProposal(
220222
cluster.UntrustedProposal{
221223
Block: *block,
@@ -403,8 +405,13 @@ func (b *Builder) populateFinalizedAncestryLookup(lctx lockctx.Proof, ctx *block
403405

404406
// buildPayload constructs a valid payload based on transactions available in the mempool.
405407
// If the mempool is empty, an empty payload will be returned.
408+
// Return values:
409+
// - *cluster.Payload: the payload that has been built.
410+
// - uint: number of prioritized transactions included in the payload.
411+
// - error: exception if failed to build the payload.
412+
//
406413
// No errors are expected during normal operation.
407-
func (b *Builder) buildPayload(buildCtx *blockBuildContext) (*cluster.Payload, error) {
414+
func (b *Builder) buildPayload(buildCtx *blockBuildContext) (*cluster.Payload, uint, error) {
408415
lookup := buildCtx.lookup
409416
limiter := buildCtx.limiter
410417
config := buildCtx.config
@@ -469,7 +476,7 @@ func (b *Builder) buildPayload(buildCtx *blockBuildContext) (*cluster.Payload, e
469476
continue // in case we are configured with liberal transaction ingest rules
470477
}
471478
if err != nil {
472-
return nil, fmt.Errorf("could not retrieve reference header: %w", err)
479+
return nil, 0, fmt.Errorf("could not retrieve reference header: %w", err)
473480
}
474481

475482
// disallow un-finalized reference blocks, and reference blocks beyond the cluster's operating epoch
@@ -480,7 +487,7 @@ func (b *Builder) buildPayload(buildCtx *blockBuildContext) (*cluster.Payload, e
480487
// make sure the reference block is finalized and not orphaned
481488
blockIDFinalizedAtRefHeight, err := b.mainHeaders.BlockIDByHeight(refHeader.Height)
482489
if err != nil {
483-
return nil, fmt.Errorf("could not check that reference block (id=%x) for transaction (id=%x) is finalized: %w", tx.ReferenceBlockID, txID, err)
490+
return nil, 0, fmt.Errorf("could not check that reference block (id=%x) for transaction (id=%x) is finalized: %w", tx.ReferenceBlockID, txID, err)
484491
}
485492
if blockIDFinalizedAtRefHeight != tx.ReferenceBlockID {
486493
// the transaction references an orphaned block - it will never be valid
@@ -544,7 +551,7 @@ func (b *Builder) buildPayload(buildCtx *blockBuildContext) (*cluster.Payload, e
544551
// build the payload from the transactions
545552
collection, err := flow.NewCollection(flow.UntrustedCollection{Transactions: transactions})
546553
if err != nil {
547-
return nil, fmt.Errorf("could not build the collection from the transactions: %w", err)
554+
return nil, 0, fmt.Errorf("could not build the collection from the transactions: %w", err)
548555
}
549556

550557
payload, err := cluster.NewPayload(
@@ -554,9 +561,9 @@ func (b *Builder) buildPayload(buildCtx *blockBuildContext) (*cluster.Payload, e
554561
},
555562
)
556563
if err != nil {
557-
return nil, fmt.Errorf("could not build a payload: %w", err)
564+
return nil, 0, fmt.Errorf("could not build a payload: %w", err)
558565
}
559-
return payload, nil
566+
return payload, uint(len(priorityTransactions)), nil
560567
}
561568

562569
// buildHeader constructs the header for the cluster block being built.

module/execution/scripts_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ func (s *scriptTestSuite) SetupTest() {
178178
derivedChainData, err := derived.NewDerivedChainData(derived.DefaultDerivedDataCacheSize)
179179
s.Require().NoError(err)
180180

181-
index, err := indexer.New(
181+
index := indexer.New(
182182
logger,
183183
metrics.NewNoopCollector(),
184184
nil,
@@ -188,12 +188,12 @@ func (s *scriptTestSuite) SetupTest() {
188188
nil,
189189
nil,
190190
nil,
191-
flow.Testnet.Chain(),
191+
nil,
192+
flow.Testnet,
192193
derivedChainData,
193194
nil,
194195
lockManager,
195196
)
196-
s.Require().NoError(err)
197197

198198
s.scripts = NewScripts(
199199
logger,

0 commit comments

Comments
 (0)