Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 40 additions & 6 deletions state/cluster/badger/mutator.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (m *MutableState) checkPayloadTransactions(lctx lockctx.Proof, ctx extendCo
// follow the same fork as the one we are extending here. Hence, we might apply the transaction de-duplication logic
// against blocks that do not belong to our fork. If we erroneously find a duplicated transaction, based on a block
// that is not part of our fork, we would be raising an invalid slashing challenge, which would get this node slashed.
duplicateTxIDs, err = m.checkDupeTransactionsInFinalizedAncestry(lctx, txLookup, minRefHeight, maxRefHeight)
duplicateTxIDs, err = m.checkDupeTransactionsInFinalizedAncestry(lctx, txLookup, minRefHeight, maxRefHeight, ctx.finalizedClusterBlock.Height)
if err != nil {
return fmt.Errorf("could not check for duplicate txs in finalized ancestry: %w", err)
}
Expand All @@ -357,6 +357,7 @@ func (m *MutableState) checkPayloadTransactions(lctx lockctx.Proof, ctx extendCo

// checkDupeTransactionsInUnfinalizedAncestry checks for duplicate transactions in the un-finalized
// ancestry of the given block, and returns a list of all duplicates if there are any.
// No errors are expected during normal operation.
func (m *MutableState) checkDupeTransactionsInUnfinalizedAncestry(block *cluster.Block, includedTransactions map[flow.Identifier]struct{}, finalHeight uint64) ([]flow.Identifier, error) {
var duplicateTxIDs []flow.Identifier
err := fork.TraverseBackward(m.headers, block.ParentID, func(ancestor *flow.Header) error {
Expand All @@ -380,8 +381,13 @@ func (m *MutableState) checkDupeTransactionsInUnfinalizedAncestry(block *cluster

// checkDupeTransactionsInFinalizedAncestry checks for duplicate transactions in the finalized
// ancestry, and returns a list of all duplicates if there are any.
func (m *MutableState) checkDupeTransactionsInFinalizedAncestry(lctx lockctx.Proof, includedTransactions map[flow.Identifier]struct{}, minRefHeight, maxRefHeight uint64) ([]flow.Identifier, error) {
var duplicatedTxIDs []flow.Identifier
// No errors are expected during normal operation.
func (m *MutableState) checkDupeTransactionsInFinalizedAncestry(
lctx lockctx.Proof,
includedTransactions map[flow.Identifier]struct{},
minRefHeight, maxRefHeight, finalClusterHeight uint64,
) ([]flow.Identifier, error) {
var dupeTxIDs []flow.Identifier

// Let E be the global transaction expiry constant, measured in blocks. For each
// T ∈ `includedTransactions`, we have to decide whether the transaction
Expand Down Expand Up @@ -420,14 +426,42 @@ func (m *MutableState) checkDupeTransactionsInFinalizedAncestry(lctx lockctx.Pro
if err != nil {
return nil, fmt.Errorf("could not retrieve cluster payload (block_id=%x) to de-duplicate: %w", blockID, err)
}

// capture any transactions in the finalized block duplicating transactions in the candidate block
var dupeTxIDsForBlock []flow.Identifier
for _, tx := range payload.Collection.Transactions {
txID := tx.ID()
_, duplicated := includedTransactions[txID]
if duplicated {
duplicatedTxIDs = append(duplicatedTxIDs, txID)
dupeTxIDsForBlock = append(dupeTxIDsForBlock, txID)
}
}
}

return duplicatedTxIDs, nil
// if no duplicates were found, continue to the next block
if len(dupeTxIDsForBlock) == 0 {
continue
}

// otherwise, if any duplicates were found, confirm that the block is an ancestor of the candidate block
// We checked that the candidate block extends the finalized state as of the beginning of the Extend process.
// That state was captured as `extendCtx.finalizedClusterBlock`.
// Although not currently allowed, we assume that finalization may occur concurrently with extension,
// so a new block may have been finalized in the interim. Here we verify that, in case such a block
// was finalized, it is still an ancestor of the candidate block.
header, err := m.headers.ByBlockID(blockID)
if err != nil {
return nil, fmt.Errorf("could not retrieve header by block_id=%x: %w", blockID, err)
}
// Since we already checked for duplicate transactions in the unfinalized ancestry above, we
// know that if we found a duplicate here ABOVE our view of the finalized height, it must be on a different fork
// (otherwise we would have found it before). So, we only consider blocks at or below our view of the finalized height.
if header.Height <= finalClusterHeight {
dupeTxIDs = append(dupeTxIDs, dupeTxIDsForBlock...)
// TODO: We could stop at this point since we know the candidate is invalid.
// We likely SHOULD stop here when permissionless LNs are available, for performance reasons.
// For now, we continue and obtain a complete list of duplicates for debugging purposes, since we don't expect this case to occur.
continue
}
}
return dupeTxIDs, nil
}
36 changes: 36 additions & 0 deletions state/cluster/badger/mutator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,42 @@ func (suite *MutatorSuite) TestExtend_FinalizedBlockWithDupeTx() {
suite.Assert().True(state.IsInvalidExtensionError(err))
}

// TestExtend_RaceCondition_FinalizedForkWithDupeTx tests the case where an extending
// block conflicts with a block on another fork, and that fork is finalized.
// Usually the extending block would be rejected because it is orphaned, however
// concurrent finalization and extension can allow this scenario to occur.
//
// ↙ B(tx1) [tentatively withheld by byzantine proposer to force a fork]
// A
// ↖ C(tx1) ← D ← E [E finalizes C]
func (suite *MutatorSuite) TestExtend_RaceCondition_FinalizedForkWithDupeTx() {
tx1 := suite.Tx()

// create a block extending genesis containing tx1
B := suite.ProposalWithParentAndPayload(suite.genesis, suite.Payload(&tx1))
// create a conflicting block C, which will be finalized concurrently with inserting B
C := suite.ProposalWithParentAndPayload(suite.genesis, suite.Payload(&tx1))

// should be able to extend block C
err := suite.state.Extend(&C)
suite.Assert().NoError(err)

// We want to replicate a race condition where block C is finalized concurrently with
// block B being inserted. To accomplish this, we skip actually inserting D/E (although
// they would be need in practice). Instead, we manually insert the reference block to
// transaction lookup for C which would be inserted during finalization.
err = unittest.WithLock(suite.T(), suite.lockManager, storage.LockInsertOrFinalizeClusterBlock, func(lctx lockctx.Context) error {
return suite.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
return operation.IndexClusterBlockByReferenceHeight(lctx, rw.Writer(), suite.genesis.Height, C.Block.ID())
})
})
require.NoError(suite.T(), err)

// we should be able to extend B, because C is on a fork.
err = suite.state.Extend(&B)
suite.Assert().NoError(err)
}

func (suite *MutatorSuite) TestExtend_ConflictingForkWithDupeTx() {
tx1 := suite.Tx()

Expand Down
Loading