Skip to content

Commit eea6de3

Browse files
committed
Merge branch 'main' into refactor/physical_plan_trait
2 parents 9754542 + 4fca845 commit eea6de3

File tree

6 files changed

+38
-24
lines changed

6 files changed

+38
-24
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,7 @@ reqwest-hickory-resolver = "0.2"
467467
ringbuffer = "0.14.2"
468468
rmp-serde = "1.1.1"
469469
roaring = { version = "^0.10", features = ["serde"] }
470-
rotbl = { version = "0.2.7", features = [] }
470+
rotbl = { version = "0.2.8", features = [] }
471471
rust_decimal = "1.26"
472472
rustix = "0.38.37"
473473
rustls = { version = "0.23.27", features = ["ring", "tls12"], default-features = false }

src/meta/raft-store/src/leveled_store/leveled_map/mod.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -146,9 +146,11 @@ impl LeveledMap {
146146
self.immutable_levels = b;
147147
}
148148

149-
/// Replace several bottom immutable levels and persisted level
150-
/// with the compacted data.
151-
pub fn replace_with_compacted(&mut self, mut compactor: Compactor, db: DB) {
149+
/// Replace bottom immutable levels and persisted level with compacted data.
150+
///
151+
/// **Important**: Do not drop the compactor within this function when called
152+
/// under a state machine lock, as dropping may take ~250ms.
153+
pub fn replace_with_compacted(&mut self, compactor: &mut Compactor, db: DB) {
152154
let len = compactor.immutable_levels.len();
153155
let corresponding_index = compactor
154156
.immutable_levels
@@ -169,7 +171,7 @@ impl LeveledMap {
169171

170172
self.persisted = Some(db);
171173

172-
info!("compaction finished replacing the db");
174+
info!("replace_with_compacted: finished replacing the db");
173175
}
174176

175177
/// Get a singleton `Compactor` instance specific to `self`.

src/meta/raft-store/src/sm_v003/writer_v003.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ use databend_common_meta_types::snapshot_db::DB;
1919
use databend_common_meta_types::sys_data::SysData;
2020
use futures::Stream;
2121
use futures_util::TryStreamExt;
22-
use log::debug;
2322
use log::info;
2423
use rotbl::v001::SeqMarked;
2524
use tokio::sync::mpsc;
@@ -93,7 +92,7 @@ impl WriterV003 {
9392
mut kv_rx: mpsc::Receiver<WriteEntry<(String, SeqMarked), (MetaSnapshotId, SysData)>>,
9493
) -> Result<DB, io::Error> {
9594
while let Some(ent) = kv_rx.blocking_recv() {
96-
debug!(entry :? =(&ent); "write kv");
95+
// debug!(entry :? =(&ent); "write kv");
9796

9897
let (k, v) = match ent {
9998
WriteEntry::Data(ent) => ent,

src/meta/service/src/store/lock_guards.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ where T: fmt::Display
5858
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
5959
write!(
6060
f,
61-
"StateMachineLock-Write-Guard({}): {}",
62-
self.purpose, self.inner
61+
"StateMachineLock-Write-Guard: {} {}",
62+
self.inner, self.purpose,
6363
)
6464
}
6565
}
@@ -68,8 +68,8 @@ impl<T: ?Sized> Drop for WriteGuard<'_, T> {
6868
fn drop(&mut self) {
6969
let elapsed = self.acquired.elapsed();
7070
info!(
71-
"StateMachineLock-Write-Guard({}) released after {:?}",
72-
self.purpose, elapsed
71+
"StateMachineLock-Write-Release: total: {:?}; {}",
72+
elapsed, self.purpose,
7373
);
7474
}
7575
}
@@ -105,8 +105,8 @@ where T: fmt::Display
105105
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
106106
write!(
107107
f,
108-
"StateMachineLock-Read-Guard({}): {}",
109-
self.purpose, self.inner
108+
"StateMachineLock-Read-Guard: {} {}",
109+
self.inner, self.purpose
110110
)
111111
}
112112
}
@@ -115,8 +115,8 @@ impl<T: ?Sized> Drop for ReadGuard<'_, T> {
115115
fn drop(&mut self) {
116116
let elapsed = self.acquired.elapsed();
117117
info!(
118-
"StateMachineLock-Read-Guard({}) released after {:?}",
119-
self.purpose, elapsed
118+
"StateMachineLock-Read-Release: total: {:?}; {}",
119+
elapsed, self.purpose,
120120
);
121121
}
122122
}

src/meta/service/src/store/store_inner.rs

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -188,10 +188,10 @@ impl RaftStoreInner {
188188
let g = self
189189
.state_machine
190190
.read()
191-
.with_timing(|_output, total, busy| {
191+
.with_timing(|_output, total, _busy| {
192192
info!(
193-
"StateMachineLock-Read-Acquire-Elapsed: total: {:?}, busy: {:?}; {}",
194-
total, busy, &for_what
193+
"StateMachineLock-Read-Acquire: total: {:?}; {}",
194+
total, &for_what
195195
);
196196
})
197197
.await;
@@ -208,10 +208,10 @@ impl RaftStoreInner {
208208
let g = self
209209
.state_machine
210210
.write()
211-
.with_timing(|_output, total, busy| {
211+
.with_timing(|_output, total, _busy| {
212212
info!(
213-
"StateMachineLock-Write-Acquire-Elapsed: total: {:?}, busy: {:?}; {}",
214-
total, busy, &for_what
213+
"StateMachineLock-Write-Acquire: total: {:?}; {}",
214+
total, &for_what
215215
);
216216
})
217217
.await;
@@ -313,9 +313,22 @@ impl RaftStoreInner {
313313
.get_state_machine_write("do_build_snapshot-replace-compacted")
314314
.await;
315315
sm.levels_mut()
316-
.replace_with_compacted(compactor, db.clone());
316+
.replace_with_compacted(&mut compactor, db.clone());
317+
info!("do_build_snapshot-replace_with_compacted returned");
317318
}
318319

320+
// Consume and drop compactor, dropping it may involve blocking IO
321+
databend_common_base::runtime::spawn_blocking(move || {
322+
let drop_start = std::time::Instant::now();
323+
drop(compactor);
324+
info!(
325+
"do_build_snapshot-compactor-drop completed in {:?}",
326+
drop_start.elapsed()
327+
);
328+
})
329+
.await
330+
.expect("compactor drop task should not panic");
331+
319332
// Clean old snapshot
320333
ss_store.new_loader().clean_old_snapshots().await?;
321334
info!("do_build_snapshot clean_old_snapshots complete");

0 commit comments

Comments
 (0)