Skip to content

Commit 4e5181d

Browse files
committed
fix: thread shutdown leaks
Signed-off-by: Alexandre Rulleau <[email protected]>
1 parent c2690e7 commit 4e5181d

File tree

7 files changed

+161
-54
lines changed

7 files changed

+161
-54
lines changed

datadog-ipc/tarpc/src/trace.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,14 @@ use rand::Rng;
2222
use std::{
2323
fmt::{self, Formatter},
2424
num::{NonZeroU128, NonZeroU64},
25+
sync::atomic::{AtomicU64, Ordering},
2526
};
2627
#[cfg(feature = "opentelemetry")]
2728
use tracing_opentelemetry::OpenTelemetrySpanExt;
2829

30+
/// Global atomic counter for generating unique span IDs
31+
static SPAN_ID_COUNTER: AtomicU64 = AtomicU64::new(1);
32+
2933
/// A context for tracing the execution of processes, distributed or otherwise.
3034
///
3135
/// Consists of a span identifying an event, an optional parent span identifying a causal event
@@ -80,9 +84,11 @@ pub enum SamplingDecision {
8084
impl Context {
8185
/// Constructs a new context with the trace ID and sampling decision inherited from the parent.
8286
pub(crate) fn new_child(&self) -> Self {
87+
// Use atomic counter instead of rand to avoid TLS allocation
88+
let span_id_value = SPAN_ID_COUNTER.fetch_add(1, Ordering::Relaxed);
8389
Self {
8490
trace_id: self.trace_id,
85-
span_id: SpanId::random(&mut rand::thread_rng()),
91+
span_id: SpanId(span_id_value),
8692
sampling_decision: self.sampling_decision,
8793
}
8894
}

datadog-sidecar-ffi/src/lib.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ use datadog_sidecar::service::{get_telemetry_action_sender, InternalTelemetryAct
3333
use datadog_sidecar::shm_remote_config::{path_for_remote_config, RemoteConfigReader};
3434
#[cfg(unix)]
3535
use datadog_sidecar::{
36-
connect_worker_unix, shutdown_master_listener_unix, start_master_listener_unix,
36+
clear_inherited_listener_unix, connect_worker_unix, shutdown_master_listener_unix,
37+
start_master_listener_unix,
3738
};
3839
#[cfg(windows)]
3940
use datadog_sidecar::{
@@ -359,6 +360,19 @@ pub extern "C" fn ddog_sidecar_shutdown_master_listener() -> MaybeError {
359360
MaybeError::None
360361
}
361362

363+
#[no_mangle]
364+
pub extern "C" fn ddog_sidecar_clear_inherited_listener() -> MaybeError {
365+
#[cfg(unix)]
366+
{
367+
try_c!(clear_inherited_listener_unix());
368+
}
369+
#[cfg(windows)]
370+
{
371+
// Windows doesn't use fork, so no inherited state to clear
372+
}
373+
MaybeError::None
374+
}
375+
362376
#[no_mangle]
363377
pub extern "C" fn ddog_sidecar_ping(transport: &mut Box<SidecarTransport>) -> MaybeError {
364378
try_c!(blocking::ping(transport));

datadog-sidecar/src/entry.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -129,10 +129,12 @@ where
129129
// Shutdown final sender so the receiver can complete
130130
drop(shutdown_complete_tx);
131131

132-
// Await everything else to completion
133-
_ = telemetry_handle.await;
132+
// Await everything else to completion with timeouts to ensure we don't hang
133+
let shutdown_timeout = Duration::from_millis(500);
134+
135+
_ = tokio::time::timeout(shutdown_timeout, telemetry_handle).await;
134136
server.shutdown();
135-
_ = server.trace_flusher.join().await;
137+
_ = tokio::time::timeout(shutdown_timeout, server.trace_flusher.join()).await;
136138

137139
Ok(())
138140
}
@@ -153,14 +155,9 @@ where
153155

154156
let (listener, cancel) = acquire_listener()?;
155157

156-
let result = runtime
158+
runtime
157159
.block_on(main_loop(listener, Arc::new(cancel)))
158-
.map_err(|e| e.into());
159-
160-
// Wait 1 second to shut down properly
161-
runtime.shutdown_timeout(std::time::Duration::from_secs(1));
162-
163-
result
160+
.map_err(|e| e.into())
164161
}
165162

166163
pub fn daemonize(listener: IpcServer, mut cfg: Config) -> anyhow::Result<()> {

datadog-sidecar/src/service/queue_id.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use rand::Rng;
54
use serde::{Deserialize, Serialize};
5+
use std::sync::atomic::{AtomicU64, Ordering};
66

77
/// `QueueId` is a struct that represents a unique identifier for a queue.
88
/// It contains a single field, `inner`, which is a 64-bit unsigned integer.
@@ -12,11 +12,15 @@ pub struct QueueId {
1212
pub(crate) inner: u64,
1313
}
1414

15+
/// Global atomic counter for generating unique queue IDs
16+
static QUEUE_ID_COUNTER: AtomicU64 = AtomicU64::new(1);
17+
1518
impl QueueId {
1619
/// Generates a new unique `QueueId`.
1720
///
18-
/// This method generates a random 64-bit unsigned integer between 1 (inclusive) and `u64::MAX`
19-
/// (exclusive) and uses it as the `inner` value of the new `QueueId`.
21+
/// This method uses an atomic counter to generate monotonically increasing
22+
/// unique IDs. The counter starts at 1 and increments with each call.
23+
/// This approach avoids TLS allocations from random number generators.
2024
///
2125
/// # Examples
2226
///
@@ -27,7 +31,7 @@ impl QueueId {
2731
/// ```
2832
pub fn new_unique() -> Self {
2933
Self {
30-
inner: rand::thread_rng().gen_range(1u64..u64::MAX),
34+
inner: QUEUE_ID_COUNTER.fetch_add(1, Ordering::Relaxed),
3135
}
3236
}
3337
}

datadog-sidecar/src/setup/unix.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use std::sync::LazyLock;
4+
use std::sync::{
5+
atomic::{AtomicU16, Ordering},
6+
LazyLock,
7+
};
58
use std::{
69
env, fs, io,
710
os::unix::{
@@ -83,7 +86,10 @@ impl Liaison for SharedDirLiaison {
8386
}
8487

8588
fn ipc_per_process() -> Self {
86-
static PROCESS_RANDOM_ID: LazyLock<u16> = LazyLock::new(rand::random);
89+
// Use atomic counter instead of rand::random to avoid TLS allocation
90+
static PROCESS_ID_COUNTER: AtomicU16 = AtomicU16::new(1);
91+
static PROCESS_RANDOM_ID: LazyLock<u16> =
92+
LazyLock::new(|| PROCESS_ID_COUNTER.fetch_add(1, Ordering::Relaxed));
8793

8894
let pid = std::process::id();
8995
let liason_path = env::temp_dir().join(format!("libdatadog.{}.{pid}", *PROCESS_RANDOM_ID));

datadog-sidecar/src/unix.rs

Lines changed: 108 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -51,20 +51,89 @@ pub fn start_master_listener_unix(master_pid: i32) -> io::Result<()> {
5151
let handle = thread::Builder::new()
5252
.name("dd-sidecar".into())
5353
.spawn(move || {
54-
let acquire_listener = move || -> io::Result<_> {
55-
std_listener.set_nonblocking(true)?;
56-
let listener = UnixListener::from_std(std_listener.try_clone()?)?;
57-
let cancel = {
58-
let fd = listener.as_raw_fd();
59-
move || stop_listening(fd)
60-
};
61-
Ok((move |handler| accept_socket_loop(listener, handler), cancel))
54+
// Use blocking I/O - no shared tokio Runtime needed
55+
// This makes the code fork-safe
56+
use crate::service::sidecar_server::SidecarServer;
57+
let runtime = match tokio::runtime::Builder::new_current_thread()
58+
.enable_all()
59+
.build()
60+
{
61+
Ok(rt) => rt,
62+
Err(e) => {
63+
error!("Failed to create runtime for server initialization: {}", e);
64+
return;
65+
}
6266
};
6367

64-
let _ = enter_listener_loop(acquire_listener).map_err(|e| {
65-
error!("enter_listener_loop failed: {}", e);
66-
e
67-
});
68+
let server = runtime.block_on(async { SidecarServer::default() });
69+
70+
loop {
71+
match std_listener.accept() {
72+
Ok((stream, _addr)) => {
73+
let server = server.clone();
74+
// Spawn a detached thread for each connection
75+
// Threads are not joined during shutdown to avoid blocking on active connections
76+
// Each thread will exit naturally when its connection closes
77+
if let Err(e) = thread::Builder::new().name("dd-conn-handler".into()).spawn(
78+
move || {
79+
// Create a minimal single-threaded runtime for this connection only
80+
// This runtime will be dropped when the connection closes
81+
let runtime = match tokio::runtime::Builder::new_current_thread()
82+
.enable_all()
83+
.build()
84+
{
85+
Ok(rt) => rt,
86+
Err(e) => {
87+
error!("Failed to create runtime for connection: {}", e);
88+
return;
89+
}
90+
};
91+
92+
runtime.block_on(async move {
93+
// Convert std UnixStream to tokio UnixStream
94+
if let Err(e) = stream.set_nonblocking(true) {
95+
error!("Failed to set nonblocking: {}", e);
96+
return;
97+
}
98+
99+
let tokio_stream = match UnixStream::from_std(stream) {
100+
Ok(s) => s,
101+
Err(e) => {
102+
error!("Failed to convert stream: {}", e);
103+
return;
104+
}
105+
};
106+
107+
// Handle the connection using existing async infrastructure
108+
use datadog_ipc::platform::AsyncChannel;
109+
110+
// Use the cloned shared server
111+
server
112+
.accept_connection(AsyncChannel::from(tokio_stream))
113+
.await;
114+
});
115+
},
116+
) {
117+
error!("Failed to spawn handler thread: {}", e);
118+
}
119+
}
120+
Err(e) => {
121+
match e.kind() {
122+
io::ErrorKind::Interrupted => continue,
123+
io::ErrorKind::InvalidInput => break, // Socket shut down
124+
_ => {
125+
error!("Accept error: {}", e);
126+
thread::sleep(Duration::from_millis(100));
127+
}
128+
}
129+
}
130+
}
131+
}
132+
133+
info!("Master listener stopped accepting connections");
134+
135+
// Shutdown the server - connection threads will finish naturally
136+
server.shutdown();
68137
})
69138
.map_err(io::Error::other)?;
70139

@@ -95,6 +164,7 @@ pub fn connect_worker_unix(master_pid: i32) -> io::Result<SidecarTransport> {
95164
}
96165
}
97166

167+
error!("Worker failed to connect after 10 attempts");
98168
Err(last_error.unwrap_or_else(|| io::Error::other("Connection failed")))
99169
}
100170

@@ -112,28 +182,35 @@ pub fn shutdown_master_listener_unix() -> io::Result<()> {
112182

113183
if let Some((handle, fd)) = listener_data {
114184
stop_listening(fd);
185+
let _ = handle.join();
186+
}
115187

116-
// Try to join with a timeout to avoid hanging the shutdown
117-
// We spawn a helper thread to do the join so we can implement a timeout
118-
let (tx, rx) = std::sync::mpsc::channel();
119-
std::thread::spawn(move || {
120-
let result = handle.join();
121-
let _ = tx.send(result);
122-
});
123-
124-
// Wait up to 2 seconds for clean shutdown (including time for tokio runtime shutdown)
125-
match rx.recv_timeout(Duration::from_millis(2000)) {
126-
Ok(Ok(())) => {
127-
// Clean shutdown
128-
}
129-
Ok(Err(_)) => {
130-
error!("Listener thread panicked during shutdown");
131-
}
132-
Err(_) => {
133-
// Timeout - thread didn't exit in time
134-
// This is acceptable as the OS will clean up when the process exits
188+
Ok(())
189+
}
190+
191+
/// Clears inherited resources in child processes after fork().
192+
/// With the new blocking I/O approach, we only need to forget the listener thread handle.
193+
/// Each connection creates its own short-lived runtime, so there's no global runtime to inherit.
194+
pub fn clear_inherited_listener_unix() -> io::Result<()> {
195+
info!("Child process clearing inherited listener state");
196+
match MASTER_LISTENER.lock() {
197+
Ok(mut guard) => {
198+
if let Some((handle, _fd)) = guard.take() {
199+
info!("Child forgetting inherited listener thread handle");
200+
// Forget the handle without joining - parent owns the thread
201+
std::mem::forget(handle);
202+
info!("Child successfully forgot listener handle");
203+
} else {
204+
info!("Child found no listener to clear");
135205
}
136206
}
207+
Err(e) => {
208+
error!(
209+
"Failed to acquire lock for clearing inherited listener: {}",
210+
e
211+
);
212+
return Err(io::Error::other("Mutex poisoned"));
213+
}
137214
}
138215

139216
Ok(())

datadog-sidecar/src/windows.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -147,9 +147,7 @@ async fn accept_pipe_loop(
147147
.ok_or_else(|| io::Error::from(io::ErrorKind::InvalidInput))?;
148148

149149
let raw_handle = pipe_listener.as_raw_handle();
150-
let mut pipe = unsafe {
151-
NamedPipeServer::from_raw_handle(raw_handle)
152-
}?;
150+
let mut pipe = unsafe { NamedPipeServer::from_raw_handle(raw_handle) }?;
153151

154152
loop {
155153
match pipe.connect().await {
@@ -287,21 +285,26 @@ pub fn shutdown_master_listener_windows() -> io::Result<()> {
287285
stop_listening_on_handle(raw);
288286

289287
let (tx, rx) = std::sync::mpsc::channel();
290-
std::thread::spawn(move || {
288+
let helper_handle = std::thread::spawn(move || {
291289
let result = handle.join();
292290
let _ = tx.send(result);
293291
});
294292

295293
// Wait up to 500ms for proper shutdown
296294
match rx.recv_timeout(Duration::from_millis(500)) {
297-
Ok(Ok(())) => { }
295+
Ok(Ok(())) => {}
298296
Ok(Err(_)) => {
299297
error!("Listener thread panicked during shutdown");
300298
}
301299
Err(err) => {
302300
error!("Timeout waiting for listener thread to shut down: {}", err);
303301
}
304302
}
303+
304+
// Join the helper thread to clean up its TLS
305+
if let Err(_) = helper_handle.join() {
306+
error!("Helper thread panicked");
307+
}
305308
}
306309

307310
Ok(())

0 commit comments

Comments
 (0)