From 4dce30632178742772809d3bd566225f64990b5f Mon Sep 17 00:00:00 2001 From: Draco Date: Fri, 17 Oct 2025 16:48:23 -0400 Subject: [PATCH] feat(blockdb): add block database --- plugin/evm/database/blockdb/database.go | 464 +++++++++++++ plugin/evm/database/blockdb/database_test.go | 681 +++++++++++++++++++ plugin/evm/database/blockdb/migrator.go | 494 ++++++++++++++ plugin/evm/database/blockdb/migrator_test.go | 359 ++++++++++ 4 files changed, 1998 insertions(+) create mode 100644 plugin/evm/database/blockdb/database.go create mode 100644 plugin/evm/database/blockdb/database_test.go create mode 100644 plugin/evm/database/blockdb/migrator.go create mode 100644 plugin/evm/database/blockdb/migrator_test.go diff --git a/plugin/evm/database/blockdb/database.go b/plugin/evm/database/blockdb/database.go new file mode 100644 index 0000000000..ca40bd9955 --- /dev/null +++ b/plugin/evm/database/blockdb/database.go @@ -0,0 +1,464 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package blockdb + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + "path/filepath" + + "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/database/heightindexdb/meterdb" + "github.com/ava-labs/avalanchego/database/prefixdb" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/x/blockdb" + "github.com/ava-labs/libevm/common" + "github.com/ava-labs/libevm/ethdb" + "github.com/ava-labs/libevm/log" + "github.com/ava-labs/libevm/rlp" + "github.com/prometheus/client_golang/prometheus" +) + +var ( + _ ethdb.Database = (*Database)(nil) + + // Key prefixes for block data in the ethdb. + // REVIEW: I opted to just copy these from libevm since these should never change + // and we can avoid libevm changes by not needing to export them. + // Alternatively, we can update libevm to export these or create reader functions. + chainDBHeaderPrefix = []byte("h") + chainDBBlockBodyPrefix = []byte("b") + chainDBReceiptsPrefix = []byte("r") + + // Database prefix for storing block database migrator's internal state and progress tracking + migratorDBPrefix = []byte("migrator") + + // Key for storing the minimum height configuration of the block databases. + // This value determines the lowest block height that will be stored in the + // height-indexed block databases. Once set during initialization, the block + // database min height cannot be changed without recreating the databases. + blockDBMinHeightKey = []byte("blockdb_min_height") +) + +const ( + // Number of elements in the RLP encoded receipt/header/body data (hash + data) + hashDataElements = 2 + blockNumberSize = 8 + blockHashSize = 32 +) + +// Database is a wrapper around an ethdb.Database that stores block header +// body, and receipts in separate height-indexed block databases. +// All other keys are stored in the underlying ethdb.Database (chainDB). +type Database struct { + ethdb.Database + // DB for storing block database state data (ie min height) + stateDB database.Database + headerDB database.HeightIndex + bodyDB database.HeightIndex + receiptsDB database.HeightIndex + + config blockdb.DatabaseConfig + blockDBPath string + minHeight uint64 + + migrator *migrator + + reg prometheus.Registerer + // todo: use logger for blockdb instances + //nolint:unused + logger logging.Logger + + initialized bool +} + +type databaseBatch struct { + database *Database + ethdb.Batch +} + +func New( + stateDB database.Database, + chainDB ethdb.Database, + config blockdb.DatabaseConfig, + blockDBPath string, + logger logging.Logger, + reg prometheus.Registerer, +) *Database { + return &Database{ + stateDB: stateDB, + Database: chainDB, + blockDBPath: blockDBPath, + config: config, + reg: reg, + logger: logger, + } +} + +// InitWithStateSync initializes the height-indexed databases with the +// appropriate minimum height based on existing configuration and state sync settings. +// +// Initialization cases (in order of precedence): +// 1. Databases already exist → loads with existing min height +// 2. Data to migrate exists → initializes with min block height to migrate +// 3. No data to migrate + state sync enabled → defers initialization +// 4. No data to migrate + state sync disabled → initializes with min height 1 +// +// Returns true if databases were initialized, false otherwise. +func (db *Database) InitWithStateSync(stateSyncEnabled bool) (bool, error) { + minHeight, err := getDatabaseMinHeight(db.stateDB) + if err != nil { + return false, err + } + + // Databases already exist, load with existing min height + if minHeight != nil { + if err := db.InitWithMinHeight(*minHeight); err != nil { + return false, err + } + return true, nil + } + + // Data to migrate exists, initialize with min block height to migrate + minMigrateHeight := minBlockHeightToMigrate(db.Database) + if minMigrateHeight != nil { + if err := db.InitWithMinHeight(*minMigrateHeight); err != nil { + return false, err + } + return true, nil + } + + // No data to migrate and state sync disabled, initialize with min height 1 + if !stateSyncEnabled { + // Genesis block is not stored in height-indexed databases to avoid + // min height complexity with different node types (pruned vs archive). + if err := db.InitWithMinHeight(1); err != nil { + return false, err + } + return true, nil + } + return false, nil +} + +// InitWithMinHeight initializes the height-indexed databases with the provided minimum height. +func (db *Database) InitWithMinHeight(minHeight uint64) error { + if db.initialized { + log.Warn("InitWithMinHeight called on a block database that is already initialized") + return nil + } + log.Info("Initializing block database with min height", "minHeight", minHeight) + + if err := db.stateDB.Put(blockDBMinHeightKey, encodeBlockNumber(minHeight)); err != nil { + return err + } + headerDB, err := db.newMeteredDatabase("headerdb", minHeight) + if err != nil { + return err + } + bodyDB, err := db.newMeteredDatabase("bodydb", minHeight) + if err != nil { + return err + } + receiptsDB, err := db.newMeteredDatabase("receiptsdb", minHeight) + if err != nil { + return err + } + db.headerDB = headerDB + db.bodyDB = bodyDB + db.receiptsDB = receiptsDB + + if err := db.initMigrator(); err != nil { + return fmt.Errorf("failed to init migrator: %w", err) + } + + db.initialized = true + db.minHeight = minHeight + return nil +} + +// Migrate migrates block headers, bodies, and receipts from ethDB to the block databases. +func (db *Database) Migrate() error { + if !db.initialized { + return errors.New("block database must be initialized before migrating") + } + return db.migrator.Migrate() +} + +func (db *Database) Put(key []byte, value []byte) error { + if !db.shouldWriteToBlockDatabase(key) { + return db.Database.Put(key, value) + } + + blockNumber, blockHash := blockNumberAndHashFromKey(key) + + if isReceiptKey(key) { + return writeHashAndData(db.receiptsDB, blockNumber, blockHash, value) + } + if isHeaderKey(key) { + return writeHashAndData(db.headerDB, blockNumber, blockHash, value) + } + if isBodyKey(key) { + return writeHashAndData(db.bodyDB, blockNumber, blockHash, value) + } + return nil +} + +func (db *Database) Get(key []byte) ([]byte, error) { + if !db.shouldWriteToBlockDatabase(key) { + return db.Database.Get(key) + } + + blockNumber, blockHash := blockNumberAndHashFromKey(key) + var heightDB database.HeightIndex + switch { + case isReceiptKey(key): + heightDB = db.receiptsDB + case isHeaderKey(key): + heightDB = db.headerDB + case isBodyKey(key): + heightDB = db.bodyDB + default: + return nil, fmt.Errorf("unexpected key: %x", key) + } + + return readHashAndData(heightDB, db.Database, key, blockNumber, blockHash, db.migrator) +} + +func (db *Database) Has(key []byte) (bool, error) { + if !db.shouldWriteToBlockDatabase(key) { + return db.Database.Has(key) + } + data, err := db.Get(key) + if err != nil { + return false, err + } + return data != nil, nil +} + +func (db *Database) Delete(key []byte) error { + if !db.shouldWriteToBlockDatabase(key) { + return db.Database.Delete(key) + } + + // no-op since deleting written blocks is not supported + return nil +} + +func (db *Database) Close() error { + if db.migrator != nil { + db.migrator.Stop() + } + if db.headerDB != nil { + if err := db.headerDB.Close(); err != nil { + return err + } + } + if db.bodyDB != nil { + if err := db.bodyDB.Close(); err != nil { + return err + } + } + if db.receiptsDB != nil { + err := db.receiptsDB.Close() + if err != nil { + return err + } + } + return db.Database.Close() +} + +func (db *Database) initMigrator() error { + if db.migrator != nil { + return nil + } + migratorStateDB := prefixdb.New(migratorDBPrefix, db.stateDB) + migrator, err := NewMigrator(migratorStateDB, db.headerDB, db.bodyDB, db.receiptsDB, db.Database) + if err != nil { + return err + } + db.migrator = migrator + return nil +} + +func (db *Database) newMeteredDatabase(namespace string, minHeight uint64) (database.HeightIndex, error) { + path := filepath.Join(db.blockDBPath, namespace) + config := db.config.WithDir(path).WithMinimumHeight(minHeight) + newDB, err := blockdb.New(config, logging.NoLog{}) + if err != nil { + return nil, fmt.Errorf("failed to create block database at %s: %w", path, err) + } + + meteredDB, err := meterdb.New(db.reg, namespace, newDB) + if err != nil { + if err := newDB.Close(); err != nil { + return nil, fmt.Errorf("failed to close block database: %w", err) + } + return nil, fmt.Errorf("failed to create metered %s database: %w", namespace, err) + } + + return meteredDB, nil +} + +func (db *Database) shouldWriteToBlockDatabase(key []byte) bool { + if !db.initialized { + return false + } + + var prefixLen int + switch { + case isBodyKey(key): + prefixLen = len(chainDBBlockBodyPrefix) + case isHeaderKey(key): + prefixLen = len(chainDBHeaderPrefix) + case isReceiptKey(key): + prefixLen = len(chainDBReceiptsPrefix) + default: + return false + } + blockNumber := binary.BigEndian.Uint64(key[prefixLen : prefixLen+8]) + return blockNumber >= db.minHeight +} + +func (db *Database) NewBatch() ethdb.Batch { + return databaseBatch{ + database: db, + Batch: db.Database.NewBatch(), + } +} + +func (db *Database) NewBatchWithSize(size int) ethdb.Batch { + return databaseBatch{ + database: db, + Batch: db.Database.NewBatchWithSize(size), + } +} + +func (batch databaseBatch) Put(key []byte, value []byte) error { + if !batch.database.shouldWriteToBlockDatabase(key) { + return batch.Batch.Put(key, value) + } + return batch.database.Put(key, value) +} + +func (batch databaseBatch) Delete(key []byte) error { + if !batch.database.shouldWriteToBlockDatabase(key) { + return batch.Batch.Delete(key) + } + return batch.database.Delete(key) +} + +func blockNumberAndHashFromKey(key []byte) (uint64, common.Hash) { + var prefixLen int + switch { + case isBodyKey(key): + prefixLen = len(chainDBBlockBodyPrefix) + case isHeaderKey(key): + prefixLen = len(chainDBHeaderPrefix) + case isReceiptKey(key): + prefixLen = len(chainDBReceiptsPrefix) + } + blockNumber := binary.BigEndian.Uint64(key[prefixLen : prefixLen+8]) + blockHash := common.BytesToHash(key[prefixLen+blockNumberSize:]) + return blockNumber, blockHash +} + +func encodeBlockNumber(number uint64) []byte { + enc := make([]byte, blockNumberSize) + binary.BigEndian.PutUint64(enc, number) + return enc +} + +func isBodyKey(key []byte) bool { + if len(key) != len(chainDBBlockBodyPrefix)+blockNumberSize+blockHashSize { + return false + } + return bytes.HasPrefix(key, chainDBBlockBodyPrefix) +} + +func isHeaderKey(key []byte) bool { + if len(key) != len(chainDBHeaderPrefix)+blockNumberSize+blockHashSize { + return false + } + return bytes.HasPrefix(key, chainDBHeaderPrefix) +} + +func isReceiptKey(key []byte) bool { + if len(key) != len(chainDBReceiptsPrefix)+blockNumberSize+blockHashSize { + return false + } + return bytes.HasPrefix(key, chainDBReceiptsPrefix) +} + +func getDatabaseMinHeight(db database.Database) (*uint64, error) { + has, err := db.Has(blockDBMinHeightKey) + if err != nil { + return nil, err + } + if !has { + return nil, nil + } + minHeightBytes, err := db.Get(blockDBMinHeightKey) + if err != nil { + return nil, err + } + minHeight := binary.BigEndian.Uint64(minHeightBytes) + return &minHeight, nil +} + +// writeHashAndData encodes [hash, data] and writes to the provided height-index db at the given height. +func writeHashAndData(db database.HeightIndex, height uint64, blockHash common.Hash, data []byte) error { + encoded, err := rlp.EncodeToBytes([][]byte{blockHash.Bytes(), data}) + if err != nil { + return err + } + return db.Put(height, encoded) +} + +// readHashAndData reads data from the height-indexed database and falls back +// to the chain database if the data is not found and migration is still needed. +func readHashAndData( + heightDB database.HeightIndex, + chainDB ethdb.Database, + key []byte, + blockNumber uint64, + blockHash common.Hash, + migrator *migrator, +) ([]byte, error) { + encodedData, err := heightDB.Get(blockNumber) + if err != nil { + if errors.Is(err, database.ErrNotFound) && migrator != nil && migrator.Status() != migrationCompleted { + return chainDB.Get(key) + } + return nil, err + } + + var elems [][]byte + if err := rlp.DecodeBytes(encodedData, &elems); err != nil { + return nil, err + } + if len(elems) != hashDataElements { + return nil, fmt.Errorf( + "invalid hash+data format: expected %d elements, got %d", + hashDataElements, + len(elems), + ) + } + decodedHash := common.BytesToHash(elems[0]) + if decodedHash != blockHash { + return nil, nil + } + + return elems[1], nil +} + +// IsEnabled checks if block databases have already been enabled +func IsEnabled(db database.Database) (bool, error) { + has, err := db.Has(blockDBMinHeightKey) + if err != nil { + return false, err + } + return has, nil +} diff --git a/plugin/evm/database/blockdb/database_test.go b/plugin/evm/database/blockdb/database_test.go new file mode 100644 index 0000000000..72b8f1f624 --- /dev/null +++ b/plugin/evm/database/blockdb/database_test.go @@ -0,0 +1,681 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package blockdb + +import ( + "math/big" + "os" + "path/filepath" + "testing" + "time" + + "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/database/leveldb" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/x/blockdb" + "github.com/ava-labs/libevm/core/rawdb" + "github.com/ava-labs/libevm/core/types" + "github.com/ava-labs/libevm/crypto" + "github.com/ava-labs/libevm/ethdb" + "github.com/ava-labs/libevm/rlp" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + + "github.com/ava-labs/coreth/consensus/dummy" + "github.com/ava-labs/coreth/core" + "github.com/ava-labs/coreth/params" + "github.com/ava-labs/coreth/plugin/evm/customtypes" + evmdb "github.com/ava-labs/coreth/plugin/evm/database" +) + +func TestMain(m *testing.M) { + customtypes.Register() + params.RegisterExtras() + os.Exit(m.Run()) +} + +// slowDatabase wraps a Database to add artificial delays +type slowDatabase struct { + database.HeightIndex + shouldSlow func() bool +} + +func (s *slowDatabase) Put(blockNumber uint64, encodedBlock []byte) error { + // Sleep to make migration hang for a bit + if s.shouldSlow == nil || s.shouldSlow() { + time.Sleep(100 * time.Millisecond) + } + return s.HeightIndex.Put(blockNumber, encodedBlock) +} + +var ( + key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + key2, _ = crypto.HexToECDSA("8a1f9a8f95be41cd7ccb6168179afb4504aefe388d1e14474d32c45c72ce7b7a") + addr1 = crypto.PubkeyToAddress(key1.PublicKey) + addr2 = crypto.PubkeyToAddress(key2.PublicKey) +) + +func newDatabasesFromDir(t *testing.T, dataDir string) (*Database, ethdb.Database) { + t.Helper() + + base, err := leveldb.New(dataDir, nil, logging.NoLog{}, prometheus.NewRegistry()) + require.NoError(t, err) + chainDB := rawdb.NewDatabase(evmdb.WrapDatabase(base)) + + // Create wrapped block database + blockDBPath := filepath.Join(dataDir, "dbs") + wrapper := New(base, chainDB, blockdb.DefaultConfig(), blockDBPath, logging.NoLog{}, prometheus.NewRegistry()) + require.NoError(t, wrapper.InitWithMinHeight(1)) + + return wrapper, chainDB +} + +func createBlocks(t *testing.T, numBlocks int) ([]*types.Block, []types.Receipts) { + t.Helper() + + gspec := &core.Genesis{ + Config: params.TestChainConfig, + Number: 0, + Alloc: types.GenesisAlloc{addr1: {Balance: big.NewInt(params.Ether)}}, + } + engine := dummy.NewFaker() + signer := types.LatestSigner(params.TestChainConfig) + db, blocks, receipts, err := core.GenerateChainWithGenesis(gspec, engine, numBlocks-1, 10, func(_ int, gen *core.BlockGen) { + tx, _ := types.SignTx(types.NewTx(&types.DynamicFeeTx{ + ChainID: params.TestChainConfig.ChainID, + Nonce: gen.TxNonce(addr1), + To: &addr2, + Gas: 500000, + GasTipCap: big.NewInt(1), + GasFeeCap: big.NewInt(1), + }), signer, key1) + gen.AddTx(tx) + }) + require.NoError(t, err) + + // add genesis block from db to blocks + genesisHash := rawdb.ReadCanonicalHash(db, 0) + genesisBlock := rawdb.ReadBlock(db, genesisHash, 0) + genesisReceipts := rawdb.ReadReceipts(db, genesisHash, 0, 0, params.TestChainConfig) + blocks = append([]*types.Block{genesisBlock}, blocks...) + receipts = append([]types.Receipts{genesisReceipts}, receipts...) + + return blocks, receipts +} + +func writeBlocks(db ethdb.Database, blocks []*types.Block, receipts []types.Receipts) { + for i, block := range blocks { + rawdb.WriteBlock(db, block) + if len(receipts) > 0 { + rawdb.WriteReceipts(db, block.Hash(), block.NumberU64(), receipts[i]) + } + + // ensure written blocks are canonical + rawdb.WriteCanonicalHash(db, block.Hash(), block.NumberU64()) + } +} + +// todo: break this down into read blocks and test everything +// make sure to include reading genesis block +func TestDatabase_Read(t *testing.T) { + // Test that Database reads block data correctly + dataDir := t.TempDir() + wrapper, _ := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 10) + writeBlocks(wrapper, blocks, receipts) + + testCases := []struct { + name string + test func(t *testing.T, block *types.Block, blockReceipts types.Receipts) + }{ + { + name: "header", + test: func(t *testing.T, block *types.Block, _ types.Receipts) { + require.True(t, rawdb.HasHeader(wrapper, block.Hash(), block.NumberU64())) + header := rawdb.ReadHeader(wrapper, block.Hash(), block.NumberU64()) + assertRLPEqual(t, block.Header(), header) + }, + }, + { + name: "body", + test: func(t *testing.T, block *types.Block, _ types.Receipts) { + require.True(t, rawdb.HasBody(wrapper, block.Hash(), block.NumberU64())) + body := rawdb.ReadBody(wrapper, block.Hash(), block.NumberU64()) + assertRLPEqual(t, block.Body(), body) + }, + }, + { + name: "block", + test: func(t *testing.T, block *types.Block, _ types.Receipts) { + actualBlock := rawdb.ReadBlock(wrapper, block.Hash(), block.NumberU64()) + assertRLPEqual(t, block, actualBlock) + }, + }, + { + name: "receipts_and_logs", + test: func(t *testing.T, block *types.Block, blockReceipts types.Receipts) { + require.True(t, rawdb.HasReceipts(wrapper, block.Hash(), block.NumberU64())) + recs := rawdb.ReadReceipts(wrapper, block.Hash(), block.NumberU64(), block.Time(), params.TestChainConfig) + assertRLPEqual(t, blockReceipts, recs) + + logs := rawdb.ReadLogs(wrapper, block.Hash(), block.NumberU64()) + expectedLogs := make([][]*types.Log, len(blockReceipts)) + for j, receipt := range blockReceipts { + expectedLogs[j] = receipt.Logs + } + assertRLPEqual(t, expectedLogs, logs) + }, + }, + { + name: "block_number", + test: func(t *testing.T, block *types.Block, _ types.Receipts) { + blockNumber := rawdb.ReadHeaderNumber(wrapper, block.Hash()) + require.Equal(t, block.NumberU64(), *blockNumber) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + for i, block := range blocks { + blockReceipts := receipts[i] + tc.test(t, block, blockReceipts) + } + }) + } +} + +func TestDatabaseDelete(t *testing.T) { + // Test Database delete operations. + // We are verifying that block header, body and receipts cannot be deleted, + // but hash to height mapping should be deleted. + + dataDir := t.TempDir() + wrapper, _ := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 4) + targetBlocks := blocks[1:] + targetReceipts := receipts[1:] + writeBlocks(wrapper, targetBlocks, targetReceipts) + + // delete block data + for i, block := range targetBlocks { + rawdb.DeleteBlock(wrapper, block.Hash(), block.NumberU64()) + + // we cannot delete header or body + require.True(t, rawdb.HasHeader(wrapper, block.Hash(), block.NumberU64())) + require.True(t, rawdb.HasBody(wrapper, block.Hash(), block.NumberU64())) + header := rawdb.ReadHeader(wrapper, block.Hash(), block.NumberU64()) + assertRLPEqual(t, block.Header(), header) + body := rawdb.ReadBody(wrapper, block.Hash(), block.NumberU64()) + assertRLPEqual(t, block.Body(), body) + + // hash to height mapping should be deleted + blockNumber := rawdb.ReadHeaderNumber(wrapper, block.Hash()) + require.Nil(t, blockNumber) + + // Receipts and logs should not be deleted + expectedReceipts := targetReceipts[i] + recs := rawdb.ReadReceipts(wrapper, block.Hash(), block.NumberU64(), block.Time(), params.TestChainConfig) + assertRLPEqual(t, expectedReceipts, recs) + logs := rawdb.ReadLogs(wrapper, block.Hash(), block.NumberU64()) + expectedLogs := make([][]*types.Log, len(expectedReceipts)) + for j, receipt := range expectedReceipts { + expectedLogs[j] = receipt.Logs + } + assertRLPEqual(t, expectedLogs, logs) + } +} + +func TestDatabaseWrite(t *testing.T) { + // Test that header and body are stored separately and block can be read + // after both are written. + dataDir := t.TempDir() + wrapper, chainDB := newDatabasesFromDir(t, dataDir) + blocks, _ := createBlocks(t, 2) + block := blocks[1] + + rawdb.WriteHeader(wrapper, block.Header()) + require.True(t, rawdb.HasHeader(wrapper, block.Hash(), block.NumberU64())) + header := rawdb.ReadHeader(wrapper, block.Hash(), block.NumberU64()) + assertRLPEqual(t, block.Header(), header) + require.False(t, rawdb.HasBody(wrapper, block.Hash(), block.NumberU64())) + require.Nil(t, rawdb.ReadBody(wrapper, block.Hash(), block.NumberU64())) + + // Verify underlying chainDB also has no header + require.Nil(t, rawdb.ReadHeader(chainDB, block.Hash(), block.NumberU64())) + + // Write body - should persist body now + rawdb.WriteBody(wrapper, block.Hash(), block.NumberU64(), block.Body()) + + // Verify block is available + require.True(t, rawdb.HasHeader(wrapper, block.Hash(), block.NumberU64())) + require.True(t, rawdb.HasBody(wrapper, block.Hash(), block.NumberU64())) + actualBlock := rawdb.ReadBlock(wrapper, block.Hash(), block.NumberU64()) + assertRLPEqual(t, block, actualBlock) + + // Verify underlying chainDB has no header/body + require.Nil(t, rawdb.ReadHeader(chainDB, block.Hash(), block.NumberU64())) + require.Nil(t, rawdb.ReadBody(chainDB, block.Hash(), block.NumberU64())) +} + +func TestDatabase_Batch(t *testing.T) { + // Test that batch operations work correctly for both writing and reading block data and receipts + dataDir := t.TempDir() + wrapper, _ := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 2) + block := blocks[1] + blockReceipts := receipts[1] + + batch := wrapper.NewBatch() + + // Write header, body, and receipts to batch + rawdb.WriteBlock(batch, block) + rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), blockReceipts) + + // After writing both header and body to batch, both should be available immediately + require.True(t, rawdb.HasHeader(wrapper, block.Hash(), block.NumberU64())) + require.True(t, rawdb.HasBody(wrapper, block.Hash(), block.NumberU64())) + + // Receipts should also be available immediately since they're stored separately + require.True(t, rawdb.HasReceipts(wrapper, block.Hash(), block.NumberU64())) + + // Before write, header number should not be available + require.Nil(t, rawdb.ReadHeaderNumber(wrapper, block.Hash())) + + // After Write(), verify header number is available + require.NoError(t, batch.Write()) + blockNumber := rawdb.ReadHeaderNumber(wrapper, block.Hash()) + require.Equal(t, block.NumberU64(), *blockNumber) +} + +func TestDatabase_SameBlockWrites(t *testing.T) { + // Test that writing the same block twice via rawdb.WriteBlock doesn't cause issues + dataDir := t.TempDir() + wrapper, _ := newDatabasesFromDir(t, dataDir) + blocks, _ := createBlocks(t, 1) + block := blocks[0] + + // Write block twice + rawdb.WriteBlock(wrapper, block) + rawdb.WriteBlock(wrapper, block) + + // Verify block data is still correct after duplicate writes + require.True(t, rawdb.HasHeader(wrapper, block.Hash(), block.NumberU64())) + require.True(t, rawdb.HasBody(wrapper, block.Hash(), block.NumberU64())) + actualBlock := rawdb.ReadBlock(wrapper, block.Hash(), block.NumberU64()) + assertRLPEqual(t, block, actualBlock) +} + +func TestDatabase_DifferentBlocksSameHeight(t *testing.T) { + // Test that writing different blocks to the same height overwrites the first block + // and reading by the first block's hash returns nothing. + dataDir := t.TempDir() + wrapper, _ := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 2) + block1 := blocks[1] + receipt1 := receipts[1] + + // Manually create a second block with the same height but different content + gspec := &core.Genesis{ + Config: params.TestChainConfig, + Alloc: types.GenesisAlloc{addr1: {Balance: big.NewInt(params.Ether)}}, + } + engine := dummy.NewFaker() + signer := types.LatestSigner(params.TestChainConfig) + _, blocks2, receipts2, err := core.GenerateChainWithGenesis(gspec, engine, 1, 10, func(_ int, gen *core.BlockGen) { + gen.OffsetTime(int64(5000)) + tx, _ := types.SignTx(types.NewTx(&types.DynamicFeeTx{ + ChainID: params.TestChainConfig.ChainID, + Nonce: gen.TxNonce(addr1), + To: &addr2, + Gas: 450000, + GasTipCap: big.NewInt(5), + GasFeeCap: big.NewInt(5), + }), signer, key1) + gen.AddTx(tx) + }) + require.NoError(t, err) + block2 := blocks2[0] + receipt2 := receipts2[0] + + // Ensure both blocks have the same height but different hashes + require.Equal(t, block1.NumberU64(), block2.NumberU64()) + require.NotEqual(t, block1.Hash(), block2.Hash()) + + // Write two blocks with the same height + writeBlocks(wrapper, []*types.Block{block1, block2}, []types.Receipts{receipt1, receipt2}) + + // Reading by the first block's hash does not return anything + require.False(t, rawdb.HasHeader(wrapper, block1.Hash(), block1.NumberU64())) + require.False(t, rawdb.HasBody(wrapper, block1.Hash(), block1.NumberU64())) + firstHeader := rawdb.ReadHeader(wrapper, block1.Hash(), block1.NumberU64()) + require.Nil(t, firstHeader) + firstBody := rawdb.ReadBody(wrapper, block1.Hash(), block1.NumberU64()) + require.Nil(t, firstBody) + require.False(t, rawdb.HasReceipts(wrapper, block1.Hash(), block1.NumberU64())) + firstReceipts := rawdb.ReadReceipts(wrapper, block1.Hash(), block1.NumberU64(), block1.Time(), params.TestChainConfig) + require.Nil(t, firstReceipts) + + // Reading by the second block's hash returns second block data + require.True(t, rawdb.HasHeader(wrapper, block2.Hash(), block2.NumberU64())) + require.True(t, rawdb.HasBody(wrapper, block2.Hash(), block2.NumberU64())) + secondBlock := rawdb.ReadBlock(wrapper, block2.Hash(), block2.NumberU64()) + assertRLPEqual(t, block2, secondBlock) + require.True(t, rawdb.HasReceipts(wrapper, block2.Hash(), block2.NumberU64())) + secondReceipts := rawdb.ReadReceipts(wrapper, block2.Hash(), block2.NumberU64(), block2.Time(), params.TestChainConfig) + assertRLPEqual(t, receipt2, secondReceipts) +} + +func TestDatabase_EmptyReceipts(t *testing.T) { + // Test that blocks with no transactions (empty receipts) are handled correctly + dataDir := t.TempDir() + wrapper, chainDB := newDatabasesFromDir(t, dataDir) + + // Create blocks without any transactions (empty receipts) + gspec := &core.Genesis{ + Config: params.TestChainConfig, + Alloc: types.GenesisAlloc{addr1: {Balance: big.NewInt(params.Ether)}}, + } + engine := dummy.NewFaker() + _, blocks, receipts, err := core.GenerateChainWithGenesis(gspec, engine, 3, 10, func(_ int, _ *core.BlockGen) { + // Don't add any transactions - this will create blocks with empty receipts + }) + require.NoError(t, err) + + // Verify all blocks have empty receipts + for i := range blocks { + require.Empty(t, receipts[i], "Block %d should have empty receipts", i) + } + + // write some blocks to chain db and some to wrapper and trigger migration + writeBlocks(chainDB, blocks[:2], receipts[:2]) + writeBlocks(wrapper, blocks[2:], receipts[2:]) + require.NoError(t, wrapper.migrator.Migrate()) + waitForMigratorCompletion(t, wrapper.migrator, 10*time.Second) + + // Verify that blocks with empty receipts are handled correctly + for _, block := range blocks { + blockNum := block.NumberU64() + + // Block data should be accessible + require.True(t, rawdb.HasHeader(wrapper, block.Hash(), blockNum)) + require.True(t, rawdb.HasBody(wrapper, block.Hash(), blockNum)) + + // Receipts should only be stored in the receiptsDB + require.False(t, rawdb.HasReceipts(chainDB, block.Hash(), blockNum)) + _, err := wrapper.receiptsDB.Get(blockNum) + require.NoError(t, err) + + // to be consistent with ethdb behavior, empty receipts should return true for HasReceipts + require.True(t, rawdb.HasReceipts(wrapper, block.Hash(), blockNum)) + recs := rawdb.ReadReceipts(wrapper, block.Hash(), blockNum, block.Time(), params.TestChainConfig) + require.Empty(t, recs) + logs := rawdb.ReadLogs(wrapper, block.Hash(), blockNum) + require.Empty(t, logs) + } +} + +func TestDatabase_Close_PersistsData(t *testing.T) { + // Test that Close() properly closes both databases and data persists + dataDir := t.TempDir() + wrapper, _ := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 1) + block := blocks[0] + blockReceipts := receipts[0] + + // Write block and receipts + writeBlocks(wrapper, blocks, receipts) + + // Verify data is present + require.True(t, rawdb.HasHeader(wrapper, block.Hash(), block.NumberU64())) + require.True(t, rawdb.HasBody(wrapper, block.Hash(), block.NumberU64())) + recs := rawdb.ReadReceipts(wrapper, block.Hash(), block.NumberU64(), block.Time(), params.TestChainConfig) + assertRLPEqual(t, blockReceipts, recs) + + // Close the wrapper db + require.NoError(t, wrapper.Close()) + + // Test we should no longer be able to read the block + require.False(t, rawdb.HasHeader(wrapper, block.Hash(), block.NumberU64())) + require.False(t, rawdb.HasBody(wrapper, block.Hash(), block.NumberU64())) + _, err := wrapper.bodyDB.Get(block.NumberU64()) + require.Error(t, err) + require.ErrorIs(t, err, database.ErrClosed) + + // Reopen the database and verify data is still present + wrapper, _ = newDatabasesFromDir(t, dataDir) + persistedBlock := rawdb.ReadBlock(wrapper, block.Hash(), block.NumberU64()) + assertRLPEqual(t, block, persistedBlock) + persistedRecs := rawdb.ReadReceipts(wrapper, block.Hash(), block.NumberU64(), block.Time(), params.TestChainConfig) + assertRLPEqual(t, blockReceipts, persistedRecs) + require.NoError(t, wrapper.Close()) +} + +func TestDatabase_ReadDuringMigration(t *testing.T) { + // Test that blocks are readable during migration for both migrated and un-migrated blocks. + // This test: + // 1. Generates 21 blocks with receipts + // 2. Adds first 20 blocks and their receipts to chainDB + // 3. Creates wrapper database with migration disabled + // 4. Start migration with slow block database to control migration speed + // 5. Waits for at least 5 blocks to be migrated + // 6. Writes block 21 during migration (this is fast) + // 7. Verifies all 21 blocks and their receipts are readable via rawdb using wrapper + + dataDir := t.TempDir() + // Create initial databases using newDatabasesFromDir + wrapper, chainDB := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 21) + + // Add first 20 blocks to KVDB (chainDB) - these will be migrated + writeBlocks(chainDB, blocks[:20], receipts[:20]) + + // Create a slow block database to control migration speed + blockCount := 0 + slowDB := &slowDatabase{ + HeightIndex: wrapper.bodyDB, + shouldSlow: func() bool { + blockCount++ + return blockCount > 5 // Slow down after 5 blocks + }, + } + + // Create and start migrator manually with slow block database + wrapper.migrator.bodyDB = slowDB + require.NoError(t, wrapper.migrator.Migrate()) + + // Wait for at least 5 blocks to be migrated + require.Eventually(t, func() bool { + return wrapper.migrator.blocksProcessed() >= 5 + }, 15*time.Second, 100*time.Millisecond) + + // Write block 21 to the wrapper database (this simulates a new block being added during migration) + writeBlocks(wrapper, blocks[20:21], receipts[20:21]) + + // Verify all 21 blocks are readable via rawdb using the wrapper + for i, block := range blocks { + blockNum := block.NumberU64() + expectedReceipts := receipts[i] + + // Test reading block header and body + require.True(t, rawdb.HasHeader(wrapper, block.Hash(), blockNum)) + require.True(t, rawdb.HasBody(wrapper, block.Hash(), blockNum)) + actualBlock := rawdb.ReadBlock(wrapper, block.Hash(), blockNum) + require.NotNil(t, actualBlock, "Block %d should be readable", blockNum) + assertRLPEqual(t, block, actualBlock) + actualHeader := rawdb.ReadHeader(wrapper, block.Hash(), blockNum) + require.NotNil(t, actualHeader, "Block %d header should be readable", blockNum) + assertRLPEqual(t, block.Header(), actualHeader) + actualBody := rawdb.ReadBody(wrapper, block.Hash(), blockNum) + require.NotNil(t, actualBody, "Block %d body should be readable", blockNum) + assertRLPEqual(t, block.Body(), actualBody) + + // Test reading receipts and logs + actualReceipts := rawdb.ReadReceipts(wrapper, block.Hash(), blockNum, block.Time(), params.TestChainConfig) + assertRLPEqual(t, expectedReceipts, actualReceipts) + actualLogs := rawdb.ReadLogs(wrapper, block.Hash(), blockNum) + expectedLogs := make([][]*types.Log, len(expectedReceipts)) + for j, receipt := range expectedReceipts { + expectedLogs[j] = receipt.Logs + } + assertRLPEqual(t, expectedLogs, actualLogs) + + // Header number should be readable + actualBlockNumber := rawdb.ReadHeaderNumber(wrapper, block.Hash()) + require.NotNil(t, actualBlockNumber, "Block %d number mapping should be readable", blockNum) + require.Equal(t, blockNum, *actualBlockNumber) + } + + require.NoError(t, wrapper.Close()) +} + +func TestDatabase_Initialization(t *testing.T) { + blocks, _ := createBlocks(t, 10) + + testCases := []struct { + name string + stateSyncEnabled bool + chainDBBlocks []*types.Block + existingDBMinHeight *uint64 + expInitialized bool + expMinHeight uint64 + expMinHeightSet bool + }{ + { + name: "empty_chainDB_no_state_sync", + stateSyncEnabled: false, + expInitialized: true, + expMinHeight: 1, + expMinHeightSet: true, + }, + { + name: "empty_chainDB_state_sync_no_init", + stateSyncEnabled: true, + expInitialized: false, + expMinHeight: 0, + expMinHeightSet: false, + }, + { + name: "migration_needed", + stateSyncEnabled: false, + chainDBBlocks: blocks[5:10], + expInitialized: true, + expMinHeight: 5, + expMinHeightSet: true, + }, + { + name: "migration_needed_with_genesis", + stateSyncEnabled: false, + chainDBBlocks: append([]*types.Block{blocks[0]}, blocks[5:10]...), + expInitialized: true, + expMinHeight: 5, + expMinHeightSet: true, + }, + { + name: "migration_needed_state_sync", + stateSyncEnabled: true, + chainDBBlocks: blocks[5:10], + expInitialized: true, + expMinHeight: 5, + expMinHeightSet: true, + }, + { + name: "existing_db_created_with_min_height", + existingDBMinHeight: func() *uint64 { v := uint64(2); return &v }(), + stateSyncEnabled: false, + chainDBBlocks: blocks[5:8], + expInitialized: true, + expMinHeight: 2, + expMinHeightSet: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + dataDir := t.TempDir() + base, err := leveldb.New(dataDir, nil, logging.NoLog{}, prometheus.NewRegistry()) + require.NoError(t, err) + chainDB := rawdb.NewDatabase(evmdb.WrapDatabase(base)) + blockDBPath := filepath.Join(dataDir, "blockdb") + + // Create the block database with an existing min height if needed + if tc.existingDBMinHeight != nil { + minHeight, err := getDatabaseMinHeight(base) + require.NoError(t, err) + require.Nil(t, minHeight) + wrapper := New( + base, + chainDB, + blockdb.DefaultConfig(), + blockDBPath, + logging.NoLog{}, + prometheus.NewRegistry(), + ) + require.NoError(t, wrapper.InitWithMinHeight(*tc.existingDBMinHeight)) + require.NoError(t, wrapper.bodyDB.Close()) + require.NoError(t, wrapper.receiptsDB.Close()) + minHeight, err = getDatabaseMinHeight(base) + require.NoError(t, err) + require.Equal(t, *tc.existingDBMinHeight, *minHeight) + } + + // write chainDB blocks if needed + if len(tc.chainDBBlocks) > 0 { + writeBlocks(chainDB, tc.chainDBBlocks, []types.Receipts{}) + } + + // Create wrapper database + wrapper := New( + base, + chainDB, + blockdb.DefaultConfig(), + blockDBPath, + logging.NoLog{}, + prometheus.NewRegistry(), + ) + initialized, err := wrapper.InitWithStateSync(tc.stateSyncEnabled) + require.NoError(t, err) + require.Equal(t, tc.expInitialized, initialized) + require.Equal(t, tc.expInitialized, wrapper.initialized) + + // Verify initialization state and min height + if tc.expMinHeightSet { + require.Equal(t, tc.expMinHeight, wrapper.minHeight) + } else { + require.Equal(t, uint64(0), wrapper.minHeight) + } + + require.NoError(t, wrapper.Close()) + }) + } +} + +// Test that genesis block (block number 0) behavior works correctly. +// Genesis blocks should only exist in chainDB and not in the wrapper's block database. +func TestDatabase_Genesis(t *testing.T) { + dataDir := t.TempDir() + wrapper, chainDB := newDatabasesFromDir(t, dataDir) + require.True(t, wrapper.initialized) + require.Equal(t, uint64(1), wrapper.minHeight) + blocks, receipts := createBlocks(t, 10) + writeBlocks(wrapper, blocks, receipts) + + // validate genesis block can be retrieved and its stored in chainDB + genesisHash := rawdb.ReadCanonicalHash(chainDB, 0) + genesisBlock := rawdb.ReadBlock(wrapper, genesisHash, 0) + assertRLPEqual(t, blocks[0], genesisBlock) + has, _ := wrapper.bodyDB.Has(0) + require.False(t, has) + require.Equal(t, uint64(1), wrapper.minHeight) +} + +func assertRLPEqual(t *testing.T, expected, actual interface{}) { + t.Helper() + + expectedBytes, err := rlp.EncodeToBytes(expected) + require.NoError(t, err) + actualBytes, err := rlp.EncodeToBytes(actual) + require.NoError(t, err) + require.Equal(t, expectedBytes, actualBytes) +} diff --git a/plugin/evm/database/blockdb/migrator.go b/plugin/evm/database/blockdb/migrator.go new file mode 100644 index 0000000000..ad61ce47b6 --- /dev/null +++ b/plugin/evm/database/blockdb/migrator.go @@ -0,0 +1,494 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package blockdb + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/utils/timer" + "github.com/ava-labs/libevm/common" + "github.com/ava-labs/libevm/core/rawdb" + "github.com/ava-labs/libevm/ethdb" + "github.com/ava-labs/libevm/log" + "github.com/ava-labs/libevm/rlp" +) + +type migrationStatus int + +const ( + migrationNotStarted migrationStatus = iota + migrationInProgress + migrationCompleted + + logProgressInterval = 5 * time.Minute // Log every 5 minutes + compactionInterval = 250_000 // Compact every 250k blocks processed +) + +var ( + // migrationStatusKey stores the persisted progress state for the migrator. + migrationStatusKey = []byte("migration_status") + + // endBlockNumberKey stores the target block number to migrate to. + endBlockNumberKey = []byte("migration_end_block_number") +) + +// migrator migrates canonical block data and receipts from +// ethdb.Database into the height-indexed block and receipt databases. +type migrator struct { + stateDB database.Database + chainDB ethdb.Database + headerDB database.HeightIndex + bodyDB database.HeightIndex + receiptsDB database.HeightIndex + + status migrationStatus + mu sync.RWMutex // protects status/running/cancel + running bool + cancel context.CancelFunc + wg sync.WaitGroup + + processed uint64 + endHeight uint64 +} + +// NewMigrator creates a new block database migrator with +// current migration status and target migration end block number. +func NewMigrator( + stateDB database.Database, + headerDB database.HeightIndex, + bodyDB database.HeightIndex, + receiptsDB database.HeightIndex, + chainDB ethdb.Database, +) (*migrator, error) { + m := &migrator{ + headerDB: headerDB, + bodyDB: bodyDB, + receiptsDB: receiptsDB, + stateDB: stateDB, + chainDB: chainDB, + } + + // load status + status, err := getMigrationStatus(stateDB) + if err != nil { + return nil, err + } + m.status = status + + // load end block height + endHeight, err := getEndBlockHeight(stateDB) + if err != nil { + return nil, err + } + + if endHeight == 0 { + if endHeight, err = loadAndSaveBlockEndHeight(stateDB, chainDB); err != nil { + return nil, err + } + } + m.endHeight = endHeight + + return m, nil +} + +func (b *migrator) Status() migrationStatus { + b.mu.RLock() + defer b.mu.RUnlock() + return b.status +} + +func (b *migrator) Stop() { + b.mu.Lock() + cancel := b.cancel + b.mu.Unlock() + if cancel != nil { + cancel() + // Wait for migration goroutine to finish cleanup + b.wg.Wait() + } +} + +func (b *migrator) Migrate() error { + if b.status == migrationCompleted { + return nil + } + ctx, err := b.beginRun() + if err != nil { + return err + } + + if err := b.setStatus(migrationInProgress); err != nil { + b.endRun() + return err + } + + b.wg.Add(1) + go func() { + defer b.wg.Done() + defer b.endRun() + if err := b.run(ctx); err != nil && !errors.Is(err, context.Canceled) { + log.Info("migration failed", "err", err) + } + }() + return nil +} + +func (b *migrator) beginRun() (context.Context, error) { + b.mu.Lock() + defer b.mu.Unlock() + + if b.running { + return nil, errors.New("migration already running") + } + ctx, cancel := context.WithCancel(context.Background()) + b.cancel = cancel + b.running = true + return ctx, nil +} + +func (b *migrator) endRun() { + b.mu.Lock() + defer b.mu.Unlock() + b.cancel = nil + b.running = false +} + +func (b *migrator) setStatus(s migrationStatus) error { + b.mu.Lock() + defer b.mu.Unlock() + if b.status == s { + return nil + } + if err := b.stateDB.Put(migrationStatusKey, []byte{byte(s)}); err != nil { + return err + } + b.status = s + return nil +} + +func (b *migrator) run(ctx context.Context) error { + var ( + etaTarget uint64 + etaTracker = timer.NewEtaTracker(10, 1.2) + startTime = time.Now() + timeOfNextLog = startTime.Add(logProgressInterval) + deleteBatch = b.chainDB.NewBatch() + lastCompactionNum uint64 + firstBlockInRange uint64 + lastBlockInRange uint64 + // Iterate over block bodies instead of headers since there are keys + // under the header prefix that we are not migrating. + iter = b.chainDB.NewIterator(chainDBBlockBodyPrefix, nil) + ) + + // Defer cleanup logic + defer func() { + // Release iterator (safe to call multiple times) + iter.Release() + + if err := deleteBatch.Write(); err != nil { + log.Error("failed to write final delete batch", "err", err) + } + + // Compact final range if we processed any blocks after last interval compaction + if firstBlockInRange > 0 && lastBlockInRange > 0 { + b.compactBlockRange(firstBlockInRange, lastBlockInRange) + } + + processingTime := time.Since(startTime) + log.Info("blockdb migration completed", + "blocks_processed", atomic.LoadUint64(&b.processed), + "total_processing_time", processingTime.String()) + }() + + log.Info("blockdb migration started") + + // iterator will iterate all block headers in ascending order by block number + for iter.Next() { + // Check if migration should be stopped + select { + case <-ctx.Done(): + log.Info("migration stopped", "blocks_processed", atomic.LoadUint64(&b.processed)) + return ctx.Err() + default: + // Continue with migration + } + + key := iter.Key() + if !shouldMigrateKey(b.chainDB, key) { + continue + } + + blockNum, hash := blockNumberAndHashFromKey(key) + + if etaTarget == 0 && b.endHeight > 0 && blockNum < b.endHeight { + etaTarget = b.endHeight - blockNum + etaTracker.AddSample(0, etaTarget, startTime) + } + + // Track the range of blocks for compaction + if firstBlockInRange == 0 { + firstBlockInRange = blockNum + } + lastBlockInRange = blockNum + + // Migrate block data (header + body) + if err := b.migrateBlock(blockNum, hash, iter.Value()); err != nil { + return fmt.Errorf("failed to migrate block data: %w", err) + } + + // Migrate receipts + if err := b.migrateReceipts(blockNum, hash); err != nil { + return fmt.Errorf("failed to migrate receipt data: %w", err) + } + + // Add deletes to batch + if err := b.deleteBlock(deleteBatch, blockNum, hash); err != nil { + return fmt.Errorf("failed to add block deletes to batch: %w", err) + } + processed := atomic.AddUint64(&b.processed, 1) + + if deleteBatch.ValueSize() > ethdb.IdealBatchSize { + if err := deleteBatch.Write(); err != nil { + return fmt.Errorf("failed to write delete batch: %w", err) + } + deleteBatch.Reset() + } + + // Compact every compactionInterval blocks + if processed-lastCompactionNum >= compactionInterval { + // Write any remaining deletes in batch before compaction + if deleteBatch.ValueSize() > 0 { + if err := deleteBatch.Write(); err != nil { + return fmt.Errorf("failed to write delete batch before compaction: %w", err) + } + deleteBatch.Reset() + } + + // Release iterator before compaction + iter.Release() + + // Compact the range we just processed + b.compactBlockRange(firstBlockInRange, lastBlockInRange) + + // Recreate iterator + start := encodeBlockNumber(blockNum + 1) + iter = b.chainDB.NewIterator(chainDBBlockBodyPrefix, start) + lastCompactionNum = processed + firstBlockInRange = 0 + lastBlockInRange = 0 + } + + // Log progress every logProgressInterval + if now := time.Now(); now.After(timeOfNextLog) { + logFields := []interface{}{ + "blocks_processed", processed, + "last_processed_height", blockNum, + "time_elapsed", time.Since(startTime), + } + if b.endHeight != 0 && etaTarget > 0 { + etaPtr, progressPercentage := etaTracker.AddSample(processed, etaTarget, now) + if etaPtr != nil { + logFields = append(logFields, "eta", etaPtr.String()) + logFields = append(logFields, "pctComplete", progressPercentage) + } + } + + log.Info("blockdb migration status", logFields...) + timeOfNextLog = now.Add(logProgressInterval) + } + } + + if iter.Error() != nil { + return fmt.Errorf("failed to iterate over chainDB: %w", iter.Error()) + } + + if err := b.setStatus(migrationCompleted); err != nil { + log.Error("failed to save completed migration status", "err", err) + } + + return nil +} + +func (b *migrator) compactBlockRange(startBlock, endBlock uint64) { + startTime := time.Now() + + // Compact block headers + startHeaderKey := blockHeaderKey(startBlock, common.Hash{}) + endHeaderKey := blockHeaderKey(endBlock+1, common.Hash{}) + if err := b.chainDB.Compact(startHeaderKey, endHeaderKey); err != nil { + log.Error("failed to compact block headers in range", "start_block", startBlock, "end_block", endBlock, "err", err) + } + + // Compact block bodies + startBodyKey := blockBodyKey(startBlock, common.Hash{}) + endBodyKey := blockBodyKey(endBlock+1, common.Hash{}) + if err := b.chainDB.Compact(startBodyKey, endBodyKey); err != nil { + log.Error("failed to compact block bodies in range", "start_block", startBlock, "end_block", endBlock, "err", err) + } + + // Compact receipts for this range + startReceiptsKey := receiptsKey(startBlock, common.Hash{}) + endReceiptsKey := receiptsKey(endBlock+1, common.Hash{}) + if err := b.chainDB.Compact(startReceiptsKey, endReceiptsKey); err != nil { + log.Error("failed to compact receipts in range", "start_block", startBlock, "end_block", endBlock, "err", err) + } + + log.Info("compaction of block range completed", + "start_block", startBlock, + "end_block", endBlock, + "duration", time.Since(startTime)) +} + +func (b *migrator) migrateBlock(blockNum uint64, hash common.Hash, bodyBytes []byte) error { + header := rawdb.ReadHeader(b.chainDB, hash, blockNum) + headerBytes, err := rlp.EncodeToBytes(header) + if err != nil { + return fmt.Errorf("failed to encode block header: %w", err) + } + if err := writeHashAndData(b.headerDB, blockNum, hash, headerBytes); err != nil { + return fmt.Errorf("failed to write header to headersdb: %w", err) + } + if err := writeHashAndData(b.bodyDB, blockNum, hash, bodyBytes); err != nil { + return fmt.Errorf("failed to write body to bodiesdb: %w", err) + } + return nil +} + +func (b *migrator) migrateReceipts(blockNum uint64, hash common.Hash) error { + // Read raw receipt bytes directly from chainDB + receiptBytes := rawdb.ReadReceiptsRLP(b.chainDB, hash, blockNum) + if receiptBytes == nil { + // No receipts for this block, skip + return nil + } + + if err := writeHashAndData(b.receiptsDB, blockNum, hash, receiptBytes); err != nil { + return fmt.Errorf("failed to write receipts to receiptsDB: %w", err) + } + + return nil +} + +// deleteBlock adds delete operations for a block to the provided batch +func (b *migrator) deleteBlock(batch ethdb.Batch, blockNum uint64, hash common.Hash) error { + headerKey := blockHeaderKey(blockNum, hash) + if err := batch.Delete(headerKey); err != nil { + return fmt.Errorf("failed to delete header from chainDB: %w", err) + } + rawdb.DeleteBody(batch, hash, blockNum) + rawdb.DeleteReceipts(batch, hash, blockNum) + + return nil +} + +func (b *migrator) blocksProcessed() uint64 { + return atomic.LoadUint64(&b.processed) +} + +func getMigrationStatus(db database.Database) (migrationStatus, error) { + var status migrationStatus + has, err := db.Has(migrationStatusKey) + if err != nil { + return status, err + } + if !has { + return status, nil + } + b, err := db.Get(migrationStatusKey) + if err != nil { + return status, err + } + if len(b) != 1 { + return status, fmt.Errorf("invalid migration status encoding length=%d", len(b)) + } + return migrationStatus(b[0]), nil +} + +func getEndBlockHeight(db database.Database) (uint64, error) { + has, err := db.Has(endBlockNumberKey) + if err != nil { + return 0, err + } + if !has { + return 0, nil + } + blockNumberBytes, err := db.Get(endBlockNumberKey) + if err != nil { + return 0, err + } + if len(blockNumberBytes) != blockNumberSize { + return 0, fmt.Errorf("invalid block number encoding length=%d", len(blockNumberBytes)) + } + return binary.BigEndian.Uint64(blockNumberBytes), nil +} + +func loadAndSaveBlockEndHeight(stateDB database.Database, chainDB ethdb.Database) (uint64, error) { + headHash := rawdb.ReadHeadHeaderHash(chainDB) + if headHash == (common.Hash{}) { + return 0, nil + } + headBlockNumber := rawdb.ReadHeaderNumber(chainDB, headHash) + if headBlockNumber == nil || *headBlockNumber == 0 { + return 0, nil + } + endHeight := *headBlockNumber + if err := stateDB.Put(endBlockNumberKey, encodeBlockNumber(endHeight)); err != nil { + return 0, fmt.Errorf("failed to save head block number %d: %w", endHeight, err) + } + log.Info("blockdb migration: saved head block number", "head_block_number", endHeight) + return endHeight, nil +} + +func shouldMigrateKey(db ethdb.Database, key []byte) bool { + if !isBodyKey(key) { + return false + } + blockNum, hash := blockNumberAndHashFromKey(key) + + // Skip genesis blocks to avoid complicating state-sync min-height handling. + if blockNum == 0 { + return false + } + + canonicalHash := rawdb.ReadCanonicalHash(db, blockNum) + return canonicalHash == hash +} + +// blockHeaderKey = headerPrefix + num (uint64 big endian) + hash +func blockHeaderKey(number uint64, hash common.Hash) []byte { + return append(append(chainDBHeaderPrefix, encodeBlockNumber(number)...), hash.Bytes()...) +} + +// blockBodyKey = bodyPrefix + num (uint64 big endian) + hash +func blockBodyKey(number uint64, hash common.Hash) []byte { + return append(append(chainDBBlockBodyPrefix, encodeBlockNumber(number)...), hash.Bytes()...) +} + +// receiptsKey = receiptsPrefix + num (uint64 big endian) + hash +func receiptsKey(number uint64, hash common.Hash) []byte { + return append(append(chainDBReceiptsPrefix, encodeBlockNumber(number)...), hash.Bytes()...) +} + +// minBlockHeightToMigrate returns the smallest block number that should be migrated. +func minBlockHeightToMigrate(db ethdb.Database) *uint64 { + iter := db.NewIterator(chainDBBlockBodyPrefix, nil) + defer iter.Release() + for iter.Next() { + key := iter.Key() + if !shouldMigrateKey(db, key) { + continue + } + blockNum, _ := blockNumberAndHashFromKey(key) + return &blockNum + } + return nil +} diff --git a/plugin/evm/database/blockdb/migrator_test.go b/plugin/evm/database/blockdb/migrator_test.go new file mode 100644 index 0000000000..36d13a1893 --- /dev/null +++ b/plugin/evm/database/blockdb/migrator_test.go @@ -0,0 +1,359 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package blockdb + +import ( + "path/filepath" + "testing" + "time" + + "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/database/leveldb" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/x/blockdb" + "github.com/ava-labs/libevm/common" + "github.com/ava-labs/libevm/core/rawdb" + "github.com/ava-labs/libevm/core/types" + "github.com/ava-labs/libevm/params" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + + evmdb "github.com/ava-labs/coreth/plugin/evm/database" +) + +// Tests migration status is correctly set to "not started" after initializing a new migrator. +func TestMigrator_StatusNotStarted(t *testing.T) { + dataDir := t.TempDir() + wrapper, _ := newDatabasesFromDir(t, dataDir) + t.Cleanup(func() { + require.NoError(t, wrapper.Close()) + }) + assertMigrationStatus(t, wrapper.migrator.stateDB, wrapper.migrator, migrationNotStarted) +} + +// Tests migration status transitions to "in progress" while a migration is actively running. +func TestMigrator_StatusInProgress(t *testing.T) { + dataDir := t.TempDir() + wrapper, chainDB := newDatabasesFromDir(t, dataDir) + t.Cleanup(func() { + require.NoError(t, wrapper.Close()) + }) + blocks, receipts := createBlocks(t, 10) + writeBlocks(chainDB, blocks, receipts) + + // migrate with a slow block database + slowDB := &slowDatabase{ + HeightIndex: wrapper.bodyDB, + shouldSlow: func() bool { + return true + }, + } + wrapper.migrator.bodyDB = slowDB + require.NoError(t, wrapper.migrator.Migrate()) + + // Wait for it to be in progress + require.Eventually(t, func() bool { + return wrapper.migrator.Status() == migrationInProgress + }, 2*time.Second, 100*time.Millisecond) + + assertMigrationStatus(t, wrapper.migrator.stateDB, wrapper.migrator, migrationInProgress) +} + +// Tests migration status is correctly set to "completed" after a successful migration. +func TestMigrator_StatusCompleted(t *testing.T) { + dataDir := t.TempDir() + wrapper, chainDB := newDatabasesFromDir(t, dataDir) + t.Cleanup(func() { + require.NoError(t, wrapper.Close()) + }) + + blocks, receipts := createBlocks(t, 3) + writeBlocks(chainDB, blocks, receipts) + wrapper.migrator.bodyDB = wrapper.bodyDB + require.NoError(t, wrapper.migrator.Migrate()) + waitForMigratorCompletion(t, wrapper.migrator, 5*time.Second) + assertMigrationStatus(t, wrapper.migrator.stateDB, wrapper.migrator, migrationCompleted) +} + +func TestMigrator_Migration(t *testing.T) { + testCases := []struct { + name string + initStatus migrationStatus + toMigrateHeights []uint64 + migratedHeights []uint64 + expStatus migrationStatus + }{ + { + name: "migrate_5_blocks", + toMigrateHeights: []uint64{0, 1, 2, 3, 4}, + expStatus: migrationCompleted, + }, + { + name: "migrate_5_blocks_20_24", + toMigrateHeights: []uint64{20, 21, 22, 23, 24}, + expStatus: migrationCompleted, + }, + { + name: "migrate_non_consecutive_blocks", + toMigrateHeights: []uint64{20, 21, 22, 29, 30, 40}, + expStatus: migrationCompleted, + }, + { + name: "half_blocks_migrated", + initStatus: migrationInProgress, + toMigrateHeights: []uint64{6, 7, 8, 9, 10}, + migratedHeights: []uint64{0, 1, 2, 3, 4, 5}, + expStatus: migrationCompleted, + }, + { + name: "all_blocks_migrated_in_progress", + initStatus: migrationInProgress, + migratedHeights: []uint64{0, 1, 2, 3, 4, 5}, + expStatus: migrationCompleted, + }, + { + name: "empty_chainDB_no_blocks_to_migrate", + expStatus: migrationCompleted, + }, + { + name: "half_non_consecutive_blocks_migrated", + initStatus: migrationInProgress, + toMigrateHeights: []uint64{2, 3, 7, 8, 10}, + migratedHeights: []uint64{0, 1, 4, 5, 9}, + expStatus: migrationCompleted, + }, + { + name: "migration_already_completed", + initStatus: migrationCompleted, + migratedHeights: []uint64{0, 1, 2}, + expStatus: migrationCompleted, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + dataDir := t.TempDir() + wrapper, chainDB := newDatabasesFromDir(t, dataDir) + t.Cleanup(func() { + require.NoError(t, wrapper.Close()) + }) + + // find the max block height to create + maxBlockHeight := uint64(0) + heightSets := [][]uint64{tc.toMigrateHeights, tc.migratedHeights} + for _, heights := range heightSets { + for _, height := range heights { + if height > maxBlockHeight { + maxBlockHeight = height + } + } + } + + // create blocks and receipts + blocks, receipts := createBlocks(t, int(maxBlockHeight)+1) + + // set initial state + if tc.initStatus != migrationNotStarted { + require.NoError(t, wrapper.migrator.setStatus(tc.initStatus)) + } + for _, height := range tc.toMigrateHeights { + writeBlocks(chainDB, []*types.Block{blocks[height]}, []types.Receipts{receipts[height]}) + } + for _, height := range tc.migratedHeights { + writeBlocks(wrapper, []*types.Block{blocks[height]}, []types.Receipts{receipts[height]}) + } + + // Migrate database + require.NoError(t, wrapper.migrator.Migrate()) + waitForMigratorCompletion(t, wrapper.migrator, 10*time.Second) + assertMigrationStatus(t, wrapper.migrator.stateDB, wrapper.migrator, tc.expStatus) + + // Verify that all blocks and receipts are accessible from the block wrapper + totalBlocks := len(tc.toMigrateHeights) + len(tc.migratedHeights) + allExpectedBlocks := make(map[uint64]*types.Block, totalBlocks) + allExpectedReceipts := make(map[uint64]types.Receipts, totalBlocks) + for _, heights := range heightSets { + for _, height := range heights { + allExpectedBlocks[height] = blocks[height] + allExpectedReceipts[height] = receipts[height] + } + } + require.Len(t, allExpectedBlocks, totalBlocks) + for blockNum, expectedBlock := range allExpectedBlocks { + actualBlock := rawdb.ReadBlock(wrapper, expectedBlock.Hash(), blockNum) + require.NotNil(t, actualBlock, "Block %d should be accessible", blockNum) + assertRLPEqual(t, expectedBlock, actualBlock) + expectedReceipts := allExpectedReceipts[blockNum] + actualReceipts := rawdb.ReadReceipts(wrapper, expectedBlock.Hash(), blockNum, expectedBlock.Time(), params.TestChainConfig) + assertRLPEqual(t, expectedReceipts, actualReceipts) + + // verify chainDB no longer has any blocks or receipts (except for genesis) + if expectedBlock.NumberU64() != 0 { + require.False(t, rawdb.HasHeader(chainDB, expectedBlock.Hash(), expectedBlock.NumberU64()), "Block %d should not be in chainDB", expectedBlock.NumberU64()) + require.False(t, rawdb.HasBody(chainDB, expectedBlock.Hash(), expectedBlock.NumberU64())) + require.False(t, rawdb.HasReceipts(chainDB, expectedBlock.Hash(), expectedBlock.NumberU64())) + } + } + }) + } +} + +// Tests that a migration in progress can be stopped and resumed, verifying +// that the migration state is properly persisted and that the migration +// can continue from where it left off after restart. +func TestMigrator_AbruptStop(t *testing.T) { + dataDir := t.TempDir() + wrapper, chainDB := newDatabasesFromDir(t, dataDir) + t.Cleanup(func() { + if wrapper != nil { + require.NoError(t, wrapper.Close()) + } + }) + + // Create blocks and write them to chainDB + blocks, receipts := createBlocks(t, 10) + writeBlocks(chainDB, blocks, receipts) + + // Create a slow block database that slows down after 3 blocks + blockCount := 0 + slowBdb := &slowDatabase{ + HeightIndex: wrapper.bodyDB, + shouldSlow: func() bool { + blockCount++ + return blockCount > 3 + }, + } + + // Start migration with slow block database and wait for 3 blocks to be migrated + wrapper.migrator.bodyDB = slowBdb + require.NoError(t, wrapper.migrator.Migrate()) + require.Eventually(t, func() bool { + return wrapper.migrator.blocksProcessed() >= 3 + }, 10*time.Second, 100*time.Millisecond) + require.NoError(t, wrapper.Close()) + + // Create new wrapper and verify migration status + wrapper, chainDB = newDatabasesFromDir(t, dataDir) + assertMigrationStatus(t, wrapper.migrator.stateDB, wrapper.migrator, migrationInProgress) + + // Check bodyDB and chainDB to ensure we have the correct amount of blocks + chainDBBlocks := 0 + blockdbBlocks := 0 + wrapperBlocks := 0 + for _, block := range blocks { + if rawdb.HasHeader(chainDB, block.Hash(), block.NumberU64()) { + chainDBBlocks++ + } + has, err := wrapper.bodyDB.Has(block.NumberU64()) + require.NoError(t, err) + if has { + blockdbBlocks++ + } + if rawdb.HasHeader(wrapper, block.Hash(), block.NumberU64()) { + wrapperBlocks++ + } + } + require.Equal(t, 10, chainDBBlocks+blockdbBlocks) + require.Equal(t, 10, wrapperBlocks) + + // Start migration again and finish + require.NoError(t, wrapper.migrator.Migrate()) + waitForMigratorCompletion(t, wrapper.migrator, 15*time.Second) + assertMigrationStatus(t, wrapper.migrator.stateDB, wrapper.migrator, migrationCompleted) + + // Verify that all blocks are accessible from the new wrapper + for i, block := range blocks { + actualBlock := rawdb.ReadBlock(wrapper, block.Hash(), block.NumberU64()) + require.NotNil(t, actualBlock, "Block %d should be accessible after resumption", block.NumberU64()) + assertRLPEqual(t, block, actualBlock) + actualReceipts := rawdb.ReadReceipts(wrapper, block.Hash(), block.NumberU64(), block.Time(), params.TestChainConfig) + assertRLPEqual(t, receipts[i], actualReceipts) + + // chainDB no longer has blocks except for genesis + if block.NumberU64() != 0 { + require.False(t, rawdb.HasHeader(chainDB, block.Hash(), block.NumberU64())) + require.False(t, rawdb.HasBody(chainDB, block.Hash(), block.NumberU64())) + require.False(t, rawdb.HasReceipts(chainDB, block.Hash(), block.NumberU64())) + } + } +} + +// Test that the genesis block is not migrated; the rest of the blocks should be migrated. +func TestMigrator_Genesis(t *testing.T) { + dataDir := t.TempDir() + base, err := leveldb.New(dataDir, nil, logging.NoLog{}, prometheus.NewRegistry()) + require.NoError(t, err) + chainDB := rawdb.NewDatabase(evmdb.WrapDatabase(base)) + blocks, receipts := createBlocks(t, 10) + + // Write only genesis and block 5-9 + writeBlocks(chainDB, blocks[0:1], receipts[0:1]) + writeBlocks(chainDB, blocks[5:10], receipts[5:10]) + + blockDBPath := filepath.Join(dataDir, "blockdb") + wrapper := New(base, chainDB, blockdb.DefaultConfig(), blockDBPath, logging.NoLog{}, prometheus.NewRegistry()) + initialized, err := wrapper.InitWithStateSync(false) + require.NoError(t, err) + require.True(t, initialized) + t.Cleanup(func() { + require.NoError(t, wrapper.Close()) + }) + + // validate our min height is 5 since we don't store genesis block and first + // block to migrate is block 5. + require.True(t, wrapper.initialized) + require.Equal(t, uint64(5), wrapper.minHeight) + + // migrate and wait for completion + require.NoError(t, wrapper.migrator.Migrate()) + waitForMigratorCompletion(t, wrapper.migrator, 10*time.Second) + + // verify genesis block is not migrated + genesisHash := rawdb.ReadCanonicalHash(chainDB, 0) + require.True(t, rawdb.HasHeader(chainDB, genesisHash, 0)) + has, _ := wrapper.bodyDB.Has(0) + require.False(t, has) + + // verify blocks 1-4 are missing + for i := 1; i < 5; i++ { + hash := rawdb.ReadCanonicalHash(wrapper, uint64(i)) + require.Equal(t, common.Hash{}, hash) + } + + // verify blocks 5-9 are migrated + for _, expectedBlock := range blocks[5:10] { + actualBlock := rawdb.ReadBlock(wrapper, expectedBlock.Hash(), expectedBlock.NumberU64()) + require.NotNil(t, actualBlock) + assertRLPEqual(t, expectedBlock, actualBlock) + has, err := wrapper.bodyDB.Has(expectedBlock.NumberU64()) + require.NoError(t, err) + require.True(t, has) + } +} + +func waitForMigratorCompletion(t *testing.T, migrator *migrator, timeout time.Duration) { + t.Helper() + + deadline := time.Now().Add(timeout) + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + for time.Now().Before(deadline) { + if migrator.Status() == migrationCompleted { + return + } + <-ticker.C + } + + require.Failf(t, "migration did not complete within timeout", "timeout: %v", timeout) +} + +func assertMigrationStatus(t *testing.T, db database.Database, migrator *migrator, expectedStatus migrationStatus) { + t.Helper() + + require.Equal(t, expectedStatus, migrator.Status(), "migrator status should match expected") + diskStatus, err := getMigrationStatus(db) + require.NoError(t, err) + require.Equal(t, expectedStatus, diskStatus, "disk database status should match expected") +}