Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/optimism/flashblocks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,4 @@ eyre.workspace = true

[dev-dependencies]
test-case.workspace = true
alloy-consensus.workspace = true
1 change: 1 addition & 0 deletions crates/optimism/flashblocks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub use service::FlashBlockService;
pub use ws::{WsConnect, WsFlashBlockStream};

mod payload;
mod sequence;
mod service;
mod ws;

Expand Down
186 changes: 186 additions & 0 deletions crates/optimism/flashblocks/src/sequence.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
use crate::{ExecutionPayloadBaseV1, FlashBlock};
use alloy_eips::eip2718::WithEncoded;
use reth_primitives_traits::{Recovered, SignedTransaction};
use std::collections::BTreeMap;
use tracing::trace;

/// An ordered B-tree keeping the track of a sequence of [`FlashBlock`]s by their indices.
#[derive(Debug)]
pub(crate) struct FlashBlockSequence<T> {
/// tracks the individual flashblocks in order
///
/// With a blocktime of 2s and flashblock tick-rate of 200ms plus one extra flashblock per new
/// pending block, we expect 11 flashblocks per slot.
inner: BTreeMap<u64, PreparedFlashBlock<T>>,
}

impl<T> FlashBlockSequence<T>
where
T: SignedTransaction,
{
pub(crate) const fn new() -> Self {
Self { inner: BTreeMap::new() }
}

/// Inserts a new block into the sequence.
///
/// A [`FlashBlock`] with index 0 resets the set.
pub(crate) fn insert(&mut self, flashblock: FlashBlock) -> eyre::Result<()> {
if flashblock.index == 0 {
trace!(number=%flashblock.block_number(), "Tracking new flashblock sequence");
// Flash block at index zero resets the whole state
self.clear();
self.inner.insert(flashblock.index, PreparedFlashBlock::new(flashblock)?);
return Ok(())
}

// only insert if we we previously received the same block, assume we received index 0
if self.block_number() == Some(flashblock.metadata.block_number) {
trace!(number=%flashblock.block_number(), index = %flashblock.index, block_count = self.inner.len() ,"Received followup flashblock");
self.inner.insert(flashblock.index, PreparedFlashBlock::new(flashblock)?);
} else {
trace!(number=%flashblock.block_number(), index = %flashblock.index, current=?self.block_number() ,"Ignoring untracked flashblock following");
}

Ok(())
}

/// Returns the first block number
pub(crate) fn block_number(&self) -> Option<u64> {
Some(self.inner.values().next()?.block().metadata.block_number)
}

/// Returns the payload base of the first tracked flashblock.
pub(crate) fn payload_base(&self) -> Option<ExecutionPayloadBaseV1> {
self.inner.values().next()?.block().base.clone()
}

/// Iterator over sequence of executable transactions.
///
/// A flashblocks is not ready if there's missing previous flashblocks, i.e. there's a gap in
/// the sequence
///
/// Note: flashblocks start at `index 0`.
pub(crate) fn ready_transactions(
&self,
) -> impl Iterator<Item = WithEncoded<Recovered<T>>> + '_ {
self.inner
.values()
.enumerate()
.take_while(|(idx, block)| {
// flashblock index 0 is the first flashblock
block.block().index == *idx as u64
})
.flat_map(|(_, block)| block.txs.clone())
}

/// Returns the number of tracked flashblocks.
pub(crate) fn count(&self) -> usize {
self.inner.len()
}

fn clear(&mut self) {
self.inner.clear();
}
}

#[derive(Debug)]
struct PreparedFlashBlock<T> {
/// The prepared transactions, ready for execution
txs: Vec<WithEncoded<Recovered<T>>>,
/// The tracked flashblock
block: FlashBlock,
}

impl<T> PreparedFlashBlock<T> {
const fn block(&self) -> &FlashBlock {
&self.block
}
}

impl<T> PreparedFlashBlock<T>
where
T: SignedTransaction,
{
/// Creates a flashblock that is ready for execution by preparing all transactions
///
/// Returns an error if decoding or signer recovery fails.
fn new(block: FlashBlock) -> eyre::Result<Self> {
let mut txs = Vec::with_capacity(block.diff.transactions.len());
for encoded in block.diff.transactions.iter().cloned() {
let tx = T::decode_2718_exact(encoded.as_ref())?;
let signer = tx.try_recover()?;
let tx = WithEncoded::new(encoded, tx.with_signer(signer));
txs.push(tx);
}

Ok(Self { txs, block })
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::ExecutionPayloadFlashblockDeltaV1;
use alloy_consensus::{
transaction::SignerRecoverable, EthereumTxEnvelope, EthereumTypedTransaction, TxEip1559,
};
use alloy_eips::Encodable2718;
use alloy_primitives::{hex, Signature, TxKind, U256};

#[test]
fn test_sequence_stops_before_gap() {
let mut sequence = FlashBlockSequence::new();
let tx = EthereumTxEnvelope::new_unhashed(
EthereumTypedTransaction::<TxEip1559>::Eip1559(TxEip1559 {
chain_id: 4,
nonce: 26u64,
max_priority_fee_per_gas: 1500000000,
max_fee_per_gas: 1500000013,
gas_limit: 21_000u64,
to: TxKind::Call(hex!("61815774383099e24810ab832a5b2a5425c154d5").into()),
value: U256::from(3000000000000000000u64),
input: Default::default(),
access_list: Default::default(),
}),
Signature::new(
U256::from_be_bytes(hex!(
"59e6b67f48fb32e7e570dfb11e042b5ad2e55e3ce3ce9cd989c7e06e07feeafd"
)),
U256::from_be_bytes(hex!(
"016b83f4f980694ed2eee4d10667242b1f40dc406901b34125b008d334d47469"
)),
true,
),
);
let tx = Recovered::new_unchecked(tx.clone(), tx.recover_signer_unchecked().unwrap());

sequence
.insert(FlashBlock {
payload_id: Default::default(),
index: 0,
base: None,
diff: ExecutionPayloadFlashblockDeltaV1 {
transactions: vec![tx.encoded_2718().into()],
..Default::default()
},
metadata: Default::default(),
})
.unwrap();

sequence
.insert(FlashBlock {
payload_id: Default::default(),
index: 2,
base: None,
diff: Default::default(),
metadata: Default::default(),
})
.unwrap();

let actual_txs: Vec<_> = sequence.ready_transactions().collect();
let expected_txs = vec![WithEncoded::new(tx.encoded_2718().into(), tx)];

assert_eq!(actual_txs, expected_txs);
}
}
121 changes: 3 additions & 118 deletions crates/optimism/flashblocks/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{ExecutionPayloadBaseV1, FlashBlock};
use alloy_eips::{eip2718::WithEncoded, BlockNumberOrTag};
use crate::{sequence::FlashBlockSequence, ExecutionPayloadBaseV1, FlashBlock};
use alloy_eips::BlockNumberOrTag;
use alloy_primitives::B256;
use futures_util::{FutureExt, Stream, StreamExt};
use reth_chain_state::{CanonStateNotifications, CanonStateSubscriptions, ExecutedBlock};
Expand All @@ -9,14 +9,11 @@ use reth_evm::{
ConfigureEvm,
};
use reth_execution_types::ExecutionOutcome;
use reth_primitives_traits::{
AlloyBlockHeader, BlockTy, HeaderTy, NodePrimitives, ReceiptTy, Recovered, SignedTransaction,
};
use reth_primitives_traits::{AlloyBlockHeader, BlockTy, HeaderTy, NodePrimitives, ReceiptTy};
use reth_revm::{cached::CachedReads, database::StateProviderDatabase, db::State};
use reth_rpc_eth_types::{EthApiError, PendingBlock};
use reth_storage_api::{noop::NoopProvider, BlockReaderIdExt, StateProviderFactory};
use std::{
collections::BTreeMap,
pin::Pin,
sync::Arc,
task::{Context, Poll},
Expand Down Expand Up @@ -238,115 +235,3 @@ where
Poll::Pending
}
}

/// Simple wrapper around an ordered B-tree to keep track of a sequence of flashblocks by index.
#[derive(Debug)]
struct FlashBlockSequence<T> {
/// tracks the individual flashblocks in order
///
/// With a blocktime of 2s and flashblock tickrate of ~200ms, we expect 10 or 11 flashblocks
/// per slot.
inner: BTreeMap<u64, PreparedFlashBlock<T>>,
}

impl<T> FlashBlockSequence<T>
where
T: SignedTransaction,
{
const fn new() -> Self {
Self { inner: BTreeMap::new() }
}

/// Inserts a new block into the sequence.
///
/// A [`FlashBlock`] with index 0 resets the set.
fn insert(&mut self, flashblock: FlashBlock) -> eyre::Result<()> {
if flashblock.index == 0 {
trace!(number=%flashblock.block_number(), "Tracking new flashblock sequence");
// Flash block at index zero resets the whole state
self.clear();
self.inner.insert(flashblock.index, PreparedFlashBlock::new(flashblock)?);
return Ok(())
}

// only insert if we we previously received the same block, assume we received index 0
if self.block_number() == Some(flashblock.metadata.block_number) {
trace!(number=%flashblock.block_number(), index = %flashblock.index, block_count = self.inner.len() ,"Received followup flashblock");
self.inner.insert(flashblock.index, PreparedFlashBlock::new(flashblock)?);
} else {
trace!(number=%flashblock.block_number(), index = %flashblock.index, current=?self.block_number() ,"Ignoring untracked flashblock following");
}

Ok(())
}

/// Returns the number of tracked flashblocks.
fn count(&self) -> usize {
self.inner.len()
}

/// Returns the first block number
fn block_number(&self) -> Option<u64> {
Some(self.inner.values().next()?.block().metadata.block_number)
}

/// Returns the payload base of the first tracked flashblock.
fn payload_base(&self) -> Option<ExecutionPayloadBaseV1> {
self.inner.values().next()?.block().base.clone()
}

fn clear(&mut self) {
self.inner.clear();
}

/// Iterator over sequence of executable transactions.
///
/// A flashblocks is not ready if there's missing previous flashblocks, i.e. there's a gap in
/// the sequence
///
/// Note: flashblocks start at `index 0`.
fn ready_transactions(&self) -> impl Iterator<Item = WithEncoded<Recovered<T>>> + '_ {
self.inner
.values()
.enumerate()
.take_while(|(idx, block)| {
// flashblock index 0 is the first flashblock
block.block().index == *idx as u64
})
.flat_map(|(_, block)| block.txs.clone())
}
}

#[derive(Debug)]
struct PreparedFlashBlock<T> {
/// The prepared transactions, ready for execution
txs: Vec<WithEncoded<Recovered<T>>>,
/// The tracked flashblock
block: FlashBlock,
}

impl<T> PreparedFlashBlock<T> {
const fn block(&self) -> &FlashBlock {
&self.block
}
}

impl<T> PreparedFlashBlock<T>
where
T: SignedTransaction,
{
/// Creates a flashblock that is ready for execution by preparing all transactions
///
/// Returns an error if decoding or signer recovery fails.
fn new(block: FlashBlock) -> eyre::Result<Self> {
let mut txs = Vec::with_capacity(block.diff.transactions.len());
for encoded in block.diff.transactions.iter().cloned() {
let tx = T::decode_2718_exact(encoded.as_ref())?;
let signer = tx.try_recover()?;
let tx = WithEncoded::new(encoded, tx.with_signer(signer));
txs.push(tx);
}

Ok(Self { txs, block })
}
}
Loading