diff --git a/.changelog/6321.feature.md b/.changelog/6321.feature.md new file mode 100644 index 00000000000..f14494fade2 --- /dev/null +++ b/.changelog/6321.feature.md @@ -0,0 +1,7 @@ +go/oasis-node: Add new command for offline pruning of consensus databases + +A new experimental command `oasis-node storage prune-experimental` +was added, enabling offline pruning as specified in the configuration. + +Operators are encouraged to run this command whenever they change pruning +configuration, to ensure node is healthy when it starts. diff --git a/docs/oasis-node/cli.md b/docs/oasis-node/cli.md index 464540bf148..80a4941952c 100644 --- a/docs/oasis-node/cli.md +++ b/docs/oasis-node/cli.md @@ -336,7 +336,7 @@ oasis1qqncl383h8458mr9cytatygctzwsx02n4c5f8ed7 ### compact-experimental -Run +Run (when the node is not running): ```sh oasis-node storage compact-experimental --config /path/to/config/file @@ -364,3 +364,26 @@ may stay constant or not be reclaimed for a very long time. This command gives operators manual control to release disk space during maintenance periods. + +### prune-experimental + +Run (when the node is not running): + +```sh +oasis-node storage prune-experimental --config /path/to/config/file +``` + +to trigger manual pruning of consensus database instances: + +```sh +{"caller":"storage.go:433","level":"info","module":"cmd/storage", \ +"msg":"Starting consensus databases pruning. This may take a while...", \ +"ts":"2025-10-23T11:02:11.129822974Z"} +``` + +Operators should run this whenever they change pruning configuration, e.g. when +enabling it for the first time, or later changing it to retain less data. This +way they guarantee the node is healthy when it starts. + +Following successful pruning, to release disk space, they are encouraged to run +[the compaction command](#compact-experimental). diff --git a/go/consensus/cometbft/abci/prune.go b/go/consensus/cometbft/abci/prune.go index 6aa24599536..487bac1a4e5 100644 --- a/go/consensus/cometbft/abci/prune.go +++ b/go/consensus/cometbft/abci/prune.go @@ -200,6 +200,8 @@ func (p *genericPruner) canPrune(v int64) error { return nil } +// Warning: When registering new handler DO NOT forget to update the logic for +// "oasis-node storage prune" command as well. func (p *genericPruner) RegisterHandler(handler consensus.StatePruneHandler) { p.Lock() defer p.Unlock() diff --git a/go/consensus/cometbft/db/init.go b/go/consensus/cometbft/db/init.go index 167c7dcfb0a..34161d8dc43 100644 --- a/go/consensus/cometbft/db/init.go +++ b/go/consensus/cometbft/db/init.go @@ -2,19 +2,24 @@ package db import ( + "fmt" + dbm "github.com/cometbft/cometbft-db" + cmtconfig "github.com/cometbft/cometbft/config" "github.com/cometbft/cometbft/node" + cmtnode "github.com/cometbft/cometbft/node" + "github.com/cometbft/cometbft/state" "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/db/badger" ) -// GetBackendName returns the currently configured CometBFT database backend. -func GetBackendName() string { +// BackendName returns the currently configured CometBFT database backend. +func BackendName() string { return badger.BackendName } -// GetProvider returns the currently configured CometBFT DBProvider. -func GetProvider() (node.DBProvider, error) { +// Provider returns the currently configured CometBFT DBProvider. +func Provider() (node.DBProvider, error) { return badger.DBProvider, nil } @@ -22,3 +27,36 @@ func GetProvider() (node.DBProvider, error) { func New(fn string, noSuffix bool) (dbm.DB, error) { return badger.New(fn, noSuffix) } + +// OpenBlockstoreDB opens a CometBFT managed blockstore DB. +// +// This function is a hack as CometBFT does not expose a way to access the underlying databases. +func OpenBlockstoreDB(provider cmtnode.DBProvider, cfg *cmtconfig.Config) (dbm.DB, error) { + // NOTE: DBContext uses a full CometBFT config but the only thing that is actually used + // is the data dir field. + db, err := provider(&cmtnode.DBContext{ID: "blockstore", Config: cfg}) + if err != nil { + return nil, fmt.Errorf("failed to open blockstore: %w", err) + } + + return db, nil +} + +// OpenStateDB opens a CometBFT managed state DB. +// +// This function is a hack as CometBFT does not expose a way to access the underlying databases. +func OpenStateDB(provider cmtnode.DBProvider, cfg *cmtconfig.Config) (dbm.DB, error) { + // NOTE: DBContext uses a full CometBFT config but the only thing that is actually used + // is the data dir field. + db, err := provider(&cmtnode.DBContext{ID: "state", Config: cfg}) + if err != nil { + return nil, fmt.Errorf("failed to open state db: %w", err) + } + + return db, nil +} + +// OpenStateStore constructs a new state store using default options. +func OpenStateStore(stateDB dbm.DB) state.Store { + return state.NewStore(stateDB, state.StoreOptions{}) +} diff --git a/go/consensus/cometbft/full/archive.go b/go/consensus/cometbft/full/archive.go index ebcb258423a..9f8a829894d 100644 --- a/go/consensus/cometbft/full/archive.go +++ b/go/consensus/cometbft/full/archive.go @@ -7,11 +7,9 @@ import ( "sync" "time" - dbm "github.com/cometbft/cometbft-db" abcicli "github.com/cometbft/cometbft/abci/client" cmtconfig "github.com/cometbft/cometbft/config" cmtsync "github.com/cometbft/cometbft/libs/sync" - cmtnode "github.com/cometbft/cometbft/node" cmtproxy "github.com/cometbft/cometbft/proxy" cmtcore "github.com/cometbft/cometbft/rpc/core" "github.com/cometbft/cometbft/state" @@ -194,31 +192,27 @@ func NewArchive(ctx context.Context, cfg ArchiveConfig) (consensusAPI.Service, e logger := tmcommon.NewLogAdapter(!config.GlobalConfig.Consensus.LogDebug) srv.abciClient = abcicli.NewLocalClient(new(cmtsync.Mutex), srv.mux.Mux()) - dbProvider, err := db.GetProvider() - if err != nil { - return nil, err - } cmtConfig := cmtconfig.DefaultConfig() _ = viper.Unmarshal(&cmtConfig) cmtConfig.SetRoot(filepath.Join(srv.dataDir, tmcommon.StateDir)) - // NOTE: DBContext uses a full CometBFT config but the only thing that is actually used - // is the data dir field. - srv.blockStoreDB, err = dbProvider(&cmtnode.DBContext{ID: "blockstore", Config: cmtConfig}) + dbProvider, err := db.Provider() + if err != nil { + return nil, fmt.Errorf("failed to obtain db provider: %w", err) + } + + srv.blockStoreDB, err = db.OpenBlockstoreDB(dbProvider, cmtConfig) if err != nil { return nil, err } srv.blockStoreDB = db.WithCloser(srv.blockStoreDB, srv.dbCloser) - // NOTE: DBContext uses a full CometBFT config but the only thing that is actually used - // is the data dir field. - var stateDB dbm.DB - stateDB, err = dbProvider(&cmtnode.DBContext{ID: "state", Config: cmtConfig}) + stateDB, err := db.OpenStateDB(dbProvider, cmtConfig) if err != nil { return nil, err } stateDB = db.WithCloser(stateDB, srv.dbCloser) - srv.stateStore = state.NewStore(stateDB, state.StoreOptions{}) + srv.stateStore = db.OpenStateStore(stateDB) srv.eb = cmttypes.NewEventBus() // Setup minimal CometBFT environment needed to support consensus queries. diff --git a/go/consensus/cometbft/full/full.go b/go/consensus/cometbft/full/full.go index 29d76a1a66c..b6470b3ca99 100644 --- a/go/consensus/cometbft/full/full.go +++ b/go/consensus/cometbft/full/full.go @@ -668,7 +668,7 @@ func (t *fullService) lazyInit() error { // nolint: gocyclo return t.genesisDoc, nil } - dbProvider, err := db.GetProvider() + dbProvider, err := db.Provider() if err != nil { t.Logger.Error("failed to obtain database provider", "err", err, diff --git a/go/oasis-node/cmd/common/common.go b/go/oasis-node/cmd/common/common.go index d20e4d5476a..14efa632f11 100644 --- a/go/oasis-node/cmd/common/common.go +++ b/go/oasis-node/cmd/common/common.go @@ -2,8 +2,10 @@ package common import ( + "errors" "fmt" "io" + "io/fs" "os" "path/filepath" "strings" @@ -71,6 +73,20 @@ func InternalSocketPath() string { return filepath.Join(DataDir(), InternalSocketName) } +// IsNodeRunning returns true when the node is running. +func IsNodeRunning() (bool, error) { + path := InternalSocketPath() + + if _, err := os.Stat(path); err != nil { + if errors.Is(err, fs.ErrNotExist) { + return false, nil + } + return false, fmt.Errorf("stat %s: %w", path, err) + } + + return true, nil +} + // IsNodeCmd returns true iff the current command is the ekiden node. func IsNodeCmd() bool { return isNodeCmd diff --git a/go/oasis-node/cmd/storage/storage.go b/go/oasis-node/cmd/storage/storage.go index baccd32a3ef..e343b707278 100644 --- a/go/oasis-node/cmd/storage/storage.go +++ b/go/oasis-node/cmd/storage/storage.go @@ -6,11 +6,14 @@ import ( "errors" "fmt" "io/fs" + "math" "os" "path/filepath" "strings" "time" + cmtconfig "github.com/cometbft/cometbft/config" + cmtBlockstore "github.com/cometbft/cometbft/store" badgerDB "github.com/dgraph-io/badger/v4" "github.com/spf13/cobra" @@ -20,12 +23,16 @@ import ( "github.com/oasisprotocol/oasis-core/go/config" "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/abci" cmtCommon "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/common" + cmtConfig "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/config" + cmtDB "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/db" cmtDBProvider "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/db/badger" cmdCommon "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/common" roothash "github.com/oasisprotocol/oasis-core/go/roothash/api" "github.com/oasisprotocol/oasis-core/go/runtime/bundle" runtimeConfig "github.com/oasisprotocol/oasis-core/go/runtime/config" "github.com/oasisprotocol/oasis-core/go/runtime/history" + "github.com/oasisprotocol/oasis-core/go/runtime/registry" + "github.com/oasisprotocol/oasis-core/go/storage/api" db "github.com/oasisprotocol/oasis-core/go/storage/mkvs/db/api" "github.com/oasisprotocol/oasis-core/go/storage/mkvs/db/badger" "github.com/oasisprotocol/oasis-core/go/storage/mkvs/node" @@ -70,6 +77,13 @@ WARNING: Ensure you have at least as much of a free disk as your largest databas RunE: doDBCompactions, } + pruneCmd = &cobra.Command{ + Use: "prune-experimental", + Args: cobra.NoArgs, + Short: "EXPERIMENTAL: trigger pruning for all consensus databases", + RunE: doPrune, + } + logger = logging.GetLogger("cmd/storage") pretty = cmdCommon.Isatty(1) @@ -305,6 +319,15 @@ func doDBCompactions(_ *cobra.Command, args []string) error { cmdCommon.EarlyLogAndExit(err) } + running, err := cmdCommon.IsNodeRunning() + if err != nil { + return fmt.Errorf("failed to ensure the node is not running: %w", err) + } + + if running { + return fmt.Errorf("compaction can only be done when the node is not running") + } + dataDir := cmdCommon.DataDir() logger.Info("Starting database compactions. This may take a while...") @@ -385,6 +408,16 @@ func flattenBadgerDB(db *badgerDB.DB, logger *logging.Logger) error { } func compactConsensusNodeDB(dataDir string) error { + ndb, close, err := openConsensusNodeDB(dataDir) + if err != nil { + return fmt.Errorf("failed to open consensus NodeDB: %w", err) + } + defer close() + + return ndb.Compact() +} + +func openConsensusNodeDB(dataDir string) (api.NodeDB, func(), error) { ldb, ndb, _, err := abci.InitStateStorage( &abci.ApplicationConfig{ DataDir: filepath.Join(dataDir, cmtCommon.StateDir), @@ -395,15 +428,215 @@ func compactConsensusNodeDB(dataDir string) error { }, ) if err != nil { - return fmt.Errorf("failed to initialize ABCI storage backend: %w", err) + return nil, nil, fmt.Errorf("failed to initialize ABCI storage backend: %w", err) } - // Close the resources. Both Close and Cleanup only close NodeDB. - // Closing both here, to prevent resource leaks if things change in the future. - defer ndb.Close() - defer ldb.Cleanup() + // Close and Cleanup both only close NodeDB. Still closing both explicitly, + // to prevent resource leaks if things change in the future. + close := func() { + ndb.Close() + ldb.Cleanup() + } - return ndb.Compact() + return ndb, close, nil +} + +func doPrune(_ *cobra.Command, args []string) error { + if err := cmdCommon.Init(); err != nil { + cmdCommon.EarlyLogAndExit(err) + } + + running, err := cmdCommon.IsNodeRunning() + if err != nil { + return fmt.Errorf("failed to ensure the node is not running: %w", err) + } + + if running { + return fmt.Errorf("pruning can only be done when the node is not running") + } + + if config.GlobalConfig.Consensus.Prune.Strategy == cmtConfig.PruneStrategyNone { + logger.Info("skipping consensus pruning since disabled in the config") + return nil + } + + runtimes, err := registry.GetConfiguredRuntimeIDs() + if err != nil { + return fmt.Errorf("failed to get configured runtimes: %w", err) + } + + logger.Info("Starting consensus databases pruning. This may take a while...") + + if err := pruneConsensusDBs( + cmdCommon.DataDir(), + config.GlobalConfig.Consensus.Prune.NumKept, + runtimes, + ); err != nil { + return fmt.Errorf("failed to prune consensus databases: %w", err) + } + + return nil +} + +func pruneConsensusDBs(dataDir string, numKept uint64, runtimes []common.Namespace) error { + ndb, close, err := openConsensusNodeDB(dataDir) + if err != nil { + return fmt.Errorf("failed to open NodeDB: %w", err) + } + defer close() + + latest, ok := ndb.GetLatestVersion() + if !ok { + logger.Info("skipping pruning as state db is empty") + return nil + } + + if latest < numKept { + logger.Info("skipping pruning as the latest version is smaller than the number of versions to keep") + return nil + } + + // In case of configured runtimes, do not prune past the earliest reindexed + // consensus height, so that light history can be populated correctly. + minReindexed, err := minReindexedHeight(dataDir, runtimes) + if err != nil { + return fmt.Errorf("failed to fetch earliest reindexed consensus height: %w", err) + } + + retainHeight := min( + latest-numKept, // underflow not possible due to if above. + uint64(minReindexed), + ) + + if err := pruneConsensusNodeDB(ndb, retainHeight); err != nil { + return fmt.Errorf("failed to prune application state: %w", err) + } + + if err := pruneCometDBs(dataDir, int64(retainHeight)); err != nil { + return fmt.Errorf("failed to prune CometBFT managed databases: %w", err) + } + + return nil +} + +func pruneConsensusNodeDB(ndb db.NodeDB, retainHeight uint64) error { + startHeight := ndb.GetEarliestVersion() + + if retainHeight <= startHeight { + logger.Info("consensus state already pruned", "retain_height", retainHeight, "start_height", startHeight) + return nil + } + + logger.Info("pruning consensus state", "start_height", startHeight, "retain_height", retainHeight) + for h := startHeight; h < retainHeight; h++ { + if err := ndb.Prune(h); err != nil { + return fmt.Errorf("failed to prune version %d: %w", h, err) + } + + if h%10_000 == 0 { // periodically sync to disk + if err := ndb.Sync(); err != nil { + return fmt.Errorf("failed to sync NodeDB: %w", err) + } + logger.Debug("forcing NodeDB disk sync during pruning", "version", h) + } + } + + if err := ndb.Sync(); err != nil { + return fmt.Errorf("failed to sync NodeDB: %w", err) + } + + return nil +} + +// minReindexedHeight returns the smallest consensus height reindexed by any +// of the configured runtimes. +// +// In case of no configured runtimes it returns max int64. +func minReindexedHeight(dataDir string, runtimes []common.Namespace) (int64, error) { + fetchLastReindexedHeight := func(runtimeID common.Namespace) (int64, error) { + rtDir := runtimeConfig.GetRuntimeStateDir(dataDir, runtimeID) + + history, err := history.New(runtimeID, rtDir, history.NewNonePrunerFactory(), true) + if err != nil { + return 0, fmt.Errorf("failed to open new light history: %w", err) + } + defer history.Close() + + h, err := history.LastConsensusHeight() + if err != nil { + return 0, fmt.Errorf("failed to get last consensus height: %w", err) + } + + return h, nil + } + + var minH int64 = math.MaxInt64 + for _, rt := range runtimes { + h, err := fetchLastReindexedHeight(rt) + if err != nil { + return 0, fmt.Errorf("failed to fetch last reindexed height for %s: %w", rt, err) + } + + if h < minH { + minH = h + } + } + + return minH, nil +} + +func pruneCometDBs(dataDir string, retainHeight int64) error { + cmtConfig := cmtconfig.DefaultConfig() + cmtConfig.SetRoot(filepath.Join(dataDir, cmtCommon.StateDir)) + + dbProvider, err := cmtDB.Provider() + if err != nil { + return fmt.Errorf("failed to obtain db provider: %w", err) + } + + blockstoreDB, err := cmtDB.OpenBlockstoreDB(dbProvider, cmtConfig) + if err != nil { + return fmt.Errorf("failed to open blockstore: %w", err) + } + blockstore := cmtBlockstore.NewBlockStore(blockstoreDB) + defer blockstore.Close() + + // Mimic the upstream pruning logic from CometBFT + // (see https://github.com/oasisprotocol/cometbft/blob/653c9a0c95ac0f91a0c8c11efb9aa21c98407af6/state/execution.go#L655): + // 1. Get the base from the blockstore + // 2. Prune blockstore + // 3. Prune statestore + // + // This ordering is problematic: if the blockstore pruning succeeds (updating the base) but + // state DB pruning fails or is interrupted, a subsequent pruning run will skip already + // pruned blocks while leaving part of the state DB unpruned. + base := blockstore.Base() + if retainHeight <= base { + logger.Info("blockstore and state db already pruned") + return nil + } + + logger.Info("pruning consensus blockstore", "base", base, "retain_height", retainHeight) + n, err := blockstore.PruneBlocks(retainHeight) + if err != nil { + return fmt.Errorf("failed to prune blocks (retain height: %d): %w", retainHeight, err) + } + logger.Info("blockstore pruning finished", "pruned", n) + + stateDB, err := cmtDB.OpenStateDB(dbProvider, cmtConfig) + if err != nil { + return fmt.Errorf("failed to open state db: %w", err) + } + state := cmtDB.OpenStateStore(stateDB) + defer state.Close() + + logger.Info("pruning consensus states", "base", base, "retain_height", retainHeight) + if err := state.PruneStates(base, retainHeight); err != nil { + return fmt.Errorf("failed to prune state db (start: %d, end: %d): %w", base, retainHeight, err) + } + logger.Info("state db pruning finished") + + return nil } // Register registers the client sub-command and all of its children. @@ -414,5 +647,6 @@ func Register(parentCmd *cobra.Command) { storageCmd.AddCommand(storageCheckCmd) storageCmd.AddCommand(storageRenameNsCmd) storageCmd.AddCommand(storageCompactCmd) + storageCmd.AddCommand(pruneCmd) parentCmd.AddCommand(storageCmd) } diff --git a/go/runtime/registry/config.go b/go/runtime/registry/config.go index 856e7ac6434..4d61706f34b 100644 --- a/go/runtime/registry/config.go +++ b/go/runtime/registry/config.go @@ -26,7 +26,10 @@ func getLocalConfig(runtimeID common.Namespace, compID component.ID) map[string] return compCfg.Config } -func getConfiguredRuntimeIDs() ([]common.Namespace, error) { +// GetConfiguredRuntimeIDs returns namespaces of all the configured runtimes. +// +// Configuration is validated against the current mode of the node. +func GetConfiguredRuntimeIDs() ([]common.Namespace, error) { // Check if any runtimes are configured to be hosted. runtimes := make(map[common.Namespace]struct{}) for _, cfg := range config.GlobalConfig.Runtime.Runtimes { diff --git a/go/runtime/registry/registry.go b/go/runtime/registry/registry.go index 5974ef46def..d8f624a12d6 100644 --- a/go/runtime/registry/registry.go +++ b/go/runtime/registry/registry.go @@ -708,7 +708,7 @@ func New( consensus consensus.Service, ) (Registry, error) { // Get configured runtime IDs. - runtimeIDs, err := getConfiguredRuntimeIDs() + runtimeIDs, err := GetConfiguredRuntimeIDs() if err != nil { return nil, err }