Skip to content

Conversation

zhangchiqing
Copy link
Member

@zhangchiqing zhangchiqing commented Oct 8, 2025

Work towards #7912

  • Making BatchInsertTransactionResultErrorMessage and BatchIndexTransactionResultErrorMessage private, and merged them into one function BatchInsertAndIndexTransactionResultErrorMessage, so that existence check can be included there.


if len(results) > 0 {
if err := r.persistedResults.BatchStore(r.blockID, results, batch); err != nil {
// Use storage.WithLock to acquire the necessary lock and store the results
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@peterargue could you confirm this behavior?

I'm not sure if in the past, we let AN overwrite any data in the scenario of recovery or backfilling.

But the current behavior now is that it will never overwrite, is that OK?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we generally do not allow rewriting, but we should make attempts to index the most recently indexed block idempotent. I think that is/should be handled by the indexer though, not storage.

if err := t.persistedTxResultErrMsg.BatchStore(t.blockID, txResultErrMsgs, batch); err != nil {
return fmt.Errorf("could not add transaction result error messages to batch: %w", err)
}
err = storage.SkipAlreadyExistsError( // Note: if the data already exists, we will not overwrite
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@peterargue could you confirm this behavior?

I'm not sure if in the past, we let AN overwrite any data in the scenario of recovery or backfilling.

But the current behavior now is that it will never overwrite, is that OK?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we explicitly check before looking up the data, so the intention is that we will skip existing entries.

if len(results) > 0 {
if err := r.persistedResults.BatchStore(r.blockID, results, batch); err != nil {
// Use storage.WithLock to acquire the necessary lock and store the results
err := storage.WithLock(r.lockManager, storage.LockInsertLightTransactionResult, func(lctx lockctx.Context) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

won't this release the lock after it returns? shouldn't it hold the lock for the entire batch?

return fmt.Errorf("could not add transaction result error messages to batch: %w", err)
}
err = storage.SkipAlreadyExistsError( // Note: if the data already exists, we will not overwrite
storage.WithLock(t.lockManager, storage.LockInsertTransactionResultErrMessage, func(lctx lockctx.Context) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here.


if len(results) > 0 {
if err := r.persistedResults.BatchStore(r.blockID, results, batch); err != nil {
// Use storage.WithLock to acquire the necessary lock and store the results
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we generally do not allow rewriting, but we should make attempts to index the most recently indexed block idempotent. I think that is/should be handled by the indexer though, not storage.

if err := t.persistedTxResultErrMsg.BatchStore(t.blockID, txResultErrMsgs, batch); err != nil {
return fmt.Errorf("could not add transaction result error messages to batch: %w", err)
}
err = storage.SkipAlreadyExistsError( // Note: if the data already exists, we will not overwrite
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we explicitly check before looking up the data, so the intention is that we will skip existing entries.

return err
err := storage.WithLocks(p.lockManager, []string{
storage.LockInsertCollection,
storage.LockInsertLightTransactionResult,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like this usage here, that the block persister has to know what locks needs to be acquired. The block persister is supposed to be ignorant about what db operation the underlying individual persisters running, therefore doesn't know about the locks to acquire, it supposed to only create a batch object, and ensure it's committed.

Maybe we can revisit this, when working on #7910. My idea is that we could let the BatchStore functor to return a required lock ID and the functor, so that the locker doesn't need to remember what lock to acquire.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it's slightly awkward, but I think returning the expected lock alongside the functor could cause other problems:

  • what if many functors return the same lock to the ignorant caller? The caller would need to be able to ensure it acquires each lock only once
  • if there are many different locks to acquire, we still need to carefully order persisterStores to make sure the locks are acquired in the right order

Fundamentally, the layer at which locks are acquired does need to know something about what locks are being acquired. I think just having the upper layer explicitly acquire the needed locks is the simplest way to deal with this pattern.

Copy link
Member

@AlexHentschel AlexHentschel Oct 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Leo] I don't like this usage here, that the block persister has to know what locks needs to be acquired.

my 10 cents on this conversation:

  • On the one hand, acquiring the locks one by one looks verbose. But I don't think this is a problem of the lock proof pattern. Here are my reasonings:

    • The current BlockPersister implementation already very precisely documents that is is for persisting an execution result

      // BlockPersister stores execution data for a single execution result into the database.
      Therefore, it is nothing surprising that BlockPersister also has to know which locks to acquire (all that requires for persisting a result)

    • With moving to pebble, the lower-level storage layer has not longer the ability to self-sufficiently protect against illegal data changes. Now, the business logic has be be involved by acquiring and holding locks.

      Components that require different locks simply don't satisfy the same API anymore. Higher-level business logic has to be aware which locks to acquire, that's simply a consequence of the storage layer no longer having snapshot isolation for read + writes.

  • From my perspective, you can try to hide the reality that the type of lock that must be held is conceptually part of the interface (even through the compiler only enforces that some lock proof is given, but is oblivious about the lock). I think thereby you inadvertently create problems in other parts that then no longer have the information they need:

    • order of locks cannot be guaranteed if no single party is responsible for acquiring the locks in one place
    • more complicated implementation of checks, when locks are no longer required to be held at the time of call, but only at the time the batch is committed

@zhangchiqing zhangchiqing marked this pull request as ready for review October 8, 2025 20:04
@zhangchiqing zhangchiqing requested a review from a team as a code owner October 8, 2025 20:04
Comment on lines 46 to 47
// No errors are expected during normal operation, but it may return generic error
// if badger fails to process request
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still have "badger" in comment

Suggested change
// No errors are expected during normal operation, but it may return generic error
// if badger fails to process request
// No error returns are expected during normal operations.

Comment on lines 87 to 95
err := insertLightTransactionResult(w, blockID, &result)
if err != nil {
return fmt.Errorf("cannot batch insert light tx result: %w", err)
}

err = indexLightTransactionResultByBlockIDAndTxIndex(w, blockID, uint32(i), &result)
if err != nil {
return fmt.Errorf("cannot batch index light tx result: %w", err)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to just inline the calls UpsertByKey and the respective lines of documentation here.

Comment on lines 161 to 169
err := insertTransactionResultErrorMessageByTxID(w, blockID, &result)
if err != nil {
return fmt.Errorf("cannot batch insert tx result error message: %w", err)
}

err = indexTransactionResultErrorMessageBlockIDTxIndex(w, blockID, &result)
if err != nil {
return fmt.Errorf("cannot batch index tx result error message: %w", err)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also here, I would suggest to inline insertTransactionResultErrorMessageByTxID and indexTransactionResultErrorMessageBlockIDTxIndex (including the goDoc unless it is trivially apparent).

LockBootstrapping,
LockInsertChunkDataPack,
LockInsertTransactionResultErrMessage,
LockInsertLightTransactionResult,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whether or not we persist the transaction result in its light representation or some other representation is in my opinion irrelevant for the name of the lock. I would consider "light" an implementation detail of the storage layer and hence would suggest to remove this word from the lock:

Suggested change
LockInsertLightTransactionResult,
LockInsertTransactionResult,

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to keep as is. IMO, the lock is table based. Each table should have its own lock in order to synchronize writes to the same table. And different tables technically don't use the same lock, if they do, the synchronization might be done in the application layer, instead of storage layer.

Comment on lines +34 to +35
// LockInsertLightTransactionResult protects the insertion of light transaction results
LockInsertLightTransactionResult = "lock_insert_light_transaction_result"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up on removing the word "light":

Suggested change
// LockInsertLightTransactionResult protects the insertion of light transaction results
LockInsertLightTransactionResult = "lock_insert_light_transaction_result"
// LockInsertTransactionResult protects the insertion of transaction results
LockInsertTransactionResult = "lock_insert_transaction_result"

if err := e.persistedEvents.BatchStore(e.blockID, []flow.EventsList{e.data}, batch); err != nil {
err := e.persistedEvents.BatchStore(e.blockID, []flow.EventsList{e.data}, batch)
if err != nil {
if errors.Is(err, storage.ErrAlreadyExists) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question

I assume this is anticipating future changes? If I am not mistaken, at the moment, Events.BatchStore does not error. It just ignorantly overwrites the data.

In addition, the storage API is lacking error documentation, so added the following todo:

// BatchStore will store events for the given block ID in a given batch
// TODO: error documentation
BatchStore(blockID flow.Identifier, events []flow.EventsList, batch ReaderBatchWriter) error

Overall, I think we need to come back to the events and add overwrite protection checks. After adding these checks, the code in the EventsStore here will probably be correct. Created issue #8034 ... maybe worth-while to link the issue here in the code

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the refactor of the implementation has been addressed in this PR:
https://github.com/onflow/flow-go/pull/8005/files#diff-db1f3e68113391168d7f6bf516cfc5b9c2ac5c0d0f106751171f48b4da7d7678R19

So the error handling is added here first.

// Mock the storage of the fetched error messages into the protocol database.
s.txErrorMessages.On("Store", blockId, expectedStoreTxErrorMessages).
s.txErrorMessages.On("Store", mock.Anything, blockId, expectedStoreTxErrorMessages).
Return(nil).Once()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should check that the required lock is held here for any TransactionResultErrorMessages.Store

Suggested change
Return(nil).Once()
Return(func(lctx lockctx.Proof, blockID flow.Identifier, transactionResultErrorMessages []flow.TransactionResultErrorMessage) error {
require.True(s.T(), lctx.HoldsLock(storage.LockInsertTransactionResultErrMessage))
return nil
}).Once()

expectedStoreTxErrorMessages := createExpectedTxErrorMessages(resultsByBlockID, s.enNodeIDs.NodeIDs()[0])
s.txErrorMessages.On("Store", blockId, expectedStoreTxErrorMessages).
s.txErrorMessages.On("Store", mock.Anything, blockId, expectedStoreTxErrorMessages).
Return(fmt.Errorf("storage error")).Once()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Return(fmt.Errorf("storage error")).Once()
Return(func(lctx lockctx.Proof, blockID flow.Identifier, transactionResultErrorMessages []flow.TransactionResultErrorMessage) error {
require.True(s.T(), lctx.HoldsLock(storage.LockInsertTransactionResultErrMessage))
return fmt.Errorf("storage error")
}).Once()

Comment on lines 255 to 257
Run(func(args mock.Arguments) {
// Ensure the test does not complete its work faster than necessary
wg.Done()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets check correct lock is held:

Suggested change
Run(func(args mock.Arguments) {
// Ensure the test does not complete its work faster than necessary
wg.Done()
Run(func(args mock.Arguments) {
lctx, ok := args[0].(lockctx.Proof)
require.True(s.T(), ok, "expecting lock proof, but cast failed")
require.True(s.T(), lctx.HoldsLock(storage.LockInsertTransactionResultErrMessage))
wg.Done() // Ensure the test does not complete its work faster than necessary

Comment on lines 449 to 451
c.persistentCollections.On("BatchStoreAndIndexByTransaction", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil)
c.persistentResults.On("BatchStore", mock.Anything, mock.Anything, blockID, indexerData.Results).Return(nil)
c.persistentTxResultErrMsg.On("BatchStore", mock.Anything, mock.Anything, blockID, core.workingData.txResultErrMsgsData).Return(nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assume here most of the newly added mock.Anything are for lockctx.Proof (?) Could you add checks that the respective locks are held please. Thanks

@zhangchiqing zhangchiqing added this pull request to the merge queue Oct 15, 2025
Merged via the queue into master with commit a745d4e Oct 15, 2025
108 of 114 checks passed
@zhangchiqing zhangchiqing deleted the leo/refactor-index-tx-err-msg branch October 15, 2025 17:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants