Skip to content

Commit 62db52f

Browse files
committed
[ReplicatedLoglet] Remote append
Summary: Implements a remote loglet append calls to leader sequencer
1 parent 89d0aa5 commit 62db52f

File tree

8 files changed

+705
-13
lines changed

8 files changed

+705
-13
lines changed

crates/bifrost/src/loglet/error.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
use std::fmt::Debug;
1212
use std::sync::Arc;
1313

14-
use restate_core::ShutdownError;
14+
use restate_core::{network::NetworkError, ShutdownError};
1515
use restate_types::errors::{IntoMaybeRetryable, MaybeRetryableError};
1616

1717
#[derive(Debug, Clone, thiserror::Error)]
@@ -68,3 +68,13 @@ impl From<OperationError> for AppendError {
6868
}
6969
}
7070
}
71+
72+
impl From<NetworkError> for OperationError {
73+
fn from(value: NetworkError) -> Self {
74+
match value {
75+
NetworkError::Shutdown(err) => OperationError::Shutdown(err),
76+
// todo(azmy): are all network errors retryable?
77+
_ => OperationError::retryable(value),
78+
}
79+
}
80+
}

crates/bifrost/src/loglet/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,10 @@ impl LogletCommitResolver {
168168
pub fn offset(self, offset: LogletOffset) {
169169
let _ = self.tx.send(Ok(offset));
170170
}
171+
172+
pub fn error(self, err: AppendError) {
173+
let _ = self.tx.send(Err(err));
174+
}
171175
}
172176

173177
pub struct LogletCommit {

crates/bifrost/src/providers/replicated_loglet/loglet.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use crate::providers::replicated_loglet::tasks::SealTask;
3131

3232
use super::log_server_manager::RemoteLogServerManager;
3333
use super::record_cache::RecordCache;
34+
use super::remote_sequencer::RemoteSequencer;
3435
use super::rpc_routers::{LogServersRpc, SequencersRpc};
3536

3637
#[derive(derive_more::Debug)]
@@ -95,7 +96,14 @@ impl<T: TransportConnect> ReplicatedLoglet<T> {
9596
}
9697
} else {
9798
SequencerAccess::Remote {
98-
sequencers_rpc: sequencers_rpc.clone(),
99+
handle: RemoteSequencer::new(
100+
log_id,
101+
segment_index,
102+
my_params.clone(),
103+
networking.clone(),
104+
known_global_tail.clone(),
105+
sequencers_rpc.clone(),
106+
),
99107
}
100108
};
101109
Ok(Self {
@@ -116,7 +124,7 @@ impl<T: TransportConnect> ReplicatedLoglet<T> {
116124
pub enum SequencerAccess<T> {
117125
/// The sequencer is remote (or retired/preempted)
118126
#[debug("Remote")]
119-
Remote { sequencers_rpc: SequencersRpc },
127+
Remote { handle: RemoteSequencer<T> },
120128
/// We are the loglet leaders
121129
#[debug("Local")]
122130
Local { handle: Sequencer<T> },
@@ -143,9 +151,7 @@ impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {
143151
async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result<LogletCommit, OperationError> {
144152
match self.sequencer {
145153
SequencerAccess::Local { ref handle } => handle.enqueue_batch(payloads).await,
146-
SequencerAccess::Remote { .. } => {
147-
todo!("Access to remote sequencers is not implemented yet")
148-
}
154+
SequencerAccess::Remote { ref handle } => handle.append(payloads).await,
149155
}
150156
}
151157

crates/bifrost/src/providers/replicated_loglet/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ mod network;
1616
mod provider;
1717
#[allow(dead_code)]
1818
mod record_cache;
19+
#[allow(dead_code)]
20+
mod remote_sequencer;
1921
pub mod replication;
2022
mod rpc_routers;
2123
#[allow(dead_code)]

0 commit comments

Comments
 (0)