- 
                Notifications
    
You must be signed in to change notification settings  - Fork 201
 
[Storage] Refactor indexing transaction error message #8021
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 31 commits
af1c3e6
              d86569f
              af54bfe
              b413203
              a1a371d
              157d4ec
              43ef330
              155220d
              e666a2a
              5277616
              28b4f19
              bbd6ed9
              af6427a
              4eb3b45
              45524d0
              1ee701b
              79751ac
              e2039ac
              c9563fd
              6dddf0a
              346cdae
              f07a652
              e8036e7
              190c46d
              9bcdb06
              147f3cc
              9c36182
              3fc2a9a
              eefd772
              dd6cfa2
              780e20a
              4e11ecf
              cafb925
              ac316dd
              64dfc09
              4ac5c28
              ac296c1
              9d0cbdb
              51a1895
              c20915b
              9a21aea
              127db87
              015edcc
              73c6514
              9d9a747
              6f77ad3
              4854931
              a19f958
              348284d
              7636d0b
              e645c88
              fa97bbb
              94a08b5
              ee7a627
              c51b871
              b7918aa
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 
          
            
          
           | 
    @@ -21,7 +21,8 @@ import ( | |||||||||||
| "github.com/onflow/flow-go/module/irrecoverable" | ||||||||||||
| syncmock "github.com/onflow/flow-go/module/state_synchronization/mock" | ||||||||||||
| protocol "github.com/onflow/flow-go/state/protocol/mock" | ||||||||||||
| storage "github.com/onflow/flow-go/storage/mock" | ||||||||||||
| "github.com/onflow/flow-go/storage" | ||||||||||||
| storagemock "github.com/onflow/flow-go/storage/mock" | ||||||||||||
| "github.com/onflow/flow-go/utils/unittest" | ||||||||||||
| ) | ||||||||||||
| 
     | 
||||||||||||
| 
        
          
        
         | 
    @@ -37,13 +38,14 @@ type TxErrorMessagesCoreSuite struct { | |||||||||||
| params *protocol.Params | ||||||||||||
| } | ||||||||||||
| 
     | 
||||||||||||
| receipts *storage.ExecutionReceipts | ||||||||||||
| txErrorMessages *storage.TransactionResultErrorMessages | ||||||||||||
| lightTxResults *storage.LightTransactionResults | ||||||||||||
| receipts *storagemock.ExecutionReceipts | ||||||||||||
| txErrorMessages *storagemock.TransactionResultErrorMessages | ||||||||||||
| lightTxResults *storagemock.LightTransactionResults | ||||||||||||
| 
     | 
||||||||||||
| reporter *syncmock.IndexReporter | ||||||||||||
| indexReporter *index.Reporter | ||||||||||||
| txResultsIndex *index.TransactionResultsIndex | ||||||||||||
| lockManager storage.LockManager | ||||||||||||
| 
     | 
||||||||||||
| enNodeIDs flow.IdentityList | ||||||||||||
| execClient *accessmock.ExecutionAPIClient | ||||||||||||
| 
          
            
          
           | 
    @@ -79,18 +81,21 @@ func (s *TxErrorMessagesCoreSuite) SetupTest() { | |||||||||||
| s.proto.params = protocol.NewParams(s.T()) | ||||||||||||
| s.execClient = accessmock.NewExecutionAPIClient(s.T()) | ||||||||||||
| s.connFactory = connectionmock.NewConnectionFactory(s.T()) | ||||||||||||
| s.receipts = storage.NewExecutionReceipts(s.T()) | ||||||||||||
| s.txErrorMessages = storage.NewTransactionResultErrorMessages(s.T()) | ||||||||||||
| s.receipts = storagemock.NewExecutionReceipts(s.T()) | ||||||||||||
| s.txErrorMessages = storagemock.NewTransactionResultErrorMessages(s.T()) | ||||||||||||
| s.rootBlock = unittest.Block.Genesis(flow.Emulator) | ||||||||||||
| s.finalizedBlock = unittest.BlockWithParentFixture(s.rootBlock.ToHeader()).ToHeader() | ||||||||||||
| 
     | 
||||||||||||
| s.lightTxResults = storage.NewLightTransactionResults(s.T()) | ||||||||||||
| s.lightTxResults = storagemock.NewLightTransactionResults(s.T()) | ||||||||||||
| s.reporter = syncmock.NewIndexReporter(s.T()) | ||||||||||||
| s.indexReporter = index.NewReporter() | ||||||||||||
| err := s.indexReporter.Initialize(s.reporter) | ||||||||||||
| s.Require().NoError(err) | ||||||||||||
| s.txResultsIndex = index.NewTransactionResultsIndex(s.indexReporter, s.lightTxResults) | ||||||||||||
| 
     | 
||||||||||||
| // Initialize lock manager for tests | ||||||||||||
| s.lockManager = storage.NewTestingLockManager() | ||||||||||||
| 
     | 
||||||||||||
| s.proto.state.On("Params").Return(s.proto.params) | ||||||||||||
| 
     | 
||||||||||||
| // Mock the finalized root block header with height 0. | ||||||||||||
| 
          
            
          
           | 
    @@ -143,7 +148,7 @@ func (s *TxErrorMessagesCoreSuite) TestHandleTransactionResultErrorMessages() { | |||||||||||
| expectedStoreTxErrorMessages := createExpectedTxErrorMessages(resultsByBlockID, s.enNodeIDs.NodeIDs()[0]) | ||||||||||||
| 
     | 
||||||||||||
| // Mock the storage of the fetched error messages into the protocol database. | ||||||||||||
| s.txErrorMessages.On("Store", blockId, expectedStoreTxErrorMessages). | ||||||||||||
| s.txErrorMessages.On("Store", mock.Anything, blockId, expectedStoreTxErrorMessages). | ||||||||||||
| Return(nil).Once() | ||||||||||||
| 
     | 
||||||||||||
| core := s.initCore() | ||||||||||||
| 
          
            
          
           | 
    @@ -228,7 +233,7 @@ func (s *TxErrorMessagesCoreSuite) TestHandleTransactionResultErrorMessages_Erro | |||||||||||
| 
     | 
||||||||||||
| // Simulate an error when attempting to store the fetched transaction error messages in storage. | ||||||||||||
| expectedStoreTxErrorMessages := createExpectedTxErrorMessages(resultsByBlockID, s.enNodeIDs.NodeIDs()[0]) | ||||||||||||
| s.txErrorMessages.On("Store", blockId, expectedStoreTxErrorMessages). | ||||||||||||
| s.txErrorMessages.On("Store", mock.Anything, blockId, expectedStoreTxErrorMessages). | ||||||||||||
| Return(fmt.Errorf("storage error")).Once() | ||||||||||||
                
       | 
||||||||||||
| Return(fmt.Errorf("storage error")).Once() | |
| Return(func(lctx lockctx.Proof, blockID flow.Identifier, transactionResultErrorMessages []flow.TransactionResultErrorMessage) error { | |
| require.True(s.T(), lctx.HoldsLock(storage.LockInsertTransactionResultErrMessage)) | |
| return fmt.Errorf("storage error") | |
| }).Once() | 
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 
          
            
          
           | 
    @@ -55,6 +55,7 @@ type TxErrorMessagesEngineSuite struct { | |||||||||||||||||
| reporter *syncmock.IndexReporter | ||||||||||||||||||
| indexReporter *index.Reporter | ||||||||||||||||||
| txResultsIndex *index.TransactionResultsIndex | ||||||||||||||||||
| lockManager storage.LockManager | ||||||||||||||||||
| 
     | 
||||||||||||||||||
| enNodeIDs flow.IdentityList | ||||||||||||||||||
| execClient *accessmock.ExecutionAPIClient | ||||||||||||||||||
| 
          
            
          
           | 
    @@ -106,6 +107,9 @@ func (s *TxErrorMessagesEngineSuite) SetupTest() { | |||||||||||||||||
| s.Require().NoError(err) | ||||||||||||||||||
| s.txResultsIndex = index.NewTransactionResultsIndex(s.indexReporter, s.lightTxResults) | ||||||||||||||||||
| 
     | 
||||||||||||||||||
| // Initialize lock manager for tests | ||||||||||||||||||
| s.lockManager = storage.NewTestingLockManager() | ||||||||||||||||||
| 
     | 
||||||||||||||||||
| blockCount := 5 | ||||||||||||||||||
| s.blockMap = make(map[uint64]*flow.Block, blockCount) | ||||||||||||||||||
| s.rootBlock = unittest.Block.Genesis(flow.Emulator) | ||||||||||||||||||
| 
          
            
          
           | 
    @@ -177,6 +181,7 @@ func (s *TxErrorMessagesEngineSuite) initEngine(ctx irrecoverable.SignalerContex | |||||||||||||||||
| errorMessageProvider, | ||||||||||||||||||
| s.txErrorMessages, | ||||||||||||||||||
| execNodeIdentitiesProvider, | ||||||||||||||||||
| s.lockManager, | ||||||||||||||||||
| ) | ||||||||||||||||||
| 
     | 
||||||||||||||||||
| eng, err := New( | ||||||||||||||||||
| 
          
            
          
           | 
    @@ -245,7 +250,7 @@ func (s *TxErrorMessagesEngineSuite) TestOnFinalizedBlockHandleTxErrorMessages() | |||||||||||||||||
| expectedStoreTxErrorMessages := createExpectedTxErrorMessages(resultsByBlockID, s.enNodeIDs.NodeIDs()[0]) | ||||||||||||||||||
| 
     | 
||||||||||||||||||
| // Mock the storage of the fetched error messages into the protocol database. | ||||||||||||||||||
| s.txErrorMessages.On("Store", blockID, expectedStoreTxErrorMessages).Return(nil). | ||||||||||||||||||
| s.txErrorMessages.On("Store", mock.Anything, blockID, expectedStoreTxErrorMessages).Return(nil). | ||||||||||||||||||
| Run(func(args mock.Arguments) { | ||||||||||||||||||
| // Ensure the test does not complete its work faster than necessary | ||||||||||||||||||
| wg.Done() | ||||||||||||||||||
                
       | 
||||||||||||||||||
| Run(func(args mock.Arguments) { | |
| // Ensure the test does not complete its work faster than necessary | |
| wg.Done() | |
| Run(func(args mock.Arguments) { | |
| lctx, ok := args[0].(lockctx.Proof) | |
| require.True(s.T(), ok, "expecting lock proof, but cast failed") | |
| require.True(s.T(), lctx.HoldsLock(storage.LockInsertTransactionResultErrMessage)) | |
| wg.Done() // Ensure the test does not complete its work faster than necessary | 
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| 
          
            
          
           | 
    @@ -446,9 +446,9 @@ func (c *CoreImplSuite) TestCoreImpl_Persist() { | |
| indexerData := core.workingData.indexerData | ||
| c.persistentRegisters.On("Store", flow.RegisterEntries(indexerData.Registers), tf.block.Height).Return(nil) | ||
| c.persistentEvents.On("BatchStore", blockID, []flow.EventsList{indexerData.Events}, mock.Anything).Return(nil) | ||
| c.persistentCollections.On("BatchStoreAndIndexByTransaction", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) | ||
| c.persistentResults.On("BatchStore", blockID, indexerData.Results, mock.Anything).Return(nil) | ||
| c.persistentTxResultErrMsg.On("BatchStore", blockID, core.workingData.txResultErrMsgsData, mock.Anything).Return(nil) | ||
| c.persistentCollections.On("BatchStoreAndIndexByTransaction", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) | ||
| c.persistentResults.On("BatchStore", mock.Anything, mock.Anything, blockID, indexerData.Results).Return(nil) | ||
| c.persistentTxResultErrMsg.On("BatchStore", mock.Anything, mock.Anything, blockID, core.workingData.txResultErrMsgsData).Return(nil) | ||
                
       | 
||
| c.latestPersistedSealedResult.On("BatchSet", tf.exeResult.ID(), tf.block.Height, mock.Anything).Return(nil) | ||
| 
     | 
||
| err = core.Persist() | ||
| 
          
            
          
           | 
    ||
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
| 
          
            
          
           | 
    @@ -60,20 +60,19 @@ func (p *BlockPersister) Persist() error { | |||
| p.log.Debug().Msg("started to persist execution data") | ||||
| start := time.Now() | ||||
| 
     | 
||||
| lctx := p.lockManager.NewContext() | ||||
| err := lctx.AcquireLock(storage.LockInsertCollection) | ||||
| if err != nil { | ||||
| return fmt.Errorf("could not acquire lock for inserting light collections: %w", err) | ||||
| } | ||||
| defer lctx.Release() | ||||
| 
     | 
||||
| err = p.protocolDB.WithReaderBatchWriter(func(batch storage.ReaderBatchWriter) error { | ||||
| for _, persister := range p.persisterStores { | ||||
| if err := persister.Persist(lctx, batch); err != nil { | ||||
| return err | ||||
| err := storage.WithLocks(p.lockManager, []string{ | ||||
| storage.LockInsertCollection, | ||||
| storage.LockInsertLightTransactionResult, | ||||
| 
         There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't like this usage here, that the block persister has to know what locks needs to be acquired. The block persister is supposed to be ignorant about what db operation the underlying individual persisters running, therefore doesn't know about the locks to acquire, it supposed to only create a batch object, and ensure it's committed. Maybe we can revisit this, when working on #7910. My idea is that we could let the BatchStore functor to return a required lock ID and the functor, so that the locker doesn't need to remember what lock to acquire. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree it's slightly awkward, but I think returning the expected lock alongside the functor could cause other problems: 
 Fundamentally, the layer at which locks are acquired does need to know something about what locks are being acquired. I think just having the upper layer explicitly acquire the needed locks is the simplest way to deal with this pattern. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 
 my 10 cents on this conversation: 
  | 
||||
| storage.LockInsertTransactionResultErrMessage, | ||||
| }, func(lctx lockctx.Context) error { | ||||
| return p.protocolDB.WithReaderBatchWriter(func(batch storage.ReaderBatchWriter) error { | ||||
| for _, persister := range p.persisterStores { | ||||
| if err := persister.Persist(lctx, batch); err != nil { | ||||
| return err | ||||
| } | ||||
| } | ||||
| } | ||||
| return nil | ||||
| return nil | ||||
| }) | ||||
| }) | ||||
| 
     | 
||||
| if err != nil { | ||||
| 
          
            
          
           | 
    ||||
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
| @@ -1,6 +1,7 @@ | ||||||||
| package stores | ||||||||
| 
     | 
||||||||
| import ( | ||||||||
| "errors" | ||||||||
| "fmt" | ||||||||
| 
     | 
||||||||
| "github.com/jordanschalm/lockctx" | ||||||||
| 
          
            
          
           | 
    @@ -34,8 +35,14 @@ func NewEventsStore( | |||||||
| // | ||||||||
| // No error returns are expected during normal operations | ||||||||
| func (e *EventsStore) Persist(_ lockctx.Proof, batch storage.ReaderBatchWriter) error { | ||||||||
| if err := e.persistedEvents.BatchStore(e.blockID, []flow.EventsList{e.data}, batch); err != nil { | ||||||||
| err := e.persistedEvents.BatchStore(e.blockID, []flow.EventsList{e.data}, batch) | ||||||||
| if err != nil { | ||||||||
| if errors.Is(err, storage.ErrAlreadyExists) { | ||||||||
| 
         There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ❓Question I assume this is anticipating future changes? If I am not mistaken, at the moment,  In addition, the storage API is lacking error documentation, so added the following todo: Lines 29 to 31 in 9a21aea 
 Overall, I think we need to come back to the events and add overwrite protection checks. After adding these checks, the code in the  There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, the refactor of the implementation has been addressed in this PR: So the error handling is added here first.  | 
||||||||
| // we don't overwrite, it's ideompotent | ||||||||
                
       | 
||||||||
| return nil | ||||||||
| } | ||||||||
| return fmt.Errorf("could not add events to batch: %w", err) | ||||||||
| } | ||||||||
| 
     | 
||||||||
| return nil | ||||||||
| } | ||||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 
          
            
          
           | 
    @@ -31,10 +31,10 @@ func NewResultsStore( | |||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| 
     | 
||||||||||||||||||||||||||||||||||||||||||||||||||
| // Persist adds results to the batch. | ||||||||||||||||||||||||||||||||||||||||||||||||||
| // | ||||||||||||||||||||||||||||||||||||||||||||||||||
| // requires [storage.LockInsertLightTransactionResult] to be hold | ||||||||||||||||||||||||||||||||||||||||||||||||||
| // No error returns are expected during normal operations | ||||||||||||||||||||||||||||||||||||||||||||||||||
| func (r *ResultsStore) Persist(_ lockctx.Proof, batch storage.ReaderBatchWriter) error { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| if err := r.persistedResults.BatchStore(r.blockID, r.data, batch); err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| func (r *ResultsStore) Persist(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| if err := r.persistedResults.BatchStore(lctx, rw, r.blockID, r.data); err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| return fmt.Errorf("could not add transaction results to batch: %w", err) | ||||||||||||||||||||||||||||||||||||||||||||||||||
| 
         
      Comment on lines
    
      37
     to 
      47
    
   
  There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 
 | 
||||||||||||||||||||||||||||||||||||||||||||||||||
| err := t.persistedTxResultErrMsg.BatchStore(lctx, rw, t.blockID, t.data) | |
| if err != nil { | |
| if errors.Is(err, storage.ErrAlreadyExists) { | |
| return nil | |
| } | |
| return fmt.Errorf("could not add transaction result error messages to batch: %w", err) | |
| } | 
ResultsStore and TxResultErrMsgStore) implement the same interface (stores.PersisterStore) and should therefore behave consistently, which they currently do not.| // No error returns are expected during normal operations | |
| func (r *ResultsStore) Persist(_ lockctx.Proof, batch storage.ReaderBatchWriter) error { | |
| if err := r.persistedResults.BatchStore(r.blockID, r.data, batch); err != nil { | |
| func (r *ResultsStore) Persist(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error { | |
| if err := r.persistedResults.BatchStore(lctx, rw, r.blockID, r.data); err != nil { | |
| return fmt.Errorf("could not add transaction results to batch: %w", err) | |
| // No error returns are expected during normal operations | |
| func (r *ResultsStore) Persist(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error { | |
| // CAUTION: here we assume that if something is already stored for our blockID, then the data is identical. | |
| // This only holds true for sealed execution results, whose consistency has previously been verified by | |
| // comparing the data's hash to commitments in the execution result. | |
| err := r.persistedResults.BatchStore(lctx, rw, r.blockID, r.data); | |
| if err != nil { | |
| if errors.Is(err, storage.ErrAlreadyExists) { | |
| return nil | |
| } | |
| return fmt.Errorf("could not add transaction results to batch: %w", err) | 
Please make sure to keep implementation, interface and all callers in sync (not using different conventions).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should check that the required lock is held here for any
TransactionResultErrorMessages.Store