Skip to content

Commit 3169afd

Browse files
authored
add: ETCM-12254 bridge data caching source (#993)
1 parent 982082a commit 3169afd

File tree

9 files changed

+466
-61
lines changed

9 files changed

+466
-61
lines changed

demo/node/src/data_sources.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
use authority_selection_inherents::AuthoritySelectionDataSource;
22
use pallet_sidechain_rpc::SidechainRpcDataSource;
33
use partner_chains_db_sync_data_sources::{
4-
BlockDataSourceImpl, CandidatesDataSourceImpl, GovernedMapDataSourceCachedImpl,
5-
McFollowerMetrics, McHashDataSourceImpl, NativeTokenManagementDataSourceImpl,
6-
SidechainRpcDataSourceImpl, StakeDistributionDataSourceImpl, TokenBridgeDataSourceImpl,
4+
BlockDataSourceImpl, CachedTokenBridgeDataSourceImpl, CandidatesDataSourceImpl,
5+
GovernedMapDataSourceCachedImpl, McFollowerMetrics, McHashDataSourceImpl,
6+
NativeTokenManagementDataSourceImpl, SidechainRpcDataSourceImpl,
7+
StakeDistributionDataSourceImpl,
78
};
89
use partner_chains_demo_runtime::AccountId;
910
use partner_chains_mock_data_sources::{
@@ -115,6 +116,7 @@ pub fn create_mock_data_sources()
115116
pub const CANDIDATES_FOR_EPOCH_CACHE_SIZE: usize = 64;
116117
pub const STAKE_CACHE_SIZE: usize = 100;
117118
pub const GOVERNED_MAP_CACHE_SIZE: u16 = 100;
119+
pub const BRIDGE_TRANSFER_CACHE_LOOKAHEAD: u32 = 1000;
118120

119121
pub async fn create_cached_db_sync_data_sources(
120122
metrics_opt: Option<McFollowerMetrics>,
@@ -147,10 +149,15 @@ pub async fn create_cached_db_sync_data_sources(
147149
pool.clone(),
148150
metrics_opt.clone(),
149151
GOVERNED_MAP_CACHE_SIZE,
150-
block,
152+
block.clone(),
151153
)
152154
.await?,
153155
),
154-
bridge: Arc::new(TokenBridgeDataSourceImpl::new(pool, metrics_opt)),
156+
bridge: Arc::new(CachedTokenBridgeDataSourceImpl::new(
157+
pool,
158+
metrics_opt,
159+
block,
160+
BRIDGE_TRANSFER_CACHE_LOOKAHEAD,
161+
)),
155162
})
156163
}

toolkit/bridge/pallet/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ pub mod pallet {
113113
impl<T: Config> Pallet<T> {
114114
/// Inherent extrinsic that handles all incoming transfers in the current block
115115
#[pallet::call_index(0)]
116-
#[pallet::weight(T::WeightInfo::handle_transfers(transfers.len() as u32))]
116+
#[pallet::weight((T::WeightInfo::handle_transfers(transfers.len() as u32), DispatchClass::Mandatory))]
117117
pub fn handle_transfers(
118118
origin: OriginFor<T>,
119119
transfers: BoundedVec<BridgeTransferV1<T::Recipient>, T::MaxTransfersPerBlock>,

toolkit/bridge/primitives/src/lib.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ use alloc::vec::*;
88
use parity_scale_codec::{Decode, DecodeWithMemTracking, Encode, MaxEncodedLen};
99
use scale_info::TypeInfo;
1010
use serde::{Deserialize, Serialize};
11-
use sidechain_domain::{AssetName, MainchainAddress, McBlockHash, McBlockNumber, PolicyId, UtxoId};
11+
use sidechain_domain::{
12+
AssetId, AssetName, MainchainAddress, McBlockHash, McBlockNumber, PolicyId, UtxoId,
13+
};
1214
use sp_inherents::*;
1315

1416
#[cfg(feature = "std")]
@@ -40,6 +42,16 @@ pub struct MainChainScripts {
4042
pub illiquid_circulation_supply_validator_address: MainchainAddress,
4143
}
4244

45+
impl MainChainScripts {
46+
/// Return full asset ID fo the bridged token (minting policy ID and asset name)
47+
pub fn asset_id(&self) -> AssetId {
48+
AssetId {
49+
policy_id: self.token_policy_id.clone(),
50+
asset_name: self.token_asset_name.clone(),
51+
}
52+
}
53+
}
54+
4355
#[cfg(feature = "std")]
4456
impl MainChainScripts {
4557
/// Reads the main chain script values from environment
Lines changed: 290 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,290 @@
1+
use super::*;
2+
use crate::BlockDataSourceImpl;
3+
use crate::db_model::BridgeUtxo;
4+
use futures::lock::Mutex;
5+
use sidechain_domain::{MainchainBlock, McBlockHash, UtxoId};
6+
use std::{cmp::min, collections::HashMap, error::Error, sync::Arc};
7+
8+
/// Bridge transfer data source with block range-based caching
9+
///
10+
/// This data source caches utxos in some range [from_block, to_block] (inclusive) and serves
11+
/// queries from the cache. In case of a cache miss, the cache is first replaced before serving
12+
/// data. The cache is filled with utxos in range:
13+
/// [lower_query_bound, min(upper_query_bound + cache_lookahead, current_stable_tip)]
14+
///
15+
/// In case of queries where the lower bound is a UTXO, all UTXOs from the containing
16+
/// block are stored. Technically servable case where the lower bound UTXO is the last one in its
17+
/// block but the block is not stored, is treated as a cache miss.
18+
pub struct CachedTokenBridgeDataSourceImpl {
19+
/// Postgres connection pool
20+
pool: PgPool,
21+
/// Prometheus metrics client
22+
metrics_opt: Option<McFollowerMetrics>,
23+
/// Configuration used by Db-Sync
24+
db_sync_config: DbSyncConfigurationProvider,
25+
/// [BlockDataSourceImpl] instance shared with other data sources for cache reuse.
26+
blocks: Arc<BlockDataSourceImpl>,
27+
/// Internal data cache
28+
cache: Arc<Mutex<TokenUtxoCache>>,
29+
/// Number of additional blocks that should be loaded into cache when refreshing
30+
cache_lookahead: u32,
31+
}
32+
33+
#[derive(Default)]
34+
pub(crate) struct TokenUtxoCache {
35+
mc_scripts: MainChainScripts,
36+
start_block: BlockNumber,
37+
end_block: BlockNumber,
38+
transfers: Vec<BridgeUtxo>,
39+
utxo_cache: HashMap<UtxoId, BridgeUtxo>,
40+
}
41+
42+
impl TokenUtxoCache {
43+
pub(crate) fn new() -> Self {
44+
Self::default()
45+
}
46+
47+
pub(crate) fn set_mc_scripts(&mut self, mc_scripts: MainChainScripts) {
48+
if self.mc_scripts != mc_scripts {
49+
self.mc_scripts = mc_scripts;
50+
self.transfers = vec![];
51+
self.start_block = BlockNumber(0);
52+
self.end_block = BlockNumber(0);
53+
}
54+
}
55+
56+
pub(crate) fn set_cached_transfers(
57+
&mut self,
58+
start_block: BlockNumber,
59+
end_block: BlockNumber,
60+
transfers: Vec<BridgeUtxo>,
61+
) {
62+
self.start_block = start_block;
63+
self.end_block = end_block;
64+
self.utxo_cache = transfers.iter().map(|utxo| (utxo.utxo_id(), utxo.clone())).collect();
65+
self.transfers = transfers;
66+
}
67+
68+
pub(crate) fn serve_from_cache(
69+
&self,
70+
checkpoint: &BridgeCheckpoint,
71+
to_block: BlockNumber,
72+
max_transfers: u32,
73+
) -> Option<Vec<BridgeUtxo>> {
74+
if self.end_block < to_block {
75+
return None;
76+
}
77+
78+
let skip_pred: Box<dyn FnMut(&&BridgeUtxo) -> bool> = match checkpoint {
79+
BridgeCheckpoint::Block { number }
80+
if self.start_block <= number.saturating_add(1u32) =>
81+
{
82+
Box::new(move |utxo| *number > utxo.block_number)
83+
},
84+
BridgeCheckpoint::Utxo { block_number, tx_ix, tx_out_ix }
85+
if self.start_block <= *block_number =>
86+
{
87+
Box::new(move |utxo| utxo.ordering_key() <= (*block_number, *tx_ix, *tx_out_ix))
88+
},
89+
_ => return None,
90+
};
91+
92+
Some(
93+
self.transfers
94+
.iter()
95+
.skip_while(skip_pred)
96+
.take_while(|utxo| to_block.0 >= utxo.block_number.0)
97+
.take(max_transfers as usize)
98+
.cloned()
99+
.collect(),
100+
)
101+
}
102+
103+
pub(crate) fn find_utxo_in_cache(&self, utxo_id: &UtxoId) -> Option<BridgeUtxo> {
104+
self.utxo_cache.get(utxo_id).cloned()
105+
}
106+
}
107+
108+
observed_async_trait!(
109+
impl<RecipientAddress> TokenBridgeDataSource<RecipientAddress> for CachedTokenBridgeDataSourceImpl
110+
where
111+
RecipientAddress: Debug,
112+
RecipientAddress: (for<'a> TryFrom<&'a [u8]>),
113+
{
114+
async fn get_transfers(
115+
&self,
116+
main_chain_scripts: MainChainScripts,
117+
data_checkpoint: BridgeDataCheckpoint,
118+
max_transfers: u32,
119+
current_mc_block_hash: McBlockHash,
120+
) -> Result<
121+
(Vec<BridgeTransferV1<RecipientAddress>>, BridgeDataCheckpoint),
122+
Box<dyn std::error::Error + Send + Sync>,
123+
> {
124+
self.set_cache_mc_scripts(main_chain_scripts.clone()).await;
125+
126+
let to_block = self.get_block_by_hash(&current_mc_block_hash).await?.number.into();
127+
128+
let data_checkpoint = self.resolve_data_checkpoint(&data_checkpoint).await?;
129+
130+
let utxos =
131+
match self.try_serve_from_cache(&data_checkpoint, to_block, max_transfers).await {
132+
Some(utxos) => utxos,
133+
None => {
134+
self.fill_cache(main_chain_scripts, &data_checkpoint, to_block).await?;
135+
self.try_serve_from_cache(&data_checkpoint, to_block, max_transfers)
136+
.await
137+
.ok_or("Data should be present in cache after filling cache succeeded")?
138+
},
139+
};
140+
141+
let new_checkpoint = match utxos.last() {
142+
Some(utxo) if (utxos.len() as u32) >= max_transfers => {
143+
BridgeDataCheckpoint::Utxo(utxo.utxo_id())
144+
},
145+
_ => BridgeDataCheckpoint::Block(to_block.into()),
146+
};
147+
148+
let transfers = utxos.into_iter().flat_map(utxo_to_transfer).collect();
149+
150+
Ok((transfers, new_checkpoint))
151+
}
152+
}
153+
);
154+
155+
impl CachedTokenBridgeDataSourceImpl {
156+
/// Crates a new token bridge data source
157+
pub fn new(
158+
pool: PgPool,
159+
metrics_opt: Option<McFollowerMetrics>,
160+
blocks: Arc<BlockDataSourceImpl>,
161+
cache_lookahead: u32,
162+
) -> Self {
163+
Self {
164+
db_sync_config: DbSyncConfigurationProvider::new(pool.clone()),
165+
pool,
166+
metrics_opt,
167+
blocks,
168+
cache: Arc::new(Mutex::new(TokenUtxoCache::new())),
169+
cache_lookahead,
170+
}
171+
}
172+
173+
async fn set_cache_mc_scripts(&self, main_chain_scripts: MainChainScripts) {
174+
let mut cache = self.cache.lock().await;
175+
cache.set_mc_scripts(main_chain_scripts.clone());
176+
}
177+
178+
async fn try_serve_from_cache(
179+
&self,
180+
data_checkpoint: &BridgeCheckpoint,
181+
to_block: BlockNumber,
182+
max_transfers: u32,
183+
) -> Option<Vec<BridgeUtxo>> {
184+
let cache = self.cache.lock().await;
185+
cache.serve_from_cache(data_checkpoint, to_block, max_transfers)
186+
}
187+
188+
async fn fill_cache(
189+
&self,
190+
main_chain_scripts: MainChainScripts,
191+
data_checkpoint: &BridgeCheckpoint,
192+
to_block: BlockNumber,
193+
) -> Result<(), Box<dyn Error + Send + Sync>> {
194+
let from_block = data_checkpoint.get_block_number();
195+
196+
// We want to load all data in the block of `data_checkpoint`, so we go one block back.
197+
let effective_data_checkpoint =
198+
BridgeCheckpoint::Block { number: from_block.saturating_sub(1u32) };
199+
200+
let latest_block = self.get_latest_stable_block().await?.unwrap_or(to_block);
201+
202+
let to_block: BlockNumber =
203+
min(to_block.saturating_add(self.cache_lookahead), latest_block);
204+
205+
let utxos = get_bridge_utxos_tx(
206+
self.db_sync_config.get_tx_in_config().await?,
207+
&self.pool,
208+
&main_chain_scripts.illiquid_circulation_supply_validator_address.clone().into(),
209+
main_chain_scripts.asset_id().into(),
210+
effective_data_checkpoint,
211+
to_block.into(),
212+
None,
213+
)
214+
.await?;
215+
216+
self.set_cached_transfers(from_block, to_block, utxos).await;
217+
218+
Ok(())
219+
}
220+
221+
async fn set_cached_transfers(
222+
&self,
223+
start_block: BlockNumber,
224+
end_block: BlockNumber,
225+
utxos: Vec<BridgeUtxo>,
226+
) {
227+
let mut cache = self.cache.lock().await;
228+
cache.set_cached_transfers(start_block, end_block, utxos);
229+
}
230+
231+
async fn get_latest_stable_block(
232+
&self,
233+
) -> Result<Option<BlockNumber>, Box<dyn Error + Send + Sync>> {
234+
let latest_block_timestamp = self.blocks.get_latest_block_info().await?.timestamp;
235+
Ok(self
236+
.blocks
237+
.get_latest_stable_block_for(latest_block_timestamp.into())
238+
.await?
239+
.map(|block| block.number.into()))
240+
}
241+
242+
async fn get_block_info_for_utxo(
243+
&self,
244+
utxo_id: &UtxoId,
245+
) -> Result<TxBlockInfo, Box<dyn Error + Send + Sync>> {
246+
get_block_info_for_utxo(&self.pool, utxo_id.tx_hash.into())
247+
.await?
248+
.ok_or(format!("Could not find block info for utxo: {utxo_id:?}").into())
249+
}
250+
251+
async fn resolve_data_checkpoint(
252+
&self,
253+
data_checkpoint: &BridgeDataCheckpoint,
254+
) -> Result<BridgeCheckpoint, Box<dyn Error + Send + Sync>> {
255+
match data_checkpoint {
256+
BridgeDataCheckpoint::Block(number) => {
257+
Ok(BridgeCheckpoint::Block { number: (*number).into() })
258+
},
259+
BridgeDataCheckpoint::Utxo(utxo) => {
260+
match self.cache.lock().await.find_utxo_in_cache(&utxo) {
261+
Some(utxo) => Ok(BridgeCheckpoint::Utxo {
262+
block_number: utxo.block_number,
263+
tx_ix: utxo.tx_ix,
264+
tx_out_ix: utxo.utxo_ix,
265+
}),
266+
None => {
267+
let TxBlockInfo { block_number, tx_ix } =
268+
self.get_block_info_for_utxo(&utxo).await?;
269+
Ok(BridgeCheckpoint::Utxo {
270+
block_number,
271+
tx_ix,
272+
tx_out_ix: utxo.index.into(),
273+
})
274+
},
275+
}
276+
},
277+
}
278+
}
279+
280+
async fn get_block_by_hash(
281+
&self,
282+
mc_block_hash: &McBlockHash,
283+
) -> Result<MainchainBlock, Box<dyn Error + Send + Sync>> {
284+
Ok(self
285+
.blocks
286+
.get_block_by_hash(mc_block_hash.clone())
287+
.await?
288+
.ok_or(format!("Could not find block for hash {mc_block_hash:?}"))?)
289+
}
290+
}

0 commit comments

Comments
 (0)