Skip to content

Commit d9322d4

Browse files
committed
payload splitting in ci_visibility
1 parent 0c6ba8d commit d9322d4

File tree

3 files changed

+102
-53
lines changed

3 files changed

+102
-53
lines changed

ddtrace/internal/ci_visibility/encoder.py

Lines changed: 77 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@
22
import os
33
import threading
44
from typing import TYPE_CHECKING # noqa:F401
5+
from typing import Any # noqa:F401
6+
from typing import Dict # noqa:F401
7+
from typing import List # noqa:F401
8+
from typing import Optional # noqa:F401
9+
from typing import Tuple # noqa:F401
510
from uuid import uuid4
611

712
from ddtrace.ext import SpanTypes
@@ -28,11 +33,6 @@
2833
log = get_logger(__name__)
2934

3035
if TYPE_CHECKING: # pragma: no cover
31-
from typing import Any # noqa:F401
32-
from typing import Dict # noqa:F401
33-
from typing import List # noqa:F401
34-
from typing import Optional # noqa:F401
35-
3636
from ddtrace._trace.span import Span # noqa:F401
3737

3838

@@ -42,13 +42,15 @@ class CIVisibilityEncoderV01(BufferedEncoder):
4242
TEST_SUITE_EVENT_VERSION = 1
4343
TEST_EVENT_VERSION = 2
4444
ENDPOINT_TYPE = ENDPOINT.TEST_CYCLE
45+
_MAX_PAYLOAD_SIZE = 5 * 1024 * 1024 # 5MB
4546

4647
def __init__(self, *args):
4748
# DEV: args are not used here, but are used by BufferedEncoder's __cinit__() method,
4849
# which is called implicitly by Cython.
4950
super(CIVisibilityEncoderV01, self).__init__()
51+
self._metadata = {} # type: Dict[str, Dict[str, str]]
5052
self._lock = threading.RLock()
51-
self._metadata = {}
53+
self._is_not_xdist_worker = os.getenv("PYTEST_XDIST_WORKER") is None
5254
self._init_buffer()
5355

5456
def __len__(self):
@@ -68,18 +70,21 @@ def put(self, spans):
6870
self.buffer.append(spans)
6971

7072
def encode_traces(self, traces):
71-
return self._build_payload(traces=traces)
73+
return self._build_payload(traces=traces)[0]
7274

73-
def encode(self):
75+
def encode(self) -> List[Tuple[Optional[bytes], int]]:
7476
with self._lock:
75-
with StopWatch() as sw:
76-
payload = self._build_payload(self.buffer)
77-
record_endpoint_payload_events_serialization_time(endpoint=self.ENDPOINT_TYPE, seconds=sw.elapsed())
78-
buffer_size = len(self.buffer)
79-
if not buffer_size:
77+
if not self.buffer:
8078
return []
81-
self._init_buffer()
82-
return [(payload, buffer_size)]
79+
payloads = []
80+
while self.buffer:
81+
with StopWatch() as sw:
82+
payload, count = self._build_payload(self.buffer)
83+
payloads.append((payload, count))
84+
record_endpoint_payload_events_serialization_time(endpoint=self.ENDPOINT_TYPE, seconds=sw.elapsed())
85+
if count:
86+
self.buffer = self.buffer[count:]
87+
return payloads
8388

8489
def _get_parent_session(self, traces):
8590
for trace in traces:
@@ -89,29 +94,65 @@ def _get_parent_session(self, traces):
8994
return 0
9095

9196
def _build_payload(self, traces):
92-
new_parent_session_span_id = self._get_parent_session(traces)
93-
is_not_xdist_worker = os.getenv("PYTEST_XDIST_WORKER") is None
94-
normalized_spans = [
95-
self._convert_span(span, trace[0].context.dd_origin, new_parent_session_span_id)
96-
for trace in traces
97-
for span in trace
98-
if (is_not_xdist_worker or span.get_tag(EVENT_TYPE) != SESSION_TYPE)
99-
]
100-
if not normalized_spans:
101-
return None
102-
record_endpoint_payload_events_count(endpoint=ENDPOINT.TEST_CYCLE, count=len(normalized_spans))
97+
# type: (List[List[Span]]) -> Tuple[Optional[bytes], int]
98+
if not traces:
99+
return []
103100

104-
# TODO: Split the events in several payloads as needed to avoid hitting the intake's maximum payload size.
101+
new_parent_session_span_id = self._get_parent_session(traces)
102+
return self._send_all_or_half_spans(traces, new_parent_session_span_id)
103+
104+
def _send_all_or_half_spans(self, traces, new_parent_session_span_id):
105+
# Convert all traces to spans with filtering
106+
all_spans_with_trace_info = self._convert_traces_to_spans(traces, new_parent_session_span_id)
107+
total_traces = len(traces)
108+
109+
# Get all spans (flattened)
110+
all_spans = [span for _, trace_spans in all_spans_with_trace_info for span in trace_spans]
111+
112+
if not all_spans:
113+
log.debug("No spans to encode after filtering, returning empty payload")
114+
return None, total_traces
115+
116+
# Try to fit all spans first (optimistic case)
117+
payload = self._create_payload_from_spans(all_spans)
118+
if len(payload) <= self._MAX_PAYLOAD_SIZE or total_traces <= 1:
119+
record_endpoint_payload_events_count(endpoint=ENDPOINT.TEST_CYCLE, count=len(all_spans))
120+
return payload, total_traces
121+
122+
mid = (total_traces + 1) // 2
123+
return self._send_all_or_half_spans(traces[:mid], new_parent_session_span_id)
124+
125+
def _convert_traces_to_spans(self, traces, new_parent_session_span_id):
126+
# type: (List[List[Span]], Optional[int]) -> List[Tuple[int, List[Dict[str, Any]]]]
127+
"""Convert all traces to spans with xdist filtering applied."""
128+
all_spans_with_trace_info = []
129+
for trace_idx, trace in enumerate(traces):
130+
trace_spans = [
131+
self._convert_span(span, trace[0].context.dd_origin, new_parent_session_span_id)
132+
for span in trace
133+
if self._is_not_xdist_worker or span.get_tag(EVENT_TYPE) != SESSION_TYPE
134+
]
135+
all_spans_with_trace_info.append((trace_idx, trace_spans))
136+
137+
return all_spans_with_trace_info
138+
139+
def _create_payload_from_spans(self, spans):
140+
# type: (List[Dict[str, Any]]) -> bytes
141+
"""Create a payload from the given spans."""
105142
return CIVisibilityEncoderV01._pack_payload(
106-
{"version": self.PAYLOAD_FORMAT_VERSION, "metadata": self._metadata, "events": normalized_spans}
143+
{
144+
"version": self.PAYLOAD_FORMAT_VERSION,
145+
"metadata": self._metadata,
146+
"events": spans,
147+
}
107148
)
108149

109150
@staticmethod
110151
def _pack_payload(payload):
111152
return msgpack_packb(payload)
112153

113-
def _convert_span(self, span, dd_origin, new_parent_session_span_id=0):
114-
# type: (Span, str, Optional[int]) -> Dict[str, Any]
154+
def _convert_span(self, span, dd_origin=None, new_parent_session_span_id=0):
155+
# type: (Span, Optional[str], Optional[int]) -> Dict[str, Any]
115156
sp = JSONEncoderV2._span_to_dict(span)
116157
sp = JSONEncoderV2._normalize_span(sp)
117158
sp["type"] = span.get_tag(EVENT_TYPE) or span.span_type
@@ -220,7 +261,7 @@ def _build_body(self, data):
220261
def _build_data(self, traces):
221262
# type: (List[List[Span]]) -> Optional[bytes]
222263
normalized_covs = [
223-
self._convert_span(span, "")
264+
self._convert_span(span)
224265
for trace in traces
225266
for span in trace
226267
if (COVERAGE_TAG_NAME in span.get_tags() or span.get_struct_tag(COVERAGE_TAG_NAME) is not None)
@@ -232,14 +273,14 @@ def _build_data(self, traces):
232273
return msgpack_packb({"version": self.PAYLOAD_FORMAT_VERSION, "coverages": normalized_covs})
233274

234275
def _build_payload(self, traces):
235-
# type: (List[List[Span]]) -> Optional[bytes]
276+
# type: (List[List[Span]]) -> Tuple[Optional[bytes], int]
236277
data = self._build_data(traces)
237278
if not data:
238-
return None
239-
return b"\r\n".join(self._build_body(data))
279+
return None, 0
280+
return b"\r\n".join(self._build_body(data)), len(data)
240281

241-
def _convert_span(self, span, dd_origin, new_parent_session_span_id=0):
242-
# type: (Span, str, Optional[int]) -> Dict[str, Any]
282+
def _convert_span(self, span, dd_origin=None, new_parent_session_span_id=0):
283+
# type: (Span, Optional[str], Optional[int]) -> Dict[str, Any]
243284
# DEV: new_parent_session_span_id is unused here, but it is used in super class
244285
files: Dict[str, Any] = {}
245286

ddtrace/internal/writer/writer.py

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from typing import List
1313
from typing import Optional
1414
from typing import TextIO
15+
from typing import Tuple
1516

1617
import ddtrace
1718
from ddtrace import config
@@ -378,30 +379,37 @@ def flush_queue(self, raise_exc: bool = False):
378379
def _flush_queue_with_client(self, client: WriterClientBase, raise_exc: bool = False) -> None:
379380
n_traces = len(client.encoder)
380381
try:
381-
if not (encoded_traces := client.encoder.encode()):
382+
if not (payloads := client.encoder.encode()):
382383
return
383384

384-
[(encoded, n_traces)] = encoded_traces
385-
386-
if encoded is None:
387-
return
388-
389-
# Should gzip the payload if intake accepts it
390-
if self._intake_accepts_gzip:
391-
original_size = len(encoded)
392-
# Replace the value to send with the gzipped the value
393-
encoded = gzip.compress(encoded, compresslevel=6)
394-
log.debug("Original size in bytes: %s, Compressed size: %s", original_size, len(encoded))
395-
396-
# And add the header
397-
self._headers["Content-Encoding"] = "gzip"
385+
[(_, n_traces)] = payloads
398386

399387
except Exception:
400388
# FIXME(munir): if client.encoder raises an Exception n_traces may not be accurate due to race conditions
401389
log.error("failed to encode trace with encoder %r", client.encoder, exc_info=True)
402390
self._metrics_dist("encoder.dropped.traces", n_traces)
403391
return
404392

393+
for payload in payloads:
394+
encoded_data, n_traces = payload
395+
self._flush_single_payload(encoded_data, n_traces, client=client, raise_exc=raise_exc)
396+
397+
def _flush_single_payload(
398+
self, encoded: Optional[bytes], n_traces: int, client: WriterClientBase, raise_exc: bool = False
399+
) -> None:
400+
if encoded is None:
401+
return
402+
403+
# Should gzip the payload if intake accepts it
404+
if self._intake_accepts_gzip:
405+
original_size = len(encoded)
406+
# Replace the value to send with the gzipped the value
407+
encoded = gzip.compress(encoded, compresslevel=6)
408+
log.debug("Original size in bytes: %s, Compressed size: %s", original_size, len(encoded))
409+
410+
# And add the header
411+
self._headers["Content-Encoding"] = "gzip"
412+
405413
try:
406414
self._send_payload_with_backoff(encoded, n_traces, client)
407415
except Exception:

tests/ci_visibility/test_ci_visibility.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1417,7 +1417,7 @@ def tearDown(self):
14171417
def assert_test_session_name(self, name):
14181418
"""Check that the payload metadata contains the test session name attributes."""
14191419
payload = msgpack.loads(
1420-
CIVisibility._instance.tracer._span_aggregator.writer._clients[0].encoder._build_payload([[Span("foo")]])
1420+
CIVisibility._instance.tracer._span_aggregator.writer._clients[0].encoder._build_payload([[Span("foo")]])[0]
14211421
)
14221422
assert payload["metadata"]["test_session_end"] == {"test_session.name": name}
14231423
assert payload["metadata"]["test_suite_end"] == {"test_session.name": name}
@@ -1493,7 +1493,7 @@ def test_set_library_capabilities(self):
14931493
)
14941494

14951495
payload = msgpack.loads(
1496-
CIVisibility._instance.tracer._span_aggregator.writer._clients[0].encoder._build_payload([[Span("foo")]])
1496+
CIVisibility._instance.tracer._span_aggregator.writer._clients[0].encoder._build_payload([[Span("foo")]])[0]
14971497
)
14981498
assert payload["metadata"]["test"] == {
14991499
"_dd.library_capabilities.early_flake_detection": "1",

0 commit comments

Comments
 (0)