Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 54 additions & 2 deletions datadog-sidecar-ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ use datadog_sidecar::service::{
};
use datadog_sidecar::service::{get_telemetry_action_sender, InternalTelemetryActions};
use datadog_sidecar::shm_remote_config::{path_for_remote_config, RemoteConfigReader};
#[cfg(unix)]
use datadog_sidecar::{
connect_worker_unix, shutdown_master_listener_unix, start_master_listener_unix,
};
#[cfg(windows)]
use datadog_sidecar::{
connect_worker_windows, shutdown_master_listener_windows, start_master_listener_windows,
transport_from_owned_handle,
};
use libc::c_char;
use libdd_common::tag::Tag;
use libdd_common::Endpoint;
Expand Down Expand Up @@ -305,8 +314,6 @@ pub extern "C" fn ddog_remote_config_reader_drop(_: Box<RemoteConfigReader>) {}
#[no_mangle]
pub extern "C" fn ddog_sidecar_transport_drop(_: Box<SidecarTransport>) {}

/// # Safety
/// Caller must ensure the process is safe to fork, at the time when this method is called
#[no_mangle]
pub extern "C" fn ddog_sidecar_connect(connection: &mut *mut SidecarTransport) -> MaybeError {
let cfg = datadog_sidecar::config::FromEnv::config();
Expand All @@ -317,6 +324,51 @@ pub extern "C" fn ddog_sidecar_connect(connection: &mut *mut SidecarTransport) -
MaybeError::None
}

#[no_mangle]
pub extern "C" fn ddog_sidecar_connect_master(master_pid: i32) -> MaybeError {
#[cfg(unix)]
{
try_c!(start_master_listener_unix(master_pid));
}
#[cfg(windows)]
{
try_c!(start_master_listener_windows(master_pid));
}
MaybeError::None
}

#[no_mangle]
pub extern "C" fn ddog_sidecar_connect_worker(
master_pid: i32,
connection: &mut *mut SidecarTransport,
) -> MaybeError {
#[cfg(unix)]
{
let transport = Box::new(try_c!(connect_worker_unix(master_pid)));
*connection = Box::into_raw(transport);
}
#[cfg(windows)]
{
let handle = try_c!(connect_worker_windows(master_pid));
let transport = Box::new(try_c!(transport_from_owned_handle(handle)));
*connection = Box::into_raw(transport);
}
MaybeError::None
}

#[no_mangle]
pub extern "C" fn ddog_sidecar_shutdown_master_listener() -> MaybeError {
#[cfg(unix)]
{
try_c!(shutdown_master_listener_unix());
}
#[cfg(windows)]
{
try_c!(shutdown_master_listener_windows());
}
MaybeError::None
}

#[no_mangle]
pub extern "C" fn ddog_sidecar_ping(transport: &mut Box<SidecarTransport>) -> MaybeError {
try_c!(blocking::ping(transport));
Expand Down
17 changes: 17 additions & 0 deletions datadog-sidecar/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ const ENV_SIDECAR_APPSEC_LOCK_FILE_PATH: &str = "_DD_SIDECAR_APPSEC_LOCK_FILE_PA
const ENV_SIDECAR_APPSEC_LOG_FILE_PATH: &str = "_DD_SIDECAR_APPSEC_LOG_FILE_PATH";
const ENV_SIDECAR_APPSEC_LOG_LEVEL: &str = "_DD_SIDECAR_APPSEC_LOG_LEVEL";

const ENV_SIDECAR_CONNECT_TO_MASTER_PID: &str = "_DD_SIDECAR_CONNECT_TO_MASTER_PID";

#[derive(Debug, Copy, Clone, Default)]
pub enum IpcMode {
#[default]
Expand Down Expand Up @@ -84,6 +86,7 @@ pub struct Config {
pub crashtracker_endpoint: Option<Endpoint>,
pub appsec_config: Option<AppSecConfig>,
pub max_memory: usize,
pub connect_to_master_pid: i32,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -128,6 +131,12 @@ impl Config {
format!("{}", self.max_memory).into(),
);
}
if self.connect_to_master_pid != 0 {
res.insert(
ENV_SIDECAR_CONNECT_TO_MASTER_PID,
format!("{}", self.connect_to_master_pid).into(),
);
}
res
}
}
Expand Down Expand Up @@ -241,9 +250,17 @@ impl FromEnv {
crashtracker_endpoint: Self::crashtracker_endpoint(),
appsec_config: Self::appsec_config(),
max_memory: Self::max_memory(),
connect_to_master_pid: Self::connect_to_master_pid(),
}
}

fn connect_to_master_pid() -> i32 {
std::env::var(ENV_SIDECAR_CONNECT_TO_MASTER_PID)
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(0)
}

fn appsec_config() -> Option<AppSecConfig> {
let shared_lib_path = std::env::var_os(ENV_SIDECAR_APPSEC_SHARED_LIB_PATH)?;
let socket_file_path = std::env::var_os(ENV_SIDECAR_APPSEC_SOCKET_FILE_PATH)?;
Expand Down
4 changes: 2 additions & 2 deletions datadog-sidecar/src/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,8 @@ pub fn daemonize(listener: IpcServer, mut cfg: Config) -> anyhow::Result<()> {

pub fn start_or_connect_to_sidecar(cfg: Config) -> anyhow::Result<SidecarTransport> {
let liaison = match cfg.ipc_mode {
config::IpcMode::Shared => setup::DefaultLiason::ipc_shared(),
config::IpcMode::InstancePerProcess => setup::DefaultLiason::ipc_per_process(),
config::IpcMode::Shared => setup::DefaultLiaison::ipc_shared(),
config::IpcMode::InstancePerProcess => setup::DefaultLiaison::ipc_per_process(),
};

let err = match liaison.attempt_listen() {
Expand Down
1 change: 1 addition & 0 deletions datadog-sidecar/src/setup/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ pub trait Liaison: Sized {
fn attempt_listen(&self) -> io::Result<Option<IpcServer>>;
fn ipc_shared() -> Self;
fn ipc_per_process() -> Self;
fn for_master_pid(master_pid: u32) -> Self;
}
16 changes: 14 additions & 2 deletions datadog-sidecar/src/setup/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ impl Liaison for SharedDirLiaison {
let liason_path = env::temp_dir().join(format!("libdatadog.{}.{pid}", *PROCESS_RANDOM_ID));
Self::new(liason_path)
}

fn for_master_pid(master_pid: u32) -> Self {
Self::new(env::temp_dir().join(format!("libdatadog.{}", master_pid)))
}
}

impl SharedDirLiaison {
Expand Down Expand Up @@ -141,7 +145,7 @@ mod linux {
pub struct AbstractUnixSocketLiaison {
path: PathBuf,
}
pub type DefaultLiason = AbstractUnixSocketLiaison;
pub type DefaultLiaison = AbstractUnixSocketLiaison;

impl Liaison for AbstractUnixSocketLiaison {
fn connect_to_server(&self) -> io::Result<Channel> {
Expand Down Expand Up @@ -173,6 +177,14 @@ mod linux {
));
Self { path }
}

fn for_master_pid(master_pid: u32) -> Self {
let path = PathBuf::from(format!(
concat!("libdatadog/", crate::sidecar_version!(), ".{}.sock"),
master_pid
));
Self { path }
}
}

impl Default for AbstractUnixSocketLiaison {
Expand All @@ -193,7 +205,7 @@ mod linux {
pub use linux::*;

#[cfg(target_os = "macos")]
pub type DefaultLiason = SharedDirLiaison;
pub type DefaultLiaison = SharedDirLiaison;

#[cfg(test)]
mod tests {
Expand Down
7 changes: 6 additions & 1 deletion datadog-sidecar/src/setup/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ impl Liaison for NamedPipeLiaison {
fn ipc_per_process() -> Self {
Self::new(format!("libdatadog_{}_", unsafe { getpid() }))
}

fn for_master_pid(master_pid: u32) -> Self {
let path = env::temp_dir().join(format!("libdatadog.{}", master_pid));
Self::new(path.to_string_lossy().as_ref())
}
}

impl NamedPipeLiaison {
Expand Down Expand Up @@ -197,7 +202,7 @@ impl Default for NamedPipeLiaison {
}
}

pub type DefaultLiason = NamedPipeLiaison;
pub type DefaultLiaison = NamedPipeLiaison;

#[cfg(test)]
mod tests {
Expand Down
80 changes: 80 additions & 0 deletions datadog-sidecar/src/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

use spawn_worker::{getpid, SpawnWorker, Stdio, TrampolineData};

use crate::service::blocking::SidecarTransport;
use crate::setup::{DefaultLiaison, Liaison};
use std::ffi::CString;
use std::os::unix::net::UnixListener as StdUnixListener;

Expand All @@ -13,6 +15,8 @@ use nix::sys::socket::{shutdown, Shutdown};
use std::io;
use std::os::fd::RawFd;
use std::os::unix::prelude::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd};
use std::sync::Mutex;
use std::thread;
use std::time::Instant;
use tokio::net::{UnixListener, UnixStream};
use tokio::select;
Expand All @@ -32,6 +36,82 @@ use std::ffi::CStr;
#[cfg(target_os = "linux")]
use tracing::warn;

// Global storage for the master listener thread handle and listener FD
static MASTER_LISTENER: Mutex<Option<(thread::JoinHandle<()>, RawFd)>> = Mutex::new(None);

pub fn start_master_listener_unix(master_pid: i32) -> io::Result<()> {
let liaison = DefaultLiaison::for_master_pid(master_pid as u32);

// Try to acquire the listening endpoint via the liaison
let std_listener = match liaison.attempt_listen()? {
Some(l) => l,
None => return Ok(()),
};

// Store the listener FD for later shutdown
let listener_fd = std_listener.as_raw_fd();

let handle = thread::Builder::new()
.name("dd-sidecar".into())
.spawn(move || {
let acquire_listener = move || -> io::Result<_> {
std_listener.set_nonblocking(true)?;
let listener = UnixListener::from_std(std_listener.try_clone()?)?;
let cancel = {
let fd = listener.as_raw_fd();
move || stop_listening(fd)
};
Ok((move |handler| accept_socket_loop(listener, handler), cancel))
};

let _ = enter_listener_loop(acquire_listener);
})
.map_err(io::Error::other)?;

// Store the thread handle and FD for shutdown
match MASTER_LISTENER.lock() {
Ok(mut guard) => *guard = Some((handle, listener_fd)),
Err(e) => {
error!("Failed to acquire lock for storing master listener: {}", e);
return Err(io::Error::other("Mutex poisoned"));
}
}

Ok(())
}

pub fn connect_worker_unix(master_pid: i32) -> io::Result<SidecarTransport> {
let liaison = DefaultLiaison::for_master_pid(master_pid as u32);
let channel = liaison.connect_to_server()?;
Ok(channel.into())
}

pub fn shutdown_master_listener_unix() -> io::Result<()> {
let listener_data = match MASTER_LISTENER.lock() {
Ok(mut guard) => guard.take(),
Err(e) => {
error!(
"Failed to acquire lock for shutting down master listener: {}",
e
);
return Err(io::Error::other("Mutex poisoned"));
}
};

if let Some((handle, fd)) = listener_data {
// Signal the listener to stop
stop_listening(fd);

// Join the thread to wait for cleanup
if let Err(e) = handle.join() {
error!("Failed to join master listener thread: {:?}", e);
return Err(io::Error::other("Failed to join master listener thread"));
}
}

Ok(())
}

#[no_mangle]
#[allow(unused)]
pub extern "C" fn ddog_daemon_entry_point(trampoline_data: &TrampolineData) {
Expand Down
Loading
Loading