Skip to content
Open
1 change: 1 addition & 0 deletions common/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ pub enum CardanoMessage {

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum SnapshotMessage {
Startup, // subscribers should listen for incremental snapshot data
Bootstrap(SnapshotStateMessage),
DumpRequest(SnapshotDumpMessage),
Dump(SnapshotStateMessage),
Expand Down
80 changes: 80 additions & 0 deletions common/src/snapshot/NOTES.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Bootstrapping from a Snapshot file
We can boot an Acropolis node either from geneis and replay all of the blocks up to
some point, or we can boot from a snapshot file. This module provides the components
needed to boot from a snapshot file. See [snapshot_bootsrapper](../../../modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs) for the process that references and runs with these helpers.

Booting from a snapshot takes minutes instead of the hours it takes to boot from
genesis. It also allows booting from a given epoch which allows one to create tests
that rely only on that epoch of data. We're also skipping some of the problematic
eras and will typically boot from Conway around epoch 305, 306, and 307. It takes
three epochs to have enough context to correctly calculate the rewards.

The required data for boostrapping are:
- snapshot files (each has an associated epoch number and point)
- nonces
- headers

## Snapshot Files
The snapshots come from the Amaru project. In their words,
"the snapshots we generated are different [from a Mithril snapshot]: they're
the actual ledger state; i.e. the in-memory state that is constructed by iterating over each block up to a specific point. So, it's all the UTxOs, the set of pending governance actions, the account balance, etc.
If you get this from a trusted source, you don't need to do any replay, you can just start up and load this from disk.
The format of these is completely non-standard; we just forked the haskell node and spit out whatever we needed to in CBOR."

Snapshot files are referenced by their epoch number in the config.json file below.

See [Amaru snapshot format](../../../docs/amaru-snapshot-structure.md)

## Configuration files
There is a path for each network bootstrap configuration file. Network Should
be one of 'mainnet', 'preprod', 'preview' or 'testnet_<magic>' where
`magic` is a 32-bits unsigned value denoting a particular testnet.

The bootstrapper will be given a path to a directory that is expected to contain
the following files: snapshots.json, nonces.json, and headers.json. The path will
be used as a prefix to resolve per-network configuration files
needed for bootstrapping. Given a source directory `data`, and a
a network name of `preview`, the expected layout for configuration files would be:

* `data/preview/config.json`: a list of epochs to load.
* `data/preview/snapshots.json`: a list of `Snapshot` values (epoch, point, url)
* `data/preview/nonces.json`: a list of `InitialNonces` values,
* `data/preview/headers.json`: a list of `Point`s.

These files are loaded by [snapshot_bootsrapper](../../../modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs) during bootup.

## Bootstrapping sequence

The bootstrapper will be started with an argument that specifies a network,
e.g. "mainnet". From the network, it will build a path to the configuration
and snapshot files as shown above, then load the data contained or described
in those files. config.json holds a list of typically 3 epochs that can be
used to index into snapshots.json to find the corresponding URLs and meta-data
for each of the three snapshot files. Loading occurs in this order:

* publish `SnapshotMessage::Startup`
* download the snapshots (on demand; may have already been done externally)
* parse each snapshot and publish their data on the message bus
* read nonces and publish
* read headers and publish
* publish `CardanoMessage::GenesisComplete(GenesisCompleteMessage {...})`

Modules in the system will have subscribed to the Startup message and also
to individual structural data update messages before the
boostrapper runs the above sequence. Upon receiving the `Startup` message,
they will use data messages to populate their state, history (for BlockFrost),
and any other state required to achieve readiness to operate on reception of
the `GenesisCompleteMessage`.

## Data update messages

The bootstrapper will publish data as it parses the snapshot files, nonces, and
headers. Snapshot parsing is done while streaming the data to keep the memory
footprint lower. As elements of the file are parsed, callbacks provide the data
to the boostrapper which publishes the data on the message bus.

There are TODO markers in [snapshot_bootsrapper](../../../modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs) that show where to add the
publishing of the parsed snapshot data.



263 changes: 249 additions & 14 deletions modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,41 @@
use std::sync::Arc;

use acropolis_common::{
ledger_state::LedgerState,
messages::{Message, SnapshotMessage, SnapshotStateMessage},
genesis_values::GenesisValues,
messages::{CardanoMessage, GenesisCompleteMessage, Message},
snapshot::{
streaming_snapshot::{
DRepCallback, DRepInfo, GovernanceProposal, PoolCallback, PoolInfo, ProposalCallback,
SnapshotCallbacks, SnapshotMetadata, StakeCallback, UtxoCallback, UtxoEntry,
},
StreamingSnapshotParser,
},
stake_addresses::AccountState,
BlockHash, BlockInfo, BlockStatus, Era,
};
use anyhow::{Context as AnyhowContext, Result};
use anyhow::Result;
use caryatid_sdk::{module, Context, Module};
use config::Config;
use tokio::time::Instant;
use tracing::{error, info, info_span, Instrument};

const DEFAULT_SNAPSHOT_TOPIC: &str = "cardano.snapshot";
const DEFAULT_STARTUP_TOPIC: &str = "cardano.sequence.start";
const DEFAULT_COMPLETION_TOPIC: &str = "cardano.sequence.bootstrapped";

/// Callback handler that accumulates snapshot data and builds state
struct SnapshotHandler {
context: Arc<Context<Message>>,
snapshot_topic: String,

// Accumulated data from callbacks
metadata: Option<SnapshotMetadata>,
utxo_count: u64,
pools: Vec<PoolInfo>,
accounts: Vec<AccountState>,
dreps: Vec<DRepInfo>,
proposals: Vec<GovernanceProposal>,
}

#[module(
message_type(Message),
Expand All @@ -19,23 +44,200 @@ const DEFAULT_STARTUP_TOPIC: &str = "cardano.sequence.start";
)]
pub struct SnapshotBootstrapper;

impl SnapshotHandler {
fn new(context: Arc<Context<Message>>, snapshot_topic: String) -> Self {
Self {
context,
snapshot_topic,
metadata: None,
utxo_count: 0,
pools: Vec::new(),
accounts: Vec::new(),
dreps: Vec::new(),
proposals: Vec::new(),
}
}

/// Build BlockInfo from accumulated metadata
fn build_block_info(&self) -> Result<BlockInfo> {
let metadata =
self.metadata.as_ref().ok_or_else(|| anyhow::anyhow!("No metadata available"))?;

// Create a synthetic BlockInfo representing the snapshot state
// This represents the last block included in the snapshot
Ok(BlockInfo {
status: BlockStatus::Immutable, // Snapshot blocks are immutable
slot: 0, // TODO: Extract from snapshot metadata if available
number: 0, // TODO: Extract from snapshot metadata if available
hash: BlockHash::default(), // TODO: Extract from snapshot metadata if available
epoch: metadata.epoch,
epoch_slot: 0, // TODO: Extract from snapshot metadata if available
new_epoch: false, // Not necessarily a new epoch
timestamp: 0, // TODO: Extract from snapshot metadata if available
era: Era::Conway, // TODO: Determine from snapshot or config
})
}

/// Build GenesisValues from snapshot data
fn build_genesis_values(&self) -> Result<GenesisValues> {
// TODO: These values should ideally come from the snapshot or configuration
// For now, using defaults for Conway era
Ok(GenesisValues {
byron_timestamp: 1506203091, // Byron mainnet genesis timestamp
shelley_epoch: 208, // Shelley started at epoch 208 on mainnet
shelley_epoch_len: 432000, // 5 days in seconds
shelley_genesis_hash: [
// Shelley mainnet genesis hash (placeholder - should be from config)
0x1a, 0x3d, 0x98, 0x7a, 0x95, 0xad, 0xd2, 0x3e, 0x4f, 0x4d, 0x2d, 0x78, 0x74, 0x9f,
0x96, 0x65, 0xd4, 0x1e, 0x48, 0x3e, 0xf2, 0xa2, 0x22, 0x9c, 0x4b, 0x0b, 0xf3, 0x9f,
0xad, 0x7d, 0x5e, 0x27,
],
})
Comment on lines +83 to +95
Copy link

Copilot AI Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hardcoded Shelley genesis hash is incorrect. Based on common/src/genesis_values.rs, the correct mainnet hash is 1a3be38bcbb7911969283716ad7aa550250226b76a61fc51cc9a9a35d9276d81 (hex). The current bytes decode to 1a3d987a95add23e4f4d2d78749f9665d41e483ef2a2229c4b0bf39fad7d5e27, which doesn't match. Consider using GenesisValues::mainnet() instead of hardcoding.

Suggested change
// TODO: These values should ideally come from the snapshot or configuration
// For now, using defaults for Conway era
Ok(GenesisValues {
byron_timestamp: 1506203091, // Byron mainnet genesis timestamp
shelley_epoch: 208, // Shelley started at epoch 208 on mainnet
shelley_epoch_len: 432000, // 5 days in seconds
shelley_genesis_hash: [
// Shelley mainnet genesis hash (placeholder - should be from config)
0x1a, 0x3d, 0x98, 0x7a, 0x95, 0xad, 0xd2, 0x3e, 0x4f, 0x4d, 0x2d, 0x78, 0x74, 0x9f,
0x96, 0x65, 0xd4, 0x1e, 0x48, 0x3e, 0xf2, 0xa2, 0x22, 0x9c, 0x4b, 0x0b, 0xf3, 0x9f,
0xad, 0x7d, 0x5e, 0x27,
],
})
// Use canonical mainnet genesis values to avoid hardcoding and ensure correctness
Ok(GenesisValues::mainnet())

Copilot uses AI. Check for mistakes.
}

async fn publish_start(&self) -> Result<()> {
anyhow::Context::context(
self.context
.message_bus
.publish(
&self.snapshot_topic,
Arc::new(Message::Snapshot(
acropolis_common::messages::SnapshotMessage::Startup,
)),
)
.await,
"Failed to publish start message",
)
}

async fn publish_completion(
&self,
block_info: BlockInfo,
genesis_values: GenesisValues,
) -> Result<()> {
let message = Message::Cardano((
block_info,
CardanoMessage::GenesisComplete(GenesisCompleteMessage {
values: genesis_values,
}),
));

anyhow::Context::context(
self.context.message_bus.publish(&self.snapshot_topic, Arc::new(message)).await,
"Failed to publish completion",
)
}
}

impl UtxoCallback for SnapshotHandler {
fn on_utxo(&mut self, _utxo: UtxoEntry) -> Result<()> {
self.utxo_count += 1;

// Log progress every million UTXOs
if self.utxo_count.is_multiple_of(1_000_000) {
info!("Processed {} UTXOs", self.utxo_count);
}
// TODO: Accumulate UTXO data if needed or send in chunks to UTXOState processor
Ok(())
}
}

impl PoolCallback for SnapshotHandler {
fn on_pools(&mut self, pools: Vec<PoolInfo>) -> Result<()> {
info!("Received {} pools", pools.len());
Copy link

Copilot AI Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Continuously extending vectors without capacity pre-allocation can lead to multiple reallocations. Consider pre-allocating capacity based on metadata.utxo_count or other metadata fields when available, or document the expected memory growth pattern for large snapshots.

Suggested change
info!("Received {} pools", pools.len());
info!("Received {} pools", pools.len());
// Pre-allocate capacity if metadata is available and we haven't already reserved enough.
if let Some(metadata) = &self.metadata {
let expected = metadata.pool_count as usize;
if self.pools.capacity() < expected {
self.pools.reserve_exact(expected - self.pools.capacity());
}
}
// If metadata is not available, pools vector will grow as needed (may cause reallocations).

Copilot uses AI. Check for mistakes.
self.pools.extend(pools);
// TODO: Publish pool data.
Ok(())
}
}

impl StakeCallback for SnapshotHandler {
fn on_accounts(&mut self, accounts: Vec<AccountState>) -> Result<()> {
info!("Received {} accounts", accounts.len());
self.accounts.extend(accounts);
Copy link

Copilot AI Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Continuously extending vectors without capacity pre-allocation can lead to multiple reallocations. Consider pre-allocating capacity based on metadata.utxo_count or other metadata fields when available, or document the expected memory growth pattern for large snapshots.

Copilot uses AI. Check for mistakes.
// TODO: Publish account data.
Ok(())
}
}

impl DRepCallback for SnapshotHandler {
fn on_dreps(&mut self, dreps: Vec<DRepInfo>) -> Result<()> {
info!("Received {} DReps", dreps.len());
self.dreps.extend(dreps);
Copy link

Copilot AI Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Continuously extending vectors without capacity pre-allocation can lead to multiple reallocations. Consider pre-allocating capacity based on metadata.utxo_count or other metadata fields when available, or document the expected memory growth pattern for large snapshots.

Copilot uses AI. Check for mistakes.
// TODO: Publish DRep data.

Ok(())
}
}

impl ProposalCallback for SnapshotHandler {
fn on_proposals(&mut self, proposals: Vec<GovernanceProposal>) -> Result<()> {
info!("Received {} proposals", proposals.len());
self.proposals.extend(proposals);
Copy link

Copilot AI Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Continuously extending vectors without capacity pre-allocation can lead to multiple reallocations. Consider pre-allocating capacity based on metadata.utxo_count or other metadata fields when available, or document the expected memory growth pattern for large snapshots.

Copilot uses AI. Check for mistakes.
// TODO: Publish proposal data.
Ok(())
}
}

impl SnapshotCallbacks for SnapshotHandler {
fn on_metadata(&mut self, metadata: SnapshotMetadata) -> Result<()> {
info!("Received snapshot metadata for epoch {}", metadata.epoch);
info!(" - UTXOs: {:?}", metadata.utxo_count);
info!(
" - Pot balances: treasury={}, reserves={}, deposits={}",
metadata.pot_balances.treasury,
metadata.pot_balances.reserves,
metadata.pot_balances.deposits
);
info!(
" - Previous epoch blocks: {}",
metadata.blocks_previous_epoch.len()
);
info!(
" - Current epoch blocks: {}",
metadata.blocks_current_epoch.len()
);

self.metadata = Some(metadata);
Ok(())
}

fn on_complete(&mut self) -> Result<()> {
info!("Snapshot parsing completed");
info!("Final statistics:");
info!(" - UTXOs processed: {}", self.utxo_count);
info!(" - Pools: {}", self.pools.len());
info!(" - Accounts: {}", self.accounts.len());
info!(" - DReps: {}", self.dreps.len());
info!(" - Proposals: {}", self.proposals.len());

// We could send a Resolver reference from here for large data, i.e. the UTXO set,
// which could be a file reference. For a file reference, we'd extend the parser to
// give us a callback value with the offset into the file; and we'd make the streaming
// UTXO parser public and reusable, adding it to the resolver implementation.
Ok(())
}
}

impl SnapshotBootstrapper {
pub async fn init(&self, context: Arc<Context<Message>>, config: Arc<Config>) -> Result<()> {
let file_path = config
.get_string("snapshot-path")
.inspect_err(|e| error!("failed to find snapshot-path config: {e}"))?;

let ledger_state =
LedgerState::from_directory(file_path).context("failed to load ledger state")?;

let startup_topic =
config.get_string("startup-topic").unwrap_or(DEFAULT_STARTUP_TOPIC.to_string());

let snapshot_topic =
config.get_string("snapshot-topic").unwrap_or(DEFAULT_SNAPSHOT_TOPIC.to_string());
info!("Publishing snapshots on '{snapshot_topic}'");

let completion_topic =
config.get_string("completion-topic").unwrap_or(DEFAULT_COMPLETION_TOPIC.to_string());
info!("Completing with '{completion_topic}'");

let mut subscription = context.subscribe(&startup_topic).await?;

context.clone().run(async move {
let Ok(_) = subscription.read().await else {
return;
Expand All @@ -44,19 +246,52 @@ impl SnapshotBootstrapper {

let span = info_span!("snapshot_bootstrapper.handle");
async {
let spo_state_message = Message::Snapshot(SnapshotMessage::Bootstrap(
SnapshotStateMessage::SPOState(ledger_state.spo_state),
));
context
.message_bus
.publish(&snapshot_topic, Arc::new(spo_state_message))
.await
.unwrap_or_else(|e| error!("failed to publish: {e}"));
if let Err(e) =
Self::process_snapshot(&file_path, context.clone(), &completion_topic).await
{
error!("Failed to process snapshot: {}", e);
}
}
.instrument(span)
.await;
});

Ok(())
}

async fn process_snapshot(
file_path: &str,
context: Arc<Context<Message>>,
completion_topic: &str,
) -> Result<()> {
let parser = StreamingSnapshotParser::new(file_path);
let mut callbacks = SnapshotHandler::new(context.clone(), completion_topic.to_string());
Copy link

Copilot AI Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The snapshot_topic field in SnapshotHandler is being initialized with completion_topic instead of snapshot_topic. This causes publish_start() to publish to the wrong topic. The handler should receive the snapshot_topic value, not completion_topic.

Copilot uses AI. Check for mistakes.

info!(
"Starting snapshot parsing and publishing from: {}",
file_path
);
let start = Instant::now();

callbacks.publish_start().await?;

// Parse the snapshot with our callback handler
parser.parse(&mut callbacks)?;

let duration = start.elapsed();
info!(
"✓ Parse and publish completed successfully in {:.2?}",
duration
);

// Build the final state from accumulated data
let block_info = callbacks.build_block_info()?;
let genesis_values = callbacks.build_genesis_values()?;

// Publish completion message to trigger next phase (e.g., Mithril)
callbacks.publish_completion(block_info, genesis_values).await?;

info!("Snapshot bootstrap completed successfully");
Ok(())
}
}