Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions glean-core/rlb/examples/prototype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,14 @@ fn main() {
glean::initialize(cfg, client_info);

glean_metrics::sample_boolean.set(true);
_ = glean_metrics::sample_boolean.test_get_value(None);

PrototypePing.submit(None);

glean_core::dispatcher::launch(|| {
std::thread::sleep(std::time::Duration::from_secs(15));
});

glean::shutdown();
std::thread::sleep(std::time::Duration::from_secs(10));
}
4 changes: 4 additions & 0 deletions glean-core/src/dispatcher/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ pub fn kill() -> Result<(), DispatchError> {
join_dispatcher_thread()
}

pub fn force_kill() -> Result<(), DispatchError> {
guard().force_kill()
}

/// Shuts down the dispatch queue.
///
/// This will initiate a shutdown of the worker thread
Expand Down
19 changes: 19 additions & 0 deletions glean-core/src/dispatcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ struct DispatchGuard {

/// Sender for the unbounded queue.
sender: Sender<Command>,

running: Arc<AtomicBool>,
}

impl DispatchGuard {
Expand Down Expand Up @@ -196,6 +198,11 @@ impl DispatchGuard {
Ok(())
}

fn force_kill(&mut self) -> Result<(), DispatchError> {
self.running.store(false, Ordering::Release);
Ok(())
}

/// Flushes the pre-init buffer.
///
/// This function blocks until tasks queued prior to this call are finished.
Expand Down Expand Up @@ -264,10 +271,14 @@ impl Dispatcher {

let queue_preinit = Arc::new(AtomicBool::new(true));
let overflow_count = Arc::new(AtomicUsize::new(0));
let running = Arc::new(AtomicBool::new(true));

let inner_running = running.clone();
let worker = thread::Builder::new()
.name("glean.dispatcher".into())
.spawn(move || {
let running = inner_running;

match block_receiver.recv() {
Err(_) => {
// The other side was disconnected.
Expand All @@ -288,6 +299,13 @@ impl Dispatcher {
loop {
use Command::*;

if !running.load(Ordering::Relaxed) {
log::info!(
"Not running anymore. Bailing out with potential tasks pending."
);
break;
}

match receiver.recv() {
Ok(Shutdown) => {
break;
Expand Down Expand Up @@ -333,6 +351,7 @@ impl Dispatcher {
block_sender,
preinit_sender,
sender,
running,
};

Dispatcher {
Expand Down
5 changes: 3 additions & 2 deletions glean-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#![allow(clippy::significant_drop_in_scrutinee)]
#![allow(clippy::uninlined_format_args)]
#![deny(rustdoc::broken_intra_doc_links)]
#![deny(missing_docs)]
//#![deny(missing_docs)]

//! Glean is a modern approach for recording and sending Telemetry data.
//!
Expand Down Expand Up @@ -38,7 +38,7 @@ mod core_metrics;
mod coverage;
mod database;
mod debug;
mod dispatcher;
pub mod dispatcher;
mod error;
mod error_recording;
mod event_database;
Expand Down Expand Up @@ -706,6 +706,7 @@ pub fn shutdown() {
log::error!(
"Timeout while blocking on the dispatcher. No further shutdown cleanup will happen."
);
dispatcher::force_kill().unwrap();
return;
}

Expand Down