Skip to content

Commit 348f659

Browse files
authored
fix: more robust sampling thread (#4)
I was incorrectly just shutting the sample collection thread down whenever an exception from ocaml was raised. Really what we want to log it and try again, since occasionally we may get a "stream corrupted" exception which just means some things in the ring buffer were overwritten. This ultimately isn't a big deal unless it happens a lot, but that will be obvious by a lot of missing samples. ## Test plan I modified Example.ml to generate enough samples that occasionally this exception will pop up, and then I ran pyro caml and confirmed it no longer takes down the collection thread.
2 parents 1900682 + dd58b3b commit 348f659

File tree

4 files changed

+40
-31
lines changed

4 files changed

+40
-31
lines changed

example/Example.ml

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
(*****************************************************************************)
1717
(* Prelude *)
1818
(*****************************************************************************)
19-
2019
let example_func2 () =
2120
My_module.do_short_thing () ;
2221
My_module.alloc_thing ()
@@ -32,18 +31,10 @@ let () =
3231
@@ fun () ->
3332
Printf.printf "Starting loop\n" ;
3433
flush_all () ;
35-
let[@pyro_profile] do_main_thing () =
36-
let i = 0 in
37-
while i < 1 do
34+
let do_main_thing () =
35+
while true do
3836
example_func () ; example_func3 () ; example_func () ; example_func3 ()
3937
done
4038
in
41-
let d1 = Domain.spawn (fun () -> do_main_thing ()) in
42-
let d2 = Domain.spawn (fun () -> do_main_thing ()) in
43-
let d3 = Domain.spawn (fun () -> do_main_thing ()) in
44-
let d4 = Domain.spawn (fun () -> do_main_thing ()) in
45-
do_main_thing () ;
46-
Domain.join d1 ;
47-
Domain.join d2 ;
48-
Domain.join d3 ;
49-
Domain.join d4
39+
let domains = List.init 8 (fun _ -> Domain.spawn (fun () -> do_main_thing ())) in
40+
List.iter Domain.join domains

example/My_module.ml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ let f x y z =
2727
let c = b - y in
2828
c
2929

30+
3031
let alloc_thing () =
3132
let random_list =
3233
List.init 1000 (fun _ ->

example/dune

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22
(package pyro-caml-ppx)
33
(name example)
44
(public_name example)
5-
(libraries pyro-caml-instruments pyro-caml-ppx unix)
5+
(libraries pyro-caml-instruments unix)
66
(modes exe)
77
(preprocess
8-
(pps ppx_pyro_caml))
8+
(pps ppx_pyro_caml --auto))
99
(foreign_stubs
1010
(language c)
1111
(names example_stubs)

src/backend.rs

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use std::{
1010
path::{Path, PathBuf},
1111
sync::{
1212
atomic::{AtomicBool, Ordering},
13-
Arc, Mutex, OnceLock,
13+
Arc, Mutex, MutexGuard, OnceLock,
1414
},
1515
thread::{self, JoinHandle},
1616
};
@@ -70,7 +70,7 @@ pub struct CamlSpy {
7070
ruleset: Arc<Mutex<Ruleset>>,
7171
backend_config: Arc<Mutex<BackendConfig>>,
7272
config: CamlSpyConfig,
73-
sampler_thread: Option<JoinHandle<Result<()>>>,
73+
sampler_thread: Option<JoinHandle<()>>,
7474
}
7575

7676
impl CamlSpy {
@@ -126,15 +126,22 @@ impl Backend for CamlSpy {
126126
log::debug!(target:LOG_TAG, "starting sampler thread");
127127
while running.load(Ordering::Relaxed) {
128128
log::trace!(target:LOG_TAG, "acquiring backend config and cursor");
129-
let backend_config = backend_config.lock()?;
129+
let backend_config = backend_config
130+
.lock()
131+
.expect("Could not take backend config lock"); // we only have one sampler thread and one reporting thread so this should never fail unless something is seriously wrong
130132

131133
let cursor = config.acquire_cursor();
132134
log::trace!(target:LOG_TAG, "sampling...");
133135
let mut stack_frames: Vec<StackTrace> = OCAML_GC
134-
.with_borrow(|gc| ocaml_intf::read_poll(gc, cursor, config.sample_interval()))?
136+
.with_borrow(|gc| ocaml_intf::read_poll(gc, cursor, config.sample_interval()))
137+
.unwrap_or_else(|e| {
138+
log::error!(target:LOG_TAG, "Error reading from OCaml runtime: {:?}", e);
139+
vec![]
140+
})
135141
.into_iter()
136142
.map(|st| st.into_stack_trace(backend_config.deref(), config.pid))
137143
.collect();
144+
138145
log::trace!(target:LOG_TAG, "done sampling");
139146

140147
if stack_frames.is_empty() {
@@ -145,8 +152,12 @@ impl Backend for CamlSpy {
145152

146153
log::trace!(target:LOG_TAG, "recording stack frames");
147154
for st in stack_frames.into_iter() {
148-
let stack_trace = st + &ruleset.lock()?.clone();
149-
buffer.lock()?.record(stack_trace).unwrap();
155+
let stack_trace = st + &ruleset.lock().expect("Failed to lock ruleset").clone();
156+
buffer
157+
.lock()
158+
.expect("Failed to lock buffer") // we only have one sampler thread and one sending reports so we should never fail to obtain the lock unless something is seriously wrong
159+
.record(stack_trace)
160+
.expect("Failed to record stack traces to buffer"); // So this seems to just be a result in the pyroscope sdk for no reason
150161
}
151162
log::trace!(target:LOG_TAG, "recorded stack trace");
152163
log::trace!(target:LOG_TAG, "sleeping for {} ms", 1000 / config.sample_rate);
@@ -155,8 +166,7 @@ impl Backend for CamlSpy {
155166
1000 / config.sample_rate as u64,
156167
));
157168
}
158-
log::debug!(target:LOG_TAG, "sampler thread exiting");
159-
Ok(())
169+
log::debug!(target:LOG_TAG, "sampler thread exiting")
160170
});
161171
self.sampler_thread = Some(sampler);
162172
Ok(())
@@ -168,19 +178,26 @@ impl Backend for CamlSpy {
168178
self.sampler_thread
169179
.ok_or_else(|| PyroscopeError::new("CamlSpy: Failed to unwrap sampler thread"))?
170180
.join()
171-
.unwrap_or_else(|_| {
172-
Err(PyroscopeError::new(
173-
"CamlSpy: Failed to join sampler thread",
174-
))
181+
.map_err(|e| {
182+
PyroscopeError::new(
183+
format!("CamlSpy: Failed to join sampler thread: {:?}", e).as_str(),
184+
)
175185
})?;
176186
Ok(())
177187
}
178188

179189
fn report(&mut self) -> Result<Vec<Report>> {
180-
let report: StackBuffer = self.buffer.lock()?.deref().to_owned();
181-
let reports: Vec<Report> = report.into();
182-
183-
self.buffer.lock()?.clear();
190+
// first check if the thread has finished
191+
if let Some(handle) = &self.sampler_thread {
192+
if handle.is_finished() {
193+
return Err(PyroscopeError::new(
194+
"CamlSpy: Sampler thread has exited unexpectedly",
195+
));
196+
}
197+
}
198+
let mut buffer: MutexGuard<'_, StackBuffer> = self.buffer.lock()?;
199+
let reports: Vec<Report> = buffer.deref().to_owned().into();
200+
buffer.clear();
184201
Ok(reports)
185202
}
186203

0 commit comments

Comments
 (0)