From 8c08f4440d203e40cdca7beef0ccaed2357c739b Mon Sep 17 00:00:00 2001 From: Matthew Hounslow Date: Thu, 30 Oct 2025 15:06:33 -0700 Subject: [PATCH 1/8] Refactor: Replace `KeyHash` with `StakeAddress` for improved clarity and consistency across modules --- codec/src/block.rs | 2 +- common/src/queries/accounts.rs | 16 ++++---- common/src/queries/pools.rs | 6 +-- common/src/stake_addresses.rs | 41 +++++++++---------- .../src/spo_distribution_store.rs | 4 +- modules/accounts_state/src/state.rs | 10 ++--- modules/drep_state/src/state.rs | 12 +++--- .../rest_blockfrost/src/handlers/epochs.rs | 25 +++-------- .../src/handlers/governance.rs | 21 ++++------ modules/rest_blockfrost/src/handlers/pools.rs | 8 ++-- modules/rest_blockfrost/src/types.rs | 13 +----- modules/spo_state/src/state.rs | 4 +- modules/tx_unpacker/src/tx_unpacker.rs | 4 +- 13 files changed, 69 insertions(+), 97 deletions(-) diff --git a/codec/src/block.rs b/codec/src/block.rs index f0b3b440..b898534c 100644 --- a/codec/src/block.rs +++ b/codec/src/block.rs @@ -16,7 +16,7 @@ pub fn map_to_block_issuer( let digest = keyhash_224(vkey); if let Some(issuer) = shelley_genesis_delegates .values() - .find(|v| v.delegate == digest.to_vec()) + .find(|v| v.delegate == digest) .map(|i| BlockIssuer::GenesisDelegate(i.clone())) { Some(issuer) diff --git a/common/src/queries/accounts.rs b/common/src/queries/accounts.rs index 4470cf7b..9f04e621 100644 --- a/common/src/queries/accounts.rs +++ b/common/src/queries/accounts.rs @@ -57,17 +57,17 @@ pub enum AccountsStateQueryResponse { AccountAssets(AccountAssets), AccountAssetsTotals(AccountAssetsTotals), AccountUTxOs(AccountUTxOs), - AccountsUtxoValuesMap(HashMap), + AccountsUtxoValuesMap(HashMap), AccountsUtxoValuesSum(u64), - AccountsBalancesMap(HashMap), + AccountsBalancesMap(HashMap), AccountsBalancesSum(u64), // Epochs-related responses ActiveStakes(u64), - /// Vec<(PoolId, StakeKey, ActiveStakeAmount)> - SPDDByEpoch(Vec<(PoolId, KeyHash, u64)>), - /// Vec<(StakeKey, ActiveStakeAmount)> - SPDDByEpochAndPool(Vec<(KeyHash, u64)>), + /// Vec<(PoolId, StakeAddress, ActiveStakeAmount)> + SPDDByEpoch(Vec<(PoolId, StakeAddress, u64)>), + /// Vec<(StakeAddress, ActiveStakeAmount)> + SPDDByEpochAndPool(Vec<(StakeAddress, u64)>), // Pools-related responses OptimalPoolSizing(Option), @@ -77,7 +77,7 @@ pub enum AccountsStateQueryResponse { // DReps-related responses DrepDelegators(DrepDelegators), - AccountsDrepDelegationsMap(HashMap>), + AccountsDrepDelegationsMap(HashMap>), NotFound, Error(String), @@ -173,7 +173,7 @@ pub struct OptimalPoolSizing { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct PoolDelegators { - pub delegators: Vec<(KeyHash, u64)>, + pub delegators: Vec<(StakeAddress, u64)>, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] diff --git a/common/src/queries/pools.rs b/common/src/queries/pools.rs index 04d9f59f..bb3e9d7d 100644 --- a/common/src/queries/pools.rs +++ b/common/src/queries/pools.rs @@ -1,6 +1,6 @@ use crate::{ - queries::governance::VoteRecord, rational_number::RationalNumber, KeyHash, PoolEpochState, - PoolId, PoolMetadata, PoolRegistration, PoolRetirement, PoolUpdateEvent, Relay, + queries::governance::VoteRecord, rational_number::RationalNumber, PoolEpochState, PoolId, + PoolMetadata, PoolRegistration, PoolRetirement, PoolUpdateEvent, Relay, StakeAddress, }; pub const DEFAULT_POOLS_QUERY_TOPIC: (&str, &str) = @@ -94,5 +94,5 @@ pub struct PoolActiveStakeInfo { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct PoolDelegators { - pub delegators: Vec<(KeyHash, u64)>, + pub delegators: Vec<(StakeAddress, u64)>, } diff --git a/common/src/stake_addresses.rs b/common/src/stake_addresses.rs index df617d42..1676f519 100644 --- a/common/src/stake_addresses.rs +++ b/common/src/stake_addresses.rs @@ -166,15 +166,15 @@ impl StakeAddressMap { } /// Get Pool Delegators with live_stakes - pub fn get_pool_delegators(&self, pool_operator: &PoolId) -> Vec<(KeyHash, u64)> { + pub fn get_pool_delegators(&self, pool_operator: &PoolId) -> Vec<(StakeAddress, u64)> { // Find stake addresses delegated to pool_operator - let delegators: Vec<(KeyHash, u64)> = self + let delegators: Vec<(StakeAddress, u64)> = self .inner .iter() .filter_map(|(stake_address, sas)| match sas.delegated_spo.as_ref() { Some(delegated_spo) => { if delegated_spo.eq(pool_operator) { - Some((*stake_address.get_hash(), sas.utxo_value + sas.rewards)) + Some((stake_address.clone(), sas.utxo_value + sas.rewards)) } else { None } @@ -212,14 +212,13 @@ impl StakeAddressMap { pub fn get_accounts_utxo_values_map( &self, stake_addresses: &[StakeAddress], - ) -> Option> { + ) -> Option> { let mut map = HashMap::new(); for stake_address in stake_addresses { let account = self.get(stake_address)?; let utxo_value = account.utxo_value; - let key_hash = stake_address.get_hash(); - map.insert(*key_hash, utxo_value); + map.insert(stake_address.clone(), utxo_value); } Some(map) @@ -230,14 +229,13 @@ impl StakeAddressMap { pub fn get_accounts_balances_map( &self, stake_addresses: &[StakeAddress], - ) -> Option> { + ) -> Option> { let mut map = HashMap::new(); for stake_address in stake_addresses { let account = self.get(stake_address)?; let balance = account.utxo_value + account.rewards; - let key_hash = stake_address.get_hash(); - map.insert(*key_hash, balance); + map.insert(stake_address.clone(), balance); } Some(map) @@ -248,14 +246,13 @@ impl StakeAddressMap { pub fn get_drep_delegations_map( &self, stake_addresses: &[StakeAddress], - ) -> Option>> { + ) -> Option>> { let mut map = HashMap::new(); for stake_address in stake_addresses { let account = self.get(stake_address)?; let maybe_drep = account.delegated_drep.clone(); - let key_hash = stake_address.get_hash(); - map.insert(*key_hash, maybe_drep); + map.insert(stake_address.clone(), maybe_drep); } Some(map) @@ -326,7 +323,7 @@ impl StakeAddressMap { /// Dump current Stake Pool Delegation Distribution State /// (Stake Key, Active Stakes Amount)> - pub fn dump_spdd_state(&self) -> HashMap> { + pub fn dump_spdd_state(&self) -> HashMap> { let entries: Vec<_> = self .inner .par_iter() @@ -335,9 +332,9 @@ impl StakeAddressMap { }) .collect(); - let mut result: HashMap> = HashMap::new(); + let mut result: HashMap> = HashMap::new(); for (spo, entry) in entries { - result.entry(spo).or_default().push((entry.0.get_credential().get_hash(), entry.1)); + result.entry(spo).or_default().push((entry.0, entry.1)); } result } @@ -1244,8 +1241,8 @@ mod tests { let map = stake_addresses.get_accounts_utxo_values_map(&keys).unwrap(); assert_eq!(map.len(), 2); - assert_eq!(map.get(&addr1.get_hash()).copied().unwrap(), 1000); - assert_eq!(map.get(&addr2.get_hash()).copied().unwrap(), 2000); + assert_eq!(map.get(&addr1).copied().unwrap(), 1000); + assert_eq!(map.get(&addr2).copied().unwrap(), 2000); } #[test] @@ -1358,8 +1355,8 @@ mod tests { let map = stake_addresses.get_accounts_balances_map(&addresses).unwrap(); assert_eq!(map.len(), 2); - assert_eq!(map.get(&addr1.get_hash()).copied().unwrap(), 1100); - assert_eq!(map.get(&addr2.get_hash()).copied().unwrap(), 2000); + assert_eq!(map.get(&addr1).copied().unwrap(), 1100); + assert_eq!(map.get(&addr2).copied().unwrap(), 2000); } #[test] @@ -1467,14 +1464,14 @@ mod tests { assert_eq!(map.len(), 3); assert_eq!( - map.get(&addr1.get_hash()).unwrap(), + map.get(&addr1).unwrap(), &Some(DRepChoice::Abstain) ); assert_eq!( - map.get(&addr2.get_hash()).unwrap(), + map.get(&addr2).unwrap(), &Some(DRepChoice::Key(DREP_HASH)) ); - assert_eq!(map.get(&addr3.get_hash()).unwrap(), &None); + assert_eq!(map.get(&addr3).unwrap(), &None); } #[test] diff --git a/modules/accounts_state/src/spo_distribution_store.rs b/modules/accounts_state/src/spo_distribution_store.rs index cdf6b02e..e01b33cc 100644 --- a/modules/accounts_state/src/spo_distribution_store.rs +++ b/modules/accounts_state/src/spo_distribution_store.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; - +use std::ops::Add; use acropolis_common::types::AddrKeyhash; -use acropolis_common::PoolId; +use acropolis_common::{PoolId, StakeAddress}; use anyhow::Result; use fjall::{Config, Keyspace, PartitionCreateOptions}; diff --git a/modules/accounts_state/src/state.rs b/modules/accounts_state/src/state.rs index 0a13ca12..112d0e86 100644 --- a/modules/accounts_state/src/state.rs +++ b/modules/accounts_state/src/state.rs @@ -172,7 +172,7 @@ impl State { } /// Get Pool Delegators with live_stakes - pub fn get_pool_delegators(&self, pool_operator: &PoolId) -> Vec<(KeyHash, u64)> { + pub fn get_pool_delegators(&self, pool_operator: &PoolId) -> Vec<(StakeAddress, u64)> { self.stake_addresses.lock().unwrap().get_pool_delegators(pool_operator) } @@ -185,7 +185,7 @@ impl State { pub fn get_accounts_utxo_values_map( &self, stake_keys: &[StakeAddress], - ) -> Option> { + ) -> Option> { let stake_addresses = self.stake_addresses.lock().ok()?; // If lock fails, return None stake_addresses.get_accounts_utxo_values_map(stake_keys) } @@ -200,7 +200,7 @@ impl State { pub fn get_accounts_balances_map( &self, stake_keys: &[StakeAddress], - ) -> Option> { + ) -> Option> { let stake_addresses = self.stake_addresses.lock().ok()?; // If lock fails, return None stake_addresses.get_accounts_balances_map(stake_keys) } @@ -220,7 +220,7 @@ impl State { pub fn get_drep_delegations_map( &self, stake_keys: &[StakeAddress], - ) -> Option>> { + ) -> Option>> { let stake_addresses = self.stake_addresses.lock().ok()?; // If lock fails, return None stake_addresses.get_drep_delegations_map(stake_keys) } @@ -572,7 +572,7 @@ impl State { stake_addresses.generate_spdd() } - pub fn dump_spdd_state(&self) -> HashMap> { + pub fn dump_spdd_state(&self) -> HashMap> { let stake_addresses = self.stake_addresses.lock().unwrap(); stake_addresses.dump_spdd_state() } diff --git a/modules/drep_state/src/state.rs b/modules/drep_state/src/state.rs index 6f48b4a0..3fe47336 100644 --- a/modules/drep_state/src/state.rs +++ b/modules/drep_state/src/state.rs @@ -474,12 +474,12 @@ impl State { context: &Arc>, delegators: &[(&StakeAddress, &DRepChoice)], ) -> Result<()> { - let mut stake_key_to_input = HashMap::with_capacity(delegators.len()); + let mut stake_address_to_input = HashMap::with_capacity(delegators.len()); let mut stake_addresses = Vec::with_capacity(delegators.len()); - for &(sc, drep) in delegators { - stake_addresses.push(sc.clone()); - stake_key_to_input.insert(sc.get_credential().get_hash(), (sc, drep)); + for &(stake_address, drep) in delegators { + stake_addresses.push(stake_address.clone()); + stake_address_to_input.insert(stake_address, (stake_address, drep)); } let msg = Arc::new(Message::StateQuery(StateQuery::Accounts( @@ -500,8 +500,8 @@ impl State { } }; - for (stake_key, old_drep_opt) in result_map { - let &(delegator, new_drep_choice) = match stake_key_to_input.get(&stake_key) { + for (stake_address, old_drep_opt) in result_map { + let &(delegator, new_drep_choice) = match stake_address_to_input.get(&stake_address) { Some(pair) => pair, None => continue, }; diff --git a/modules/rest_blockfrost/src/handlers/epochs.rs b/modules/rest_blockfrost/src/handlers/epochs.rs index 6eab93f0..273be07e 100644 --- a/modules/rest_blockfrost/src/handlers/epochs.rs +++ b/modules/rest_blockfrost/src/handlers/epochs.rs @@ -15,7 +15,7 @@ use acropolis_common::{ spdd::{SPDDStateQuery, SPDDStateQueryResponse}, utils::query_state, }, - NetworkId, PoolId, StakeAddress, StakeCredential, + NetworkId, PoolId, }; use anyhow::{anyhow, Result}; use caryatid_sdk::Context; @@ -486,16 +486,10 @@ pub async fn handle_epoch_total_stakes_blockfrost( .await?; let spdd_response = spdd .into_iter() - .map(|(pool_id, stake_key_hash, amount)| { - let stake_address = StakeAddress { - network: network.clone(), - credential: StakeCredential::AddrKeyHash(stake_key_hash), - } - .to_string() - .map_err(|e| anyhow::anyhow!("Failed to convert stake address to string: {e}"))?; + .map(|(pool_id, stake_address, amount)| { Ok(SPDDByEpochItemRest { - pool_id: pool_id.to_vec(), - stake_address, + pool_id, + stake_address: stake_address.to_string().unwrap(), amount, }) }) @@ -623,16 +617,9 @@ pub async fn handle_epoch_pool_stakes_blockfrost( .await?; let spdd_response = spdd .into_iter() - .map(|(key_hash, amount)| { - let stake_address = StakeAddress { - network: network.clone(), - credential: StakeCredential::AddrKeyHash(key_hash), - } - .to_string() - .map_err(|e| anyhow::anyhow!("Failed to convert stake address to string: {e}"))?; - + .map(|(stake_address, amount)| { Ok(SPDDByEpochAndPoolItemRest { - stake_address, + stake_address: stake_address.to_string().unwrap(), amount, }) }) diff --git a/modules/rest_blockfrost/src/handlers/governance.rs b/modules/rest_blockfrost/src/handlers/governance.rs index c607fa1a..c9ecf472 100644 --- a/modules/rest_blockfrost/src/handlers/governance.rs +++ b/modules/rest_blockfrost/src/handlers/governance.rs @@ -4,14 +4,10 @@ use crate::types::{ DRepInfoREST, DRepMetadataREST, DRepUpdateREST, DRepVoteREST, DRepsListREST, ProposalVoteREST, VoterRoleREST, }; -use acropolis_common::{ - messages::{Message, RESTResponse, StateQuery, StateQueryResponse}, - queries::{ - accounts::{AccountsStateQuery, AccountsStateQueryResponse}, - governance::{GovernanceStateQuery, GovernanceStateQueryResponse}, - }, - Credential, GovActionId, KeyHash, TxHash, Voter, -}; +use acropolis_common::{messages::{Message, RESTResponse, StateQuery, StateQueryResponse}, queries::{ + accounts::{AccountsStateQuery, AccountsStateQueryResponse}, + governance::{GovernanceStateQuery, GovernanceStateQueryResponse}, +}, Credential, GovActionId, StakeAddress, TxHash, Voter}; use anyhow::{anyhow, Result}; use caryatid_sdk::Context; use reqwest::Client; @@ -191,7 +187,7 @@ pub async fn handle_drep_delegators_blockfrost( Message::StateQueryResponse(StateQueryResponse::Governance( GovernanceStateQueryResponse::DRepDelegators(delegators), )) => { - let stake_key_to_bech32: HashMap = match delegators + let stake_address_to_bech32: HashMap = match delegators .addresses .iter() .map(|addr| { @@ -199,8 +195,7 @@ pub async fn handle_drep_delegators_blockfrost( let bech32 = credential .to_stake_bech32() .map_err(|_| anyhow!("Failed to encode stake address"))?; - let key_hash = credential.get_hash(); - Ok((key_hash, bech32)) + Ok((addr.clone(), bech32)) }) .collect::>>() { @@ -229,8 +224,8 @@ pub async fn handle_drep_delegators_blockfrost( )) => { let mut response = Vec::new(); - for (key, amount) in map { - let Some(bech32) = stake_key_to_bech32.get(&key) else { + for (stake_address, amount) in map { + let Some(bech32) = stake_address_to_bech32.get(&stake_address) else { return Ok(RESTResponse::with_text( 500, "Internal error: missing Bech32 for stake key", diff --git a/modules/rest_blockfrost/src/handlers/pools.rs b/modules/rest_blockfrost/src/handlers/pools.rs index 8d402c40..cbed1c2f 100644 --- a/modules/rest_blockfrost/src/handlers/pools.rs +++ b/modules/rest_blockfrost/src/handlers/pools.rs @@ -17,7 +17,7 @@ use acropolis_common::{ utils::query_state, }, rest_helper::ToCheckedF64, - PoolId, PoolRetirement, PoolUpdateAction, StakeCredential, TxIdentifier, + PoolId, PoolRetirement, PoolUpdateAction, TxIdentifier, }; use anyhow::Result; use caryatid_sdk::Context; @@ -1004,9 +1004,9 @@ pub async fn handle_pool_delegators_blockfrost( }; let mut delegators_rest = Vec::::new(); - for (d, l) in pool_delegators { - let bech32 = StakeCredential::AddrKeyHash(d.clone()) - .to_stake_bech32() + for (stake_address, l) in pool_delegators { + let bech32 = stake_address + .to_string() .map_err(|e| anyhow::anyhow!("Invalid stake address in pool delegators: {e}"))?; delegators_rest.push(PoolDelegatorRest { address: bech32, diff --git a/modules/rest_blockfrost/src/types.rs b/modules/rest_blockfrost/src/types.rs index 81e51040..41fd9b5f 100644 --- a/modules/rest_blockfrost/src/types.rs +++ b/modules/rest_blockfrost/src/types.rs @@ -1,14 +1,5 @@ use crate::cost_models::{PLUTUS_V1, PLUTUS_V2, PLUTUS_V3}; -use acropolis_common::{ - messages::EpochActivityMessage, - protocol_params::{Nonce, NonceVariant, ProtocolParams}, - queries::blocks::BlockInfo, - queries::governance::DRepActionUpdate, - rest_helper::ToCheckedF64, - serialization::{DisplayFromBech32, PoolPrefix}, - AssetAddressEntry, AssetMetadataStandard, AssetMintRecord, KeyHash, PolicyAsset, - PoolEpochState, PoolUpdateAction, Relay, TxHash, Vote, VrfKeyHash, -}; +use acropolis_common::{messages::EpochActivityMessage, protocol_params::{Nonce, NonceVariant, ProtocolParams}, queries::blocks::BlockInfo, queries::governance::DRepActionUpdate, rest_helper::ToCheckedF64, serialization::{DisplayFromBech32, PoolPrefix}, AssetAddressEntry, AssetMetadataStandard, AssetMintRecord, KeyHash, PolicyAsset, PoolEpochState, PoolId, PoolUpdateAction, Relay, TxHash, Vote, VrfKeyHash}; use anyhow::Result; use num_traits::ToPrimitive; use rust_decimal::Decimal; @@ -63,7 +54,7 @@ pub struct BlockInfoREST(pub BlockInfo); pub struct SPDDByEpochItemRest { pub stake_address: String, #[serde_as(as = "DisplayFromBech32")] - pub pool_id: Vec, + pub pool_id: PoolId, #[serde_as(as = "DisplayFromStr")] pub amount: u64, } diff --git a/modules/spo_state/src/state.rs b/modules/spo_state/src/state.rs index 771d75b8..7d41a23e 100644 --- a/modules/spo_state/src/state.rs +++ b/modules/spo_state/src/state.rs @@ -10,7 +10,7 @@ use acropolis_common::{ params::TECHNICAL_PARAMETER_POOL_RETIRE_MAX_EPOCH, queries::governance::VoteRecord, stake_addresses::StakeAddressMap, - BlockInfo, KeyHash, PoolId, PoolMetadata, PoolRegistration, PoolRetirement, PoolUpdateEvent, + BlockInfo, PoolId, PoolMetadata, PoolRegistration, PoolRetirement, PoolUpdateEvent, Relay, StakeAddress, TxCertificate, TxHash, TxIdentifier, Voter, VotingProcedures, }; use anyhow::Result; @@ -175,7 +175,7 @@ impl State { } /// Get Pool Delegators - pub fn get_pool_delegators(&self, pool_operator: &PoolId) -> Option> { + pub fn get_pool_delegators(&self, pool_operator: &PoolId) -> Option> { let stake_addresses = self.stake_addresses.as_ref()?; let historical_spos = self.historical_spos.as_ref()?; diff --git a/modules/tx_unpacker/src/tx_unpacker.rs b/modules/tx_unpacker/src/tx_unpacker.rs index c42afcc3..92a9cfc2 100644 --- a/modules/tx_unpacker/src/tx_unpacker.rs +++ b/modules/tx_unpacker/src/tx_unpacker.rs @@ -51,7 +51,9 @@ impl TxUnpacker { for (hash, vote) in proposals.iter() { match map(vote) { - Ok(upd) => update.proposals.push((hash.to_vec(), upd)), + Ok(upd) => { + update.proposals.push((GenesisKeyhash::try_from(hash.as_slice()).unwrap(), upd)) + } Err(e) => error!("Cannot convert alonzo protocol param update {vote:?}: {e}"), } } From d2798aa5bb465cc9748adf2ce250266bdea0d5f5 Mon Sep 17 00:00:00 2001 From: Matthew Hounslow Date: Fri, 31 Oct 2025 11:12:40 -0700 Subject: [PATCH 2/8] Replace `AddrKeyhash` with `StakeAddress` in spo_distribution_store.rs and use to_/from_binary to encode the entire stake address (network + hash) so we can retrieve the stake address out of it. --- common/src/address.rs | 1 + .../src/spo_distribution_store.rs | 90 ++++++++++--------- .../rest_blockfrost/src/handlers/epochs.rs | 64 +------------ modules/rest_blockfrost/src/types.rs | 1 - 4 files changed, 48 insertions(+), 108 deletions(-) diff --git a/common/src/address.rs b/common/src/address.rs index 1dd20723..46aed140 100644 --- a/common/src/address.rs +++ b/common/src/address.rs @@ -274,6 +274,7 @@ impl ShelleyAddress { ShelleyAddressPaymentPart::ScriptHash(data) => (data, 1), }; + // TODO: MH - make delegation hash an Option> type let (delegation_hash, delegation_bits): (Vec, u8) = match &self.delegation { ShelleyAddressDelegationPart::None => (Vec::new(), 3), ShelleyAddressDelegationPart::StakeKeyHash(hash) => (hash.to_vec(), 0), diff --git a/modules/accounts_state/src/spo_distribution_store.rs b/modules/accounts_state/src/spo_distribution_store.rs index 6305fbd1..10f4082c 100644 --- a/modules/accounts_state/src/spo_distribution_store.rs +++ b/modules/accounts_state/src/spo_distribution_store.rs @@ -1,26 +1,22 @@ -use std::collections::HashMap; -use std::ops::Add; -use acropolis_common::types::AddrKeyhash; use acropolis_common::{PoolId, StakeAddress}; use anyhow::Result; use fjall::{Config, Keyspace, PartitionCreateOptions}; +use std::collections::HashMap; const POOL_ID_LENGTH: usize = 28; -const STAKE_KEY_LEN: usize = 28; +const STAKE_ADDRESS_LEN: usize = 29; // 1 byte header + 28 bytes hash const EPOCH_LEN: usize = 8; -const TOTAL_KEY_LEN: usize = EPOCH_LEN + POOL_ID_LENGTH + STAKE_KEY_LEN; +const TOTAL_KEY_LEN: usize = EPOCH_LEN + POOL_ID_LENGTH + STAKE_ADDRESS_LEN; -// Batch size balances commit overhead vs memory usage -// ~720KB per batch (72 bytes × 10,000) -// ~130 commits for typical epoch (~1.3M delegations) const BATCH_SIZE: usize = 10_000; -fn encode_key(epoch: u64, pool_id: &PoolId, stake_key: &AddrKeyhash) -> Vec { +fn encode_key(epoch: u64, pool_id: &PoolId, stake_address: &StakeAddress) -> Result> { let mut key = Vec::with_capacity(TOTAL_KEY_LEN); key.extend_from_slice(&epoch.to_be_bytes()); key.extend_from_slice(pool_id.as_ref()); - key.extend_from_slice(stake_key.as_ref()); - key + key.extend_from_slice(&stake_address.to_binary()); + + Ok(key) } fn encode_epoch_pool_prefix(epoch: u64, pool_id: &PoolId) -> Vec { @@ -30,29 +26,32 @@ fn encode_epoch_pool_prefix(epoch: u64, pool_id: &PoolId) -> Vec { prefix } -fn decode_key(key: &[u8]) -> Result<(u64, PoolId, AddrKeyhash)> { +fn decode_key(key: &[u8]) -> Result<(u64, PoolId, StakeAddress)> { + if key.len() != TOTAL_KEY_LEN { + anyhow::bail!( + "Invalid key length: expected {}, got {}", + TOTAL_KEY_LEN, + key.len() + ); + } + let epoch = u64::from_be_bytes(key[..EPOCH_LEN].try_into()?); let pool_id = key[EPOCH_LEN..EPOCH_LEN + POOL_ID_LENGTH].try_into()?; - let stake_key = key[EPOCH_LEN + POOL_ID_LENGTH..].try_into()?; - Ok((epoch, pool_id, stake_key)) + + let stake_address_bytes = &key[EPOCH_LEN + POOL_ID_LENGTH..]; + let stake_address = StakeAddress::from_binary(stake_address_bytes)?; + + Ok((epoch, pool_id, stake_address)) } -/// Encode epoch completion marker key fn encode_epoch_marker(epoch: u64) -> Vec { epoch.to_be_bytes().to_vec() } pub struct SPDDStore { keyspace: Keyspace, - /// Partition for all SPDD data - /// Key format: epoch(8 bytes) + pool_id + stake_key - /// Value: amount(8 bytes) spdd: fjall::PartitionHandle, - /// Partition for epoch completion markers - /// Key format: epoch(8 bytes) - /// Value: "complete" epoch_markers: fjall::PartitionHandle, - /// Maximum number of epochs to retain (None = unlimited) retention_epochs: u64, } @@ -101,8 +100,8 @@ impl SPDDStore { pub fn store_spdd( &mut self, epoch: u64, - spdd_state: HashMap>, - ) -> fjall::Result<()> { + spdd_state: HashMap>, + ) -> Result<()> { if self.is_epoch_complete(epoch)? { return Ok(()); } @@ -111,8 +110,8 @@ impl SPDDStore { let mut batch = self.keyspace.batch(); let mut count = 0; for (pool_id, delegations) in spdd_state { - for (stake_key, amount) in delegations { - let key = encode_key(epoch, &pool_id, &stake_key); + for (stake_address, amount) in delegations { + let key = encode_key(epoch, &pool_id, &stake_address)?; let value = amount.to_be_bytes(); batch.insert(&self.spdd, key, value); @@ -128,7 +127,6 @@ impl SPDDStore { batch.commit()?; } - // Mark epoch as complete (single key operation) let marker_key = encode_epoch_marker(epoch); self.epoch_markers.insert(marker_key, b"complete")?; @@ -141,7 +139,6 @@ impl SPDDStore { } pub fn remove_epoch_data(&self, epoch: u64) -> fjall::Result { - // Remove epoch marker first - if process fails midway, epoch will be marked incomplete let marker_key = encode_epoch_marker(epoch); self.epoch_markers.remove(marker_key)?; @@ -183,7 +180,7 @@ impl SPDDStore { Ok(deleted_epochs) } - pub fn query_by_epoch(&self, epoch: u64) -> Result> { + pub fn query_by_epoch(&self, epoch: u64) -> Result> { if !self.is_epoch_complete(epoch)? { return Err(anyhow::anyhow!("Epoch SPDD Data is not complete")); } @@ -192,9 +189,9 @@ impl SPDDStore { let mut result = Vec::new(); for item in self.spdd.prefix(prefix) { let (key, value) = item?; - let (_, pool_id, stake_key) = decode_key(&key)?; + let (_, pool_id, stake_address) = decode_key(&key)?; let amount = u64::from_be_bytes(value.as_ref().try_into()?); - result.push((pool_id, stake_key, amount)); + result.push((pool_id, stake_address, amount)); } Ok(result) } @@ -203,7 +200,7 @@ impl SPDDStore { &self, epoch: u64, pool_id: &PoolId, - ) -> Result> { + ) -> Result> { if !self.is_epoch_complete(epoch)? { return Err(anyhow::anyhow!("Epoch SPDD Data is not complete")); } @@ -212,9 +209,9 @@ impl SPDDStore { let mut result = Vec::new(); for item in self.spdd.prefix(prefix) { let (key, value) = item?; - let (_, _, stake_key) = decode_key(&key)?; + let (_, _, stake_address) = decode_key(&key)?; let amount = u64::from_be_bytes(value.as_ref().try_into()?); - result.push((stake_key, amount)); + result.push((stake_address, amount)); } Ok(result) } @@ -224,8 +221,7 @@ impl SPDDStore { mod tests { use super::*; use acropolis_common::crypto::keyhash_224; - use acropolis_common::types::AddrKeyhash; - use acropolis_common::PoolId; + use acropolis_common::{NetworkId, PoolId, StakeCredential}; const DB_PATH: &str = "spdd_db"; @@ -233,23 +229,29 @@ mod tests { keyhash_224(&[byte]).into() } - fn test_addr_hash(byte: u8) -> AddrKeyhash { - keyhash_224(&[byte]) + fn test_stake_address(byte: u8, network: NetworkId) -> StakeAddress { + StakeAddress::new(StakeCredential::AddrKeyHash(keyhash_224(&[byte])), network) } #[test] fn test_store_spdd_state() { let mut spdd_store = SPDDStore::new(std::path::Path::new(DB_PATH), 1).expect("Failed to create SPDD store"); - let mut spdd_state: HashMap> = HashMap::new(); + let mut spdd_state: HashMap> = HashMap::new(); spdd_state.insert( test_pool_hash(0x01), - vec![(test_addr_hash(0x10), 100), (test_addr_hash(0x11), 150)], + vec![ + (test_stake_address(0x10, NetworkId::Mainnet), 100), + (test_stake_address(0x11, NetworkId::Mainnet), 150), + ], ); spdd_state.insert( test_pool_hash(0x02), - vec![(test_addr_hash(0x20), 200), (test_addr_hash(0x21), 250)], + vec![ + (test_stake_address(0x20, NetworkId::Testnet), 200), + (test_stake_address(0x21, NetworkId::Testnet), 250), + ], ); assert!(spdd_store.store_spdd(1, spdd_state).is_ok()); @@ -268,13 +270,13 @@ mod tests { .expect("Failed to create SPDD store"); for epoch in 1..=3 { - let mut spdd_state: HashMap> = HashMap::new(); + let mut spdd_state: HashMap> = HashMap::new(); spdd_state.insert( test_pool_hash(epoch as u8), vec![ - (test_addr_hash(0x10), epoch * 100), - (test_addr_hash(0x11), epoch * 150), + (test_stake_address(0x10, NetworkId::Mainnet), epoch * 100), + (test_stake_address(0x11, NetworkId::Mainnet), epoch * 150), ], ); spdd_store.store_spdd(epoch, spdd_state).expect("Failed to store SPDD state"); diff --git a/modules/rest_blockfrost/src/handlers/epochs.rs b/modules/rest_blockfrost/src/handlers/epochs.rs index 273be07e..35767841 100644 --- a/modules/rest_blockfrost/src/handlers/epochs.rs +++ b/modules/rest_blockfrost/src/handlers/epochs.rs @@ -15,7 +15,7 @@ use acropolis_common::{ spdd::{SPDDStateQuery, SPDDStateQueryResponse}, utils::query_state, }, - NetworkId, PoolId, + PoolId, }; use anyhow::{anyhow, Result}; use caryatid_sdk::Context; @@ -428,37 +428,6 @@ pub async fn handle_epoch_total_stakes_blockfrost( return Ok(RESTResponse::with_text(404, "Epoch not found")); } - // Query current network from parameters-state - let current_network_msg = Arc::new(Message::StateQuery(StateQuery::Parameters( - ParametersStateQuery::GetNetworkName, - ))); - let current_network = query_state( - &context, - &handlers_config.parameters_query_topic, - current_network_msg, - |message| match message { - Message::StateQueryResponse(StateQueryResponse::Parameters( - ParametersStateQueryResponse::NetworkName(network), - )) => Ok(network), - _ => Err(anyhow::anyhow!( - "Unexpected message type while retrieving current network" - )), - }, - ) - .await?; - - let network = match current_network.as_str() { - "mainnet" => NetworkId::Mainnet, - "testnet" => NetworkId::Testnet, - unknown => { - return Ok(RESTResponse::with_text( - 500, - format!("Internal server error while retrieving current network: {unknown}") - .as_str(), - )) - } - }; - // Query SPDD by epoch from accounts-state let msg = Arc::new(Message::StateQuery(StateQuery::Accounts( AccountsStateQuery::GetSPDDByEpoch { @@ -558,37 +527,6 @@ pub async fn handle_epoch_pool_stakes_blockfrost( return Ok(RESTResponse::with_text(404, "Epoch not found")); } - // Query current network from parameters-state - let current_network_msg = Arc::new(Message::StateQuery(StateQuery::Parameters( - ParametersStateQuery::GetNetworkName, - ))); - let current_network = query_state( - &context, - &handlers_config.parameters_query_topic, - current_network_msg, - |message| match message { - Message::StateQueryResponse(StateQueryResponse::Parameters( - ParametersStateQueryResponse::NetworkName(network), - )) => Ok(network), - _ => Err(anyhow::anyhow!( - "Unexpected message type while retrieving current network" - )), - }, - ) - .await?; - - let network = match current_network.as_str() { - "mainnet" => NetworkId::Mainnet, - "testnet" => NetworkId::Testnet, - unknown => { - return Ok(RESTResponse::with_text( - 500, - format!("Internal server error while retrieving current network: {unknown}") - .as_str(), - )) - } - }; - // Query SPDD by epoch and pool from accounts-state let msg = Arc::new(Message::StateQuery(StateQuery::Accounts( AccountsStateQuery::GetSPDDByEpochAndPool { diff --git a/modules/rest_blockfrost/src/types.rs b/modules/rest_blockfrost/src/types.rs index 8a704ea9..f798c6c4 100644 --- a/modules/rest_blockfrost/src/types.rs +++ b/modules/rest_blockfrost/src/types.rs @@ -8,7 +8,6 @@ use acropolis_common::{ AssetAddressEntry, AssetMetadataStandard, AssetMintRecord, KeyHash, PolicyAsset, PoolEpochState, PoolId, PoolUpdateAction, Relay, TxHash, Vote, VrfKeyHash, }; -use acropolis_common::{messages::EpochActivityMessage, protocol_params::{Nonce, NonceVariant, ProtocolParams}, queries::blocks::BlockInfo, queries::governance::DRepActionUpdate, rest_helper::ToCheckedF64, serialization::{DisplayFromBech32, PoolPrefix}, AssetAddressEntry, AssetMetadataStandard, AssetMintRecord, KeyHash, PolicyAsset, PoolEpochState, PoolId, PoolUpdateAction, Relay, TxHash, Vote, VrfKeyHash}; use anyhow::Result; use num_traits::ToPrimitive; use rust_decimal::Decimal; From 61a6d4800795f310bd7ee78f3fe9825203c5861e Mon Sep 17 00:00:00 2001 From: Matthew Hounslow Date: Fri, 31 Oct 2025 11:12:58 -0700 Subject: [PATCH 3/8] Replace `AddrKeyhash` with `StakeAddress` in spo_distribution_store.rs and use to_/from_binary to encode the entire stake address (network + hash) so we can retrieve the stake address out of it. --- common/src/stake_addresses.rs | 10 ++-------- modules/rest_blockfrost/src/handlers/governance.rs | 12 ++++++++---- modules/spo_state/src/state.rs | 4 ++-- 3 files changed, 12 insertions(+), 14 deletions(-) diff --git a/common/src/stake_addresses.rs b/common/src/stake_addresses.rs index 46b0003b..288e8ac9 100644 --- a/common/src/stake_addresses.rs +++ b/common/src/stake_addresses.rs @@ -1467,14 +1467,8 @@ mod tests { let map = stake_addresses.get_drep_delegations_map(&addresses).unwrap(); assert_eq!(map.len(), 3); - assert_eq!( - map.get(&addr1).unwrap(), - &Some(DRepChoice::Abstain) - ); - assert_eq!( - map.get(&addr2).unwrap(), - &Some(DRepChoice::Key(DREP_HASH)) - ); + assert_eq!(map.get(&addr1).unwrap(), &Some(DRepChoice::Abstain)); + assert_eq!(map.get(&addr2).unwrap(), &Some(DRepChoice::Key(DREP_HASH))); assert_eq!(map.get(&addr3).unwrap(), &None); } diff --git a/modules/rest_blockfrost/src/handlers/governance.rs b/modules/rest_blockfrost/src/handlers/governance.rs index c9ecf472..2c56b173 100644 --- a/modules/rest_blockfrost/src/handlers/governance.rs +++ b/modules/rest_blockfrost/src/handlers/governance.rs @@ -4,10 +4,14 @@ use crate::types::{ DRepInfoREST, DRepMetadataREST, DRepUpdateREST, DRepVoteREST, DRepsListREST, ProposalVoteREST, VoterRoleREST, }; -use acropolis_common::{messages::{Message, RESTResponse, StateQuery, StateQueryResponse}, queries::{ - accounts::{AccountsStateQuery, AccountsStateQueryResponse}, - governance::{GovernanceStateQuery, GovernanceStateQueryResponse}, -}, Credential, GovActionId, StakeAddress, TxHash, Voter}; +use acropolis_common::{ + messages::{Message, RESTResponse, StateQuery, StateQueryResponse}, + queries::{ + accounts::{AccountsStateQuery, AccountsStateQueryResponse}, + governance::{GovernanceStateQuery, GovernanceStateQueryResponse}, + }, + Credential, GovActionId, StakeAddress, TxHash, Voter, +}; use anyhow::{anyhow, Result}; use caryatid_sdk::Context; use reqwest::Client; diff --git a/modules/spo_state/src/state.rs b/modules/spo_state/src/state.rs index 50b0d43f..74fd0f00 100644 --- a/modules/spo_state/src/state.rs +++ b/modules/spo_state/src/state.rs @@ -10,8 +10,8 @@ use acropolis_common::{ params::TECHNICAL_PARAMETER_POOL_RETIRE_MAX_EPOCH, queries::governance::VoteRecord, stake_addresses::StakeAddressMap, - BlockInfo, PoolId, PoolMetadata, PoolRegistration, PoolRetirement, PoolUpdateEvent, - Relay, StakeAddress, TxCertificate, TxHash, TxIdentifier, Voter, VotingProcedures, + BlockInfo, PoolId, PoolMetadata, PoolRegistration, PoolRetirement, PoolUpdateEvent, Relay, + StakeAddress, TxCertificate, TxHash, TxIdentifier, Voter, VotingProcedures, }; use anyhow::Result; use imbl::HashMap; From 981ff154fdeeb74a6aca82938c80554f2094183d Mon Sep 17 00:00:00 2001 From: Matthew Hounslow Date: Fri, 31 Oct 2025 11:20:22 -0700 Subject: [PATCH 4/8] Replace `KeyHash` with `StakeAddress` in `get_drep_delegators` --- common/src/queries/accounts.rs | 6 ++---- common/src/stake_addresses.rs | 10 +++++----- modules/accounts_state/src/state.rs | 2 +- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/common/src/queries/accounts.rs b/common/src/queries/accounts.rs index fe662d12..d6abf232 100644 --- a/common/src/queries/accounts.rs +++ b/common/src/queries/accounts.rs @@ -1,8 +1,6 @@ use std::collections::HashMap; -use crate::{ - DRepChoice, KeyHash, PoolId, PoolLiveStakeInfo, RewardType, StakeAddress, TxIdentifier, -}; +use crate::{DRepChoice, PoolId, PoolLiveStakeInfo, RewardType, StakeAddress, TxIdentifier}; pub const DEFAULT_ACCOUNTS_QUERY_TOPIC: (&str, &str) = ("accounts-state-query-topic", "cardano.query.accounts"); @@ -191,5 +189,5 @@ pub struct PoolDelegators { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct DrepDelegators { - pub delegators: Vec<(KeyHash, u64)>, + pub delegators: Vec<(StakeAddress, u64)>, } diff --git a/common/src/stake_addresses.rs b/common/src/stake_addresses.rs index 288e8ac9..9c200f82 100644 --- a/common/src/stake_addresses.rs +++ b/common/src/stake_addresses.rs @@ -1,6 +1,6 @@ use crate::{ math::update_value_with_delta, messages::DRepDelegationDistribution, DRepChoice, - DRepCredential, DelegatedStake, KeyHash, Lovelace, PoolId, PoolLiveStakeInfo, StakeAddress, + DRepCredential, DelegatedStake, Lovelace, PoolId, PoolLiveStakeInfo, StakeAddress, StakeAddressDelta, Withdrawal, }; use anyhow::Result; @@ -184,15 +184,15 @@ impl StakeAddressMap { } /// Get DRep Delegators with live_stakes - pub fn get_drep_delegators(&self, drep: &DRepChoice) -> Vec<(KeyHash, u64)> { + pub fn get_drep_delegators(&self, drep: &DRepChoice) -> Vec<(StakeAddress, u64)> { // Find stake addresses delegated to drep - let delegators: Vec<(KeyHash, u64)> = self + let delegators: Vec<(StakeAddress, u64)> = self .inner .iter() .filter_map(|(stake_address, sas)| match sas.delegated_drep.as_ref() { Some(delegated_drep) => { if delegated_drep.eq(drep) { - Some((*stake_address.get_hash(), sas.utxo_value)) + Some((stake_address.clone(), sas.utxo_value)) } else { None } @@ -549,7 +549,7 @@ impl StakeAddressMap { mod tests { use super::*; use crate::hash::Hash; - use crate::{NetworkId, StakeAddress, StakeCredential}; + use crate::{KeyHash, NetworkId, StakeAddress, StakeCredential}; const STAKE_KEY_HASH: KeyHash = KeyHash::new([0x99; 28]); const STAKE_KEY_HASH_2: KeyHash = KeyHash::new([0xaa; 28]); diff --git a/modules/accounts_state/src/state.rs b/modules/accounts_state/src/state.rs index d15b79a0..58a09c49 100644 --- a/modules/accounts_state/src/state.rs +++ b/modules/accounts_state/src/state.rs @@ -175,7 +175,7 @@ impl State { } /// Get Drep Delegators with live_stakes - pub fn get_drep_delegators(&self, drep: &DRepChoice) -> Vec<(KeyHash, u64)> { + pub fn get_drep_delegators(&self, drep: &DRepChoice) -> Vec<(StakeAddress, u64)> { self.stake_addresses.lock().unwrap().get_drep_delegators(drep) } From 4caab300ffa30aa724f75b661ba74dfc2f20476c Mon Sep 17 00:00:00 2001 From: Matthew Hounslow Date: Fri, 31 Oct 2025 11:30:48 -0700 Subject: [PATCH 5/8] Refactor: Simplify `encode_key` in `spo_distribution_store.rs` by removing `Result` return type and add back documentation for SPDDStore methods and fields --- .../src/spo_distribution_store.rs | 21 +++++++++++++++---- modules/accounts_state/src/state.rs | 11 ++-------- 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/modules/accounts_state/src/spo_distribution_store.rs b/modules/accounts_state/src/spo_distribution_store.rs index 10f4082c..dc9a97cc 100644 --- a/modules/accounts_state/src/spo_distribution_store.rs +++ b/modules/accounts_state/src/spo_distribution_store.rs @@ -8,15 +8,18 @@ const STAKE_ADDRESS_LEN: usize = 29; // 1 byte header + 28 bytes hash const EPOCH_LEN: usize = 8; const TOTAL_KEY_LEN: usize = EPOCH_LEN + POOL_ID_LENGTH + STAKE_ADDRESS_LEN; +// Batch size balances commit overhead vs memory usage +// ~720KB per batch (72 bytes × 10,000) +// ~130 commits for typical epoch (~1.3M delegations) const BATCH_SIZE: usize = 10_000; -fn encode_key(epoch: u64, pool_id: &PoolId, stake_address: &StakeAddress) -> Result> { +fn encode_key(epoch: u64, pool_id: &PoolId, stake_address: &StakeAddress) -> Vec { let mut key = Vec::with_capacity(TOTAL_KEY_LEN); key.extend_from_slice(&epoch.to_be_bytes()); key.extend_from_slice(pool_id.as_ref()); key.extend_from_slice(&stake_address.to_binary()); - Ok(key) + key } fn encode_epoch_pool_prefix(epoch: u64, pool_id: &PoolId) -> Vec { @@ -44,14 +47,22 @@ fn decode_key(key: &[u8]) -> Result<(u64, PoolId, StakeAddress)> { Ok((epoch, pool_id, stake_address)) } +/// Encode epoch completion marker key fn encode_epoch_marker(epoch: u64) -> Vec { epoch.to_be_bytes().to_vec() } pub struct SPDDStore { keyspace: Keyspace, + /// Partition for all SPDD data + /// Key format: epoch(8 bytes) + pool_id + stake_key + /// Value: amount(8 bytes) spdd: fjall::PartitionHandle, + /// Partition for epoch completion markers + /// Key format: epoch(8 bytes) + /// Value: "complete" epoch_markers: fjall::PartitionHandle, + /// Maximum number of epochs to retain (None = unlimited) retention_epochs: u64, } @@ -101,7 +112,7 @@ impl SPDDStore { &mut self, epoch: u64, spdd_state: HashMap>, - ) -> Result<()> { + ) -> fjall::Result<()> { if self.is_epoch_complete(epoch)? { return Ok(()); } @@ -111,7 +122,7 @@ impl SPDDStore { let mut count = 0; for (pool_id, delegations) in spdd_state { for (stake_address, amount) in delegations { - let key = encode_key(epoch, &pool_id, &stake_address)?; + let key = encode_key(epoch, &pool_id, &stake_address); let value = amount.to_be_bytes(); batch.insert(&self.spdd, key, value); @@ -127,6 +138,7 @@ impl SPDDStore { batch.commit()?; } + // Mark epoch as complete (single key operation) let marker_key = encode_epoch_marker(epoch); self.epoch_markers.insert(marker_key, b"complete")?; @@ -139,6 +151,7 @@ impl SPDDStore { } pub fn remove_epoch_data(&self, epoch: u64) -> fjall::Result { + // Remove epoch marker first - if process fails midway, epoch will be marked incomplete let marker_key = encode_epoch_marker(epoch); self.epoch_markers.remove(marker_key)?; diff --git a/modules/accounts_state/src/state.rs b/modules/accounts_state/src/state.rs index 58a09c49..ea107360 100644 --- a/modules/accounts_state/src/state.rs +++ b/modules/accounts_state/src/state.rs @@ -15,7 +15,7 @@ use acropolis_common::{ protocol_params::ProtocolParams, stake_addresses::{StakeAddressMap, StakeAddressState}, BlockInfo, DRepChoice, DRepCredential, DelegatedStake, InstantaneousRewardSource, - InstantaneousRewardTarget, KeyHash, Lovelace, MoveInstantaneousReward, PoolId, + InstantaneousRewardTarget, Lovelace, MoveInstantaneousReward, PoolId, PoolLiveStakeInfo, PoolRegistration, Pot, SPORewards, StakeAddress, StakeRewardDelta, TxCertificate, }; @@ -962,14 +962,7 @@ impl State { mod tests { use super::*; use acropolis_common::crypto::{keyhash_224, keyhash_256}; - use acropolis_common::{ - protocol_params::ConwayParams, rational_number::RationalNumber, Anchor, Committee, - Constitution, CostModel, DRepVotingThresholds, NetworkId, PoolVotingThresholds, Pot, - PotDelta, Ratio, Registration, StakeAddress, StakeAddressDelta, StakeAndVoteDelegation, - StakeCredential, StakeRegistrationAndStakeAndVoteDelegation, - StakeRegistrationAndVoteDelegation, TxCertificateWithPos, TxIdentifier, VoteDelegation, - VrfKeyHash, Withdrawal, - }; + use acropolis_common::{protocol_params::ConwayParams, rational_number::RationalNumber, Anchor, Committee, Constitution, CostModel, DRepVotingThresholds, KeyHash, NetworkId, PoolVotingThresholds, Pot, PotDelta, Ratio, Registration, StakeAddress, StakeAddressDelta, StakeAndVoteDelegation, StakeCredential, StakeRegistrationAndStakeAndVoteDelegation, StakeRegistrationAndVoteDelegation, TxCertificateWithPos, TxIdentifier, VoteDelegation, VrfKeyHash, Withdrawal}; // Helper to create a StakeAddress from a byte slice fn create_address(hash: &[u8]) -> StakeAddress { From aeb3a09f64a339be87d3cf5e76f6a102325f12af Mon Sep 17 00:00:00 2001 From: Matthew Hounslow Date: Fri, 31 Oct 2025 11:48:27 -0700 Subject: [PATCH 6/8] Refactor: Simplify Bech32 encoding for `StakeAddress` in governance and epochs handlers --- .../src/spo_distribution_store.rs | 32 +++++---- modules/accounts_state/src/state.rs | 14 ++-- .../rest_blockfrost/src/handlers/epochs.rs | 10 ++- .../src/handlers/governance.rs | 66 +++++++------------ 4 files changed, 62 insertions(+), 60 deletions(-) diff --git a/modules/accounts_state/src/spo_distribution_store.rs b/modules/accounts_state/src/spo_distribution_store.rs index dc9a97cc..9040dc4e 100644 --- a/modules/accounts_state/src/spo_distribution_store.rs +++ b/modules/accounts_state/src/spo_distribution_store.rs @@ -1,5 +1,5 @@ use acropolis_common::{PoolId, StakeAddress}; -use anyhow::Result; +use anyhow::{anyhow, Context, Result}; use fjall::{Config, Keyspace, PartitionCreateOptions}; use std::collections::HashMap; @@ -30,19 +30,27 @@ fn encode_epoch_pool_prefix(epoch: u64, pool_id: &PoolId) -> Vec { } fn decode_key(key: &[u8]) -> Result<(u64, PoolId, StakeAddress)> { - if key.len() != TOTAL_KEY_LEN { - anyhow::bail!( - "Invalid key length: expected {}, got {}", - TOTAL_KEY_LEN, - key.len() - ); - } - - let epoch = u64::from_be_bytes(key[..EPOCH_LEN].try_into()?); - let pool_id = key[EPOCH_LEN..EPOCH_LEN + POOL_ID_LENGTH].try_into()?; + let epoch_bytes: [u8; EPOCH_LEN] = key[..EPOCH_LEN] + .try_into() + .map_err(|_| anyhow!("Failed to extract epoch bytes (offset 0-{})", EPOCH_LEN))?; + let epoch = u64::from_be_bytes(epoch_bytes); + + let pool_id: PoolId = key[EPOCH_LEN..EPOCH_LEN + POOL_ID_LENGTH].try_into().map_err(|_| { + anyhow!( + "Failed to extract pool ID bytes (offset {}-{})", + EPOCH_LEN, + EPOCH_LEN + POOL_ID_LENGTH + ) + })?; let stake_address_bytes = &key[EPOCH_LEN + POOL_ID_LENGTH..]; - let stake_address = StakeAddress::from_binary(stake_address_bytes)?; + let stake_address = StakeAddress::from_binary(stake_address_bytes).with_context(|| { + format!( + "Failed to decode stake address from {} bytes at offset {}", + stake_address_bytes.len(), + EPOCH_LEN + POOL_ID_LENGTH + ) + })?; Ok((epoch, pool_id, stake_address)) } diff --git a/modules/accounts_state/src/state.rs b/modules/accounts_state/src/state.rs index ea107360..dd823267 100644 --- a/modules/accounts_state/src/state.rs +++ b/modules/accounts_state/src/state.rs @@ -15,9 +15,8 @@ use acropolis_common::{ protocol_params::ProtocolParams, stake_addresses::{StakeAddressMap, StakeAddressState}, BlockInfo, DRepChoice, DRepCredential, DelegatedStake, InstantaneousRewardSource, - InstantaneousRewardTarget, Lovelace, MoveInstantaneousReward, PoolId, - PoolLiveStakeInfo, PoolRegistration, Pot, SPORewards, StakeAddress, StakeRewardDelta, - TxCertificate, + InstantaneousRewardTarget, Lovelace, MoveInstantaneousReward, PoolId, PoolLiveStakeInfo, + PoolRegistration, Pot, SPORewards, StakeAddress, StakeRewardDelta, TxCertificate, }; use anyhow::Result; use imbl::OrdMap; @@ -962,7 +961,14 @@ impl State { mod tests { use super::*; use acropolis_common::crypto::{keyhash_224, keyhash_256}; - use acropolis_common::{protocol_params::ConwayParams, rational_number::RationalNumber, Anchor, Committee, Constitution, CostModel, DRepVotingThresholds, KeyHash, NetworkId, PoolVotingThresholds, Pot, PotDelta, Ratio, Registration, StakeAddress, StakeAddressDelta, StakeAndVoteDelegation, StakeCredential, StakeRegistrationAndStakeAndVoteDelegation, StakeRegistrationAndVoteDelegation, TxCertificateWithPos, TxIdentifier, VoteDelegation, VrfKeyHash, Withdrawal}; + use acropolis_common::{ + protocol_params::ConwayParams, rational_number::RationalNumber, Anchor, Committee, + Constitution, CostModel, DRepVotingThresholds, KeyHash, NetworkId, PoolVotingThresholds, + Pot, PotDelta, Ratio, Registration, StakeAddress, StakeAddressDelta, + StakeAndVoteDelegation, StakeCredential, StakeRegistrationAndStakeAndVoteDelegation, + StakeRegistrationAndVoteDelegation, TxCertificateWithPos, TxIdentifier, VoteDelegation, + VrfKeyHash, Withdrawal, + }; // Helper to create a StakeAddress from a byte slice fn create_address(hash: &[u8]) -> StakeAddress { diff --git a/modules/rest_blockfrost/src/handlers/epochs.rs b/modules/rest_blockfrost/src/handlers/epochs.rs index 35767841..74f57bdc 100644 --- a/modules/rest_blockfrost/src/handlers/epochs.rs +++ b/modules/rest_blockfrost/src/handlers/epochs.rs @@ -456,9 +456,12 @@ pub async fn handle_epoch_total_stakes_blockfrost( let spdd_response = spdd .into_iter() .map(|(pool_id, stake_address, amount)| { + let bech32 = stake_address + .to_string() + .map_err(|e| anyhow::anyhow!("Failed to convert stake address to string {}", e))?; Ok(SPDDByEpochItemRest { pool_id, - stake_address: stake_address.to_string().unwrap(), + stake_address: bech32, amount, }) }) @@ -556,8 +559,11 @@ pub async fn handle_epoch_pool_stakes_blockfrost( let spdd_response = spdd .into_iter() .map(|(stake_address, amount)| { + let bech32 = stake_address + .to_string() + .map_err(|e| anyhow::anyhow!("Failed to convert stake address to string {}", e))?; Ok(SPDDByEpochAndPoolItemRest { - stake_address: stake_address.to_string().unwrap(), + stake_address: bech32, amount, }) }) diff --git a/modules/rest_blockfrost/src/handlers/governance.rs b/modules/rest_blockfrost/src/handlers/governance.rs index 2c56b173..20440e46 100644 --- a/modules/rest_blockfrost/src/handlers/governance.rs +++ b/modules/rest_blockfrost/src/handlers/governance.rs @@ -10,13 +10,13 @@ use acropolis_common::{ accounts::{AccountsStateQuery, AccountsStateQueryResponse}, governance::{GovernanceStateQuery, GovernanceStateQueryResponse}, }, - Credential, GovActionId, StakeAddress, TxHash, Voter, + Credential, GovActionId, TxHash, Voter, }; use anyhow::{anyhow, Result}; use caryatid_sdk::Context; use reqwest::Client; use serde_json::Value; -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; pub async fn handle_dreps_list_blockfrost( context: Arc>, @@ -191,27 +191,6 @@ pub async fn handle_drep_delegators_blockfrost( Message::StateQueryResponse(StateQueryResponse::Governance( GovernanceStateQueryResponse::DRepDelegators(delegators), )) => { - let stake_address_to_bech32: HashMap = match delegators - .addresses - .iter() - .map(|addr| { - let credential = addr.get_credential(); - let bech32 = credential - .to_stake_bech32() - .map_err(|_| anyhow!("Failed to encode stake address"))?; - Ok((addr.clone(), bech32)) - }) - .collect::>>() - { - Ok(map) => map, - Err(e) => { - return Ok(RESTResponse::with_text( - 500, - &format!("Internal error: {e}"), - )); - } - }; - let msg = Arc::new(Message::StateQuery(StateQuery::Accounts( AccountsStateQuery::GetAccountsUtxoValuesMap { stake_addresses: delegators.addresses.clone(), @@ -226,27 +205,31 @@ pub async fn handle_drep_delegators_blockfrost( Message::StateQueryResponse(StateQueryResponse::Accounts( AccountsStateQueryResponse::AccountsUtxoValuesMap(map), )) => { - let mut response = Vec::new(); - - for (stake_address, amount) in map { - let Some(bech32) = stake_address_to_bech32.get(&stake_address) else { - return Ok(RESTResponse::with_text( + let response: Result> = map + .into_iter() + .map(|(stake_address, amount)| { + let bech32 = stake_address + .to_string() + .map_err(|e| anyhow!("Failed to encode stake address {}", e))?; + + Ok(serde_json::json!({ + "address": bech32, + "amount": amount.to_string(), + })) + }) + .collect(); + + match response { + Ok(response) => match serde_json::to_string_pretty(&response) { + Ok(json) => Ok(RESTResponse::with_json(200, &json)), + Err(e) => Ok(RESTResponse::with_text( 500, - "Internal error: missing Bech32 for stake key", - )); - }; - - response.push(serde_json::json!({ - "address": bech32, - "amount": amount.to_string(), - })); - } - - match serde_json::to_string_pretty(&response) { - Ok(json) => Ok(RESTResponse::with_json(200, &json)), + &format!("Failed to serialize DRep delegators: {e}"), + )), + }, Err(e) => Ok(RESTResponse::with_text( 500, - &format!("Failed to serialize DRep delegators: {e}"), + &format!("Internal error: {e}"), )), } } @@ -279,7 +262,6 @@ pub async fn handle_drep_delegators_blockfrost( _ => Ok(RESTResponse::with_text(500, "Unexpected message type")), } } - pub async fn handle_drep_metadata_blockfrost( context: Arc>, params: Vec, From 4037f8551bc2a87b5f477f7403425fe9a9e4306f Mon Sep 17 00:00:00 2001 From: Matthew Hounslow Date: Fri, 31 Oct 2025 12:56:14 -0700 Subject: [PATCH 7/8] Add `tempfile` for temporary directory management and add SPDDStore tests --- Cargo.lock | 5 +- modules/accounts_state/Cargo.toml | 5 +- .../src/spo_distribution_store.rs | 97 ++++++++++++++----- 3 files changed, 77 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5224dedd..d237f4ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -68,15 +68,12 @@ dependencies = [ "chrono", "config", "csv", - "dashmap", "fjall", "hex", "imbl", "itertools 0.14.0", - "rayon", "serde", - "serde_json", - "serde_with 3.15.1", + "tempfile", "tokio", "tracing", ] diff --git a/modules/accounts_state/Cargo.toml b/modules/accounts_state/Cargo.toml index 8bbf4909..98eb6bd1 100644 --- a/modules/accounts_state/Cargo.toml +++ b/modules/accounts_state/Cargo.toml @@ -17,18 +17,15 @@ anyhow = { workspace = true } bigdecimal = "0.4.8" chrono = { workspace = true } config = { workspace = true } -dashmap = { workspace = true } hex = { workspace = true } imbl = { workspace = true } serde = { workspace = true } -serde_json = { workspace = true } -serde_with = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } fjall = "2.11.2" -rayon = "1.10.0" csv = "1.3.1" itertools = "0.14.0" +tempfile = "3" [lib] path = "src/accounts_state.rs" diff --git a/modules/accounts_state/src/spo_distribution_store.rs b/modules/accounts_state/src/spo_distribution_store.rs index 9040dc4e..e9019196 100644 --- a/modules/accounts_state/src/spo_distribution_store.rs +++ b/modules/accounts_state/src/spo_distribution_store.rs @@ -242,39 +242,42 @@ impl SPDDStore { mod tests { use super::*; use acropolis_common::crypto::keyhash_224; - use acropolis_common::{NetworkId, PoolId, StakeCredential}; - - const DB_PATH: &str = "spdd_db"; + use acropolis_common::NetworkId::Mainnet; + use acropolis_common::{PoolId, StakeCredential}; + use tempfile::TempDir; fn test_pool_hash(byte: u8) -> PoolId { keyhash_224(&[byte]).into() } - fn test_stake_address(byte: u8, network: NetworkId) -> StakeAddress { - StakeAddress::new(StakeCredential::AddrKeyHash(keyhash_224(&[byte])), network) + fn test_stake_address(byte: u8) -> StakeAddress { + StakeAddress::new(StakeCredential::AddrKeyHash(keyhash_224(&[byte])), Mainnet) } #[test] - fn test_store_spdd_state() { + fn test_store_and_query_spdd() { + let temp_dir = TempDir::new().unwrap(); let mut spdd_store = - SPDDStore::new(std::path::Path::new(DB_PATH), 1).expect("Failed to create SPDD store"); - let mut spdd_state: HashMap> = HashMap::new(); + SPDDStore::new(temp_dir.path(), 10).expect("Failed to create SPDD store"); + let mut spdd_state: HashMap> = HashMap::new(); spdd_state.insert( test_pool_hash(0x01), vec![ - (test_stake_address(0x10, NetworkId::Mainnet), 100), - (test_stake_address(0x11, NetworkId::Mainnet), 150), + (test_stake_address(0x10), 100), + (test_stake_address(0x11), 150), ], ); spdd_state.insert( test_pool_hash(0x02), vec![ - (test_stake_address(0x20, NetworkId::Testnet), 200), - (test_stake_address(0x21, NetworkId::Testnet), 250), + (test_stake_address(0x20), 200), + (test_stake_address(0x21), 250), ], ); + assert!(spdd_store.store_spdd(1, spdd_state).is_ok()); + assert!(spdd_store.is_epoch_complete(1).unwrap()); let result = spdd_store.query_by_epoch(1).unwrap(); assert_eq!(result.len(), 4); @@ -286,31 +289,81 @@ mod tests { } #[test] - fn test_prune_old_epochs() { - let mut spdd_store = SPDDStore::new(std::path::Path::new("spdd_prune_test_db"), 2) - .expect("Failed to create SPDD store"); + fn test_retention_pruning() { + let temp_dir = TempDir::new().unwrap(); + let mut spdd_store = + SPDDStore::new(temp_dir.path(), 2).expect("Failed to create SPDD store"); + // Store epochs 1, 2, 3 for epoch in 1..=3 { let mut spdd_state: HashMap> = HashMap::new(); - spdd_state.insert( test_pool_hash(epoch as u8), vec![ - (test_stake_address(0x10, NetworkId::Mainnet), epoch * 100), - (test_stake_address(0x11, NetworkId::Mainnet), epoch * 150), + (test_stake_address(0x10), epoch * 100), + (test_stake_address(0x11), epoch * 150), ], ); spdd_store.store_spdd(epoch, spdd_state).expect("Failed to store SPDD state"); } + // Epoch 1 should be pruned (retention=2, so keep epochs 2 and 3) assert!(!spdd_store.is_epoch_complete(1).unwrap()); assert!(spdd_store.is_epoch_complete(2).unwrap()); assert!(spdd_store.is_epoch_complete(3).unwrap()); assert!(spdd_store.query_by_epoch(1).is_err()); - let result = spdd_store.query_by_epoch(2).unwrap(); - assert_eq!(result.len(), 2); - let result = spdd_store.query_by_epoch(3).unwrap(); - assert_eq!(result.len(), 2); + assert!(spdd_store.query_by_epoch(2).is_ok()); + assert!(spdd_store.query_by_epoch(3).is_ok()); + } + + #[test] + fn test_query_incomplete_epoch() { + let temp_dir = TempDir::new().unwrap(); + let spdd_store = SPDDStore::new(temp_dir.path(), 10).expect("Failed to create SPDD store"); + + assert!(!spdd_store.is_epoch_complete(999).unwrap()); + assert!(spdd_store.query_by_epoch(999).is_err()); + assert!(spdd_store.query_by_epoch_and_pool(999, &test_pool_hash(0x01)).is_err()); + } + + #[test] + fn test_remove_epoch_data() { + let temp_dir = TempDir::new().unwrap(); + let mut spdd_store = + SPDDStore::new(temp_dir.path(), 10).expect("Failed to create SPDD store"); + + let mut spdd_state: HashMap> = HashMap::new(); + spdd_state.insert( + test_pool_hash(0x01), + vec![ + (test_stake_address(0x10), 100), + (test_stake_address(0x11), 150), + ], + ); + + spdd_store.store_spdd(1, spdd_state).unwrap(); + assert!(spdd_store.is_epoch_complete(1).unwrap()); + + let deleted = spdd_store.remove_epoch_data(1).unwrap(); + assert_eq!(deleted, 2); + assert!(!spdd_store.is_epoch_complete(1).unwrap()); + + let deleted = spdd_store.remove_epoch_data(999).unwrap(); + assert_eq!(deleted, 0); + } + + #[test] + fn test_encode_decode_roundtrip() { + let epoch = 12345u64; + let pool_id = test_pool_hash(0x42); + let stake_address = test_stake_address(0x99); + + let encoded = encode_key(epoch, &pool_id, &stake_address); + let (decoded_epoch, decoded_pool, decoded_stake) = decode_key(&encoded).unwrap(); + + assert_eq!(decoded_epoch, epoch); + assert_eq!(decoded_pool, pool_id); + assert_eq!(decoded_stake, stake_address); } } From 6511204280f06005f4009187c0669e8f10f7dd3b Mon Sep 17 00:00:00 2001 From: Matthew Hounslow Date: Fri, 31 Oct 2025 14:54:41 -0700 Subject: [PATCH 8/8] Refactor: Rename variables for clarity in `drep_state.rs`, simplify `decode_key` in `spo_distribution_store.rs`, and add `tempfile` as a dev dependency --- modules/accounts_state/Cargo.toml | 2 ++ .../src/spo_distribution_store.rs | 25 +++---------------- modules/drep_state/src/state.rs | 17 ++++++------- 3 files changed, 13 insertions(+), 31 deletions(-) diff --git a/modules/accounts_state/Cargo.toml b/modules/accounts_state/Cargo.toml index 98eb6bd1..b71e9ef9 100644 --- a/modules/accounts_state/Cargo.toml +++ b/modules/accounts_state/Cargo.toml @@ -25,6 +25,8 @@ tracing = { workspace = true } fjall = "2.11.2" csv = "1.3.1" itertools = "0.14.0" + +[dev-dependencies] tempfile = "3" [lib] diff --git a/modules/accounts_state/src/spo_distribution_store.rs b/modules/accounts_state/src/spo_distribution_store.rs index e9019196..d051eb02 100644 --- a/modules/accounts_state/src/spo_distribution_store.rs +++ b/modules/accounts_state/src/spo_distribution_store.rs @@ -1,5 +1,5 @@ use acropolis_common::{PoolId, StakeAddress}; -use anyhow::{anyhow, Context, Result}; +use anyhow::Result; use fjall::{Config, Keyspace, PartitionCreateOptions}; use std::collections::HashMap; @@ -30,27 +30,10 @@ fn encode_epoch_pool_prefix(epoch: u64, pool_id: &PoolId) -> Vec { } fn decode_key(key: &[u8]) -> Result<(u64, PoolId, StakeAddress)> { - let epoch_bytes: [u8; EPOCH_LEN] = key[..EPOCH_LEN] - .try_into() - .map_err(|_| anyhow!("Failed to extract epoch bytes (offset 0-{})", EPOCH_LEN))?; - let epoch = u64::from_be_bytes(epoch_bytes); - - let pool_id: PoolId = key[EPOCH_LEN..EPOCH_LEN + POOL_ID_LENGTH].try_into().map_err(|_| { - anyhow!( - "Failed to extract pool ID bytes (offset {}-{})", - EPOCH_LEN, - EPOCH_LEN + POOL_ID_LENGTH - ) - })?; - + let epoch = u64::from_be_bytes(key[..EPOCH_LEN].try_into()?); + let pool_id = key[EPOCH_LEN..EPOCH_LEN + POOL_ID_LENGTH].try_into()?; let stake_address_bytes = &key[EPOCH_LEN + POOL_ID_LENGTH..]; - let stake_address = StakeAddress::from_binary(stake_address_bytes).with_context(|| { - format!( - "Failed to decode stake address from {} bytes at offset {}", - stake_address_bytes.len(), - EPOCH_LEN + POOL_ID_LENGTH - ) - })?; + let stake_address = StakeAddress::from_binary(stake_address_bytes)?; Ok((epoch, pool_id, stake_address)) } diff --git a/modules/drep_state/src/state.rs b/modules/drep_state/src/state.rs index df3a650a..f2056721 100644 --- a/modules/drep_state/src/state.rs +++ b/modules/drep_state/src/state.rs @@ -474,12 +474,12 @@ impl State { context: &Arc>, delegators: &[(&StakeAddress, &DRepChoice)], ) -> Result<()> { - let mut stake_address_to_input = HashMap::with_capacity(delegators.len()); + let mut stake_address_to_drep = HashMap::with_capacity(delegators.len()); let mut stake_addresses = Vec::with_capacity(delegators.len()); for &(stake_address, drep) in delegators { stake_addresses.push(stake_address.clone()); - stake_address_to_input.insert(stake_address, (stake_address, drep)); + stake_address_to_drep.insert(stake_address, drep); } let msg = Arc::new(Message::StateQuery(StateQuery::Accounts( @@ -501,8 +501,8 @@ impl State { }; for (stake_address, old_drep_opt) in result_map { - let &(delegator, new_drep_choice) = match stake_address_to_input.get(&stake_address) { - Some(pair) => pair, + let new_drep_choice = match stake_address_to_drep.get(&stake_address) { + Some(&drep) => drep, None => continue, }; @@ -516,10 +516,7 @@ impl State { if old_drep_cred != new_drep_cred { self.update_historical(&old_drep_cred, false, |entry| { if let Some(delegators) = entry.delegators.as_mut() { - delegators.retain(|s| { - s.get_credential().get_hash() - != delegator.get_credential().get_hash() - }); + delegators.retain(|s| s.get_hash() != stake_address.get_hash()); } })?; } @@ -529,8 +526,8 @@ impl State { // Add delegator to new DRep match self.update_historical(&new_drep_cred, true, |entry| { if let Some(delegators) = entry.delegators.as_mut() { - if !delegators.contains(delegator) { - delegators.push(delegator.clone()); + if !delegators.contains(&stake_address) { + delegators.push(stake_address.clone()); } } }) {