Skip to content

Commit cdcdd2a

Browse files
authored
Merge pull request #15 from buffrr/fetcher-updates
Refactor fetcher to use the main block source type
2 parents 57ae9bf + 769fdcd commit cdcdd2a

File tree

6 files changed

+52
-60
lines changed

6 files changed

+52
-60
lines changed

node/src/node.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,16 @@ use protocol::{
1616
};
1717
use serde::{Deserialize, Serialize};
1818

19-
use crate::store::{ChainState, ChainStore, LiveSnapshot, LiveStore, Sha256};
19+
use crate::{
20+
source::BitcoinRpcError,
21+
store::{ChainState, ChainStore, LiveSnapshot, LiveStore, Sha256},
22+
};
2023

2124
pub trait BlockSource {
22-
fn get_block_hash(&self, height: u32) -> Result<BlockHash>;
23-
fn get_block(&self, hash: &BlockHash) -> Result<Block>;
24-
fn get_median_time(&self) -> anyhow::Result<u64>;
25-
fn get_block_count(&self) -> Result<u64>;
25+
fn get_block_hash(&self, height: u32) -> Result<BlockHash, BitcoinRpcError>;
26+
fn get_block(&self, hash: &BlockHash) -> Result<Block, BitcoinRpcError>;
27+
fn get_median_time(&self) -> Result<u64, BitcoinRpcError>;
28+
fn get_block_count(&self) -> Result<u64, BitcoinRpcError>;
2629
}
2730

2831
#[derive(Debug, Clone)]

node/src/source.rs

Lines changed: 31 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use std::{
99
time::Duration,
1010
};
1111

12-
use anyhow::anyhow;
1312
use base64::Engine;
1413
use bitcoin::{Block, BlockHash, Txid};
1514
use hex::FromHexError;
@@ -34,8 +33,7 @@ pub struct BitcoinRpc {
3433
}
3534

3635
pub struct BlockFetcher {
37-
client: reqwest::blocking::Client,
38-
rpc: Arc<BitcoinRpc>,
36+
src: BitcoinBlockSource,
3937
job_id: Arc<AtomicUsize>,
4038
sender: std::sync::mpsc::SyncSender<BlockEvent>,
4139
num_workers: usize,
@@ -303,15 +301,13 @@ impl BitcoinRpcAuth {
303301

304302
impl BlockFetcher {
305303
pub fn new(
306-
rpc: BitcoinRpc,
307-
client: reqwest::blocking::Client,
304+
src: BitcoinBlockSource,
308305
num_workers: usize,
309306
) -> (Self, std::sync::mpsc::Receiver<BlockEvent>) {
310307
let (tx, rx) = std::sync::mpsc::sync_channel(12);
311308
(
312309
Self {
313-
client,
314-
rpc: Arc::new(rpc),
310+
src,
315311
job_id: Arc::new(AtomicUsize::new(0)),
316312
sender: tx,
317313
num_workers,
@@ -328,8 +324,7 @@ impl BlockFetcher {
328324
self.stop();
329325

330326
let job_id = self.job_id.load(Ordering::SeqCst);
331-
let task_client = self.client.clone();
332-
let task_rpc = self.rpc.clone();
327+
let task_src = self.src.clone();
333328
let current_task = self.job_id.clone();
334329
let task_sender = self.sender.clone();
335330
let num_workers = self.num_workers;
@@ -348,20 +343,19 @@ impl BlockFetcher {
348343
}
349344
last_check = Instant::now();
350345

351-
let tip: u32 =
352-
match task_rpc.send_json_blocking(&task_client, &task_rpc.get_block_count()) {
353-
Ok(t) => t,
354-
Err(e) => {
355-
_ = task_sender.send(BlockEvent::Error(BlockFetchError::RpcError(e)));
356-
return;
357-
}
358-
};
346+
let tip: u32 = match task_src.get_block_count() {
347+
Ok(t) => t as _,
348+
Err(e) => {
349+
_ = task_sender.send(BlockEvent::Error(BlockFetchError::RpcError(e)));
350+
return;
351+
}
352+
};
359353

360354
if tip > checkpoint.height {
361355
let res = Self::run_workers(
362356
job_id,
363357
current_task.clone(),
364-
task_rpc.clone(),
358+
task_src.clone(),
365359
task_sender.clone(),
366360
checkpoint,
367361
tip,
@@ -386,7 +380,7 @@ impl BlockFetcher {
386380
fn run_workers(
387381
job_id: usize,
388382
current_job: Arc<AtomicUsize>,
389-
rpc: Arc<BitcoinRpc>,
383+
src: BitcoinBlockSource,
390384
sender: std::sync::mpsc::SyncSender<BlockEvent>,
391385
start_block: ChainAnchor,
392386
end_height: u32,
@@ -400,7 +394,7 @@ impl BlockFetcher {
400394
queued_height: start_block.height + 1,
401395
end_height,
402396
ordered_sender: sender,
403-
rpc,
397+
src,
404398
num_workers,
405399
pool: ThreadPool::new(num_workers),
406400
};
@@ -409,14 +403,14 @@ impl BlockFetcher {
409403
}
410404

411405
pub fn fetch_block(
412-
rpc: &BitcoinRpc,
413-
client: &reqwest::blocking::Client,
406+
source: &BitcoinBlockSource,
414407
hash: &BlockHash,
415408
) -> Result<Block, BitcoinRpcError> {
416-
let block_req = rpc.get_block(&hash);
409+
let block_req = source.rpc.get_block(&hash);
417410
let id = block_req.id;
418-
let response = rpc
419-
.send_request_blocking(client, &block_req)?
411+
let response = source
412+
.rpc
413+
.send_request_blocking(&source.client, &block_req)?
420414
.error_for_status()?;
421415
let mut raw = response.bytes()?.to_vec();
422416

@@ -465,7 +459,7 @@ struct Workers {
465459
queued_height: u32,
466460
end_height: u32,
467461
ordered_sender: std::sync::mpsc::SyncSender<BlockEvent>,
468-
rpc: Arc<BitcoinRpc>,
462+
src: BitcoinBlockSource,
469463
num_workers: usize,
470464
pool: ThreadPool,
471465
}
@@ -521,7 +515,6 @@ impl Workers {
521515
}
522516

523517
fn run(&mut self) -> Result<ChainAnchor, BlockFetchError> {
524-
let client = reqwest::blocking::Client::new();
525518
let (tx, rx) = std::sync::mpsc::sync_channel(self.num_workers);
526519

527520
'queue_blocks: while !self.queued_all() {
@@ -534,8 +527,7 @@ impl Workers {
534527
return Err(BlockFetchError::ChannelClosed);
535528
}
536529
let tx = tx.clone();
537-
let rpc = self.rpc.clone();
538-
let task_client = client.clone();
530+
let rpc = self.src.clone();
539531
let task_sigterm = self.current_job.clone();
540532
let height = self.queued_height;
541533
let job_id = self.job_id;
@@ -545,9 +537,8 @@ impl Workers {
545537
return;
546538
}
547539
let result: Result<_, BitcoinRpcError> = (move || {
548-
let hash: BlockHash =
549-
rpc.send_json_blocking(&task_client, &rpc.get_block_hash(height))?;
550-
let block = BlockFetcher::fetch_block(&rpc, &task_client, &hash)?;
540+
let hash: BlockHash = rpc.get_block_hash(height)?;
541+
let block = BlockFetcher::fetch_block(&rpc, &hash)?;
551542
Ok((ChainAnchor { height, hash }, block))
552543
})();
553544
_ = tx.send(result);
@@ -675,7 +666,7 @@ impl ErrorForRpc for reqwest::Response {
675666
return Err(BitcoinRpcError::Rpc(e));
676667
}
677668

678-
return Ok(rpc_res.result.unwrap());
669+
Ok(rpc_res.result.unwrap())
679670
}
680671
}
681672

@@ -704,29 +695,31 @@ impl BitcoinBlockSource {
704695
}
705696

706697
impl BlockSource for BitcoinBlockSource {
707-
fn get_block_hash(&self, height: u32) -> anyhow::Result<BlockHash> {
698+
fn get_block_hash(&self, height: u32) -> Result<BlockHash, BitcoinRpcError> {
708699
Ok(self
709700
.rpc
710701
.send_json_blocking(&self.client, &self.rpc.get_block_hash(height))?)
711702
}
712703

713-
fn get_block(&self, hash: &BlockHash) -> anyhow::Result<Block> {
704+
fn get_block(&self, hash: &BlockHash) -> Result<Block, BitcoinRpcError> {
714705
Ok(self
715706
.rpc
716707
.send_json_blocking(&self.client, &self.rpc.get_block(hash))?)
717708
}
718709

719-
fn get_median_time(&self) -> anyhow::Result<u64> {
710+
fn get_median_time(&self) -> Result<u64, BitcoinRpcError> {
720711
let info: serde_json::Value = self
721712
.rpc
722713
.send_json_blocking(&self.client, &self.rpc.get_blockchain_info())?;
723714
if let Some(time) = info.get("mediantime").and_then(|t| t.as_u64()) {
724715
return Ok(time);
725716
}
726-
return Err(anyhow!("Could not fetch median time"));
717+
Err(BitcoinRpcError::Other(
718+
"Could not fetch median time".to_string(),
719+
))
727720
}
728721

729-
fn get_block_count(&self) -> anyhow::Result<u64> {
722+
fn get_block_count(&self) -> Result<u64, BitcoinRpcError> {
730723
Ok(self
731724
.rpc
732725
.send_json_blocking(&self.client, &self.rpc.get_block_count())?)

node/src/sync.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,9 +154,7 @@ impl Spaced {
154154
start_block.hash, start_block.height
155155
);
156156

157-
let rpc = source.rpc.clone();
158-
let client = reqwest::blocking::Client::new();
159-
let (fetcher, receiver) = BlockFetcher::new(rpc.clone(), client.clone(), self.num_workers);
157+
let (fetcher, receiver) = BlockFetcher::new(source.clone(), self.num_workers);
160158
fetcher.start(start_block);
161159

162160
let mut shutdown_signal = shutdown.subscribe();

node/src/wallets.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -284,8 +284,7 @@ impl RpcWallet {
284284
mut shutdown: broadcast::Receiver<()>,
285285
num_workers: usize,
286286
) -> anyhow::Result<()> {
287-
let (fetcher, receiver) =
288-
BlockFetcher::new(source.rpc.clone(), source.client.clone(), num_workers);
287+
let (fetcher, receiver) = BlockFetcher::new(source.clone(), num_workers);
289288

290289
let mut wallet_tip = {
291290
let tip = wallet.coins.local_chain().tip();

node/tests/fetcher_tests.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@ use std::{
55

66
use anyhow::Result;
77
use protocol::{bitcoin::BlockHash, constants::ChainAnchor};
8-
use reqwest::blocking::Client;
9-
use spaced::source::{BitcoinRpc, BitcoinRpcAuth, BlockEvent, BlockFetcher};
8+
use spaced::source::{BitcoinBlockSource, BitcoinRpc, BitcoinRpcAuth, BlockEvent, BlockFetcher};
109
use testutil::TestRig;
1110

1211
async fn setup(blocks: u64) -> Result<(TestRig, u64, BlockHash)> {
@@ -21,16 +20,16 @@ async fn setup(blocks: u64) -> Result<(TestRig, u64, BlockHash)> {
2120
fn test_block_fetching_from_bitcoin_rpc() -> Result<()> {
2221
const GENERATED_BLOCKS: u64 = 10;
2322

24-
let (rig, mut height, hash) = tokio::runtime::Runtime::new()?
25-
.block_on(setup(GENERATED_BLOCKS))?;
26-
let fetcher_rpc = BitcoinRpc::new(
23+
let (rig, mut height, hash) =
24+
tokio::runtime::Runtime::new()?.block_on(setup(GENERATED_BLOCKS))?;
25+
let fetcher_rpc = BitcoinBlockSource::new(BitcoinRpc::new(
2726
&rig.bitcoind.rpc_url(),
2827
BitcoinRpcAuth::UserPass("user".to_string(), "password".to_string()),
28+
));
29+
let (fetcher, receiver) = BlockFetcher::new(
30+
fetcher_rpc.clone(),
31+
8
2932
);
30-
31-
let client = Client::new();
32-
let (fetcher, receiver) = BlockFetcher::new(fetcher_rpc.clone(), client.clone(), 8);
33-
3433
fetcher.start(ChainAnchor { hash, height: 0 });
3534

3635
let timeout = Duration::from_secs(5);

node/tests/wallet_tests.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::str::FromStr;
2+
23
use protocol::bitcoin::{Address, Amount};
3-
use spaced::rpc::RpcClient;
4-
use spaced::wallets::AddressKind;
4+
use spaced::{rpc::RpcClient, wallets::AddressKind};
55
use testutil::TestRig;
66

77
async fn setup() -> anyhow::Result<TestRig> {
@@ -22,7 +22,7 @@ async fn it_should_create_and_fund_wallet(rig: &TestRig) -> anyhow::Result<()> {
2222
.wallet_get_new_address(name.clone(), AddressKind::Coin)
2323
.await?,
2424
)?
25-
.assume_checked();
25+
.assume_checked();
2626
// have the rig send some coins
2727
rig.send(&addr, Amount::from_sat(1000_000)).await?;
2828
// mine the transaction

0 commit comments

Comments
 (0)