Skip to content

Commit c07bef2

Browse files
authored
Titan scraper (#832)
## 📝 Summary Added support for scraping titan's top bid stream. Also fixes lint problems due to rust update. ## ✅ I have completed the following steps: * [X] Run `make lint` * [X] Run `make test` * [ ] Added tests (if applicable) * [X] Wrote a letter to Santa.
1 parent fa12cf3 commit c07bef2

File tree

19 files changed

+204
-81
lines changed

19 files changed

+204
-81
lines changed

crates/bid-scraper/README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,3 +74,13 @@ This publisher connects to a relay using bloxroute websocket bids protocol.
7474
|relay_name| Be sure to use unique names. Maybe we can take it from the bloxroute_url?|
7575
|auth_header|string or env var|Added as "Authorization" header. Example: "env:BLOXROUTE_AUTH_HEADER"|
7676

77+
78+
* Titan websocket (type = "titan-ws")
79+
This publisher connects to a relay using titan websocket top bid protocol (almost identical to Ultrasound).
80+
81+
##### Fields
82+
| Name | Type | Comments |
83+
|------|------|-------------|
84+
|titan_url|string|Url to connect to. Example: "ws://us.titanrelay.xyz/builder/top_bid"|
85+
|relay_name|string|Be sure to use unique names. Maybe we can take it from the titan_url?|
86+
|api_token|string|Mandatory auth used as header X-Api-Token|

crates/bid-scraper/src/bid_scraper.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::{
1010
config::{NamedPublisherConfig, PublisherConfig},
1111
get_timestamp_f64,
1212
headers_publisher::{HeadersPublisherService, RelayHeadersPublisherConfig},
13+
titan_ws_publisher::{TitanWsConnectionHandler, TitanWsPublisher, TitanWsPublisherConfig},
1314
types::{PublisherType, ScrapedRelayBlockBid},
1415
ultrasound_ws_publisher::{
1516
UltrasoundWsConnectionHandler, UltrasoundWsPublisher, UltrasoundWsPublisherConfig,
@@ -53,6 +54,26 @@ impl PublisherFactory<UltrasoundWsPublisherConfig, UltrasoundWsPublisher> for Ul
5354
}
5455
}
5556

57+
struct TitanWsFactory;
58+
impl PublisherFactory<TitanWsPublisherConfig, TitanWsPublisher> for TitanWsFactory {
59+
async fn create_publisher(
60+
cfg: TitanWsPublisherConfig,
61+
name: String,
62+
sender: Arc<dyn BidSender>,
63+
cancel: CancellationToken,
64+
) -> eyre::Result<TitanWsPublisher> {
65+
Ok(TitanWsPublisher::new(
66+
TitanWsConnectionHandler::new(cfg.clone(), name.clone()),
67+
sender,
68+
cancel,
69+
)
70+
.await)
71+
}
72+
async fn run(publisher: TitanWsPublisher) {
73+
publisher.run().await
74+
}
75+
}
76+
5677
struct BloxrouteWsFactory;
5778
impl PublisherFactory<BloxrouteWsPublisherConfig, BloxrouteWsPublisher> for BloxrouteWsFactory {
5879
async fn create_publisher(
@@ -144,6 +165,14 @@ pub fn run(
144165
global_cancel.clone(),
145166
));
146167
}
168+
PublisherConfig::TitanWs(cfg) => {
169+
tokio::spawn(start_publisher::<_, _, TitanWsFactory>(
170+
cfg,
171+
named_publisher.name,
172+
sender.clone(),
173+
global_cancel.clone(),
174+
));
175+
}
147176
PublisherConfig::BloxrouteWs(cfg) => {
148177
tokio::spawn(start_publisher::<_, _, BloxrouteWsFactory>(
149178
cfg,

crates/bid-scraper/src/config.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use serde_with::serde_as;
44
use crate::{
55
best_bid_ws_connector::ExternalWsPublisherConfig, bids_publisher::RelayBidsPublisherConfig,
66
bloxroute_ws_publisher::BloxrouteWsPublisherConfig,
7-
headers_publisher::RelayHeadersPublisherConfig,
7+
headers_publisher::RelayHeadersPublisherConfig, titan_ws_publisher::TitanWsPublisherConfig,
88
ultrasound_ws_publisher::UltrasoundWsPublisherConfig,
99
};
1010

@@ -14,6 +14,7 @@ pub enum PublisherConfig {
1414
RelayBids(RelayBidsPublisherConfig),
1515
RelayHeaders(RelayHeadersPublisherConfig),
1616
UltrasoundWs(UltrasoundWsPublisherConfig),
17+
TitanWs(TitanWsPublisherConfig),
1718
BloxrouteWs(BloxrouteWsPublisherConfig),
1819
ExternalWs(ExternalWsPublisherConfig),
1920
}

crates/bid-scraper/src/lib.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
11
use std::time::Duration;
22

3-
pub mod bids_publisher;
4-
pub mod bloxroute_ws_publisher;
5-
mod slot;
6-
pub mod ultrasound_ws_publisher;
7-
83
pub mod best_bid_ws_connector;
94
pub mod bid_scraper;
105
pub mod bid_scraper_client;
116
pub mod bid_sender;
7+
pub mod bids_publisher;
8+
pub mod bloxroute_ws_publisher;
129
pub mod config;
1310
pub mod headers_publisher;
1411
pub mod reconnect;
1512
pub mod relay_api_publisher;
13+
mod slot;
14+
pub mod titan_ws_publisher;
1615
pub mod types;
16+
pub mod ultrasound_ws_publisher;
1717
pub mod ws_publisher;
1818

1919
pub type DynResult<T> = std::result::Result<T, Box<dyn std::error::Error>>;
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
//! PUAJ: copy/paste from ultrasound_ws_publisher.rs
2+
use crate::{
3+
types::{top_bid_update::parse_message, PublisherType, ScrapedRelayBlockBid},
4+
ws_publisher::{ConnectionHandler, Service},
5+
};
6+
use eyre::Context;
7+
use futures::stream::{SplitSink, SplitStream};
8+
use serde::Deserialize;
9+
use tokio::net::TcpStream;
10+
use tokio_tungstenite::{
11+
tungstenite::{http::Request, protocol::Message},
12+
MaybeTlsStream, WebSocketStream,
13+
};
14+
15+
#[derive(Debug, Clone, Deserialize, PartialEq)]
16+
pub struct TitanWsPublisherConfig {
17+
/// Url to connect to. Example: "ws://us.titanrelay.xyz/builder/top_bid"
18+
pub titan_url: String,
19+
/// Be sure to use unique names. Maybe we can take it from the titan_url?
20+
pub relay_name: String,
21+
/// used as header X-Api-Token, for use with titan builder direct endpoint
22+
pub api_token: String,
23+
}
24+
25+
pub struct TitanWsConnectionHandler {
26+
cfg: TitanWsPublisherConfig,
27+
name: String,
28+
}
29+
30+
impl TitanWsConnectionHandler {
31+
pub fn new(cfg: TitanWsPublisherConfig, name: String) -> Self {
32+
Self { cfg, name }
33+
}
34+
}
35+
36+
impl ConnectionHandler for TitanWsConnectionHandler {
37+
fn url(&self) -> String {
38+
self.cfg.titan_url.clone()
39+
}
40+
fn configure_request(&self, request: &mut Request<()>) -> eyre::Result<()> {
41+
let headers = request.headers_mut();
42+
let api_token_header_value =
43+
tokio_tungstenite::tungstenite::http::HeaderValue::from_str(&self.cfg.api_token)
44+
.wrap_err("Invalid header value for 'X-Api-Token'")?;
45+
headers.insert("X-Api-Token", api_token_header_value);
46+
Ok(())
47+
}
48+
49+
async fn init_connection(
50+
&self,
51+
_write: &mut SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
52+
_read: &mut SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
53+
) -> eyre::Result<()> {
54+
Ok(())
55+
}
56+
57+
fn parse(&self, message: Message) -> eyre::Result<Option<ScrapedRelayBlockBid>> {
58+
parse_message(
59+
message,
60+
&self.cfg.relay_name,
61+
&self.name,
62+
PublisherType::TitanWs,
63+
)
64+
}
65+
}
66+
67+
pub type TitanWsPublisher = Service<TitanWsConnectionHandler>;

crates/bid-scraper/src/types/bid.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ pub enum PublisherType {
1919
BloxrouteWs,
2020
#[serde(rename = "external_ws")]
2121
ExternalWs,
22+
#[serde(rename = "titan_ws")]
23+
TitanWs,
2224
}
2325

2426
impl PublisherType {
@@ -31,6 +33,7 @@ impl PublisherType {
3133
PublisherType::UltrasoundWs => true,
3234
PublisherType::BloxrouteWs => false,
3335
PublisherType::ExternalWs => true,
36+
PublisherType::TitanWs => true,
3437
}
3538
}
3639
}

crates/bid-scraper/src/types/bid_update.rs

Lines changed: 0 additions & 16 deletions
This file was deleted.
Lines changed: 1 addition & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,3 @@
1-
use crate::get_timestamp_f64;
2-
31
pub mod bid;
42
pub use bid::{PublisherType, ScrapedRelayBlockBid};
5-
6-
mod bid_update;
7-
pub use bid_update::TopBidUpdate;
8-
9-
pub fn block_bid_from_update(
10-
update: TopBidUpdate,
11-
relay_name: &str,
12-
publisher_name: &str,
13-
publisher_type: PublisherType,
14-
) -> ScrapedRelayBlockBid {
15-
ScrapedRelayBlockBid {
16-
publisher_name: publisher_name.to_owned(),
17-
publisher_type: publisher_type.to_owned(),
18-
builder_pubkey: Some(update.builder_pubkey),
19-
relay_name: relay_name.to_owned(),
20-
parent_hash: update.parent_hash,
21-
block_hash: update.block_hash,
22-
seen_time: get_timestamp_f64(),
23-
relay_time: Some(update.timestamp as f64 / 1000.),
24-
value: update.value,
25-
slot_number: update.slot,
26-
gas_used: None,
27-
fee_recipient: Some(update.fee_recipient),
28-
proposer_fee_recipient: None,
29-
optimistic_submission: None,
30-
block_number: update.block_number,
31-
extra_data: None,
32-
}
33-
}
3+
pub mod top_bid_update;
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
use crate::{
2+
get_timestamp_f64,
3+
types::{PublisherType, ScrapedRelayBlockBid},
4+
};
5+
use alloy_primitives::{Address, BlockHash, U256};
6+
use alloy_rpc_types_beacon::BlsPublicKey;
7+
use eyre::eyre;
8+
use ssz::Decode;
9+
use ssz_derive::Decode;
10+
use tokio_tungstenite::tungstenite::Message;
11+
use tracing::debug;
12+
13+
/// Common to Ultrasound and Titan.
14+
#[derive(Debug, Decode)]
15+
struct TopBidUpdate {
16+
/// Millisecond timestamp at which this became the top bid
17+
pub timestamp: u64,
18+
pub slot: u64,
19+
pub block_number: u64,
20+
pub block_hash: BlockHash,
21+
pub parent_hash: BlockHash,
22+
pub builder_pubkey: BlsPublicKey,
23+
pub fee_recipient: Address,
24+
pub value: U256,
25+
}
26+
27+
fn block_bid_from_update(
28+
update: TopBidUpdate,
29+
relay_name: &str,
30+
publisher_name: &str,
31+
publisher_type: PublisherType,
32+
) -> ScrapedRelayBlockBid {
33+
ScrapedRelayBlockBid {
34+
publisher_name: publisher_name.to_owned(),
35+
publisher_type: publisher_type.to_owned(),
36+
builder_pubkey: Some(update.builder_pubkey),
37+
relay_name: relay_name.to_owned(),
38+
parent_hash: update.parent_hash,
39+
block_hash: update.block_hash,
40+
seen_time: get_timestamp_f64(),
41+
relay_time: Some(update.timestamp as f64 / 1000.),
42+
value: update.value,
43+
slot_number: update.slot,
44+
gas_used: None,
45+
fee_recipient: Some(update.fee_recipient),
46+
proposer_fee_recipient: None,
47+
optimistic_submission: None,
48+
block_number: update.block_number,
49+
extra_data: None,
50+
}
51+
}
52+
53+
pub fn parse_message(
54+
message: Message,
55+
relay_name: &str,
56+
publisher_name: &str,
57+
publisher_type: PublisherType,
58+
) -> eyre::Result<Option<ScrapedRelayBlockBid>> {
59+
match message {
60+
Message::Binary(data) => {
61+
let update =
62+
TopBidUpdate::from_ssz_bytes(&data).map_err(|_| eyre!("unable to deserialize"))?;
63+
debug!("Got message: {:?}", update);
64+
let bid = block_bid_from_update(update, relay_name, publisher_name, publisher_type);
65+
Ok(Some(bid))
66+
}
67+
_ => {
68+
eyre::bail!("Unhandled {publisher_type:?} WS message: {message:?}");
69+
}
70+
}
71+
}

crates/bid-scraper/src/ultrasound_ws_publisher.rs

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,15 @@
11
use crate::{
2-
types::{block_bid_from_update, PublisherType, ScrapedRelayBlockBid, TopBidUpdate},
2+
types::{top_bid_update::parse_message, PublisherType, ScrapedRelayBlockBid},
33
ws_publisher::{ConnectionHandler, Service},
44
};
5-
use eyre::{eyre, Context};
5+
use eyre::Context;
66
use futures::stream::{SplitSink, SplitStream};
77
use serde::Deserialize;
8-
use ssz::Decode;
98
use tokio::net::TcpStream;
109
use tokio_tungstenite::{
1110
tungstenite::{http::Request, protocol::Message},
1211
MaybeTlsStream, WebSocketStream,
1312
};
14-
use tracing::debug;
1513

1614
#[derive(Debug, Clone, Deserialize, PartialEq)]
1715
pub struct UltrasoundWsPublisherConfig {
@@ -64,23 +62,12 @@ impl ConnectionHandler for UltrasoundWsConnectionHandler {
6462
}
6563

6664
fn parse(&self, message: Message) -> eyre::Result<Option<ScrapedRelayBlockBid>> {
67-
match message {
68-
Message::Binary(data) => {
69-
let update = TopBidUpdate::from_ssz_bytes(&data)
70-
.map_err(|_| eyre!("unable to deserialize"))?;
71-
debug!("Got message: {:?}", update);
72-
let bid = block_bid_from_update(
73-
update,
74-
&self.cfg.relay_name,
75-
&self.name,
76-
PublisherType::UltrasoundWs,
77-
);
78-
Ok(Some(bid))
79-
}
80-
_ => {
81-
eyre::bail!("Unhandled ultrasound WS message: {:?}", message);
82-
}
83-
}
65+
parse_message(
66+
message,
67+
&self.cfg.relay_name,
68+
&self.name,
69+
PublisherType::UltrasoundWs,
70+
)
8471
}
8572
}
8673

0 commit comments

Comments
 (0)