diff --git a/crates/bid-scraper/README.md b/crates/bid-scraper/README.md index b755708c3..e1c4c92e9 100644 --- a/crates/bid-scraper/README.md +++ b/crates/bid-scraper/README.md @@ -74,3 +74,13 @@ This publisher connects to a relay using bloxroute websocket bids protocol. |relay_name| Be sure to use unique names. Maybe we can take it from the bloxroute_url?| |auth_header|string or env var|Added as "Authorization" header. Example: "env:BLOXROUTE_AUTH_HEADER"| + +* Titan websocket (type = "titan-ws") +This publisher connects to a relay using titan websocket top bid protocol (almost identical to Ultrasound). + + ##### Fields + | Name | Type | Comments | + |------|------|-------------| + |titan_url|string|Url to connect to. Example: "ws://us.titanrelay.xyz/builder/top_bid"| + |relay_name|string|Be sure to use unique names. Maybe we can take it from the titan_url?| + |api_token|string|Mandatory auth used as header X-Api-Token| \ No newline at end of file diff --git a/crates/bid-scraper/src/bid_scraper.rs b/crates/bid-scraper/src/bid_scraper.rs index 8f98a6575..d2e36816e 100644 --- a/crates/bid-scraper/src/bid_scraper.rs +++ b/crates/bid-scraper/src/bid_scraper.rs @@ -10,6 +10,7 @@ use crate::{ config::{NamedPublisherConfig, PublisherConfig}, get_timestamp_f64, headers_publisher::{HeadersPublisherService, RelayHeadersPublisherConfig}, + titan_ws_publisher::{TitanWsConnectionHandler, TitanWsPublisher, TitanWsPublisherConfig}, types::{PublisherType, ScrapedRelayBlockBid}, ultrasound_ws_publisher::{ UltrasoundWsConnectionHandler, UltrasoundWsPublisher, UltrasoundWsPublisherConfig, @@ -53,6 +54,26 @@ impl PublisherFactory for Ul } } +struct TitanWsFactory; +impl PublisherFactory for TitanWsFactory { + async fn create_publisher( + cfg: TitanWsPublisherConfig, + name: String, + sender: Arc, + cancel: CancellationToken, + ) -> eyre::Result { + Ok(TitanWsPublisher::new( + TitanWsConnectionHandler::new(cfg.clone(), name.clone()), + sender, + cancel, + ) + .await) + } + async fn run(publisher: TitanWsPublisher) { + publisher.run().await + } +} + struct BloxrouteWsFactory; impl PublisherFactory for BloxrouteWsFactory { async fn create_publisher( @@ -144,6 +165,14 @@ pub fn run( global_cancel.clone(), )); } + PublisherConfig::TitanWs(cfg) => { + tokio::spawn(start_publisher::<_, _, TitanWsFactory>( + cfg, + named_publisher.name, + sender.clone(), + global_cancel.clone(), + )); + } PublisherConfig::BloxrouteWs(cfg) => { tokio::spawn(start_publisher::<_, _, BloxrouteWsFactory>( cfg, diff --git a/crates/bid-scraper/src/config.rs b/crates/bid-scraper/src/config.rs index f61d1d3fa..fe6210708 100644 --- a/crates/bid-scraper/src/config.rs +++ b/crates/bid-scraper/src/config.rs @@ -4,7 +4,7 @@ use serde_with::serde_as; use crate::{ best_bid_ws_connector::ExternalWsPublisherConfig, bids_publisher::RelayBidsPublisherConfig, bloxroute_ws_publisher::BloxrouteWsPublisherConfig, - headers_publisher::RelayHeadersPublisherConfig, + headers_publisher::RelayHeadersPublisherConfig, titan_ws_publisher::TitanWsPublisherConfig, ultrasound_ws_publisher::UltrasoundWsPublisherConfig, }; @@ -14,6 +14,7 @@ pub enum PublisherConfig { RelayBids(RelayBidsPublisherConfig), RelayHeaders(RelayHeadersPublisherConfig), UltrasoundWs(UltrasoundWsPublisherConfig), + TitanWs(TitanWsPublisherConfig), BloxrouteWs(BloxrouteWsPublisherConfig), ExternalWs(ExternalWsPublisherConfig), } diff --git a/crates/bid-scraper/src/lib.rs b/crates/bid-scraper/src/lib.rs index baef5e7fd..a2473526d 100644 --- a/crates/bid-scraper/src/lib.rs +++ b/crates/bid-scraper/src/lib.rs @@ -1,19 +1,19 @@ use std::time::Duration; -pub mod bids_publisher; -pub mod bloxroute_ws_publisher; -mod slot; -pub mod ultrasound_ws_publisher; - pub mod best_bid_ws_connector; pub mod bid_scraper; pub mod bid_scraper_client; pub mod bid_sender; +pub mod bids_publisher; +pub mod bloxroute_ws_publisher; pub mod config; pub mod headers_publisher; pub mod reconnect; pub mod relay_api_publisher; +mod slot; +pub mod titan_ws_publisher; pub mod types; +pub mod ultrasound_ws_publisher; pub mod ws_publisher; pub type DynResult = std::result::Result>; diff --git a/crates/bid-scraper/src/titan_ws_publisher.rs b/crates/bid-scraper/src/titan_ws_publisher.rs new file mode 100644 index 000000000..94069562d --- /dev/null +++ b/crates/bid-scraper/src/titan_ws_publisher.rs @@ -0,0 +1,67 @@ +//! PUAJ: copy/paste from ultrasound_ws_publisher.rs +use crate::{ + types::{top_bid_update::parse_message, PublisherType, ScrapedRelayBlockBid}, + ws_publisher::{ConnectionHandler, Service}, +}; +use eyre::Context; +use futures::stream::{SplitSink, SplitStream}; +use serde::Deserialize; +use tokio::net::TcpStream; +use tokio_tungstenite::{ + tungstenite::{http::Request, protocol::Message}, + MaybeTlsStream, WebSocketStream, +}; + +#[derive(Debug, Clone, Deserialize, PartialEq)] +pub struct TitanWsPublisherConfig { + /// Url to connect to. Example: "ws://us.titanrelay.xyz/builder/top_bid" + pub titan_url: String, + /// Be sure to use unique names. Maybe we can take it from the titan_url? + pub relay_name: String, + /// used as header X-Api-Token, for use with titan builder direct endpoint + pub api_token: String, +} + +pub struct TitanWsConnectionHandler { + cfg: TitanWsPublisherConfig, + name: String, +} + +impl TitanWsConnectionHandler { + pub fn new(cfg: TitanWsPublisherConfig, name: String) -> Self { + Self { cfg, name } + } +} + +impl ConnectionHandler for TitanWsConnectionHandler { + fn url(&self) -> String { + self.cfg.titan_url.clone() + } + fn configure_request(&self, request: &mut Request<()>) -> eyre::Result<()> { + let headers = request.headers_mut(); + let api_token_header_value = + tokio_tungstenite::tungstenite::http::HeaderValue::from_str(&self.cfg.api_token) + .wrap_err("Invalid header value for 'X-Api-Token'")?; + headers.insert("X-Api-Token", api_token_header_value); + Ok(()) + } + + async fn init_connection( + &self, + _write: &mut SplitSink>, Message>, + _read: &mut SplitStream>>, + ) -> eyre::Result<()> { + Ok(()) + } + + fn parse(&self, message: Message) -> eyre::Result> { + parse_message( + message, + &self.cfg.relay_name, + &self.name, + PublisherType::TitanWs, + ) + } +} + +pub type TitanWsPublisher = Service; diff --git a/crates/bid-scraper/src/types/bid.rs b/crates/bid-scraper/src/types/bid.rs index fdb3c78a8..ad29e2abf 100644 --- a/crates/bid-scraper/src/types/bid.rs +++ b/crates/bid-scraper/src/types/bid.rs @@ -19,6 +19,8 @@ pub enum PublisherType { BloxrouteWs, #[serde(rename = "external_ws")] ExternalWs, + #[serde(rename = "titan_ws")] + TitanWs, } impl PublisherType { @@ -31,6 +33,7 @@ impl PublisherType { PublisherType::UltrasoundWs => true, PublisherType::BloxrouteWs => false, PublisherType::ExternalWs => true, + PublisherType::TitanWs => true, } } } diff --git a/crates/bid-scraper/src/types/bid_update.rs b/crates/bid-scraper/src/types/bid_update.rs deleted file mode 100644 index ff47a994c..000000000 --- a/crates/bid-scraper/src/types/bid_update.rs +++ /dev/null @@ -1,16 +0,0 @@ -use alloy_primitives::{Address, BlockHash, U256}; -use alloy_rpc_types_beacon::BlsPublicKey; -use ssz_derive::Decode; - -#[derive(Debug, Decode)] -pub struct TopBidUpdate { - /// Millisecond timestamp at which this became the top bid - pub timestamp: u64, - pub slot: u64, - pub block_number: u64, - pub block_hash: BlockHash, - pub parent_hash: BlockHash, - pub builder_pubkey: BlsPublicKey, - pub fee_recipient: Address, - pub value: U256, -} diff --git a/crates/bid-scraper/src/types/mod.rs b/crates/bid-scraper/src/types/mod.rs index 8af03989a..81308b6cf 100644 --- a/crates/bid-scraper/src/types/mod.rs +++ b/crates/bid-scraper/src/types/mod.rs @@ -1,33 +1,3 @@ -use crate::get_timestamp_f64; - pub mod bid; pub use bid::{PublisherType, ScrapedRelayBlockBid}; - -mod bid_update; -pub use bid_update::TopBidUpdate; - -pub fn block_bid_from_update( - update: TopBidUpdate, - relay_name: &str, - publisher_name: &str, - publisher_type: PublisherType, -) -> ScrapedRelayBlockBid { - ScrapedRelayBlockBid { - publisher_name: publisher_name.to_owned(), - publisher_type: publisher_type.to_owned(), - builder_pubkey: Some(update.builder_pubkey), - relay_name: relay_name.to_owned(), - parent_hash: update.parent_hash, - block_hash: update.block_hash, - seen_time: get_timestamp_f64(), - relay_time: Some(update.timestamp as f64 / 1000.), - value: update.value, - slot_number: update.slot, - gas_used: None, - fee_recipient: Some(update.fee_recipient), - proposer_fee_recipient: None, - optimistic_submission: None, - block_number: update.block_number, - extra_data: None, - } -} +pub mod top_bid_update; diff --git a/crates/bid-scraper/src/types/top_bid_update.rs b/crates/bid-scraper/src/types/top_bid_update.rs new file mode 100644 index 000000000..3b11bf973 --- /dev/null +++ b/crates/bid-scraper/src/types/top_bid_update.rs @@ -0,0 +1,71 @@ +use crate::{ + get_timestamp_f64, + types::{PublisherType, ScrapedRelayBlockBid}, +}; +use alloy_primitives::{Address, BlockHash, U256}; +use alloy_rpc_types_beacon::BlsPublicKey; +use eyre::eyre; +use ssz::Decode; +use ssz_derive::Decode; +use tokio_tungstenite::tungstenite::Message; +use tracing::debug; + +/// Common to Ultrasound and Titan. +#[derive(Debug, Decode)] +struct TopBidUpdate { + /// Millisecond timestamp at which this became the top bid + pub timestamp: u64, + pub slot: u64, + pub block_number: u64, + pub block_hash: BlockHash, + pub parent_hash: BlockHash, + pub builder_pubkey: BlsPublicKey, + pub fee_recipient: Address, + pub value: U256, +} + +fn block_bid_from_update( + update: TopBidUpdate, + relay_name: &str, + publisher_name: &str, + publisher_type: PublisherType, +) -> ScrapedRelayBlockBid { + ScrapedRelayBlockBid { + publisher_name: publisher_name.to_owned(), + publisher_type: publisher_type.to_owned(), + builder_pubkey: Some(update.builder_pubkey), + relay_name: relay_name.to_owned(), + parent_hash: update.parent_hash, + block_hash: update.block_hash, + seen_time: get_timestamp_f64(), + relay_time: Some(update.timestamp as f64 / 1000.), + value: update.value, + slot_number: update.slot, + gas_used: None, + fee_recipient: Some(update.fee_recipient), + proposer_fee_recipient: None, + optimistic_submission: None, + block_number: update.block_number, + extra_data: None, + } +} + +pub fn parse_message( + message: Message, + relay_name: &str, + publisher_name: &str, + publisher_type: PublisherType, +) -> eyre::Result> { + match message { + Message::Binary(data) => { + let update = + TopBidUpdate::from_ssz_bytes(&data).map_err(|_| eyre!("unable to deserialize"))?; + debug!("Got message: {:?}", update); + let bid = block_bid_from_update(update, relay_name, publisher_name, publisher_type); + Ok(Some(bid)) + } + _ => { + eyre::bail!("Unhandled {publisher_type:?} WS message: {message:?}"); + } + } +} diff --git a/crates/bid-scraper/src/ultrasound_ws_publisher.rs b/crates/bid-scraper/src/ultrasound_ws_publisher.rs index dcaf71ae3..a14cf897f 100644 --- a/crates/bid-scraper/src/ultrasound_ws_publisher.rs +++ b/crates/bid-scraper/src/ultrasound_ws_publisher.rs @@ -1,17 +1,15 @@ use crate::{ - types::{block_bid_from_update, PublisherType, ScrapedRelayBlockBid, TopBidUpdate}, + types::{top_bid_update::parse_message, PublisherType, ScrapedRelayBlockBid}, ws_publisher::{ConnectionHandler, Service}, }; -use eyre::{eyre, Context}; +use eyre::Context; use futures::stream::{SplitSink, SplitStream}; use serde::Deserialize; -use ssz::Decode; use tokio::net::TcpStream; use tokio_tungstenite::{ tungstenite::{http::Request, protocol::Message}, MaybeTlsStream, WebSocketStream, }; -use tracing::debug; #[derive(Debug, Clone, Deserialize, PartialEq)] pub struct UltrasoundWsPublisherConfig { @@ -64,23 +62,12 @@ impl ConnectionHandler for UltrasoundWsConnectionHandler { } fn parse(&self, message: Message) -> eyre::Result> { - match message { - Message::Binary(data) => { - let update = TopBidUpdate::from_ssz_bytes(&data) - .map_err(|_| eyre!("unable to deserialize"))?; - debug!("Got message: {:?}", update); - let bid = block_bid_from_update( - update, - &self.cfg.relay_name, - &self.name, - PublisherType::UltrasoundWs, - ); - Ok(Some(bid)) - } - _ => { - eyre::bail!("Unhandled ultrasound WS message: {:?}", message); - } - } + parse_message( + message, + &self.cfg.relay_name, + &self.name, + PublisherType::UltrasoundWs, + ) } } diff --git a/crates/rbuilder-operator/src/bidding_service_wrapper/fast_streams/types.rs b/crates/rbuilder-operator/src/bidding_service_wrapper/fast_streams/types.rs index cba475ae4..ad2f3de8a 100644 --- a/crates/rbuilder-operator/src/bidding_service_wrapper/fast_streams/types.rs +++ b/crates/rbuilder-operator/src/bidding_service_wrapper/fast_streams/types.rs @@ -27,6 +27,7 @@ pub enum PublisherType { UltrasoundWs = 2, BloxrouteWs = 3, ExternalWs = 4, + TitanWs = 5, } impl From for PublisherType { @@ -37,6 +38,7 @@ impl From for PublisherType { bid_scraper::types::PublisherType::UltrasoundWs => PublisherType::UltrasoundWs, bid_scraper::types::PublisherType::BloxrouteWs => PublisherType::BloxrouteWs, bid_scraper::types::PublisherType::ExternalWs => PublisherType::ExternalWs, + bid_scraper::types::PublisherType::TitanWs => PublisherType::TitanWs, } } } @@ -49,6 +51,7 @@ impl From for bid_scraper::types::PublisherType { PublisherType::UltrasoundWs => bid_scraper::types::PublisherType::UltrasoundWs, PublisherType::BloxrouteWs => bid_scraper::types::PublisherType::BloxrouteWs, PublisherType::ExternalWs => bid_scraper::types::PublisherType::ExternalWs, + PublisherType::TitanWs => bid_scraper::types::PublisherType::TitanWs, } } } diff --git a/crates/rbuilder-operator/src/flashbots_config.rs b/crates/rbuilder-operator/src/flashbots_config.rs index 4dbb75edd..f48556258 100644 --- a/crates/rbuilder-operator/src/flashbots_config.rs +++ b/crates/rbuilder-operator/src/flashbots_config.rs @@ -36,7 +36,7 @@ use serde::Deserialize; use serde_with::serde_as; use time::OffsetDateTime; use tokio_util::sync::CancellationToken; -use tracing::{error, warn}; +use tracing::warn; use url::Url; use crate::{ diff --git a/crates/rbuilder-operator/src/true_block_value_push/blocks_processor_backend.rs b/crates/rbuilder-operator/src/true_block_value_push/blocks_processor_backend.rs index 6d10761da..76e7132fc 100644 --- a/crates/rbuilder-operator/src/true_block_value_push/blocks_processor_backend.rs +++ b/crates/rbuilder-operator/src/true_block_value_push/blocks_processor_backend.rs @@ -5,7 +5,6 @@ use crate::{ use alloy_signer_local::PrivateKeySigner; use jsonrpsee::core::client::ClientT; use tokio::runtime::Runtime; -use tracing::error; use super::best_true_value_pusher::{Backend, BuiltBlockInfo}; diff --git a/crates/rbuilder-operator/src/true_block_value_push/redis_backend.rs b/crates/rbuilder-operator/src/true_block_value_push/redis_backend.rs index be3e4bf85..6977ff303 100644 --- a/crates/rbuilder-operator/src/true_block_value_push/redis_backend.rs +++ b/crates/rbuilder-operator/src/true_block_value_push/redis_backend.rs @@ -1,5 +1,4 @@ use redis::Commands; -use tracing::error; use super::best_true_value_pusher::{Backend, BuiltBlockInfo}; diff --git a/crates/rbuilder-primitives/src/serialize.rs b/crates/rbuilder-primitives/src/serialize.rs index 47599876f..763f44bc4 100644 --- a/crates/rbuilder-primitives/src/serialize.rs +++ b/crates/rbuilder-primitives/src/serialize.rs @@ -12,7 +12,6 @@ use reth_chainspec::MAINNET; use serde::{Deserialize, Deserializer, Serialize}; use serde_with::serde_as; use thiserror::Error; -use tracing::error; use uuid::Uuid; /// Encoding mode for raw transactions (https://eips.ethereum.org/EIPS/eip-4844) diff --git a/crates/rbuilder/src/backtest/build_block/synthetic_orders.rs b/crates/rbuilder/src/backtest/build_block/synthetic_orders.rs index 6d3ecdfb4..22cbe4578 100644 --- a/crates/rbuilder/src/backtest/build_block/synthetic_orders.rs +++ b/crates/rbuilder/src/backtest/build_block/synthetic_orders.rs @@ -1,5 +1,5 @@ use alloy_primitives::B256; -use clap::{command, Parser}; +use clap::Parser; use rbuilder_config::load_toml_config; use rbuilder_primitives::{ Bundle, MempoolTx, Metadata, Order, TransactionSignedEcRecoveredWithBlobs, LAST_BUNDLE_VERSION, diff --git a/crates/rbuilder/src/building/builders/block_building_helper.rs b/crates/rbuilder/src/building/builders/block_building_helper.rs index c9dfbe7a8..5bbd70d34 100644 --- a/crates/rbuilder/src/building/builders/block_building_helper.rs +++ b/crates/rbuilder/src/building/builders/block_building_helper.rs @@ -7,7 +7,7 @@ use std::{ }; use time::OffsetDateTime; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, trace, warn}; +use tracing::{debug, trace, warn}; use crate::{ building::{ diff --git a/crates/rbuilder/src/live_builder/wallet_balance_watcher.rs b/crates/rbuilder/src/live_builder/wallet_balance_watcher.rs index c076b240a..016e9f07c 100644 --- a/crates/rbuilder/src/live_builder/wallet_balance_watcher.rs +++ b/crates/rbuilder/src/live_builder/wallet_balance_watcher.rs @@ -3,7 +3,7 @@ use std::time::Duration; use alloy_primitives::{utils::format_ether, Address, BlockNumber, U256}; use reth::providers::ProviderError; use time::{error, OffsetDateTime}; -use tracing::{error, info, warn}; +use tracing::{info, warn}; use crate::{ provider::StateProviderFactory, diff --git a/crates/rbuilder/src/telemetry/metrics/mod.rs b/crates/rbuilder/src/telemetry/metrics/mod.rs index 20a355386..d22404fef 100644 --- a/crates/rbuilder/src/telemetry/metrics/mod.rs +++ b/crates/rbuilder/src/telemetry/metrics/mod.rs @@ -736,6 +736,7 @@ pub fn inc_bids_received(bid: &ScrapedRelayBlockBid) { bid_scraper::types::PublisherType::UltrasoundWs => "ultrasound_ws", bid_scraper::types::PublisherType::BloxrouteWs => "bloxroute_ws", bid_scraper::types::PublisherType::ExternalWs => "external_ws", + bid_scraper::types::PublisherType::TitanWs => "titan_ws", }; BIDS_RECEIVED .with_label_values(&[relay_name, publisher_name, publisher_type])