From 5df68221e3fe3737e9741a5c8fb7621d1b5ec1f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Eriksson?= Date: Sun, 6 Apr 2025 12:31:55 +0200 Subject: [PATCH] wip trace logging rewrite --- Cargo.lock | 11 +++++- runtimes/core/Cargo.toml | 1 + runtimes/core/src/trace/log.rs | 69 +++++++++++++++++++--------------- 3 files changed, 49 insertions(+), 32 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 73e835d631..7d96ebfcad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1617,6 +1617,7 @@ dependencies = [ "sha2", "sha3", "subtle", + "sync-cell", "thiserror 1.0.69", "tokio", "tokio-nsq", @@ -3883,7 +3884,7 @@ dependencies = [ "pingora-http", "pingora-ketama", "pingora-runtime", - "rand 0.7.3", + "rand 0.8.5", "tokio", ] @@ -3896,7 +3897,7 @@ dependencies = [ "arrayvec", "hashbrown 0.15.2", "parking_lot", - "rand 0.7.3", + "rand 0.8.5", ] [[package]] @@ -5657,6 +5658,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync-cell" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03f753958cb8997f0356c66d3daa382cab71bf797ced914d31405cd9f018e10a" + [[package]] name = "sync_wrapper" version = "0.1.2" diff --git a/runtimes/core/Cargo.toml b/runtimes/core/Cargo.toml index 7f6d769797..3052ebef72 100644 --- a/runtimes/core/Cargo.toml +++ b/runtimes/core/Cargo.toml @@ -96,6 +96,7 @@ percent-encoding = "2.3.1" aws-credential-types = "1.2.1" regex = "1.11.1" email_address = "0.2.9" +sync-cell = "0.2.0" [build-dependencies] prost-build = "0.12.3" diff --git a/runtimes/core/src/trace/log.rs b/runtimes/core/src/trace/log.rs index 34d4c2223c..e42b9f1327 100644 --- a/runtimes/core/src/trace/log.rs +++ b/runtimes/core/src/trace/log.rs @@ -1,6 +1,6 @@ use std::convert::Infallible; use std::pin::Pin; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; use crate::api::reqauth::platform; @@ -58,10 +58,8 @@ pub(super) struct TraceEvent { impl Reporter { pub async fn start_reporting(self) { - let mut inner = Box::new(InnerTraceEventStream { - rx: self.rx, - anchor: self.anchor.clone(), - current: None, + let inner = Arc::new(InnerTraceEventStream { + rx: sync_cell::SyncCell::new(self.rx), }); let trace_time_anchor = self.anchor.trace_header(); @@ -95,16 +93,16 @@ impl Reporter { loop { // Wait for at least one entry on rx before we open an HTTP request. - { - let Some(event) = inner.rx.recv().await else { + let event = { + let Some(event) = inner.lock().unwrap().rx.recv().await else { // The stream is closed. This only happens if all senders have been dropped, // which should never happen in regular use. return; }; - inner.current = Some(StreamingTraceEvent { + StreamingTraceEvent { event, next: EventStreamState::Header, - }); + } }; // Construct the body stream. @@ -112,7 +110,9 @@ impl Reporter { no_data_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); let stream = TraceEventStream { - inner: inner.as_mut() as *mut InnerTraceEventStream, + anchor: self.anchor.clone(), + current: Some(event), + inner: inner.clone(), num_events_this_tick: 1, no_data_ticker, }; @@ -158,53 +158,58 @@ impl Reporter { } struct TraceEventStream { - inner: *mut InnerTraceEventStream, + inner: InnerTraceEventStream, + anchor: TimeAnchor, // The number of events received since the last tick. num_events_this_tick: usize, /// Ticks to detect when there is no data to close the stream. no_data_ticker: tokio::time::Interval, + + /// Current item received from rx and being streamed. + current: Option, } // Safety: the TraceEventStream only contains `poll_next` which requires a mutable reference // to self. Therefore it is never called concurrently. The lifetime of inner is guaranteed // to exceed the lifetime of the stream. -unsafe impl Send for TraceEventStream {} -unsafe impl Sync for TraceEventStream {} +// unsafe impl Send for TraceEventStream {} +// unsafe impl Sync for TraceEventStream {} struct InnerTraceEventStream { - rx: tokio::sync::mpsc::UnboundedReceiver, - anchor: TimeAnchor, + rx: sync_cell::SyncCell>>, +} - /// Current item received from rx and being streamed. - current: Option, +impl InnerTraceEventStream { + fn poll_recv(&self, cx: &mut Context) -> Poll> { + let mut rx = self.rx.replace(None).unwrap(); + let poll = rx.poll_recv(cx); + self.rx.replace(Some(rx)); + poll + } } impl futures_core::stream::Stream for TraceEventStream { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - // Safety: the inner pointer is boxed and never moved, and is kept alive - // by the start_reporting method for the lifetime of the stream. - let inner = unsafe { &mut *self.inner }; - { // If we have a current item, return it. - if inner.current.is_some() { - let next = inner.current.as_ref().unwrap().next; + if self.current.is_some() { + let next = self.current.as_ref().unwrap().next; return match next { EventStreamState::Header => { - inner.current.as_mut().unwrap().next = EventStreamState::Data; - Poll::Ready(Some(Ok(inner + self.current.as_mut().unwrap().next = EventStreamState::Data; + Poll::Ready(Some(Ok(self .current .as_ref() .unwrap() - .header(&inner.anchor)))) + .header(&self.anchor)))) } EventStreamState::Data => { - let data = inner.current.as_ref().unwrap().event.data.clone(); // cheap clone - inner.current = None; + let data = self.current.as_ref().unwrap().event.data.clone(); // cheap clone + self.current = None; Poll::Ready(Some(Ok(data))) } }; @@ -228,10 +233,14 @@ impl futures_core::stream::Stream for TraceEventStream { // If we have no current item, poll the receiver for a new trace event. { - match inner.rx.poll_recv(cx) { + let mut rx = self.inner.poll_recv(cx); + let poll = rx.poll_recv(cx); + self.inner.rx.replace(Some(rx)); + + match poll { Poll::Ready(Some(event)) => { self.num_events_this_tick += 1; - inner.current = Some(StreamingTraceEvent { + self.current = Some(StreamingTraceEvent { event, next: EventStreamState::Header, });