Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/chain-orchestrator/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use alloy_json_rpc::RpcError;
use alloy_primitives::B256;
use alloy_transport::TransportErrorKind;
use scroll_db::{DatabaseError, L1MessageStart};
use scroll_db::{DatabaseError, L1MessageKey};

/// A type that represents an error that occurred in the chain orchestrator.
#[derive(Debug, thiserror::Error)]
Expand All @@ -28,7 +28,7 @@ pub enum ChainOrchestratorError {
},
/// An L1 message was not found in the database.
#[error("L1 message not found at {0}")]
L1MessageNotFound(L1MessageStart),
L1MessageNotFound(L1MessageKey),
/// A gap was detected in the L1 message queue: the previous message before index {0} is
/// missing.
#[error("L1 message queue gap detected at index {0}, previous L1 message not found")]
Expand Down
12 changes: 8 additions & 4 deletions crates/chain-orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use scroll_alloy_hardforks::ScrollHardforks;
use scroll_alloy_network::Scroll;
use scroll_db::{
Database, DatabaseError, DatabaseReadOperations, DatabaseTransactionProvider,
DatabaseWriteOperations, L1MessageStart, UnwindResult,
DatabaseWriteOperations, L1MessageKey, UnwindResult,
};
use scroll_network::NewBlockWithPeer;
use std::{
Expand Down Expand Up @@ -838,7 +838,7 @@ async fn compute_l1_message_queue_hash(
})
.await?
.map(|m| m.queue_hash)
.ok_or(DatabaseError::L1MessageNotFound(L1MessageStart::Index(index)))?
.ok_or(DatabaseError::L1MessageNotFound(L1MessageKey::QueueIndex(index)))?
.unwrap_or_default()
.to_vec();
input.append(&mut l1_message.tx_hash().to_vec());
Expand Down Expand Up @@ -1078,7 +1078,9 @@ async fn validate_l1_messages(
let l1_message_stream = Retry::default()
.retry("get_l1_messages", || async {
let messages = tx
.get_l1_messages(l1_message_hashes.first().map(|tx| L1MessageStart::Hash(*tx)))
.get_l1_messages(
l1_message_hashes.first().map(|tx| L1MessageKey::TransactionHash(*tx)),
)
.await?;
Ok::<_, ChainOrchestratorError>(messages)
})
Expand All @@ -1093,7 +1095,9 @@ async fn validate_l1_messages(
.await
.map(|m| m.map(|msg| msg.transaction.tx_hash()))
.transpose()?
.ok_or(ChainOrchestratorError::L1MessageNotFound(L1MessageStart::Hash(message_hash)))?;
.ok_or(ChainOrchestratorError::L1MessageNotFound(L1MessageKey::TransactionHash(
message_hash,
)))?;

// If the received and expected L1 messages do not match return an error.
if message_hash != expected_hash {
Expand Down
6 changes: 3 additions & 3 deletions crates/database/db/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::L1MessageStart;
use super::L1MessageKey;
use sea_orm::sqlx::Error as SqlxError;

/// The error type for database operations.
Expand All @@ -17,6 +17,6 @@ pub enum DatabaseError {
#[error("failed to serde metadata value: {0}")]
MetadataSerdeError(#[from] serde_json::Error),
/// The L1 message was not found in database.
#[error("L1 message at index [{0}] not found in database")]
L1MessageNotFound(L1MessageStart),
#[error("L1 message at key [{0}] not found in database")]
L1MessageNotFound(L1MessageKey),
}
4 changes: 1 addition & 3 deletions crates/database/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ mod models;
pub use models::*;

mod operations;
pub use operations::{
DatabaseReadOperations, DatabaseWriteOperations, L1MessageStart, UnwindResult,
};
pub use operations::{DatabaseReadOperations, DatabaseWriteOperations, L1MessageKey, UnwindResult};

mod transaction;
pub use transaction::{DatabaseTransactionProvider, TXMut, TX};
Expand Down
73 changes: 48 additions & 25 deletions crates/database/db/src/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,18 +531,34 @@ pub trait DatabaseReadOperations: ReadConnectionProvider + Sync {
/// `start` point.
async fn get_l1_messages<'a>(
&'a self,
start: Option<L1MessageStart>,
start: Option<L1MessageKey>,
) -> Result<impl Stream<Item = Result<L1MessageEnvelope, DatabaseError>> + 'a, DatabaseError>
{
let queue_index = match start {
Some(L1MessageStart::Index(i)) => i,
Some(L1MessageStart::Hash(ref h)) => {
Some(L1MessageKey::QueueIndex(i)) => i,
Some(L1MessageKey::TransactionHash(ref h)) => {
// Lookup message by hash
let record = models::l1_message::Entity::find()
.filter(models::l1_message::Column::Hash.eq(h.to_vec()))
.one(self.get_connection())
.await?
.ok_or_else(|| DatabaseError::L1MessageNotFound(L1MessageStart::Hash(*h)))?;
.ok_or_else(|| {
DatabaseError::L1MessageNotFound(L1MessageKey::TransactionHash(*h))
})?;

record.queue_index as u64
}
Some(L1MessageKey::QueueHash(ref h)) => {
// Lookup message by queue hash
let record = models::l1_message::Entity::find()
.filter(
Condition::all()
.add(models::l1_message::Column::QueueHash.is_not_null())
.add(models::l1_message::Column::QueueHash.eq(h.to_vec())),
)
.one(self.get_connection())
.await?
.ok_or_else(|| DatabaseError::L1MessageNotFound(L1MessageKey::QueueHash(*h)))?;

record.queue_index as u64
}
Expand Down Expand Up @@ -691,36 +707,43 @@ pub trait DatabaseReadOperations: ReadConnectionProvider + Sync {
}
}

/// This type defines the start of an L1 message stream.
/// A key for an L1 message stored in the database.
///
/// It can either be an index, which is the queue index of the first message to return, or a hash,
/// which is the hash of the first message to return.
/// It can either be the queue index, queue hash or the transaction hash.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum L1MessageStart {
/// Start from the provided queue index.
Index(u64),
/// Start from the provided queue hash.
Hash(B256),
pub enum L1MessageKey {
/// The queue index of the message.
QueueIndex(u64),
/// The queue hash of the message.
QueueHash(B256),
/// The transaction hash of the message.
TransactionHash(B256),
}

impl fmt::Display for L1MessageStart {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Index(index) => write!(f, "Index({index})"),
Self::Hash(hash) => write!(f, "Hash({hash:#x})"),
}
impl L1MessageKey {
/// Create a new [`L1MessageKey`] from a queue index.
pub const fn from_queue_index(index: u64) -> Self {
Self::QueueIndex(index)
}

/// Create a new [`L1MessageKey`] from a queue hash.
pub const fn from_queue_hash(hash: B256) -> Self {
Self::QueueHash(hash)
}
}

impl From<u64> for L1MessageStart {
fn from(value: u64) -> Self {
Self::Index(value)
/// Create a new [`L1MessageKey`] from a transaction hash.
pub const fn from_transaction_hash(hash: B256) -> Self {
Self::TransactionHash(hash)
}
}

impl From<B256> for L1MessageStart {
fn from(value: B256) -> Self {
Self::Hash(value)
impl fmt::Display for L1MessageKey {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::QueueIndex(index) => write!(f, "QueueIndex({index})"),
Self::QueueHash(hash) => write!(f, "QueueHash({hash:#x})"),
Self::TransactionHash(hash) => write!(f, "TransactionHash({hash:#x})"),
}
}
}

Expand Down
14 changes: 10 additions & 4 deletions crates/derivation-pipeline/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use rollup_node_primitives::{
use rollup_node_providers::{BlockDataProvider, L1Provider};
use scroll_alloy_rpc_types_engine::{BlockDataHint, ScrollPayloadAttributes};
use scroll_codec::{decoding::payload::PayloadData, Codec};
use scroll_db::{Database, DatabaseReadOperations, DatabaseTransactionProvider};
use scroll_db::{Database, DatabaseReadOperations, DatabaseTransactionProvider, L1MessageKey};
use tokio::time::Interval;

/// A future that resolves to a stream of [`ScrollPayloadAttributesWithBatchInfo`].
Expand Down Expand Up @@ -352,20 +352,26 @@ async fn iter_l1_messages_from_payload<L1P: L1Provider>(
let total_l1_messages = data.blocks.iter().map(|b| b.context.num_l1_messages as u64).sum();

let messages = if let Some(index) = data.queue_index_start() {
provider.get_n_messages(index.into(), total_l1_messages).await.map_err(Into::into)?
provider
.get_n_messages(L1MessageKey::from_queue_index(index), total_l1_messages)
.await
.map_err(Into::into)?
} else if let Some(hash) = data.prev_l1_message_queue_hash() {
// If the message queue hash is zero then we should use the V2 L1 message queue start
// index. We must apply this branch logic because we do not have a L1
// message associated with a queue hash of ZERO (we only compute a queue
// hash for the first L1 message of the V2 contract).
if hash == &B256::ZERO {
provider
.get_n_messages(l1_v2_message_queue_start_index.into(), total_l1_messages)
.get_n_messages(
L1MessageKey::from_queue_index(l1_v2_message_queue_start_index),
total_l1_messages,
)
.await
.map_err(Into::into)?
} else {
let mut messages = provider
.get_n_messages((*hash).into(), total_l1_messages + 1)
.get_n_messages(L1MessageKey::from_queue_hash(*hash), total_l1_messages + 1)
.await
.map_err(Into::into)?;
// we skip the first l1 message, as we are interested in the one starting after
Expand Down
6 changes: 3 additions & 3 deletions crates/manager/src/manager/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use reth_scroll_primitives::ScrollBlock;
use rollup_node_chain_orchestrator::ChainOrchestratorEvent;
use rollup_node_signer::SignerEvent;
use rollup_node_watcher::L1Notification;
use scroll_db::L1MessageStart;
use scroll_db::L1MessageKey;
use scroll_engine::ConsolidationOutcome;
use scroll_network::NewBlockWithPeer;

Expand Down Expand Up @@ -33,8 +33,8 @@ pub enum RollupManagerEvent {
},
/// A block has been received containing an L1 message that is not in the database.
L1MessageMissingInDatabase {
/// The L1 message start index or hash.
start: L1MessageStart,
/// The L1 message key.
key: L1MessageKey,
},
/// An event was received from the L1 watcher.
L1NotificationEvent(L1Notification),
Expand Down
2 changes: 1 addition & 1 deletion crates/manager/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ where

if let Some(event_sender) = self.event_sender.as_ref() {
event_sender.notify(RollupManagerEvent::L1MessageMissingInDatabase {
start: start.clone(),
key: start.clone(),
});
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/node/tests/e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use rollup_node_sequencer::L1MessageInclusionMode;
use rollup_node_watcher::L1Notification;
use scroll_alloy_consensus::TxL1Message;
use scroll_alloy_rpc_types::Transaction as ScrollAlloyTransaction;
use scroll_db::{test_utils::setup_test_db, L1MessageStart};
use scroll_db::{test_utils::setup_test_db, L1MessageKey};
use scroll_network::NewBlockWithPeer;
use scroll_wire::{ScrollWireConfig, ScrollWireProtocolHandler};
use std::{
Expand Down Expand Up @@ -1719,7 +1719,7 @@ async fn can_reject_l2_block_with_unknown_l1_message() -> eyre::Result<()> {
wait_for_event_5s(
&mut node1_rnm_events,
RollupManagerEvent::L1MessageMissingInDatabase {
start: L1MessageStart::Hash(b256!(
key: L1MessageKey::TransactionHash(b256!(
"0x0a2f8e75392ab51a26a2af835042c614eb141cd934fe1bdd4934c10f2fe17e98"
)),
},
Expand Down
4 changes: 2 additions & 2 deletions crates/node/tests/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ async fn test_should_consolidate_after_optimistic_sync() -> eyre::Result<()> {
// message.
wait_n_events(
&mut follower_events,
|e| matches!(e, RollupManagerEvent::L1MessageMissingInDatabase { start: _ }),
|e| matches!(e, RollupManagerEvent::L1MessageMissingInDatabase { key: _ }),
1,
)
.await;
Expand Down Expand Up @@ -599,7 +599,7 @@ async fn test_consolidation() -> eyre::Result<()> {
// Assert that the follower node rejects the new block as it hasn't received the L1 message.
wait_n_events(
&mut follower_events,
|e| matches!(e, RollupManagerEvent::L1MessageMissingInDatabase { start: _ }),
|e| matches!(e, RollupManagerEvent::L1MessageMissingInDatabase { key: _ }),
1,
)
.await;
Expand Down
8 changes: 3 additions & 5 deletions crates/providers/src/l1/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ use crate::L1ProviderError;

use futures::{StreamExt, TryStreamExt};
use rollup_node_primitives::L1MessageEnvelope;
use scroll_db::{
DatabaseError, DatabaseReadOperations, DatabaseTransactionProvider, L1MessageStart,
};
use scroll_db::{DatabaseError, DatabaseReadOperations, DatabaseTransactionProvider, L1MessageKey};

/// An instance of the trait can provide L1 messages iterators.
#[async_trait::async_trait]
Expand All @@ -24,7 +22,7 @@ pub trait L1MessageProvider: Send + Sync {
/// avoid capturing the lifetime of `self`.
async fn get_n_messages(
&self,
start: L1MessageStart,
start: L1MessageKey,
n: u64,
) -> Result<Vec<L1MessageEnvelope>, Self::Error>;
}
Expand All @@ -38,7 +36,7 @@ where

async fn get_n_messages(
&self,
start: L1MessageStart,
start: L1MessageKey,
n: u64,
) -> Result<Vec<L1MessageEnvelope>, Self::Error> {
let tx = self.tx().await?;
Expand Down
4 changes: 2 additions & 2 deletions crates/providers/src/l1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use alloy_eips::eip4844::Blob;
use alloy_primitives::B256;
use alloy_transport::{RpcError, TransportErrorKind};
use rollup_node_primitives::L1MessageEnvelope;
use scroll_db::{DatabaseError, L1MessageStart};
use scroll_db::{DatabaseError, L1MessageKey};

/// An instance of the trait can be used to provide L1 data.
pub trait L1Provider: BlobProvider + L1MessageProvider {}
Expand Down Expand Up @@ -77,7 +77,7 @@ impl<L1MP: L1MessageProvider, BP: Sync + Send> L1MessageProvider for FullL1Provi

async fn get_n_messages(
&self,
start: L1MessageStart,
start: L1MessageKey,
n: u64,
) -> Result<Vec<L1MessageEnvelope>, Self::Error> {
self.l1_message_provider.get_n_messages(start, n).await
Expand Down
4 changes: 2 additions & 2 deletions crates/providers/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{collections::HashMap, sync::Arc};
use alloy_eips::eip4844::Blob;
use alloy_primitives::B256;
use rollup_node_primitives::L1MessageEnvelope;
use scroll_db::L1MessageStart;
use scroll_db::L1MessageKey;

/// Implementation of the [`crate::L1Provider`] that never returns blobs.
#[derive(Clone, Default, Debug)]
Expand Down Expand Up @@ -34,7 +34,7 @@ impl<P: L1MessageProvider + Send + Sync> L1MessageProvider for MockL1Provider<P>

async fn get_n_messages(
&self,
start: L1MessageStart,
start: L1MessageKey,
n: u64,
) -> Result<Vec<L1MessageEnvelope>, Self::Error> {
self.l1_messages_provider.get_n_messages(start, n).await
Expand Down
3 changes: 2 additions & 1 deletion crates/sequencer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub use error::SequencerError;

mod metrics;
pub use metrics::SequencerMetrics;
use scroll_db::L1MessageKey;

/// A type alias for the payload building job future.
pub type PayloadBuildingJobFuture =
Expand Down Expand Up @@ -250,7 +251,7 @@ async fn build_payload_attributes<P: L1MessageProvider + Unpin + Send + Sync + '

// Collect L1 messages to include in payload.
let db_l1_messages = provider
.get_n_messages(l1_messages_queue_index.into(), max_l1_messages)
.get_n_messages(L1MessageKey::from_queue_index(l1_messages_queue_index), max_l1_messages)
.await
.map_err(Into::<L1ProviderError>::into)?;

Expand Down