diff --git a/Cargo.lock b/Cargo.lock index 5224dedd..d237f4ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -68,15 +68,12 @@ dependencies = [ "chrono", "config", "csv", - "dashmap", "fjall", "hex", "imbl", "itertools 0.14.0", - "rayon", "serde", - "serde_json", - "serde_with 3.15.1", + "tempfile", "tokio", "tracing", ] diff --git a/common/src/address.rs b/common/src/address.rs index 1dd20723..46aed140 100644 --- a/common/src/address.rs +++ b/common/src/address.rs @@ -274,6 +274,7 @@ impl ShelleyAddress { ShelleyAddressPaymentPart::ScriptHash(data) => (data, 1), }; + // TODO: MH - make delegation hash an Option> type let (delegation_hash, delegation_bits): (Vec, u8) = match &self.delegation { ShelleyAddressDelegationPart::None => (Vec::new(), 3), ShelleyAddressDelegationPart::StakeKeyHash(hash) => (hash.to_vec(), 0), diff --git a/common/src/queries/accounts.rs b/common/src/queries/accounts.rs index 0bef0201..d6abf232 100644 --- a/common/src/queries/accounts.rs +++ b/common/src/queries/accounts.rs @@ -1,8 +1,6 @@ use std::collections::HashMap; -use crate::{ - DRepChoice, KeyHash, PoolId, PoolLiveStakeInfo, RewardType, StakeAddress, TxIdentifier, -}; +use crate::{DRepChoice, PoolId, PoolLiveStakeInfo, RewardType, StakeAddress, TxIdentifier}; pub const DEFAULT_ACCOUNTS_QUERY_TOPIC: (&str, &str) = ("accounts-state-query-topic", "cardano.query.accounts"); @@ -59,17 +57,17 @@ pub enum AccountsStateQueryResponse { AccountAssets(AccountAssets), AccountAssetsTotals(AccountAssetsTotals), AccountUTxOs(AccountUTxOs), - AccountsUtxoValuesMap(HashMap), + AccountsUtxoValuesMap(HashMap), AccountsUtxoValuesSum(u64), - AccountsBalancesMap(HashMap), + AccountsBalancesMap(HashMap), AccountsBalancesSum(u64), // Epochs-related responses ActiveStakes(u64), - /// Vec<(PoolId, StakeKey, ActiveStakeAmount)> - SPDDByEpoch(Vec<(PoolId, KeyHash, u64)>), - /// Vec<(StakeKey, ActiveStakeAmount)> - SPDDByEpochAndPool(Vec<(KeyHash, u64)>), + /// Vec<(PoolId, StakeAddress, ActiveStakeAmount)> + SPDDByEpoch(Vec<(PoolId, StakeAddress, u64)>), + /// Vec<(StakeAddress, ActiveStakeAmount)> + SPDDByEpochAndPool(Vec<(StakeAddress, u64)>), // Pools-related responses OptimalPoolSizing(Option), @@ -79,7 +77,7 @@ pub enum AccountsStateQueryResponse { // DReps-related responses DrepDelegators(DrepDelegators), - AccountsDrepDelegationsMap(HashMap>), + AccountsDrepDelegationsMap(HashMap>), NotFound, Error(String), @@ -186,10 +184,10 @@ pub struct OptimalPoolSizing { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct PoolDelegators { - pub delegators: Vec<(KeyHash, u64)>, + pub delegators: Vec<(StakeAddress, u64)>, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct DrepDelegators { - pub delegators: Vec<(KeyHash, u64)>, + pub delegators: Vec<(StakeAddress, u64)>, } diff --git a/common/src/queries/pools.rs b/common/src/queries/pools.rs index 04d9f59f..bb3e9d7d 100644 --- a/common/src/queries/pools.rs +++ b/common/src/queries/pools.rs @@ -1,6 +1,6 @@ use crate::{ - queries::governance::VoteRecord, rational_number::RationalNumber, KeyHash, PoolEpochState, - PoolId, PoolMetadata, PoolRegistration, PoolRetirement, PoolUpdateEvent, Relay, + queries::governance::VoteRecord, rational_number::RationalNumber, PoolEpochState, PoolId, + PoolMetadata, PoolRegistration, PoolRetirement, PoolUpdateEvent, Relay, StakeAddress, }; pub const DEFAULT_POOLS_QUERY_TOPIC: (&str, &str) = @@ -94,5 +94,5 @@ pub struct PoolActiveStakeInfo { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct PoolDelegators { - pub delegators: Vec<(KeyHash, u64)>, + pub delegators: Vec<(StakeAddress, u64)>, } diff --git a/common/src/stake_addresses.rs b/common/src/stake_addresses.rs index 43f38ea3..9c200f82 100644 --- a/common/src/stake_addresses.rs +++ b/common/src/stake_addresses.rs @@ -1,6 +1,6 @@ use crate::{ math::update_value_with_delta, messages::DRepDelegationDistribution, DRepChoice, - DRepCredential, DelegatedStake, KeyHash, Lovelace, PoolId, PoolLiveStakeInfo, StakeAddress, + DRepCredential, DelegatedStake, Lovelace, PoolId, PoolLiveStakeInfo, StakeAddress, StakeAddressDelta, Withdrawal, }; use anyhow::Result; @@ -163,15 +163,15 @@ impl StakeAddressMap { } /// Get Pool Delegators with live_stakes - pub fn get_pool_delegators(&self, pool_operator: &PoolId) -> Vec<(KeyHash, u64)> { + pub fn get_pool_delegators(&self, pool_operator: &PoolId) -> Vec<(StakeAddress, u64)> { // Find stake addresses delegated to pool_operator - let delegators: Vec<(KeyHash, u64)> = self + let delegators: Vec<(StakeAddress, u64)> = self .inner .iter() .filter_map(|(stake_address, sas)| match sas.delegated_spo.as_ref() { Some(delegated_spo) => { if delegated_spo.eq(pool_operator) { - Some((*stake_address.get_hash(), sas.utxo_value + sas.rewards)) + Some((stake_address.clone(), sas.utxo_value + sas.rewards)) } else { None } @@ -184,15 +184,15 @@ impl StakeAddressMap { } /// Get DRep Delegators with live_stakes - pub fn get_drep_delegators(&self, drep: &DRepChoice) -> Vec<(KeyHash, u64)> { + pub fn get_drep_delegators(&self, drep: &DRepChoice) -> Vec<(StakeAddress, u64)> { // Find stake addresses delegated to drep - let delegators: Vec<(KeyHash, u64)> = self + let delegators: Vec<(StakeAddress, u64)> = self .inner .iter() .filter_map(|(stake_address, sas)| match sas.delegated_drep.as_ref() { Some(delegated_drep) => { if delegated_drep.eq(drep) { - Some((*stake_address.get_hash(), sas.utxo_value)) + Some((stake_address.clone(), sas.utxo_value)) } else { None } @@ -209,14 +209,13 @@ impl StakeAddressMap { pub fn get_accounts_utxo_values_map( &self, stake_addresses: &[StakeAddress], - ) -> Option> { + ) -> Option> { let mut map = HashMap::new(); for stake_address in stake_addresses { let account = self.get(stake_address)?; let utxo_value = account.utxo_value; - let key_hash = stake_address.get_hash(); - map.insert(*key_hash, utxo_value); + map.insert(stake_address.clone(), utxo_value); } Some(map) @@ -227,14 +226,13 @@ impl StakeAddressMap { pub fn get_accounts_balances_map( &self, stake_addresses: &[StakeAddress], - ) -> Option> { + ) -> Option> { let mut map = HashMap::new(); for stake_address in stake_addresses { let account = self.get(stake_address)?; let balance = account.utxo_value + account.rewards; - let key_hash = stake_address.get_hash(); - map.insert(*key_hash, balance); + map.insert(stake_address.clone(), balance); } Some(map) @@ -245,14 +243,13 @@ impl StakeAddressMap { pub fn get_drep_delegations_map( &self, stake_addresses: &[StakeAddress], - ) -> Option>> { + ) -> Option>> { let mut map = HashMap::new(); for stake_address in stake_addresses { let account = self.get(stake_address)?; let maybe_drep = account.delegated_drep.clone(); - let key_hash = stake_address.get_hash(); - map.insert(*key_hash, maybe_drep); + map.insert(stake_address.clone(), maybe_drep); } Some(map) @@ -323,7 +320,7 @@ impl StakeAddressMap { /// Dump current Stake Pool Delegation Distribution State /// (Stake Key, Active Stakes Amount)> - pub fn dump_spdd_state(&self) -> HashMap> { + pub fn dump_spdd_state(&self) -> HashMap> { let entries: Vec<_> = self .inner .par_iter() @@ -332,9 +329,9 @@ impl StakeAddressMap { }) .collect(); - let mut result: HashMap> = HashMap::new(); + let mut result: HashMap> = HashMap::new(); for (spo, entry) in entries { - result.entry(spo).or_default().push((entry.0.get_credential().get_hash(), entry.1)); + result.entry(spo).or_default().push((entry.0, entry.1)); } result } @@ -552,7 +549,7 @@ impl StakeAddressMap { mod tests { use super::*; use crate::hash::Hash; - use crate::{NetworkId, StakeAddress, StakeCredential}; + use crate::{KeyHash, NetworkId, StakeAddress, StakeCredential}; const STAKE_KEY_HASH: KeyHash = KeyHash::new([0x99; 28]); const STAKE_KEY_HASH_2: KeyHash = KeyHash::new([0xaa; 28]); @@ -1248,8 +1245,8 @@ mod tests { let map = stake_addresses.get_accounts_utxo_values_map(&keys).unwrap(); assert_eq!(map.len(), 2); - assert_eq!(map.get(addr1.get_hash()).copied().unwrap(), 1000); - assert_eq!(map.get(addr2.get_hash()).copied().unwrap(), 2000); + assert_eq!(map.get(&addr1).copied().unwrap(), 1000); + assert_eq!(map.get(&addr2).copied().unwrap(), 2000); } #[test] @@ -1362,8 +1359,8 @@ mod tests { let map = stake_addresses.get_accounts_balances_map(&addresses).unwrap(); assert_eq!(map.len(), 2); - assert_eq!(map.get(addr1.get_hash()).copied().unwrap(), 1100); - assert_eq!(map.get(addr2.get_hash()).copied().unwrap(), 2000); + assert_eq!(map.get(&addr1).copied().unwrap(), 1100); + assert_eq!(map.get(&addr2).copied().unwrap(), 2000); } #[test] @@ -1470,15 +1467,9 @@ mod tests { let map = stake_addresses.get_drep_delegations_map(&addresses).unwrap(); assert_eq!(map.len(), 3); - assert_eq!( - map.get(addr1.get_hash()).unwrap(), - &Some(DRepChoice::Abstain) - ); - assert_eq!( - map.get(addr2.get_hash()).unwrap(), - &Some(DRepChoice::Key(DREP_HASH)) - ); - assert_eq!(map.get(addr3.get_hash()).unwrap(), &None); + assert_eq!(map.get(&addr1).unwrap(), &Some(DRepChoice::Abstain)); + assert_eq!(map.get(&addr2).unwrap(), &Some(DRepChoice::Key(DREP_HASH))); + assert_eq!(map.get(&addr3).unwrap(), &None); } #[test] diff --git a/modules/accounts_state/Cargo.toml b/modules/accounts_state/Cargo.toml index 8bbf4909..b71e9ef9 100644 --- a/modules/accounts_state/Cargo.toml +++ b/modules/accounts_state/Cargo.toml @@ -17,18 +17,17 @@ anyhow = { workspace = true } bigdecimal = "0.4.8" chrono = { workspace = true } config = { workspace = true } -dashmap = { workspace = true } hex = { workspace = true } imbl = { workspace = true } serde = { workspace = true } -serde_json = { workspace = true } -serde_with = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } fjall = "2.11.2" -rayon = "1.10.0" csv = "1.3.1" itertools = "0.14.0" +[dev-dependencies] +tempfile = "3" + [lib] path = "src/accounts_state.rs" diff --git a/modules/accounts_state/src/spo_distribution_store.rs b/modules/accounts_state/src/spo_distribution_store.rs index 0a041e0e..d051eb02 100644 --- a/modules/accounts_state/src/spo_distribution_store.rs +++ b/modules/accounts_state/src/spo_distribution_store.rs @@ -1,25 +1,24 @@ -use std::collections::HashMap; - -use acropolis_common::types::AddrKeyhash; -use acropolis_common::PoolId; +use acropolis_common::{PoolId, StakeAddress}; use anyhow::Result; use fjall::{Config, Keyspace, PartitionCreateOptions}; +use std::collections::HashMap; const POOL_ID_LENGTH: usize = 28; -const STAKE_KEY_LEN: usize = 28; +const STAKE_ADDRESS_LEN: usize = 29; // 1 byte header + 28 bytes hash const EPOCH_LEN: usize = 8; -const TOTAL_KEY_LEN: usize = EPOCH_LEN + POOL_ID_LENGTH + STAKE_KEY_LEN; +const TOTAL_KEY_LEN: usize = EPOCH_LEN + POOL_ID_LENGTH + STAKE_ADDRESS_LEN; // Batch size balances commit overhead vs memory usage // ~720KB per batch (72 bytes × 10,000) // ~130 commits for typical epoch (~1.3M delegations) const BATCH_SIZE: usize = 10_000; -fn encode_key(epoch: u64, pool_id: &PoolId, stake_key: &AddrKeyhash) -> Vec { +fn encode_key(epoch: u64, pool_id: &PoolId, stake_address: &StakeAddress) -> Vec { let mut key = Vec::with_capacity(TOTAL_KEY_LEN); key.extend_from_slice(&epoch.to_be_bytes()); key.extend_from_slice(pool_id.as_ref()); - key.extend_from_slice(stake_key.as_ref()); + key.extend_from_slice(&stake_address.to_binary()); + key } @@ -30,11 +29,13 @@ fn encode_epoch_pool_prefix(epoch: u64, pool_id: &PoolId) -> Vec { prefix } -fn decode_key(key: &[u8]) -> Result<(u64, PoolId, AddrKeyhash)> { +fn decode_key(key: &[u8]) -> Result<(u64, PoolId, StakeAddress)> { let epoch = u64::from_be_bytes(key[..EPOCH_LEN].try_into()?); let pool_id = key[EPOCH_LEN..EPOCH_LEN + POOL_ID_LENGTH].try_into()?; - let stake_key = key[EPOCH_LEN + POOL_ID_LENGTH..].try_into()?; - Ok((epoch, pool_id, stake_key)) + let stake_address_bytes = &key[EPOCH_LEN + POOL_ID_LENGTH..]; + let stake_address = StakeAddress::from_binary(stake_address_bytes)?; + + Ok((epoch, pool_id, stake_address)) } /// Encode epoch completion marker key @@ -101,7 +102,7 @@ impl SPDDStore { pub fn store_spdd( &mut self, epoch: u64, - spdd_state: HashMap>, + spdd_state: HashMap>, ) -> fjall::Result<()> { if self.is_epoch_complete(epoch)? { return Ok(()); @@ -111,8 +112,8 @@ impl SPDDStore { let mut batch = self.keyspace.batch(); let mut count = 0; for (pool_id, delegations) in spdd_state { - for (stake_key, amount) in delegations { - let key = encode_key(epoch, &pool_id, &stake_key); + for (stake_address, amount) in delegations { + let key = encode_key(epoch, &pool_id, &stake_address); let value = amount.to_be_bytes(); batch.insert(&self.spdd, key, value); @@ -183,7 +184,7 @@ impl SPDDStore { Ok(deleted_epochs) } - pub fn query_by_epoch(&self, epoch: u64) -> Result> { + pub fn query_by_epoch(&self, epoch: u64) -> Result> { if !self.is_epoch_complete(epoch)? { return Err(anyhow::anyhow!("Epoch SPDD Data is not complete")); } @@ -192,9 +193,9 @@ impl SPDDStore { let mut result = Vec::new(); for item in self.spdd.prefix(prefix) { let (key, value) = item?; - let (_, pool_id, stake_key) = decode_key(&key)?; + let (_, pool_id, stake_address) = decode_key(&key)?; let amount = u64::from_be_bytes(value.as_ref().try_into()?); - result.push((pool_id, stake_key, amount)); + result.push((pool_id, stake_address, amount)); } Ok(result) } @@ -203,7 +204,7 @@ impl SPDDStore { &self, epoch: u64, pool_id: &PoolId, - ) -> Result> { + ) -> Result> { if !self.is_epoch_complete(epoch)? { return Err(anyhow::anyhow!("Epoch SPDD Data is not complete")); } @@ -212,9 +213,9 @@ impl SPDDStore { let mut result = Vec::new(); for item in self.spdd.prefix(prefix) { let (key, value) = item?; - let (_, _, stake_key) = decode_key(&key)?; + let (_, _, stake_address) = decode_key(&key)?; let amount = u64::from_be_bytes(value.as_ref().try_into()?); - result.push((stake_key, amount)); + result.push((stake_address, amount)); } Ok(result) } @@ -224,34 +225,42 @@ impl SPDDStore { mod tests { use super::*; use acropolis_common::crypto::keyhash_224; - use acropolis_common::types::AddrKeyhash; - use acropolis_common::PoolId; - - const DB_PATH: &str = "spdd_db"; + use acropolis_common::NetworkId::Mainnet; + use acropolis_common::{PoolId, StakeCredential}; + use tempfile::TempDir; fn test_pool_hash(byte: u8) -> PoolId { keyhash_224(&[byte]).into() } - fn test_addr_hash(byte: u8) -> AddrKeyhash { - keyhash_224(&[byte]) + fn test_stake_address(byte: u8) -> StakeAddress { + StakeAddress::new(StakeCredential::AddrKeyHash(keyhash_224(&[byte])), Mainnet) } #[test] - fn test_store_spdd_state() { + fn test_store_and_query_spdd() { + let temp_dir = TempDir::new().unwrap(); let mut spdd_store = - SPDDStore::new(std::path::Path::new(DB_PATH), 1).expect("Failed to create SPDD store"); - let mut spdd_state: HashMap> = HashMap::new(); + SPDDStore::new(temp_dir.path(), 10).expect("Failed to create SPDD store"); + let mut spdd_state: HashMap> = HashMap::new(); spdd_state.insert( test_pool_hash(0x01), - vec![(test_addr_hash(0x10), 100), (test_addr_hash(0x11), 150)], + vec![ + (test_stake_address(0x10), 100), + (test_stake_address(0x11), 150), + ], ); spdd_state.insert( test_pool_hash(0x02), - vec![(test_addr_hash(0x20), 200), (test_addr_hash(0x21), 250)], + vec![ + (test_stake_address(0x20), 200), + (test_stake_address(0x21), 250), + ], ); + assert!(spdd_store.store_spdd(1, spdd_state).is_ok()); + assert!(spdd_store.is_epoch_complete(1).unwrap()); let result = spdd_store.query_by_epoch(1).unwrap(); assert_eq!(result.len(), 4); @@ -263,31 +272,81 @@ mod tests { } #[test] - fn test_prune_old_epochs() { - let mut spdd_store = SPDDStore::new(std::path::Path::new("spdd_prune_test_db"), 2) - .expect("Failed to create SPDD store"); + fn test_retention_pruning() { + let temp_dir = TempDir::new().unwrap(); + let mut spdd_store = + SPDDStore::new(temp_dir.path(), 2).expect("Failed to create SPDD store"); + // Store epochs 1, 2, 3 for epoch in 1..=3 { - let mut spdd_state: HashMap> = HashMap::new(); - + let mut spdd_state: HashMap> = HashMap::new(); spdd_state.insert( test_pool_hash(epoch as u8), vec![ - (test_addr_hash(0x10), epoch * 100), - (test_addr_hash(0x11), epoch * 150), + (test_stake_address(0x10), epoch * 100), + (test_stake_address(0x11), epoch * 150), ], ); spdd_store.store_spdd(epoch, spdd_state).expect("Failed to store SPDD state"); } + // Epoch 1 should be pruned (retention=2, so keep epochs 2 and 3) assert!(!spdd_store.is_epoch_complete(1).unwrap()); assert!(spdd_store.is_epoch_complete(2).unwrap()); assert!(spdd_store.is_epoch_complete(3).unwrap()); assert!(spdd_store.query_by_epoch(1).is_err()); - let result = spdd_store.query_by_epoch(2).unwrap(); - assert_eq!(result.len(), 2); - let result = spdd_store.query_by_epoch(3).unwrap(); - assert_eq!(result.len(), 2); + assert!(spdd_store.query_by_epoch(2).is_ok()); + assert!(spdd_store.query_by_epoch(3).is_ok()); + } + + #[test] + fn test_query_incomplete_epoch() { + let temp_dir = TempDir::new().unwrap(); + let spdd_store = SPDDStore::new(temp_dir.path(), 10).expect("Failed to create SPDD store"); + + assert!(!spdd_store.is_epoch_complete(999).unwrap()); + assert!(spdd_store.query_by_epoch(999).is_err()); + assert!(spdd_store.query_by_epoch_and_pool(999, &test_pool_hash(0x01)).is_err()); + } + + #[test] + fn test_remove_epoch_data() { + let temp_dir = TempDir::new().unwrap(); + let mut spdd_store = + SPDDStore::new(temp_dir.path(), 10).expect("Failed to create SPDD store"); + + let mut spdd_state: HashMap> = HashMap::new(); + spdd_state.insert( + test_pool_hash(0x01), + vec![ + (test_stake_address(0x10), 100), + (test_stake_address(0x11), 150), + ], + ); + + spdd_store.store_spdd(1, spdd_state).unwrap(); + assert!(spdd_store.is_epoch_complete(1).unwrap()); + + let deleted = spdd_store.remove_epoch_data(1).unwrap(); + assert_eq!(deleted, 2); + assert!(!spdd_store.is_epoch_complete(1).unwrap()); + + let deleted = spdd_store.remove_epoch_data(999).unwrap(); + assert_eq!(deleted, 0); + } + + #[test] + fn test_encode_decode_roundtrip() { + let epoch = 12345u64; + let pool_id = test_pool_hash(0x42); + let stake_address = test_stake_address(0x99); + + let encoded = encode_key(epoch, &pool_id, &stake_address); + let (decoded_epoch, decoded_pool, decoded_stake) = decode_key(&encoded).unwrap(); + + assert_eq!(decoded_epoch, epoch); + assert_eq!(decoded_pool, pool_id); + assert_eq!(decoded_stake, stake_address); } } diff --git a/modules/accounts_state/src/state.rs b/modules/accounts_state/src/state.rs index 63f9ccfe..dd823267 100644 --- a/modules/accounts_state/src/state.rs +++ b/modules/accounts_state/src/state.rs @@ -15,9 +15,8 @@ use acropolis_common::{ protocol_params::ProtocolParams, stake_addresses::{StakeAddressMap, StakeAddressState}, BlockInfo, DRepChoice, DRepCredential, DelegatedStake, InstantaneousRewardSource, - InstantaneousRewardTarget, KeyHash, Lovelace, MoveInstantaneousReward, PoolId, - PoolLiveStakeInfo, PoolRegistration, Pot, SPORewards, StakeAddress, StakeRewardDelta, - TxCertificate, + InstantaneousRewardTarget, Lovelace, MoveInstantaneousReward, PoolId, PoolLiveStakeInfo, + PoolRegistration, Pot, SPORewards, StakeAddress, StakeRewardDelta, TxCertificate, }; use anyhow::Result; use imbl::OrdMap; @@ -170,12 +169,12 @@ impl State { } /// Get Pool Delegators with live_stakes - pub fn get_pool_delegators(&self, pool_operator: &PoolId) -> Vec<(KeyHash, u64)> { + pub fn get_pool_delegators(&self, pool_operator: &PoolId) -> Vec<(StakeAddress, u64)> { self.stake_addresses.lock().unwrap().get_pool_delegators(pool_operator) } /// Get Drep Delegators with live_stakes - pub fn get_drep_delegators(&self, drep: &DRepChoice) -> Vec<(KeyHash, u64)> { + pub fn get_drep_delegators(&self, drep: &DRepChoice) -> Vec<(StakeAddress, u64)> { self.stake_addresses.lock().unwrap().get_drep_delegators(drep) } @@ -183,7 +182,7 @@ impl State { pub fn get_accounts_utxo_values_map( &self, stake_keys: &[StakeAddress], - ) -> Option> { + ) -> Option> { let stake_addresses = self.stake_addresses.lock().ok()?; // If lock fails, return None stake_addresses.get_accounts_utxo_values_map(stake_keys) } @@ -198,7 +197,7 @@ impl State { pub fn get_accounts_balances_map( &self, stake_keys: &[StakeAddress], - ) -> Option> { + ) -> Option> { let stake_addresses = self.stake_addresses.lock().ok()?; // If lock fails, return None stake_addresses.get_accounts_balances_map(stake_keys) } @@ -218,7 +217,7 @@ impl State { pub fn get_drep_delegations_map( &self, stake_keys: &[StakeAddress], - ) -> Option>> { + ) -> Option>> { let stake_addresses = self.stake_addresses.lock().ok()?; // If lock fails, return None stake_addresses.get_drep_delegations_map(stake_keys) } @@ -559,7 +558,7 @@ impl State { stake_addresses.generate_spdd() } - pub fn dump_spdd_state(&self) -> HashMap> { + pub fn dump_spdd_state(&self) -> HashMap> { let stake_addresses = self.stake_addresses.lock().unwrap(); stake_addresses.dump_spdd_state() } @@ -964,9 +963,9 @@ mod tests { use acropolis_common::crypto::{keyhash_224, keyhash_256}; use acropolis_common::{ protocol_params::ConwayParams, rational_number::RationalNumber, Anchor, Committee, - Constitution, CostModel, DRepVotingThresholds, NetworkId, PoolVotingThresholds, Pot, - PotDelta, Ratio, Registration, StakeAddress, StakeAddressDelta, StakeAndVoteDelegation, - StakeCredential, StakeRegistrationAndStakeAndVoteDelegation, + Constitution, CostModel, DRepVotingThresholds, KeyHash, NetworkId, PoolVotingThresholds, + Pot, PotDelta, Ratio, Registration, StakeAddress, StakeAddressDelta, + StakeAndVoteDelegation, StakeCredential, StakeRegistrationAndStakeAndVoteDelegation, StakeRegistrationAndVoteDelegation, TxCertificateWithPos, TxIdentifier, VoteDelegation, VrfKeyHash, Withdrawal, }; diff --git a/modules/drep_state/src/state.rs b/modules/drep_state/src/state.rs index 054e55a5..f2056721 100644 --- a/modules/drep_state/src/state.rs +++ b/modules/drep_state/src/state.rs @@ -474,12 +474,12 @@ impl State { context: &Arc>, delegators: &[(&StakeAddress, &DRepChoice)], ) -> Result<()> { - let mut stake_key_to_input = HashMap::with_capacity(delegators.len()); + let mut stake_address_to_drep = HashMap::with_capacity(delegators.len()); let mut stake_addresses = Vec::with_capacity(delegators.len()); - for &(sc, drep) in delegators { - stake_addresses.push(sc.clone()); - stake_key_to_input.insert(sc.get_credential().get_hash(), (sc, drep)); + for &(stake_address, drep) in delegators { + stake_addresses.push(stake_address.clone()); + stake_address_to_drep.insert(stake_address, drep); } let msg = Arc::new(Message::StateQuery(StateQuery::Accounts( @@ -500,9 +500,9 @@ impl State { } }; - for (stake_key, old_drep_opt) in result_map { - let &(delegator, new_drep_choice) = match stake_key_to_input.get(&stake_key) { - Some(pair) => pair, + for (stake_address, old_drep_opt) in result_map { + let new_drep_choice = match stake_address_to_drep.get(&stake_address) { + Some(&drep) => drep, None => continue, }; @@ -516,10 +516,7 @@ impl State { if old_drep_cred != new_drep_cred { self.update_historical(&old_drep_cred, false, |entry| { if let Some(delegators) = entry.delegators.as_mut() { - delegators.retain(|s| { - s.get_credential().get_hash() - != delegator.get_credential().get_hash() - }); + delegators.retain(|s| s.get_hash() != stake_address.get_hash()); } })?; } @@ -529,8 +526,8 @@ impl State { // Add delegator to new DRep match self.update_historical(&new_drep_cred, true, |entry| { if let Some(delegators) = entry.delegators.as_mut() { - if !delegators.contains(delegator) { - delegators.push(delegator.clone()); + if !delegators.contains(&stake_address) { + delegators.push(stake_address.clone()); } } }) { diff --git a/modules/rest_blockfrost/src/handlers/epochs.rs b/modules/rest_blockfrost/src/handlers/epochs.rs index 089bf972..74f57bdc 100644 --- a/modules/rest_blockfrost/src/handlers/epochs.rs +++ b/modules/rest_blockfrost/src/handlers/epochs.rs @@ -15,7 +15,7 @@ use acropolis_common::{ spdd::{SPDDStateQuery, SPDDStateQueryResponse}, utils::query_state, }, - NetworkId, PoolId, StakeAddress, StakeCredential, + PoolId, }; use anyhow::{anyhow, Result}; use caryatid_sdk::Context; @@ -428,37 +428,6 @@ pub async fn handle_epoch_total_stakes_blockfrost( return Ok(RESTResponse::with_text(404, "Epoch not found")); } - // Query current network from parameters-state - let current_network_msg = Arc::new(Message::StateQuery(StateQuery::Parameters( - ParametersStateQuery::GetNetworkName, - ))); - let current_network = query_state( - &context, - &handlers_config.parameters_query_topic, - current_network_msg, - |message| match message { - Message::StateQueryResponse(StateQueryResponse::Parameters( - ParametersStateQueryResponse::NetworkName(network), - )) => Ok(network), - _ => Err(anyhow::anyhow!( - "Unexpected message type while retrieving current network" - )), - }, - ) - .await?; - - let network = match current_network.as_str() { - "mainnet" => NetworkId::Mainnet, - "testnet" => NetworkId::Testnet, - unknown => { - return Ok(RESTResponse::with_text( - 500, - format!("Internal server error while retrieving current network: {unknown}") - .as_str(), - )) - } - }; - // Query SPDD by epoch from accounts-state let msg = Arc::new(Message::StateQuery(StateQuery::Accounts( AccountsStateQuery::GetSPDDByEpoch { @@ -486,16 +455,13 @@ pub async fn handle_epoch_total_stakes_blockfrost( .await?; let spdd_response = spdd .into_iter() - .map(|(pool_id, stake_key_hash, amount)| { - let stake_address = StakeAddress { - network: network.clone(), - credential: StakeCredential::AddrKeyHash(stake_key_hash), - } - .to_string() - .map_err(|e| anyhow::anyhow!("Failed to convert stake address to string: {e}"))?; + .map(|(pool_id, stake_address, amount)| { + let bech32 = stake_address + .to_string() + .map_err(|e| anyhow::anyhow!("Failed to convert stake address to string {}", e))?; Ok(SPDDByEpochItemRest { pool_id, - stake_address, + stake_address: bech32, amount, }) }) @@ -564,37 +530,6 @@ pub async fn handle_epoch_pool_stakes_blockfrost( return Ok(RESTResponse::with_text(404, "Epoch not found")); } - // Query current network from parameters-state - let current_network_msg = Arc::new(Message::StateQuery(StateQuery::Parameters( - ParametersStateQuery::GetNetworkName, - ))); - let current_network = query_state( - &context, - &handlers_config.parameters_query_topic, - current_network_msg, - |message| match message { - Message::StateQueryResponse(StateQueryResponse::Parameters( - ParametersStateQueryResponse::NetworkName(network), - )) => Ok(network), - _ => Err(anyhow::anyhow!( - "Unexpected message type while retrieving current network" - )), - }, - ) - .await?; - - let network = match current_network.as_str() { - "mainnet" => NetworkId::Mainnet, - "testnet" => NetworkId::Testnet, - unknown => { - return Ok(RESTResponse::with_text( - 500, - format!("Internal server error while retrieving current network: {unknown}") - .as_str(), - )) - } - }; - // Query SPDD by epoch and pool from accounts-state let msg = Arc::new(Message::StateQuery(StateQuery::Accounts( AccountsStateQuery::GetSPDDByEpochAndPool { @@ -623,16 +558,12 @@ pub async fn handle_epoch_pool_stakes_blockfrost( .await?; let spdd_response = spdd .into_iter() - .map(|(key_hash, amount)| { - let stake_address = StakeAddress { - network: network.clone(), - credential: StakeCredential::AddrKeyHash(key_hash), - } - .to_string() - .map_err(|e| anyhow::anyhow!("Failed to convert stake address to string: {e}"))?; - + .map(|(stake_address, amount)| { + let bech32 = stake_address + .to_string() + .map_err(|e| anyhow::anyhow!("Failed to convert stake address to string {}", e))?; Ok(SPDDByEpochAndPoolItemRest { - stake_address, + stake_address: bech32, amount, }) }) diff --git a/modules/rest_blockfrost/src/handlers/governance.rs b/modules/rest_blockfrost/src/handlers/governance.rs index c607fa1a..20440e46 100644 --- a/modules/rest_blockfrost/src/handlers/governance.rs +++ b/modules/rest_blockfrost/src/handlers/governance.rs @@ -10,13 +10,13 @@ use acropolis_common::{ accounts::{AccountsStateQuery, AccountsStateQueryResponse}, governance::{GovernanceStateQuery, GovernanceStateQueryResponse}, }, - Credential, GovActionId, KeyHash, TxHash, Voter, + Credential, GovActionId, TxHash, Voter, }; use anyhow::{anyhow, Result}; use caryatid_sdk::Context; use reqwest::Client; use serde_json::Value; -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; pub async fn handle_dreps_list_blockfrost( context: Arc>, @@ -191,28 +191,6 @@ pub async fn handle_drep_delegators_blockfrost( Message::StateQueryResponse(StateQueryResponse::Governance( GovernanceStateQueryResponse::DRepDelegators(delegators), )) => { - let stake_key_to_bech32: HashMap = match delegators - .addresses - .iter() - .map(|addr| { - let credential = addr.get_credential(); - let bech32 = credential - .to_stake_bech32() - .map_err(|_| anyhow!("Failed to encode stake address"))?; - let key_hash = credential.get_hash(); - Ok((key_hash, bech32)) - }) - .collect::>>() - { - Ok(map) => map, - Err(e) => { - return Ok(RESTResponse::with_text( - 500, - &format!("Internal error: {e}"), - )); - } - }; - let msg = Arc::new(Message::StateQuery(StateQuery::Accounts( AccountsStateQuery::GetAccountsUtxoValuesMap { stake_addresses: delegators.addresses.clone(), @@ -227,27 +205,31 @@ pub async fn handle_drep_delegators_blockfrost( Message::StateQueryResponse(StateQueryResponse::Accounts( AccountsStateQueryResponse::AccountsUtxoValuesMap(map), )) => { - let mut response = Vec::new(); - - for (key, amount) in map { - let Some(bech32) = stake_key_to_bech32.get(&key) else { - return Ok(RESTResponse::with_text( + let response: Result> = map + .into_iter() + .map(|(stake_address, amount)| { + let bech32 = stake_address + .to_string() + .map_err(|e| anyhow!("Failed to encode stake address {}", e))?; + + Ok(serde_json::json!({ + "address": bech32, + "amount": amount.to_string(), + })) + }) + .collect(); + + match response { + Ok(response) => match serde_json::to_string_pretty(&response) { + Ok(json) => Ok(RESTResponse::with_json(200, &json)), + Err(e) => Ok(RESTResponse::with_text( 500, - "Internal error: missing Bech32 for stake key", - )); - }; - - response.push(serde_json::json!({ - "address": bech32, - "amount": amount.to_string(), - })); - } - - match serde_json::to_string_pretty(&response) { - Ok(json) => Ok(RESTResponse::with_json(200, &json)), + &format!("Failed to serialize DRep delegators: {e}"), + )), + }, Err(e) => Ok(RESTResponse::with_text( 500, - &format!("Failed to serialize DRep delegators: {e}"), + &format!("Internal error: {e}"), )), } } @@ -280,7 +262,6 @@ pub async fn handle_drep_delegators_blockfrost( _ => Ok(RESTResponse::with_text(500, "Unexpected message type")), } } - pub async fn handle_drep_metadata_blockfrost( context: Arc>, params: Vec, diff --git a/modules/rest_blockfrost/src/handlers/pools.rs b/modules/rest_blockfrost/src/handlers/pools.rs index 07f35a47..4b14d116 100644 --- a/modules/rest_blockfrost/src/handlers/pools.rs +++ b/modules/rest_blockfrost/src/handlers/pools.rs @@ -17,7 +17,7 @@ use acropolis_common::{ utils::query_state, }, rest_helper::ToCheckedF64, - PoolId, PoolRetirement, PoolUpdateAction, StakeCredential, TxIdentifier, + PoolId, PoolRetirement, PoolUpdateAction, TxIdentifier, }; use anyhow::Result; use caryatid_sdk::Context; @@ -992,9 +992,9 @@ pub async fn handle_pool_delegators_blockfrost( }; let mut delegators_rest = Vec::::new(); - for (d, l) in pool_delegators { - let bech32 = StakeCredential::AddrKeyHash(d) - .to_stake_bech32() + for (stake_address, l) in pool_delegators { + let bech32 = stake_address + .to_string() .map_err(|e| anyhow::anyhow!("Invalid stake address in pool delegators: {e}"))?; delegators_rest.push(PoolDelegatorRest { address: bech32, diff --git a/modules/spo_state/src/state.rs b/modules/spo_state/src/state.rs index bb849525..74fd0f00 100644 --- a/modules/spo_state/src/state.rs +++ b/modules/spo_state/src/state.rs @@ -10,8 +10,8 @@ use acropolis_common::{ params::TECHNICAL_PARAMETER_POOL_RETIRE_MAX_EPOCH, queries::governance::VoteRecord, stake_addresses::StakeAddressMap, - BlockInfo, KeyHash, PoolId, PoolMetadata, PoolRegistration, PoolRetirement, PoolUpdateEvent, - Relay, StakeAddress, TxCertificate, TxHash, TxIdentifier, Voter, VotingProcedures, + BlockInfo, PoolId, PoolMetadata, PoolRegistration, PoolRetirement, PoolUpdateEvent, Relay, + StakeAddress, TxCertificate, TxHash, TxIdentifier, Voter, VotingProcedures, }; use anyhow::Result; use imbl::HashMap; @@ -175,7 +175,7 @@ impl State { } /// Get Pool Delegators - pub fn get_pool_delegators(&self, pool_operator: &PoolId) -> Option> { + pub fn get_pool_delegators(&self, pool_operator: &PoolId) -> Option> { let stake_addresses = self.stake_addresses.as_ref()?; let historical_spos = self.historical_spos.as_ref()?;