Skip to content

Commit e072b0a

Browse files
committed
[Sequencer] Renames for clarity
1 parent 0525be8 commit e072b0a

File tree

2 files changed

+19
-19
lines changed

2 files changed

+19
-19
lines changed

crates/bifrost/src/providers/replicated_loglet/sequencer/append.rs renamed to crates/bifrost/src/providers/replicated_loglet/sequencer/appender.rs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use crate::{
3636

3737
const DEFAULT_BACKOFF_TIME: Duration = Duration::from_millis(1000);
3838

39-
enum AppenderState {
39+
enum SequencerAppenderState {
4040
Wave {
4141
// nodes that should be avoided by the spread selector
4242
graylist: NodeSet,
@@ -46,7 +46,7 @@ enum AppenderState {
4646
}
4747

4848
/// Appender makes sure a batch of records will run to completion
49-
pub(crate) struct Appender<T> {
49+
pub(crate) struct SequencerAppender<T> {
5050
sequencer_shared_state: Arc<SequencerSharedState>,
5151
log_server_manager: RemoteLogServerManager,
5252
store_router: RpcRouter<Store>,
@@ -61,7 +61,7 @@ pub(crate) struct Appender<T> {
6161
configuration: Live<Configuration>,
6262
}
6363

64-
impl<T: TransportConnect> Appender<T> {
64+
impl<T: TransportConnect> SequencerAppender<T> {
6565
#[allow(clippy::too_many_arguments)]
6666
pub fn new(
6767
sequencer_shared_state: Arc<SequencerSharedState>,
@@ -88,7 +88,7 @@ impl<T: TransportConnect> Appender<T> {
8888

8989
pub async fn run(mut self) {
9090
// initial wave has 0 replicated and 0 gray listed node
91-
let mut state = AppenderState::Wave {
91+
let mut state = SequencerAppenderState::Wave {
9292
graylist: NodeSet::empty(),
9393
};
9494

@@ -104,17 +104,17 @@ impl<T: TransportConnect> Appender<T> {
104104

105105
loop {
106106
state = match state {
107-
AppenderState::Done => break,
108-
AppenderState::Wave {
107+
SequencerAppenderState::Done => break,
108+
SequencerAppenderState::Wave {
109109
graylist: gray_list,
110110
} => self.wave(gray_list).await,
111-
AppenderState::Backoff => {
111+
SequencerAppenderState::Backoff => {
112112
// since backoff can be None, or run out of iterations,
113113
// but appender should never give up we fall back to fixed backoff
114114
let delay = retry.next().unwrap_or(DEFAULT_BACKOFF_TIME);
115115
tokio::time::sleep(delay).await;
116116

117-
AppenderState::Wave {
117+
SequencerAppenderState::Wave {
118118
// todo: introduce some backoff strategy
119119
graylist: NodeSet::empty(),
120120
}
@@ -123,7 +123,7 @@ impl<T: TransportConnect> Appender<T> {
123123
}
124124
}
125125

126-
async fn wave(&mut self, mut gray_list: NodeSet) -> AppenderState {
126+
async fn wave(&mut self, mut gray_list: NodeSet) -> SequencerAppenderState {
127127
// select the spread
128128
let spread = match self.sequencer_shared_state.selector.select(
129129
&mut rand::thread_rng(),
@@ -136,10 +136,10 @@ impl<T: TransportConnect> Appender<T> {
136136
// gray list was empty during spread selection!
137137
// yet we couldn't find a spread. there is
138138
// no reason to retry immediately.
139-
return AppenderState::Backoff;
139+
return SequencerAppenderState::Backoff;
140140
}
141141
// otherwise, we retry without a gray list.
142-
return AppenderState::Wave {
142+
return SequencerAppenderState::Wave {
143143
graylist: NodeSet::empty(),
144144
};
145145
}
@@ -169,7 +169,7 @@ impl<T: TransportConnect> Appender<T> {
169169
// write quorum
170170

171171
// we basically try again with a new set of gray_list
172-
return AppenderState::Wave {
172+
return SequencerAppenderState::Wave {
173173
graylist: gray_list,
174174
};
175175
}
@@ -178,7 +178,7 @@ impl<T: TransportConnect> Appender<T> {
178178
self.send_wave(servers).await
179179
}
180180

181-
async fn send_wave(&mut self, spread_servers: Vec<RemoteLogServer>) -> AppenderState {
181+
async fn send_wave(&mut self, spread_servers: Vec<RemoteLogServer>) -> SequencerAppenderState {
182182
let last_offset = self.records.last_offset(self.first_offset).unwrap();
183183

184184
let mut checker = NodeSetChecker::new(
@@ -233,7 +233,7 @@ impl<T: TransportConnect> Appender<T> {
233233
// timed out!
234234
// none of the pending tasks has finished in time! we will assume all pending server
235235
// are gray listed and try again
236-
return AppenderState::Wave {
236+
return SequencerAppenderState::Wave {
237237
graylist: pending_servers,
238238
};
239239
}
@@ -296,9 +296,9 @@ impl<T: TransportConnect> Appender<T> {
296296
}
297297

298298
if checker.check_write_quorum(|attr| *attr) {
299-
AppenderState::Done
299+
SequencerAppenderState::Done
300300
} else {
301-
AppenderState::Wave {
301+
SequencerAppenderState::Wave {
302302
graylist: pending_servers,
303303
}
304304
}

crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
// the Business Source License, use of this software will be governed
99
// by the Apache License, Version 2.0.
1010

11-
mod append;
11+
mod appender;
1212

1313
use std::sync::{atomic::AtomicU32, Arc};
1414

@@ -33,7 +33,7 @@ use super::{
3333
replication::spread_selector::{SelectorStrategy, SpreadSelector},
3434
};
3535
use crate::loglet::{util::TailOffsetWatch, LogletCommit};
36-
use append::Appender;
36+
use appender::SequencerAppender;
3737

3838
#[derive(thiserror::Error, Debug)]
3939
pub enum SequencerError {
@@ -202,7 +202,7 @@ impl<T: TransportConnect> Sequencer<T> {
202202

203203
let (loglet_commit, commit_resolver) = LogletCommit::deferred();
204204

205-
let appender = Appender::new(
205+
let appender = SequencerAppender::new(
206206
Arc::clone(&self.sequencer_shared_state),
207207
self.log_server_manager.clone(),
208208
self.rpc_router.clone(),

0 commit comments

Comments
 (0)