Skip to content

Commit dbc714a

Browse files
committed
[ReplicatedLoglet] Remote append
Summary: Implements a remote loglet append calls to leader sequencer
1 parent 4d8e757 commit dbc714a

File tree

6 files changed

+663
-11
lines changed

6 files changed

+663
-11
lines changed

crates/bifrost/src/loglet/error.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
use std::fmt::Debug;
1212
use std::sync::Arc;
1313

14-
use restate_core::ShutdownError;
14+
use restate_core::{network::NetworkError, ShutdownError};
1515
use restate_types::errors::{IntoMaybeRetryable, MaybeRetryableError};
1616

1717
#[derive(Debug, Clone, thiserror::Error)]
@@ -68,3 +68,13 @@ impl From<OperationError> for AppendError {
6868
}
6969
}
7070
}
71+
72+
impl From<NetworkError> for OperationError {
73+
fn from(value: NetworkError) -> Self {
74+
match value {
75+
NetworkError::Shutdown(err) => OperationError::Shutdown(err),
76+
// todo(azmy): are all network errors retryable?
77+
_ => OperationError::retryable(value),
78+
}
79+
}
80+
}

crates/bifrost/src/loglet/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,10 @@ impl LogletCommitResolver {
168168
pub fn offset(self, offset: LogletOffset) {
169169
let _ = self.tx.send(Ok(offset));
170170
}
171+
172+
pub fn error(self, err: AppendError) {
173+
let _ = self.tx.send(Err(err));
174+
}
171175
}
172176

173177
pub struct LogletCommit {

crates/bifrost/src/providers/replicated_loglet/loglet.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,22 @@ impl<T: TransportConnect> ReplicatedLoglet<T> {
109109
log_server_manager,
110110
})
111111
}
112+
113+
pub fn networking(&self) -> &Networking<T> {
114+
&self.networking
115+
}
116+
117+
pub fn params(&self) -> &ReplicatedLogletParams {
118+
&self.my_params
119+
}
120+
121+
pub fn log_id(&self) -> LogId {
122+
self.log_id
123+
}
124+
125+
pub fn segment_index(&self) -> SegmentIndex {
126+
self.segment_index
127+
}
112128
}
113129

114130
#[derive(derive_more::Debug, derive_more::IsVariant)]
@@ -142,8 +158,8 @@ impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {
142158
async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result<LogletCommit, OperationError> {
143159
match self.sequencer {
144160
SequencerAccess::Local { ref handle } => handle.enqueue_batch(payloads).await,
145-
SequencerAccess::Remote { .. } => {
146-
todo!("Access to remote sequencers is not implemented yet")
161+
SequencerAccess::Remote { ref sequencers_rpc } => {
162+
sequencers_rpc.append(self, payloads).await
147163
}
148164
}
149165
}

0 commit comments

Comments
 (0)