Skip to content

Commit 9b3c815

Browse files
committed
feat: support threaded connectivity
Signed-off-by: Alexandre Rulleau <[email protected]>
1 parent 636b792 commit 9b3c815

File tree

8 files changed

+335
-16
lines changed

8 files changed

+335
-16
lines changed

datadog-sidecar-ffi/src/lib.rs

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ use datadog_sidecar::agent_remote_config::{
2222
new_reader, reader_from_shm, AgentRemoteConfigEndpoint, AgentRemoteConfigWriter,
2323
};
2424
use datadog_sidecar::config;
25+
#[cfg(unix)]
26+
use datadog_sidecar::{start_master_listener_unix, connect_worker_unix};
27+
#[cfg(windows)]
28+
use datadog_sidecar::{start_master_listener_windows, connect_worker_windows, transport_from_owned_handle};
2529
use datadog_sidecar::config::LogMethod;
2630
use datadog_sidecar::crashtracker::crashtracker_unix_socket_path;
2731
use datadog_sidecar::one_way_shared_memory::{OneWayShmReader, ReaderOpener};
@@ -305,8 +309,6 @@ pub extern "C" fn ddog_remote_config_reader_drop(_: Box<RemoteConfigReader>) {}
305309
#[no_mangle]
306310
pub extern "C" fn ddog_sidecar_transport_drop(_: Box<SidecarTransport>) {}
307311

308-
/// # Safety
309-
/// Caller must ensure the process is safe to fork, at the time when this method is called
310312
#[no_mangle]
311313
pub extern "C" fn ddog_sidecar_connect(connection: &mut *mut SidecarTransport) -> MaybeError {
312314
let cfg = datadog_sidecar::config::FromEnv::config();
@@ -317,6 +319,38 @@ pub extern "C" fn ddog_sidecar_connect(connection: &mut *mut SidecarTransport) -
317319
MaybeError::None
318320
}
319321

322+
#[no_mangle]
323+
pub extern "C" fn ddog_sidecar_connect_master(master_pid: i32) -> MaybeError {
324+
#[cfg(unix)]
325+
{
326+
try_c!(start_master_listener_unix(master_pid));
327+
}
328+
#[cfg(windows)]
329+
{
330+
try_c!(start_master_listener_windows(master_pid));
331+
}
332+
MaybeError::None
333+
}
334+
335+
#[no_mangle]
336+
pub extern "C" fn ddog_sidecar_connect_worker(
337+
master_pid: i32,
338+
connection: &mut *mut SidecarTransport
339+
) -> MaybeError {
340+
#[cfg(unix)]
341+
{
342+
let transport = Box::new(try_c!(connect_worker_unix(master_pid)));
343+
*connection = Box::into_raw(transport);
344+
}
345+
#[cfg(windows)]
346+
{
347+
let handle = try_c!(connect_worker_windows(master_pid));
348+
let transport = Box::new(try_c!(transport_from_owned_handle(handle)));
349+
*connection = Box::into_raw(transport);
350+
}
351+
MaybeError::None
352+
}
353+
320354
#[no_mangle]
321355
pub extern "C" fn ddog_sidecar_ping(transport: &mut Box<SidecarTransport>) -> MaybeError {
322356
try_c!(blocking::ping(transport));

datadog-sidecar/src/config.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ const ENV_SIDECAR_APPSEC_LOCK_FILE_PATH: &str = "_DD_SIDECAR_APPSEC_LOCK_FILE_PA
3636
const ENV_SIDECAR_APPSEC_LOG_FILE_PATH: &str = "_DD_SIDECAR_APPSEC_LOG_FILE_PATH";
3737
const ENV_SIDECAR_APPSEC_LOG_LEVEL: &str = "_DD_SIDECAR_APPSEC_LOG_LEVEL";
3838

39+
const ENV_SIDECAR_CONNECT_TO_MASTER_PID: &str = "_DD_SIDECAR_CONNECT_TO_MASTER_PID";
40+
3941
#[derive(Debug, Copy, Clone)]
4042
pub enum IpcMode {
4143
Shared,
@@ -94,6 +96,7 @@ pub struct Config {
9496
pub crashtracker_endpoint: Option<Endpoint>,
9597
pub appsec_config: Option<AppSecConfig>,
9698
pub max_memory: usize,
99+
pub connect_to_master_pid: i32,
97100
}
98101

99102
#[derive(Debug, Clone)]
@@ -138,6 +141,12 @@ impl Config {
138141
format!("{}", self.max_memory).into(),
139142
);
140143
}
144+
if self.connect_to_master_pid != 0 {
145+
res.insert(
146+
ENV_SIDECAR_CONNECT_TO_MASTER_PID,
147+
format!("{}", self.connect_to_master_pid).into(),
148+
);
149+
}
141150
res
142151
}
143152
}
@@ -251,9 +260,17 @@ impl FromEnv {
251260
crashtracker_endpoint: Self::crashtracker_endpoint(),
252261
appsec_config: Self::appsec_config(),
253262
max_memory: Self::max_memory(),
263+
connect_to_master_pid: Self::connect_to_master_pid(),
254264
}
255265
}
256266

267+
fn connect_to_master_pid() -> i32 {
268+
std::env::var(ENV_SIDECAR_CONNECT_TO_MASTER_PID)
269+
.ok()
270+
.and_then(|s| s.parse().ok())
271+
.unwrap_or(0)
272+
}
273+
257274
fn appsec_config() -> Option<AppSecConfig> {
258275
let shared_lib_path = std::env::var_os(ENV_SIDECAR_APPSEC_SHARED_LIB_PATH)?;
259276
let socket_file_path = std::env::var_os(ENV_SIDECAR_APPSEC_SOCKET_FILE_PATH)?;

datadog-sidecar/src/entry.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,8 +219,8 @@ pub fn daemonize(listener: IpcServer, mut cfg: Config) -> anyhow::Result<()> {
219219

220220
pub fn start_or_connect_to_sidecar(cfg: Config) -> anyhow::Result<SidecarTransport> {
221221
let liaison = match cfg.ipc_mode {
222-
config::IpcMode::Shared => setup::DefaultLiason::ipc_shared(),
223-
config::IpcMode::InstancePerProcess => setup::DefaultLiason::ipc_per_process(),
222+
config::IpcMode::Shared => setup::DefaultLiaison::ipc_shared(),
223+
config::IpcMode::InstancePerProcess => setup::DefaultLiaison::ipc_per_process(),
224224
};
225225

226226
let err = match liaison.attempt_listen() {

datadog-sidecar/src/setup/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,5 @@ pub trait Liaison: Sized {
2323
fn attempt_listen(&self) -> io::Result<Option<IpcServer>>;
2424
fn ipc_shared() -> Self;
2525
fn ipc_per_process() -> Self;
26+
fn for_master_pid(master_pid: u32) -> Self;
2627
}

datadog-sidecar/src/setup/unix.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,10 @@ impl Liaison for SharedDirLiaison {
8989
let liason_path = env::temp_dir().join(format!("libdatadog.{}.{pid}", *PROCESS_RANDOM_ID));
9090
Self::new(liason_path)
9191
}
92+
93+
fn for_master_pid(master_pid: u32) -> Self {
94+
Self::new(env::temp_dir().join(format!("libdatadog.{}", master_pid)))
95+
}
9296
}
9397

9498
impl SharedDirLiaison {
@@ -141,7 +145,7 @@ mod linux {
141145
pub struct AbstractUnixSocketLiaison {
142146
path: PathBuf,
143147
}
144-
pub type DefaultLiason = AbstractUnixSocketLiaison;
148+
pub type DefaultLiaison = AbstractUnixSocketLiaison;
145149

146150
impl Liaison for AbstractUnixSocketLiaison {
147151
fn connect_to_server(&self) -> io::Result<Channel> {
@@ -173,6 +177,14 @@ mod linux {
173177
));
174178
Self { path }
175179
}
180+
181+
fn for_master_pid(master_pid: u32) -> Self {
182+
let path = PathBuf::from(format!(
183+
concat!("libdatadog/", crate::sidecar_version!(), ".{}.sock"),
184+
master_pid
185+
));
186+
Self { path }
187+
}
176188
}
177189

178190
impl Default for AbstractUnixSocketLiaison {
@@ -193,7 +205,7 @@ mod linux {
193205
pub use linux::*;
194206

195207
#[cfg(target_os = "macos")]
196-
pub type DefaultLiason = SharedDirLiaison;
208+
pub type DefaultLiaison = SharedDirLiaison;
197209

198210
#[cfg(test)]
199211
mod tests {

datadog-sidecar/src/setup/windows.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,10 @@ impl Liaison for NamedPipeLiaison {
166166
fn ipc_per_process() -> Self {
167167
Self::new(format!("libdatadog_{}_", unsafe { getpid() }))
168168
}
169+
170+
fn for_master_pid(master_pid: u32) -> Self {
171+
Self::new(env::temp_dir().join(format!("libdatadog.{}", master_pid)))
172+
}
169173
}
170174

171175
impl NamedPipeLiaison {
@@ -197,7 +201,7 @@ impl Default for NamedPipeLiaison {
197201
}
198202
}
199203

200-
pub type DefaultLiason = NamedPipeLiaison;
204+
pub type DefaultLiaison = NamedPipeLiaison;
201205

202206
#[cfg(test)]
203207
mod tests {

datadog-sidecar/src/unix.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
use spawn_worker::{entrypoint, get_dl_path_raw, getpid, SpawnWorker, Stdio, TrampolineData};
55

66
use std::ffi::{CStr, CString};
7+
use crate::service::blocking::SidecarTransport;
8+
use crate::setup::{DefaultLiaison, Liaison};
79
use std::os::unix::net::UnixListener as StdUnixListener;
810

911
use crate::config::{Config, LogMethod};
@@ -23,6 +25,44 @@ use tokio::signal::unix::{signal, SignalKind};
2325
use tracing::log::warn;
2426
use tracing::{error, info};
2527

28+
use std::thread;
29+
30+
31+
pub fn start_master_listener_unix(master_pid: i32) -> io::Result<()> {
32+
let liaison = DefaultLiaison::for_master_pid(master_pid as u32);
33+
34+
// Try to acquire the listening endpoint via the liaison
35+
let std_listener = match liaison.attempt_listen()? {
36+
Some(l) => l,
37+
None => return Ok(()),
38+
};
39+
40+
let _ = thread::Builder::new()
41+
.name("dd-sidecar".into())
42+
.spawn(move || {
43+
let acquire_listener = move || -> io::Result<_> {
44+
std_listener.set_nonblocking(true)?;
45+
let listener = UnixListener::from_std(std_listener.try_clone()?)?;
46+
let cancel = {
47+
let fd = listener.as_raw_fd();
48+
move || stop_listening(fd)
49+
};
50+
Ok((move |handler| accept_socket_loop(listener, handler), cancel))
51+
};
52+
53+
enter_listener_loop(acquire_listener);
54+
})
55+
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
56+
57+
Ok(())
58+
}
59+
60+
pub fn connect_worker_unix(master_pid: i32) -> io::Result<SidecarTransport> {
61+
let liaison = DefaultLiaison::for_master_pid(master_pid as u32);
62+
let channel = liaison.connect_to_server()?;
63+
Ok(channel.into())
64+
}
65+
2666
#[no_mangle]
2767
pub extern "C" fn ddog_daemon_entry_point(trampoline_data: &TrampolineData) {
2868
#[cfg(feature = "tracing")]

0 commit comments

Comments
 (0)