Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions crates/bid-scraper/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,13 @@ This publisher connects to a relay using bloxroute websocket bids protocol.
|relay_name| Be sure to use unique names. Maybe we can take it from the bloxroute_url?|
|auth_header|string or env var|Added as "Authorization" header. Example: "env:BLOXROUTE_AUTH_HEADER"|


* Titan websocket (type = "titan-ws")
This publisher connects to a relay using titan websocket top bid protocol (almost identical to Ultrasound).

##### Fields
| Name | Type | Comments |
|------|------|-------------|
|titan_url|string|Url to connect to. Example: "ws://us.titanrelay.xyz/builder/top_bid"|
|relay_name|string|Be sure to use unique names. Maybe we can take it from the titan_url?|
|api_token|string|Mandatory auth used as header X-Api-Token|
29 changes: 29 additions & 0 deletions crates/bid-scraper/src/bid_scraper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
config::{NamedPublisherConfig, PublisherConfig},
get_timestamp_f64,
headers_publisher::{HeadersPublisherService, RelayHeadersPublisherConfig},
titan_ws_publisher::{TitanWsConnectionHandler, TitanWsPublisher, TitanWsPublisherConfig},
types::{PublisherType, ScrapedRelayBlockBid},
ultrasound_ws_publisher::{
UltrasoundWsConnectionHandler, UltrasoundWsPublisher, UltrasoundWsPublisherConfig,
Expand Down Expand Up @@ -53,6 +54,26 @@ impl PublisherFactory<UltrasoundWsPublisherConfig, UltrasoundWsPublisher> for Ul
}
}

struct TitanWsFactory;
impl PublisherFactory<TitanWsPublisherConfig, TitanWsPublisher> for TitanWsFactory {
async fn create_publisher(
cfg: TitanWsPublisherConfig,
name: String,
sender: Arc<dyn BidSender>,
cancel: CancellationToken,
) -> eyre::Result<TitanWsPublisher> {
Ok(TitanWsPublisher::new(
TitanWsConnectionHandler::new(cfg.clone(), name.clone()),
sender,
cancel,
)
.await)
}
async fn run(publisher: TitanWsPublisher) {
publisher.run().await
}
}

struct BloxrouteWsFactory;
impl PublisherFactory<BloxrouteWsPublisherConfig, BloxrouteWsPublisher> for BloxrouteWsFactory {
async fn create_publisher(
Expand Down Expand Up @@ -144,6 +165,14 @@ pub fn run(
global_cancel.clone(),
));
}
PublisherConfig::TitanWs(cfg) => {
tokio::spawn(start_publisher::<_, _, TitanWsFactory>(
cfg,
named_publisher.name,
sender.clone(),
global_cancel.clone(),
));
}
PublisherConfig::BloxrouteWs(cfg) => {
tokio::spawn(start_publisher::<_, _, BloxrouteWsFactory>(
cfg,
Expand Down
3 changes: 2 additions & 1 deletion crates/bid-scraper/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use serde_with::serde_as;
use crate::{
best_bid_ws_connector::ExternalWsPublisherConfig, bids_publisher::RelayBidsPublisherConfig,
bloxroute_ws_publisher::BloxrouteWsPublisherConfig,
headers_publisher::RelayHeadersPublisherConfig,
headers_publisher::RelayHeadersPublisherConfig, titan_ws_publisher::TitanWsPublisherConfig,
ultrasound_ws_publisher::UltrasoundWsPublisherConfig,
};

Expand All @@ -14,6 +14,7 @@ pub enum PublisherConfig {
RelayBids(RelayBidsPublisherConfig),
RelayHeaders(RelayHeadersPublisherConfig),
UltrasoundWs(UltrasoundWsPublisherConfig),
TitanWs(TitanWsPublisherConfig),
BloxrouteWs(BloxrouteWsPublisherConfig),
ExternalWs(ExternalWsPublisherConfig),
}
Expand Down
10 changes: 5 additions & 5 deletions crates/bid-scraper/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
use std::time::Duration;

pub mod bids_publisher;
pub mod bloxroute_ws_publisher;
mod slot;
pub mod ultrasound_ws_publisher;

pub mod best_bid_ws_connector;
pub mod bid_scraper;
pub mod bid_scraper_client;
pub mod bid_sender;
pub mod bids_publisher;
pub mod bloxroute_ws_publisher;
pub mod config;
pub mod headers_publisher;
pub mod reconnect;
pub mod relay_api_publisher;
mod slot;
pub mod titan_ws_publisher;
pub mod types;
pub mod ultrasound_ws_publisher;
pub mod ws_publisher;

pub type DynResult<T> = std::result::Result<T, Box<dyn std::error::Error>>;
Expand Down
67 changes: 67 additions & 0 deletions crates/bid-scraper/src/titan_ws_publisher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//! PUAJ: copy/paste from ultrasound_ws_publisher.rs
use crate::{
types::{top_bid_update::parse_message, PublisherType, ScrapedRelayBlockBid},
ws_publisher::{ConnectionHandler, Service},
};
use eyre::Context;
use futures::stream::{SplitSink, SplitStream};
use serde::Deserialize;
use tokio::net::TcpStream;
use tokio_tungstenite::{
tungstenite::{http::Request, protocol::Message},
MaybeTlsStream, WebSocketStream,
};

#[derive(Debug, Clone, Deserialize, PartialEq)]
pub struct TitanWsPublisherConfig {
/// Url to connect to. Example: "ws://us.titanrelay.xyz/builder/top_bid"
pub titan_url: String,
/// Be sure to use unique names. Maybe we can take it from the titan_url?
pub relay_name: String,
/// used as header X-Api-Token, for use with titan builder direct endpoint
pub api_token: String,
}

pub struct TitanWsConnectionHandler {
cfg: TitanWsPublisherConfig,
name: String,
}

impl TitanWsConnectionHandler {
pub fn new(cfg: TitanWsPublisherConfig, name: String) -> Self {
Self { cfg, name }
}
}

impl ConnectionHandler for TitanWsConnectionHandler {
fn url(&self) -> String {
self.cfg.titan_url.clone()
}
fn configure_request(&self, request: &mut Request<()>) -> eyre::Result<()> {
let headers = request.headers_mut();
let api_token_header_value =
tokio_tungstenite::tungstenite::http::HeaderValue::from_str(&self.cfg.api_token)
.wrap_err("Invalid header value for 'X-Api-Token'")?;
headers.insert("X-Api-Token", api_token_header_value);
Ok(())
}

async fn init_connection(
&self,
_write: &mut SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
_read: &mut SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
) -> eyre::Result<()> {
Ok(())
}

fn parse(&self, message: Message) -> eyre::Result<Option<ScrapedRelayBlockBid>> {
parse_message(
message,
&self.cfg.relay_name,
&self.name,
PublisherType::TitanWs,
)
}
}

pub type TitanWsPublisher = Service<TitanWsConnectionHandler>;
3 changes: 3 additions & 0 deletions crates/bid-scraper/src/types/bid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ pub enum PublisherType {
BloxrouteWs,
#[serde(rename = "external_ws")]
ExternalWs,
#[serde(rename = "titan_ws")]
TitanWs,
}

impl PublisherType {
Expand All @@ -31,6 +33,7 @@ impl PublisherType {
PublisherType::UltrasoundWs => true,
PublisherType::BloxrouteWs => false,
PublisherType::ExternalWs => true,
PublisherType::TitanWs => true,
}
}
}
Expand Down
16 changes: 0 additions & 16 deletions crates/bid-scraper/src/types/bid_update.rs

This file was deleted.

32 changes: 1 addition & 31 deletions crates/bid-scraper/src/types/mod.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,3 @@
use crate::get_timestamp_f64;

pub mod bid;
pub use bid::{PublisherType, ScrapedRelayBlockBid};

mod bid_update;
pub use bid_update::TopBidUpdate;

pub fn block_bid_from_update(
update: TopBidUpdate,
relay_name: &str,
publisher_name: &str,
publisher_type: PublisherType,
) -> ScrapedRelayBlockBid {
ScrapedRelayBlockBid {
publisher_name: publisher_name.to_owned(),
publisher_type: publisher_type.to_owned(),
builder_pubkey: Some(update.builder_pubkey),
relay_name: relay_name.to_owned(),
parent_hash: update.parent_hash,
block_hash: update.block_hash,
seen_time: get_timestamp_f64(),
relay_time: Some(update.timestamp as f64 / 1000.),
value: update.value,
slot_number: update.slot,
gas_used: None,
fee_recipient: Some(update.fee_recipient),
proposer_fee_recipient: None,
optimistic_submission: None,
block_number: update.block_number,
extra_data: None,
}
}
pub mod top_bid_update;
71 changes: 71 additions & 0 deletions crates/bid-scraper/src/types/top_bid_update.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use crate::{
get_timestamp_f64,
types::{PublisherType, ScrapedRelayBlockBid},
};
use alloy_primitives::{Address, BlockHash, U256};
use alloy_rpc_types_beacon::BlsPublicKey;
use eyre::eyre;
use ssz::Decode;
use ssz_derive::Decode;
use tokio_tungstenite::tungstenite::Message;
use tracing::debug;

/// Common to Ultrasound and Titan.
#[derive(Debug, Decode)]
struct TopBidUpdate {
/// Millisecond timestamp at which this became the top bid
pub timestamp: u64,
pub slot: u64,
pub block_number: u64,
pub block_hash: BlockHash,
pub parent_hash: BlockHash,
pub builder_pubkey: BlsPublicKey,
pub fee_recipient: Address,
pub value: U256,
}

fn block_bid_from_update(
update: TopBidUpdate,
relay_name: &str,
publisher_name: &str,
publisher_type: PublisherType,
) -> ScrapedRelayBlockBid {
ScrapedRelayBlockBid {
publisher_name: publisher_name.to_owned(),
publisher_type: publisher_type.to_owned(),
builder_pubkey: Some(update.builder_pubkey),
relay_name: relay_name.to_owned(),
parent_hash: update.parent_hash,
block_hash: update.block_hash,
seen_time: get_timestamp_f64(),
relay_time: Some(update.timestamp as f64 / 1000.),
value: update.value,
slot_number: update.slot,
gas_used: None,
fee_recipient: Some(update.fee_recipient),
proposer_fee_recipient: None,
optimistic_submission: None,
block_number: update.block_number,
extra_data: None,
}
}

pub fn parse_message(
message: Message,
relay_name: &str,
publisher_name: &str,
publisher_type: PublisherType,
) -> eyre::Result<Option<ScrapedRelayBlockBid>> {
match message {
Message::Binary(data) => {
let update =
TopBidUpdate::from_ssz_bytes(&data).map_err(|_| eyre!("unable to deserialize"))?;
debug!("Got message: {:?}", update);
let bid = block_bid_from_update(update, relay_name, publisher_name, publisher_type);
Ok(Some(bid))
}
_ => {
eyre::bail!("Unhandled {publisher_type:?} WS message: {message:?}");
}
}
}
29 changes: 8 additions & 21 deletions crates/bid-scraper/src/ultrasound_ws_publisher.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
use crate::{
types::{block_bid_from_update, PublisherType, ScrapedRelayBlockBid, TopBidUpdate},
types::{top_bid_update::parse_message, PublisherType, ScrapedRelayBlockBid},
ws_publisher::{ConnectionHandler, Service},
};
use eyre::{eyre, Context};
use eyre::Context;
use futures::stream::{SplitSink, SplitStream};
use serde::Deserialize;
use ssz::Decode;
use tokio::net::TcpStream;
use tokio_tungstenite::{
tungstenite::{http::Request, protocol::Message},
MaybeTlsStream, WebSocketStream,
};
use tracing::debug;

#[derive(Debug, Clone, Deserialize, PartialEq)]
pub struct UltrasoundWsPublisherConfig {
Expand Down Expand Up @@ -64,23 +62,12 @@ impl ConnectionHandler for UltrasoundWsConnectionHandler {
}

fn parse(&self, message: Message) -> eyre::Result<Option<ScrapedRelayBlockBid>> {
match message {
Message::Binary(data) => {
let update = TopBidUpdate::from_ssz_bytes(&data)
.map_err(|_| eyre!("unable to deserialize"))?;
debug!("Got message: {:?}", update);
let bid = block_bid_from_update(
update,
&self.cfg.relay_name,
&self.name,
PublisherType::UltrasoundWs,
);
Ok(Some(bid))
}
_ => {
eyre::bail!("Unhandled ultrasound WS message: {:?}", message);
}
}
parse_message(
message,
&self.cfg.relay_name,
&self.name,
PublisherType::UltrasoundWs,
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub enum PublisherType {
UltrasoundWs = 2,
BloxrouteWs = 3,
ExternalWs = 4,
TitanWs = 5,
}

impl From<bid_scraper::types::PublisherType> for PublisherType {
Expand All @@ -37,6 +38,7 @@ impl From<bid_scraper::types::PublisherType> for PublisherType {
bid_scraper::types::PublisherType::UltrasoundWs => PublisherType::UltrasoundWs,
bid_scraper::types::PublisherType::BloxrouteWs => PublisherType::BloxrouteWs,
bid_scraper::types::PublisherType::ExternalWs => PublisherType::ExternalWs,
bid_scraper::types::PublisherType::TitanWs => PublisherType::TitanWs,
}
}
}
Expand All @@ -49,6 +51,7 @@ impl From<PublisherType> for bid_scraper::types::PublisherType {
PublisherType::UltrasoundWs => bid_scraper::types::PublisherType::UltrasoundWs,
PublisherType::BloxrouteWs => bid_scraper::types::PublisherType::BloxrouteWs,
PublisherType::ExternalWs => bid_scraper::types::PublisherType::ExternalWs,
PublisherType::TitanWs => bid_scraper::types::PublisherType::TitanWs,
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/rbuilder-operator/src/flashbots_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use serde::Deserialize;
use serde_with::serde_as;
use time::OffsetDateTime;
use tokio_util::sync::CancellationToken;
use tracing::{error, warn};
use tracing::warn;
use url::Url;

use crate::{
Expand Down
Loading
Loading