Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 44 additions & 12 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -951,7 +951,8 @@ func (bc *BlockChain) rewindPathHead(head *types.Header, root common.Hash) (*typ
// Recover if the target state if it's not available yet.
if !bc.HasState(head.Root) {
if err := bc.triedb.Recover(head.Root); err != nil {
log.Crit("Failed to rollback state", "err", err)
log.Error("Failed to rollback state, resetting to genesis", "err", err)
return bc.genesisBlock.Header(), rootNumber
}
}
log.Info("Rewound to block with state", "number", head.Number, "hash", head.Hash())
Expand Down Expand Up @@ -1113,14 +1114,48 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
return rootNumber, bc.loadLastState()
}

// SnapSyncCommitHead sets the current head block to the one defined by the hash
// irrelevant what the chain contents were prior.
func (bc *BlockChain) SnapSyncCommitHead(hash common.Hash) error {
// SnapSyncStart disables the underlying databases (such as the trie DB and the
// optional state snapshot) to prevent potential concurrent mutations between
// snap sync and other chain operations.
func (bc *BlockChain) SnapSyncStart() error {
if !bc.chainmu.TryLock() {
return errChainStopped
}
defer bc.chainmu.Unlock()

// Snap sync will directly modify the persistent state, making the entire
// trie database unusable until the state is fully synced. To prevent any
// subsequent state reads, explicitly disable the trie database and state
// syncer is responsible to address and correct any state missing.
if bc.TrieDB().Scheme() == rawdb.PathScheme {
if err := bc.TrieDB().Disable(); err != nil {
return err
}
}
// Snap sync uses the snapshot namespace to store potentially flaky data until
// sync completely heals and finishes. Pause snapshot maintenance in the mean-
// time to prevent access.
if snapshots := bc.Snapshots(); snapshots != nil { // Only nil in tests
snapshots.Disable()
}
return nil
}

// SnapSyncComplete sets the current head block to the block identified by the
// given hash, regardless of the chain contents prior to snap sync. It is
// invoked once snap sync completes and assumes that SnapSyncStart was called
// previously.
func (bc *BlockChain) SnapSyncComplete(hash common.Hash) error {
// Make sure that both the block as well at its state trie exists
block := bc.GetBlockByHash(hash)
if block == nil {
return fmt.Errorf("non existent block [%x..]", hash[:4])
}
if !bc.chainmu.TryLock() {
return errChainStopped
}
defer bc.chainmu.Unlock()

// Reset the trie database with the fresh snap synced state.
root := block.Root()
if bc.triedb.Scheme() == rawdb.PathScheme {
Expand All @@ -1131,19 +1166,16 @@ func (bc *BlockChain) SnapSyncCommitHead(hash common.Hash) error {
if !bc.HasState(root) {
return fmt.Errorf("non existent state [%x..]", root[:4])
}
// If all checks out, manually set the head block.
if !bc.chainmu.TryLock() {
return errChainStopped
}
bc.currentBlock.Store(block.Header())
headBlockGauge.Update(int64(block.NumberU64()))
bc.chainmu.Unlock()

// Destroy any existing state snapshot and regenerate it in the background,
// also resuming the normal maintenance of any previously paused snapshot.
if bc.snaps != nil {
bc.snaps.Rebuild(root)
}

// If all checks out, manually set the head block.
bc.currentBlock.Store(block.Header())
headBlockGauge.Update(int64(block.NumberU64()))

log.Info("Committed new head block", "number", block.Number(), "hash", hash)
return nil
}
Expand Down
22 changes: 4 additions & 18 deletions eth/catalyst/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/internal/version"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/miner"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/params"
Expand Down Expand Up @@ -81,20 +80,6 @@ const (
beaconUpdateWarnFrequency = 5 * time.Minute
)

var (
// Number of blobs requested via getBlobsV2
getBlobsRequestedCounter = metrics.NewRegisteredCounter("engine/getblobs/requested", nil)

// Number of blobs requested via getBlobsV2 that are present in the blobpool
getBlobsAvailableCounter = metrics.NewRegisteredCounter("engine/getblobs/available", nil)

// Number of times getBlobsV2 responded with “hit”
getBlobsV2RequestHit = metrics.NewRegisteredCounter("engine/getblobs/hit", nil)

// Number of times getBlobsV2 responded with “miss”
getBlobsV2RequestMiss = metrics.NewRegisteredCounter("engine/getblobs/miss", nil)
)

type ConsensusAPI struct {
eth *eth.Ethereum

Expand Down Expand Up @@ -137,6 +122,9 @@ type ConsensusAPI struct {

// NewConsensusAPI creates a new consensus api for the given backend.
// The underlying blockchain needs to have a valid terminal total difficulty set.
//
// This function creates a long-lived object with an attached background thread.
// For testing or other short-term use cases, please use newConsensusAPIWithoutHeartbeat.
func NewConsensusAPI(eth *eth.Ethereum) *ConsensusAPI {
api := newConsensusAPIWithoutHeartbeat(eth)
go api.heartbeat()
Expand Down Expand Up @@ -818,7 +806,7 @@ func (api *ConsensusAPI) delayPayloadImport(block *types.Block) engine.PayloadSt
return engine.PayloadStatusV1{Status: engine.SYNCING}
}
// Either no beacon sync was started yet, or it rejected the delivered
// payload as non-integratable on top of the existing sync. We'll just
// payload as non-integrate on top of the existing sync. We'll just
// have to rely on the beacon client to forcefully update the head with
// a forkchoice update request.
if api.eth.Downloader().ConfigSyncMode() == ethconfig.FullSync {
Expand Down Expand Up @@ -916,8 +904,6 @@ func (api *ConsensusAPI) invalid(err error, latestValid *types.Header) engine.Pa
// heartbeat loops indefinitely, and checks if there have been beacon client updates
// received in the last while. If not - or if they but strange ones - it warns the
// user that something might be off with their consensus node.
//
// TODO(karalabe): Spin this goroutine down somehow
func (api *ConsensusAPI) heartbeat() {
// Sleep a bit on startup since there's obviously no beacon client yet
// attached, so no need to print scary warnings to the user.
Expand Down
33 changes: 33 additions & 0 deletions eth/catalyst/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2025 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package catalyst

import "github.com/ethereum/go-ethereum/metrics"

var (
// Number of blobs requested via getBlobsV2
getBlobsRequestedCounter = metrics.NewRegisteredCounter("engine/getblobs/requested", nil)

// Number of blobs requested via getBlobsV2 that are present in the blobpool
getBlobsAvailableCounter = metrics.NewRegisteredCounter("engine/getblobs/available", nil)

// Number of times getBlobsV2 responded with “hit”
getBlobsV2RequestHit = metrics.NewRegisteredCounter("engine/getblobs/hit", nil)

// Number of times getBlobsV2 responded with “miss”
getBlobsV2RequestMiss = metrics.NewRegisteredCounter("engine/getblobs/miss", nil)
)
5 changes: 5 additions & 0 deletions eth/downloader/beaconsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func (b *beaconBackfiller) suspend() *types.Header {
b.lock.Unlock()

if !filling {
log.Debug("Backfiller was inactive")
return filled // Return the filled header on the previous sync completion
}
// A previous filling should be running, though it may happen that it hasn't
Expand All @@ -73,6 +74,7 @@ func (b *beaconBackfiller) suspend() *types.Header {
// Now that we're sure the downloader successfully started up, we can cancel
// it safely without running the risk of data races.
b.downloader.Cancel()
log.Debug("Backfiller has been suspended")

// Sync cycle was just terminated, retrieve and return the last filled header.
// Can't use `filled` as that contains a stale value from before cancellation.
Expand All @@ -86,6 +88,7 @@ func (b *beaconBackfiller) resume() {
// If a previous filling cycle is still running, just ignore this start
// request. // TODO(karalabe): We should make this channel driven
b.lock.Unlock()
log.Debug("Backfiller is running")
return
}
b.filling = true
Expand Down Expand Up @@ -114,7 +117,9 @@ func (b *beaconBackfiller) resume() {
if b.success != nil {
b.success()
}
log.Debug("Backfilling completed")
}()
log.Debug("Backfilling started")
}

// SetBadBlockCallback sets the callback to run when a bad block is hit by the
Expand Down
35 changes: 16 additions & 19 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,12 @@ type BlockChain interface {
// CurrentSnapBlock retrieves the head snap block from the local chain.
CurrentSnapBlock() *types.Header

// SnapSyncCommitHead directly commits the head block to a certain entity.
SnapSyncCommitHead(common.Hash) error
// SnapSyncStart explicitly notifies the chain that snap sync is scheduled and
// marks chain mutations as disallowed.
SnapSyncStart() error

// SnapSyncComplete directly commits the head block to a certain entity.
SnapSyncComplete(common.Hash) error

// InsertHeadersBeforeCutoff inserts a batch of headers before the configured
// chain cutoff into the ancient store.
Expand Down Expand Up @@ -361,28 +365,21 @@ func (d *Downloader) synchronise(beaconPing chan struct{}) (err error) {
if d.notified.CompareAndSwap(false, true) {
log.Info("Block synchronisation started")
}
mode := d.moder.get()

// Obtain the synchronized used in this cycle
mode := d.moder.get(true)
defer func() {
if err == nil && mode == ethconfig.SnapSync {
d.moder.disableSnap()
log.Info("Disabled snap-sync after the initial sync cycle")
}
}()

// Disable chain mutations when snap sync is selected, ensuring the
// downloader is the sole mutator.
if mode == ethconfig.SnapSync {
// Snap sync will directly modify the persistent state, making the entire
// trie database unusable until the state is fully synced. To prevent any
// subsequent state reads, explicitly disable the trie database and state
// syncer is responsible to address and correct any state missing.
if d.blockchain.TrieDB().Scheme() == rawdb.PathScheme {
if err := d.blockchain.TrieDB().Disable(); err != nil {
return err
}
}
// Snap sync uses the snapshot namespace to store potentially flaky data until
// sync completely heals and finishes. Pause snapshot maintenance in the mean-
// time to prevent access.
if snapshots := d.blockchain.Snapshots(); snapshots != nil { // Only nil in tests
snapshots.Disable()
if err := d.blockchain.SnapSyncStart(); err != nil {
return err
}
}
// Reset the queue, peer set and wake channels to clean any internal leftover state
Expand Down Expand Up @@ -427,7 +424,7 @@ func (d *Downloader) getMode() SyncMode {
// ConfigSyncMode returns the sync mode configured for the node.
// The actual running sync mode can differ from this.
func (d *Downloader) ConfigSyncMode() SyncMode {
return d.moder.get()
return d.moder.get(false)
}

// syncToHead starts a block synchronization based on the hash chain from
Expand Down Expand Up @@ -1086,7 +1083,7 @@ func (d *Downloader) commitPivotBlock(result *fetchResult) error {
if _, err := d.blockchain.InsertReceiptChain([]*types.Block{block}, []rlp.RawValue{result.Receipts}, d.ancientLimit); err != nil {
return err
}
if err := d.blockchain.SnapSyncCommitHead(block.Hash()); err != nil {
if err := d.blockchain.SnapSyncComplete(block.Hash()); err != nil {
return err
}
d.committed.Store(true)
Expand Down
40 changes: 39 additions & 1 deletion eth/downloader/skeleton.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ var errSyncMerged = errors.New("sync merged")
// should abort and restart with the new state.
var errSyncReorged = errors.New("sync reorged")

// errSyncTrimmed is an internal helper error to signal that the local chain
// has been trimmed (e.g, via debug_setHead explicitly) and the skeleton chain
// is no longer linked with the local chain. In this case, the skeleton sync
// should be re-scheduled again.
var errSyncTrimmed = errors.New("sync trimmed")

// errTerminated is returned if the sync mechanism was terminated for this run of
// the process. This is usually the case when Geth is shutting down and some events
// might still be propagating.
Expand Down Expand Up @@ -296,6 +302,11 @@ func (s *skeleton) startup() {
// head to force a cleanup.
head = newhead

case err == errSyncTrimmed:
// The skeleton chain is not linked with the local chain anymore,
// restart the sync.
head = nil

case err == errTerminated:
// Sync was requested to be terminated from within, stop and
// return (no need to pass a message, was already done internally)
Expand Down Expand Up @@ -486,7 +497,22 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) {
// is still running, it will pick it up. If it already terminated,
// a new cycle needs to be spun up.
if linked {
s.filler.resume()
linked = len(s.progress.Subchains) == 1 &&
rawdb.HasHeader(s.db, s.progress.Subchains[0].Next, s.scratchHead) &&
rawdb.HasBody(s.db, s.progress.Subchains[0].Next, s.scratchHead) &&
rawdb.HasReceipts(s.db, s.progress.Subchains[0].Next, s.scratchHead)

if linked {
// The skeleton chain has been extended and is still linked with the local
// chain, try to re-schedule the backfiller if it's already terminated.
s.filler.resume()
} else {
// The skeleton chain is no longer linked to the local chain for some reason
// (e.g. debug_setHead was used to trim the local chain). Re-schedule the
// skeleton sync to fill the chain gap.
log.Warn("Local chain has been trimmed", "tailnumber", s.scratchHead, "tailhash", s.progress.Subchains[0].Next)
return nil, errSyncTrimmed
}
}

case req := <-requestFails:
Expand Down Expand Up @@ -649,9 +675,19 @@ func (s *skeleton) processNewHead(head *types.Header, final *types.Header) error
// Not a noop / double head announce, abort with a reorg
return fmt.Errorf("%w, tail: %d, head: %d, newHead: %d", errChainReorged, lastchain.Tail, lastchain.Head, number)
}
// Terminate the sync if the chain head is gapped
if lastchain.Head+1 < number {
return fmt.Errorf("%w, head: %d, newHead: %d", errChainGapped, lastchain.Head, number)
}
// Ignore the duplicated beacon header announcement
if lastchain.Head == number {
local := rawdb.ReadSkeletonHeader(s.db, number)
if local != nil && local.Hash() == head.Hash() {
log.Debug("Ignored the identical beacon header", "number", number, "hash", local.Hash())
return nil
}
}
// Terminate the sync if the chain head is forked
if parent := rawdb.ReadSkeletonHeader(s.db, number-1); parent.Hash() != head.ParentHash {
return fmt.Errorf("%w, ancestor: %d, hash: %s, want: %s", errChainForked, number-1, parent.Hash(), head.ParentHash)
}
Expand All @@ -669,6 +705,7 @@ func (s *skeleton) processNewHead(head *types.Header, final *types.Header) error
if err := batch.Write(); err != nil {
log.Crit("Failed to write skeleton sync status", "err", err)
}
log.Debug("Extended beacon header chain", "number", head.Number, "hash", head.Hash())
return nil
}

Expand Down Expand Up @@ -1206,6 +1243,7 @@ func (s *skeleton) cleanStales(filled *types.Header) error {
if err := batch.Write(); err != nil {
log.Crit("Failed to write beacon trim data", "err", err)
}
log.Debug("Cleaned stale beacon headers", "start", start, "end", end)
return nil
}

Expand Down
Loading