Skip to content

Commit 5f2d07b

Browse files
committed
feat: support threaded connectivity
Signed-off-by: Alexandre Rulleau <[email protected]>
1 parent ee5d57c commit 5f2d07b

File tree

8 files changed

+335
-18
lines changed

8 files changed

+335
-18
lines changed

datadog-sidecar-ffi/src/lib.rs

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@ use datadog_sidecar::service::{
3232
};
3333
use datadog_sidecar::service::{get_telemetry_action_sender, InternalTelemetryActions};
3434
use datadog_sidecar::shm_remote_config::{path_for_remote_config, RemoteConfigReader};
35+
#[cfg(unix)]
36+
use datadog_sidecar::{connect_worker_unix, start_master_listener_unix};
37+
#[cfg(windows)]
38+
use datadog_sidecar::{
39+
connect_worker_windows, start_master_listener_windows, transport_from_owned_handle,
40+
};
3541
use datadog_trace_utils::msgpack_encoder;
3642
use ddcommon::tag::Tag;
3743
use ddcommon::Endpoint;
@@ -305,8 +311,6 @@ pub extern "C" fn ddog_remote_config_reader_drop(_: Box<RemoteConfigReader>) {}
305311
#[no_mangle]
306312
pub extern "C" fn ddog_sidecar_transport_drop(_: Box<SidecarTransport>) {}
307313

308-
/// # Safety
309-
/// Caller must ensure the process is safe to fork, at the time when this method is called
310314
#[no_mangle]
311315
pub extern "C" fn ddog_sidecar_connect(connection: &mut *mut SidecarTransport) -> MaybeError {
312316
let cfg = datadog_sidecar::config::FromEnv::config();
@@ -317,6 +321,38 @@ pub extern "C" fn ddog_sidecar_connect(connection: &mut *mut SidecarTransport) -
317321
MaybeError::None
318322
}
319323

324+
#[no_mangle]
325+
pub extern "C" fn ddog_sidecar_connect_master(master_pid: i32) -> MaybeError {
326+
#[cfg(unix)]
327+
{
328+
try_c!(start_master_listener_unix(master_pid));
329+
}
330+
#[cfg(windows)]
331+
{
332+
try_c!(start_master_listener_windows(master_pid));
333+
}
334+
MaybeError::None
335+
}
336+
337+
#[no_mangle]
338+
pub extern "C" fn ddog_sidecar_connect_worker(
339+
master_pid: i32,
340+
connection: &mut *mut SidecarTransport,
341+
) -> MaybeError {
342+
#[cfg(unix)]
343+
{
344+
let transport = Box::new(try_c!(connect_worker_unix(master_pid)));
345+
*connection = Box::into_raw(transport);
346+
}
347+
#[cfg(windows)]
348+
{
349+
let handle = try_c!(connect_worker_windows(master_pid));
350+
let transport = Box::new(try_c!(transport_from_owned_handle(handle)));
351+
*connection = Box::into_raw(transport);
352+
}
353+
MaybeError::None
354+
}
355+
320356
#[no_mangle]
321357
pub extern "C" fn ddog_sidecar_ping(transport: &mut Box<SidecarTransport>) -> MaybeError {
322358
try_c!(blocking::ping(transport));

datadog-sidecar/src/config.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ const ENV_SIDECAR_APPSEC_LOCK_FILE_PATH: &str = "_DD_SIDECAR_APPSEC_LOCK_FILE_PA
3636
const ENV_SIDECAR_APPSEC_LOG_FILE_PATH: &str = "_DD_SIDECAR_APPSEC_LOG_FILE_PATH";
3737
const ENV_SIDECAR_APPSEC_LOG_LEVEL: &str = "_DD_SIDECAR_APPSEC_LOG_LEVEL";
3838

39+
const ENV_SIDECAR_CONNECT_TO_MASTER_PID: &str = "_DD_SIDECAR_CONNECT_TO_MASTER_PID";
40+
3941
#[derive(Debug, Copy, Clone, Default)]
4042
pub enum IpcMode {
4143
#[default]
@@ -84,6 +86,7 @@ pub struct Config {
8486
pub crashtracker_endpoint: Option<Endpoint>,
8587
pub appsec_config: Option<AppSecConfig>,
8688
pub max_memory: usize,
89+
pub connect_to_master_pid: i32,
8790
}
8891

8992
#[derive(Debug, Clone)]
@@ -128,6 +131,12 @@ impl Config {
128131
format!("{}", self.max_memory).into(),
129132
);
130133
}
134+
if self.connect_to_master_pid != 0 {
135+
res.insert(
136+
ENV_SIDECAR_CONNECT_TO_MASTER_PID,
137+
format!("{}", self.connect_to_master_pid).into(),
138+
);
139+
}
131140
res
132141
}
133142
}
@@ -241,9 +250,17 @@ impl FromEnv {
241250
crashtracker_endpoint: Self::crashtracker_endpoint(),
242251
appsec_config: Self::appsec_config(),
243252
max_memory: Self::max_memory(),
253+
connect_to_master_pid: Self::connect_to_master_pid(),
244254
}
245255
}
246256

257+
fn connect_to_master_pid() -> i32 {
258+
std::env::var(ENV_SIDECAR_CONNECT_TO_MASTER_PID)
259+
.ok()
260+
.and_then(|s| s.parse().ok())
261+
.unwrap_or(0)
262+
}
263+
247264
fn appsec_config() -> Option<AppSecConfig> {
248265
let shared_lib_path = std::env::var_os(ENV_SIDECAR_APPSEC_SHARED_LIB_PATH)?;
249266
let socket_file_path = std::env::var_os(ENV_SIDECAR_APPSEC_SOCKET_FILE_PATH)?;

datadog-sidecar/src/entry.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,8 +219,8 @@ pub fn daemonize(listener: IpcServer, mut cfg: Config) -> anyhow::Result<()> {
219219

220220
pub fn start_or_connect_to_sidecar(cfg: Config) -> anyhow::Result<SidecarTransport> {
221221
let liaison = match cfg.ipc_mode {
222-
config::IpcMode::Shared => setup::DefaultLiason::ipc_shared(),
223-
config::IpcMode::InstancePerProcess => setup::DefaultLiason::ipc_per_process(),
222+
config::IpcMode::Shared => setup::DefaultLiaison::ipc_shared(),
223+
config::IpcMode::InstancePerProcess => setup::DefaultLiaison::ipc_per_process(),
224224
};
225225

226226
let err = match liaison.attempt_listen() {

datadog-sidecar/src/setup/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,5 @@ pub trait Liaison: Sized {
2323
fn attempt_listen(&self) -> io::Result<Option<IpcServer>>;
2424
fn ipc_shared() -> Self;
2525
fn ipc_per_process() -> Self;
26+
fn for_master_pid(master_pid: u32) -> Self;
2627
}

datadog-sidecar/src/setup/unix.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,10 @@ impl Liaison for SharedDirLiaison {
8989
let liason_path = env::temp_dir().join(format!("libdatadog.{}.{pid}", *PROCESS_RANDOM_ID));
9090
Self::new(liason_path)
9191
}
92+
93+
fn for_master_pid(master_pid: u32) -> Self {
94+
Self::new(env::temp_dir().join(format!("libdatadog.{}", master_pid)))
95+
}
9296
}
9397

9498
impl SharedDirLiaison {
@@ -141,7 +145,7 @@ mod linux {
141145
pub struct AbstractUnixSocketLiaison {
142146
path: PathBuf,
143147
}
144-
pub type DefaultLiason = AbstractUnixSocketLiaison;
148+
pub type DefaultLiaison = AbstractUnixSocketLiaison;
145149

146150
impl Liaison for AbstractUnixSocketLiaison {
147151
fn connect_to_server(&self) -> io::Result<Channel> {
@@ -173,6 +177,14 @@ mod linux {
173177
));
174178
Self { path }
175179
}
180+
181+
fn for_master_pid(master_pid: u32) -> Self {
182+
let path = PathBuf::from(format!(
183+
concat!("libdatadog/", crate::sidecar_version!(), ".{}.sock"),
184+
master_pid
185+
));
186+
Self { path }
187+
}
176188
}
177189

178190
impl Default for AbstractUnixSocketLiaison {
@@ -193,7 +205,7 @@ mod linux {
193205
pub use linux::*;
194206

195207
#[cfg(target_os = "macos")]
196-
pub type DefaultLiason = SharedDirLiaison;
208+
pub type DefaultLiaison = SharedDirLiaison;
197209

198210
#[cfg(test)]
199211
mod tests {

datadog-sidecar/src/setup/windows.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,10 @@ impl Liaison for NamedPipeLiaison {
166166
fn ipc_per_process() -> Self {
167167
Self::new(format!("libdatadog_{}_", unsafe { getpid() }))
168168
}
169+
170+
fn for_master_pid(master_pid: u32) -> Self {
171+
Self::new(env::temp_dir().join(format!("libdatadog.{}", master_pid)))
172+
}
169173
}
170174

171175
impl NamedPipeLiaison {
@@ -197,7 +201,7 @@ impl Default for NamedPipeLiaison {
197201
}
198202
}
199203

200-
pub type DefaultLiason = NamedPipeLiaison;
204+
pub type DefaultLiaison = NamedPipeLiaison;
201205

202206
#[cfg(test)]
203207
mod tests {

datadog-sidecar/src/unix.rs

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33

44
use spawn_worker::{getpid, SpawnWorker, Stdio, TrampolineData};
55

6-
use std::ffi::CString;
6+
use crate::service::blocking::SidecarTransport;
7+
use crate::setup::{DefaultLiaison, Liaison};
8+
use std::ffi::{CStr, CString};
79
use std::os::unix::net::UnixListener as StdUnixListener;
810

911
use crate::config::Config;
@@ -28,10 +30,45 @@ use datadog_crashtracker::{
2830
#[cfg(target_os = "linux")]
2931
use spawn_worker::{entrypoint, get_dl_path_raw};
3032
#[cfg(target_os = "linux")]
31-
use std::ffi::CStr;
33+
use std::thread;
3234
#[cfg(target_os = "linux")]
3335
use tracing::warn;
3436

37+
pub fn start_master_listener_unix(master_pid: i32) -> io::Result<()> {
38+
let liaison = DefaultLiaison::for_master_pid(master_pid as u32);
39+
40+
// Try to acquire the listening endpoint via the liaison
41+
let std_listener = match liaison.attempt_listen()? {
42+
Some(l) => l,
43+
None => return Ok(()),
44+
};
45+
46+
let _ = thread::Builder::new()
47+
.name("dd-sidecar".into())
48+
.spawn(move || {
49+
let acquire_listener = move || -> io::Result<_> {
50+
std_listener.set_nonblocking(true)?;
51+
let listener = UnixListener::from_std(std_listener.try_clone()?)?;
52+
let cancel = {
53+
let fd = listener.as_raw_fd();
54+
move || stop_listening(fd)
55+
};
56+
Ok((move |handler| accept_socket_loop(listener, handler), cancel))
57+
};
58+
59+
let _ = enter_listener_loop(acquire_listener);
60+
})
61+
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
62+
63+
Ok(())
64+
}
65+
66+
pub fn connect_worker_unix(master_pid: i32) -> io::Result<SidecarTransport> {
67+
let liaison = DefaultLiaison::for_master_pid(master_pid as u32);
68+
let channel = liaison.connect_to_server()?;
69+
Ok(channel.into())
70+
}
71+
3572
#[no_mangle]
3673
#[allow(unused)]
3774
pub extern "C" fn ddog_daemon_entry_point(trampoline_data: &TrampolineData) {

0 commit comments

Comments
 (0)