Skip to content

Commit 0f7e0b3

Browse files
gnufedealyshawang
authored andcommitted
chore(ci_visibility): proper payload split by size (#14065)
1 parent 002abb0 commit 0f7e0b3

File tree

2 files changed

+318
-56
lines changed

2 files changed

+318
-56
lines changed

ddtrace/internal/ci_visibility/encoder.py

Lines changed: 126 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,14 @@
1+
from __future__ import annotations
2+
13
import json
24
import os
35
import threading
46
from typing import TYPE_CHECKING # noqa:F401
7+
from typing import Any # noqa:F401
8+
from typing import Dict # noqa:F401
9+
from typing import List # noqa:F401
10+
from typing import Optional # noqa:F401
11+
from typing import Tuple # noqa:F401
512
from uuid import uuid4
613

714
from ddtrace.ext import SpanTypes
@@ -28,12 +35,6 @@
2835
log = get_logger(__name__)
2936

3037
if TYPE_CHECKING: # pragma: no cover
31-
from typing import Any # noqa:F401
32-
from typing import Dict # noqa:F401
33-
from typing import List # noqa:F401
34-
from typing import Optional # noqa:F401
35-
from typing import Tuple # noqa:F401
36-
3738
from ddtrace._trace.span import Span # noqa:F401
3839

3940

@@ -43,79 +44,153 @@ class CIVisibilityEncoderV01(BufferedEncoder):
4344
TEST_SUITE_EVENT_VERSION = 1
4445
TEST_EVENT_VERSION = 2
4546
ENDPOINT_TYPE = ENDPOINT.TEST_CYCLE
47+
_MAX_PAYLOAD_SIZE = 5 * 1024 * 1024 # 5MB
4648

4749
def __init__(self, *args):
4850
# DEV: args are not used here, but are used by BufferedEncoder's __cinit__() method,
4951
# which is called implicitly by Cython.
5052
super(CIVisibilityEncoderV01, self).__init__()
53+
self._metadata: Dict[str, Dict[str, str]] = {}
5154
self._lock = threading.RLock()
52-
self._metadata = {}
55+
self._is_xdist_worker = os.getenv("PYTEST_XDIST_WORKER") is not None
5356
self._init_buffer()
5457

5558
def __len__(self):
5659
with self._lock:
5760
return len(self.buffer)
5861

59-
def set_metadata(self, event_type, metadata):
60-
# type: (str, Dict[str, str]) -> None
62+
def set_metadata(self, event_type: str, metadata: Dict[str, str]):
6163
self._metadata.setdefault(event_type, {}).update(metadata)
6264

6365
def _init_buffer(self):
6466
with self._lock:
6567
self.buffer = []
6668

67-
def put(self, spans):
69+
def put(self, item):
6870
with self._lock:
69-
self.buffer.append(spans)
71+
self.buffer.append(item)
7072

7173
def encode_traces(self, traces):
72-
return self._build_payload(traces=traces)
74+
"""
75+
Only used for LogWriter, not called for CI Visibility currently
76+
"""
77+
raise NotImplementedError()
7378

74-
def encode(self):
79+
def encode(self) -> List[Tuple[Optional[bytes], int]]:
7580
with self._lock:
81+
if not self.buffer:
82+
return []
83+
payloads = []
7684
with StopWatch() as sw:
77-
result_payloads = self._build_payload(self.buffer)
85+
payloads = self._build_payload(self.buffer)
7886
record_endpoint_payload_events_serialization_time(endpoint=self.ENDPOINT_TYPE, seconds=sw.elapsed())
7987
self._init_buffer()
80-
return result_payloads
88+
return payloads
8189

82-
def _get_parent_session(self, traces):
90+
def _get_parent_session(self, traces: List[List[Span]]) -> int:
8391
for trace in traces:
8492
for span in trace:
8593
if span.get_tag(EVENT_TYPE) == SESSION_TYPE and span.parent_id is not None and span.parent_id != 0:
8694
return span.parent_id
8795
return 0
8896

89-
def _build_payload(self, traces):
90-
# type: (List[List[Span]]) -> List[Tuple[Optional[bytes], int]]
97+
def _build_payload(self, traces: List[List[Span]]) -> List[Tuple[Optional[bytes], int]]:
98+
"""
99+
Build multiple payloads from traces, splitting when necessary to stay under size limits.
100+
Uses index-based recursive approach to avoid copying slices.
101+
102+
Returns a list of (payload_bytes, trace_count) tuples, where each payload contains
103+
as many traces as possible without exceeding _MAX_PAYLOAD_SIZE.
104+
"""
105+
if not traces:
106+
return []
107+
91108
new_parent_session_span_id = self._get_parent_session(traces)
92-
is_not_xdist_worker = os.getenv("PYTEST_XDIST_WORKER") is None
93-
normalized_spans = [
94-
self._convert_span(span, trace[0].context.dd_origin, new_parent_session_span_id)
95-
for trace in traces
96-
for span in trace
97-
if (is_not_xdist_worker or span.get_tag(EVENT_TYPE) != SESSION_TYPE)
98-
]
99-
if not normalized_spans:
109+
return self._build_payloads_recursive(traces, 0, len(traces), new_parent_session_span_id)
110+
111+
def _build_payloads_recursive(
112+
self, traces: List[List[Span]], start_idx: int, end_idx: int, new_parent_session_span_id: int
113+
) -> List[Tuple[Optional[bytes], int]]:
114+
"""
115+
Recursively build payloads using start/end indexes to avoid slice copying.
116+
117+
Args:
118+
traces: Full list of traces
119+
start_idx: Start index (inclusive)
120+
end_idx: End index (exclusive)
121+
new_parent_session_span_id: Parent session span ID
122+
123+
Returns:
124+
List of (payload_bytes, trace_count) tuples
125+
"""
126+
if start_idx >= end_idx:
100127
return []
101-
record_endpoint_payload_events_count(endpoint=ENDPOINT.TEST_CYCLE, count=len(normalized_spans))
102128

103-
# TODO: Split the events in several payloads as needed to avoid hitting the intake's maximum payload size.
104-
return [
105-
(
106-
CIVisibilityEncoderV01._pack_payload(
107-
{"version": self.PAYLOAD_FORMAT_VERSION, "metadata": self._metadata, "events": normalized_spans}
108-
),
109-
len(traces),
110-
)
111-
]
129+
trace_count = end_idx - start_idx
130+
131+
# Convert traces to spans with filtering (using indexes)
132+
all_spans_with_trace_info = self._convert_traces_to_spans_indexed(
133+
traces, start_idx, end_idx, new_parent_session_span_id
134+
)
135+
136+
# Get all spans (flattened)
137+
all_spans = [span for _, trace_spans in all_spans_with_trace_info for span in trace_spans]
138+
139+
if not all_spans:
140+
log.debug("No spans to encode after filtering, skipping chunk")
141+
return []
142+
143+
# Try to create payload from all spans
144+
payload = self._create_payload_from_spans(all_spans)
145+
146+
if len(payload) <= self._MAX_PAYLOAD_SIZE or trace_count == 1:
147+
# Payload fits or we can't split further (single trace)
148+
record_endpoint_payload_events_count(endpoint=self.ENDPOINT_TYPE, count=len(all_spans))
149+
return [(payload, trace_count)]
150+
else:
151+
# Payload is too large, split in half recursively
152+
mid_idx = start_idx + (trace_count + 1) // 2
153+
154+
# Process both halves recursively
155+
left_payloads = self._build_payloads_recursive(traces, start_idx, mid_idx, new_parent_session_span_id)
156+
right_payloads = self._build_payloads_recursive(traces, mid_idx, end_idx, new_parent_session_span_id)
157+
158+
# Combine results
159+
return left_payloads + right_payloads
160+
161+
def _convert_traces_to_spans_indexed(
162+
self, traces: List[List[Span]], start_idx: int, end_idx: int, new_parent_session_span_id: int
163+
) -> List[Tuple[int, List[Dict[str, Any]]]]:
164+
"""Convert traces to spans with xdist filtering applied, using indexes to avoid slicing."""
165+
all_spans_with_trace_info = []
166+
for trace_idx in range(start_idx, end_idx):
167+
trace = traces[trace_idx]
168+
trace_spans = [
169+
self._convert_span(span, trace[0].context.dd_origin, new_parent_session_span_id)
170+
for span in trace
171+
if (not self._is_xdist_worker) or (span.get_tag(EVENT_TYPE) != SESSION_TYPE)
172+
]
173+
all_spans_with_trace_info.append((trace_idx, trace_spans))
174+
175+
return all_spans_with_trace_info
176+
177+
def _create_payload_from_spans(self, spans: List[Dict[str, Any]]) -> bytes:
178+
"""Create a payload from the given spans."""
179+
return CIVisibilityEncoderV01._pack_payload(
180+
{
181+
"version": self.PAYLOAD_FORMAT_VERSION,
182+
"metadata": self._metadata,
183+
"events": spans,
184+
}
185+
)
112186

113187
@staticmethod
114188
def _pack_payload(payload):
115189
return msgpack_packb(payload)
116190

117-
def _convert_span(self, span, dd_origin, new_parent_session_span_id=0):
118-
# type: (Span, Optional[str], Optional[int]) -> Dict[str, Any]
191+
def _convert_span(
192+
self, span: Span, dd_origin: Optional[str] = None, new_parent_session_span_id: int = 0
193+
) -> Dict[str, Any]:
119194
sp = JSONEncoderV2._span_to_dict(span)
120195
sp = JSONEncoderV2._normalize_span(sp)
121196
sp["type"] = span.get_tag(EVENT_TYPE) or span.span_type
@@ -183,18 +258,17 @@ class CIVisibilityCoverageEncoderV02(CIVisibilityEncoderV01):
183258
def _set_itr_suite_skipping_mode(self, new_value):
184259
self.itr_suite_skipping_mode = new_value
185260

186-
def put(self, spans):
261+
def put(self, item):
187262
spans_with_coverage = [
188263
span
189-
for span in spans
264+
for span in item
190265
if COVERAGE_TAG_NAME in span.get_tags() or span.get_struct_tag(COVERAGE_TAG_NAME) is not None
191266
]
192267
if not spans_with_coverage:
193268
raise NoEncodableSpansError()
194269
return super(CIVisibilityCoverageEncoderV02, self).put(spans_with_coverage)
195270

196-
def _build_coverage_attachment(self, data):
197-
# type: (bytes) -> List[bytes]
271+
def _build_coverage_attachment(self, data: bytes) -> List[bytes]:
198272
return [
199273
b"--%s" % self.boundary.encode("utf-8"),
200274
b'Content-Disposition: form-data; name="coverage1"; filename="coverage1.msgpack"',
@@ -203,8 +277,7 @@ def _build_coverage_attachment(self, data):
203277
data,
204278
]
205279

206-
def _build_event_json_attachment(self):
207-
# type: () -> List[bytes]
280+
def _build_event_json_attachment(self) -> List[bytes]:
208281
return [
209282
b"--%s" % self.boundary.encode("utf-8"),
210283
b'Content-Disposition: form-data; name="event"; filename="event.json"',
@@ -213,18 +286,16 @@ def _build_event_json_attachment(self):
213286
b'{"dummy":true}',
214287
]
215288

216-
def _build_body(self, data):
217-
# type: (bytes) -> List[bytes]
289+
def _build_body(self, data: bytes) -> List[bytes]:
218290
return (
219291
self._build_coverage_attachment(data)
220292
+ self._build_event_json_attachment()
221293
+ [b"--%s--" % self.boundary.encode("utf-8")]
222294
)
223295

224-
def _build_data(self, traces):
225-
# type: (List[List[Span]]) -> Optional[bytes]
296+
def _build_data(self, traces: List[List[Span]]) -> Optional[bytes]:
226297
normalized_covs = [
227-
self._convert_span(span, "")
298+
self._convert_span(span)
228299
for trace in traces
229300
for span in trace
230301
if (COVERAGE_TAG_NAME in span.get_tags() or span.get_struct_tag(COVERAGE_TAG_NAME) is not None)
@@ -235,17 +306,17 @@ def _build_data(self, traces):
235306
# TODO: Split the events in several payloads as needed to avoid hitting the intake's maximum payload size.
236307
return msgpack_packb({"version": self.PAYLOAD_FORMAT_VERSION, "coverages": normalized_covs})
237308

238-
def _build_payload(self, traces):
239-
# type: (List[List[Span]]) -> List[Tuple[Optional[bytes], int]]
309+
def _build_payload(self, traces: List[List[Span]]) -> List[Tuple[Optional[bytes], int]]:
240310
data = self._build_data(traces)
241311
if not data:
242312
return []
243-
return [(b"\r\n".join(self._build_body(data)), len(traces))]
313+
return [(b"\r\n".join(self._build_body(data)), len(data))]
244314

245-
def _convert_span(self, span, dd_origin, new_parent_session_span_id=0):
246-
# type: (Span, Optional[str], Optional[int]) -> Dict[str, Any]
315+
def _convert_span(
316+
self, span: Span, dd_origin: Optional[str] = None, new_parent_session_span_id: int = 0
317+
) -> Dict[str, Any]:
247318
# DEV: new_parent_session_span_id is unused here, but it is used in super class
248-
files: Dict[str, Any] = {}
319+
files: dict[str, Any] = {}
249320

250321
files_struct_tag_value = span.get_struct_tag(COVERAGE_TAG_NAME)
251322
if files_struct_tag_value is not None and "files" in files_struct_tag_value:

0 commit comments

Comments
 (0)