Skip to content

chore(ci_visibility): proper payload split by size #14065

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 63 commits into from
Jul 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
cd46d89
change type of return for encode()
gnufede Jul 17, 2025
987a9fe
read from encode as a list of 1 tuple
gnufede Jul 17, 2025
72db8d2
adjust msgpack encoders
gnufede Jul 17, 2025
e530dc6
None, 0 case
gnufede Jul 17, 2025
cf5f3d9
update some places
gnufede Jul 17, 2025
8caf2ea
print riot command if tests failed
gnufede Jul 17, 2025
3c06386
fix some tests
gnufede Jul 17, 2025
73fdeb2
preserve exit status
gnufede Jul 17, 2025
32ca193
fix metadata_included test
gnufede Jul 17, 2025
f52be9c
fix other tests
gnufede Jul 17, 2025
00ef9ea
Merge branch 'main' into gnufede/encoder-returns-list
gnufede Jul 17, 2025
1076cb1
remove unrelated change
gnufede Jul 17, 2025
1d5a1b5
missing piece
gnufede Jul 18, 2025
e5bda2e
Merge branch 'main' into gnufede/encoder-returns-list
gnufede Jul 18, 2025
d696391
return empty list instead of None, 0
gnufede Jul 18, 2025
8ff147a
rename results to traces
gnufede Jul 18, 2025
84e09d9
lint, and s/results/traces
gnufede Jul 18, 2025
33b2409
Merge branch 'main' into gnufede/encoder-returns-list
gnufede Jul 18, 2025
eff9d15
revert change
gnufede Jul 18, 2025
2fa704a
Merge branch 'main' into gnufede/encoder-returns-list
gnufede Jul 18, 2025
f8af5e9
use walrus operator
gnufede Jul 18, 2025
0c6ba8d
specific empty traces case
gnufede Jul 18, 2025
d9322d4
payload splitting in ci_visibility
gnufede Jul 18, 2025
27f78b5
mypy
gnufede Jul 18, 2025
4db8ef2
fix test
gnufede Jul 18, 2025
f2ef944
iterate over the traces, add unit tests
gnufede Jul 21, 2025
c283c15
Merge branch 'main' into gnufede/encoder-returns-list
gnufede Jul 21, 2025
cd613f5
remove commented line
gnufede Jul 21, 2025
fc5fa2d
fix new test
gnufede Jul 21, 2025
8124f0d
Merge branch 'main' into gnufede/encoder-returns-list
gnufede Jul 21, 2025
40da814
Merge branch 'gnufede/encoder-returns-list' into gnufede/payload-split-1
gnufede Jul 21, 2025
a32e552
change to use indexes
gnufede Jul 21, 2025
b5928fc
fix and code remove
gnufede Jul 21, 2025
ba10fb9
disable encode_traces() in ci visibility encoder
gnufede Jul 21, 2025
439dd3b
remove redundant check
gnufede Jul 21, 2025
3eaa8b6
fix
gnufede Jul 21, 2025
715bd03
fix test
gnufede Jul 21, 2025
4a7db2f
fix tests
gnufede Jul 21, 2025
3baae61
make build_payload have the new signature
gnufede Jul 21, 2025
b37c35b
simplify logic, adjust tests
gnufede Jul 21, 2025
696ffd8
Merge branch 'main' into gnufede/encoder-returns-list
gnufede Jul 21, 2025
4cba8f2
Merge branch 'gnufede/encoder-returns-list' into gnufede/payload-split-1
gnufede Jul 21, 2025
fd2274a
remove unrelated change
gnufede Jul 21, 2025
6d658cd
empty list of payloads expected
gnufede Jul 22, 2025
f19fbd8
empty commit
gnufede Jul 22, 2025
d9c054e
Merge branch 'main' into gnufede/payload-split-1
gnufede Jul 22, 2025
7e95207
fix bad merge
gnufede Jul 22, 2025
6f52184
typing
gnufede Jul 22, 2025
917619d
Merge branch 'main' into gnufede/payload-split-1
gnufede Jul 22, 2025
a9bbb94
better types
gnufede Jul 22, 2025
76ff0f5
better types yet
gnufede Jul 22, 2025
404bc5e
future import
gnufede Jul 22, 2025
16a4aac
Merge branch 'main' into gnufede/payload-split-1
gnufede Jul 22, 2025
5826860
Merge branch 'main' into gnufede/payload-split-1
gnufede Jul 23, 2025
22f9e58
replace unittest mock with mock
gnufede Jul 23, 2025
64fdf3d
suggestions from code review
gnufede Jul 23, 2025
6987ce3
change optional[bytes] to bytes
gnufede Jul 24, 2025
26d52d1
keep optional[bytes]
gnufede Jul 24, 2025
aabc44d
avoid using extend unnecessary
gnufede Jul 24, 2025
0ec66c1
remove future import
gnufede Jul 24, 2025
18300a1
all the type hints now
gnufede Jul 24, 2025
ea10fe6
Revert "remove future import"
gnufede Jul 24, 2025
3d932b6
Merge branch 'main' into gnufede/payload-split-1
gnufede Jul 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 126 additions & 55 deletions ddtrace/internal/ci_visibility/encoder.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
from __future__ import annotations

import json
import os
import threading
from typing import TYPE_CHECKING # noqa:F401
from typing import Any # noqa:F401
from typing import Dict # noqa:F401
from typing import List # noqa:F401
from typing import Optional # noqa:F401
from typing import Tuple # noqa:F401
from uuid import uuid4

from ddtrace.ext import SpanTypes
Expand All @@ -28,12 +35,6 @@
log = get_logger(__name__)

if TYPE_CHECKING: # pragma: no cover
from typing import Any # noqa:F401
from typing import Dict # noqa:F401
from typing import List # noqa:F401
from typing import Optional # noqa:F401
from typing import Tuple # noqa:F401

from ddtrace._trace.span import Span # noqa:F401


Expand All @@ -43,79 +44,153 @@ class CIVisibilityEncoderV01(BufferedEncoder):
TEST_SUITE_EVENT_VERSION = 1
TEST_EVENT_VERSION = 2
ENDPOINT_TYPE = ENDPOINT.TEST_CYCLE
_MAX_PAYLOAD_SIZE = 5 * 1024 * 1024 # 5MB

def __init__(self, *args):
# DEV: args are not used here, but are used by BufferedEncoder's __cinit__() method,
# which is called implicitly by Cython.
super(CIVisibilityEncoderV01, self).__init__()
self._metadata: Dict[str, Dict[str, str]] = {}
self._lock = threading.RLock()
self._metadata = {}
self._is_xdist_worker = os.getenv("PYTEST_XDIST_WORKER") is not None
self._init_buffer()

def __len__(self):
with self._lock:
return len(self.buffer)

def set_metadata(self, event_type, metadata):
# type: (str, Dict[str, str]) -> None
def set_metadata(self, event_type: str, metadata: Dict[str, str]):
self._metadata.setdefault(event_type, {}).update(metadata)

def _init_buffer(self):
with self._lock:
self.buffer = []

def put(self, spans):
def put(self, item):
with self._lock:
self.buffer.append(spans)
self.buffer.append(item)

def encode_traces(self, traces):
return self._build_payload(traces=traces)
"""
Only used for LogWriter, not called for CI Visibility currently
"""
raise NotImplementedError()

def encode(self):
def encode(self) -> List[Tuple[Optional[bytes], int]]:
with self._lock:
if not self.buffer:
return []
payloads = []
with StopWatch() as sw:
result_payloads = self._build_payload(self.buffer)
payloads = self._build_payload(self.buffer)
record_endpoint_payload_events_serialization_time(endpoint=self.ENDPOINT_TYPE, seconds=sw.elapsed())
self._init_buffer()
return result_payloads
return payloads

def _get_parent_session(self, traces):
def _get_parent_session(self, traces: List[List[Span]]) -> int:
for trace in traces:
for span in trace:
if span.get_tag(EVENT_TYPE) == SESSION_TYPE and span.parent_id is not None and span.parent_id != 0:
return span.parent_id
return 0

def _build_payload(self, traces):
# type: (List[List[Span]]) -> List[Tuple[Optional[bytes], int]]
def _build_payload(self, traces: List[List[Span]]) -> List[Tuple[Optional[bytes], int]]:
"""
Build multiple payloads from traces, splitting when necessary to stay under size limits.
Uses index-based recursive approach to avoid copying slices.

Returns a list of (payload_bytes, trace_count) tuples, where each payload contains
as many traces as possible without exceeding _MAX_PAYLOAD_SIZE.
"""
if not traces:
return []

new_parent_session_span_id = self._get_parent_session(traces)
is_not_xdist_worker = os.getenv("PYTEST_XDIST_WORKER") is None
normalized_spans = [
self._convert_span(span, trace[0].context.dd_origin, new_parent_session_span_id)
for trace in traces
for span in trace
if (is_not_xdist_worker or span.get_tag(EVENT_TYPE) != SESSION_TYPE)
]
if not normalized_spans:
return self._build_payloads_recursive(traces, 0, len(traces), new_parent_session_span_id)

def _build_payloads_recursive(
self, traces: List[List[Span]], start_idx: int, end_idx: int, new_parent_session_span_id: int
) -> List[Tuple[Optional[bytes], int]]:
"""
Recursively build payloads using start/end indexes to avoid slice copying.

Args:
traces: Full list of traces
start_idx: Start index (inclusive)
end_idx: End index (exclusive)
new_parent_session_span_id: Parent session span ID

Returns:
List of (payload_bytes, trace_count) tuples
"""
if start_idx >= end_idx:
return []
record_endpoint_payload_events_count(endpoint=ENDPOINT.TEST_CYCLE, count=len(normalized_spans))

# TODO: Split the events in several payloads as needed to avoid hitting the intake's maximum payload size.
return [
(
CIVisibilityEncoderV01._pack_payload(
{"version": self.PAYLOAD_FORMAT_VERSION, "metadata": self._metadata, "events": normalized_spans}
),
len(traces),
)
]
trace_count = end_idx - start_idx

# Convert traces to spans with filtering (using indexes)
all_spans_with_trace_info = self._convert_traces_to_spans_indexed(
traces, start_idx, end_idx, new_parent_session_span_id
)

# Get all spans (flattened)
all_spans = [span for _, trace_spans in all_spans_with_trace_info for span in trace_spans]

if not all_spans:
log.debug("No spans to encode after filtering, skipping chunk")
return []

# Try to create payload from all spans
payload = self._create_payload_from_spans(all_spans)

if len(payload) <= self._MAX_PAYLOAD_SIZE or trace_count == 1:
# Payload fits or we can't split further (single trace)
record_endpoint_payload_events_count(endpoint=self.ENDPOINT_TYPE, count=len(all_spans))
return [(payload, trace_count)]
else:
# Payload is too large, split in half recursively
mid_idx = start_idx + (trace_count + 1) // 2

# Process both halves recursively
left_payloads = self._build_payloads_recursive(traces, start_idx, mid_idx, new_parent_session_span_id)
right_payloads = self._build_payloads_recursive(traces, mid_idx, end_idx, new_parent_session_span_id)

# Combine results
return left_payloads + right_payloads

def _convert_traces_to_spans_indexed(
self, traces: List[List[Span]], start_idx: int, end_idx: int, new_parent_session_span_id: int
) -> List[Tuple[int, List[Dict[str, Any]]]]:
"""Convert traces to spans with xdist filtering applied, using indexes to avoid slicing."""
all_spans_with_trace_info = []
for trace_idx in range(start_idx, end_idx):
trace = traces[trace_idx]
trace_spans = [
self._convert_span(span, trace[0].context.dd_origin, new_parent_session_span_id)
for span in trace
if (not self._is_xdist_worker) or (span.get_tag(EVENT_TYPE) != SESSION_TYPE)
]
all_spans_with_trace_info.append((trace_idx, trace_spans))

return all_spans_with_trace_info

def _create_payload_from_spans(self, spans: List[Dict[str, Any]]) -> bytes:
"""Create a payload from the given spans."""
return CIVisibilityEncoderV01._pack_payload(
{
"version": self.PAYLOAD_FORMAT_VERSION,
"metadata": self._metadata,
"events": spans,
}
)

@staticmethod
def _pack_payload(payload):
return msgpack_packb(payload)

def _convert_span(self, span, dd_origin, new_parent_session_span_id=0):
# type: (Span, Optional[str], Optional[int]) -> Dict[str, Any]
def _convert_span(
self, span: Span, dd_origin: Optional[str] = None, new_parent_session_span_id: int = 0
) -> Dict[str, Any]:
sp = JSONEncoderV2._span_to_dict(span)
sp = JSONEncoderV2._normalize_span(sp)
sp["type"] = span.get_tag(EVENT_TYPE) or span.span_type
Expand Down Expand Up @@ -183,18 +258,17 @@ class CIVisibilityCoverageEncoderV02(CIVisibilityEncoderV01):
def _set_itr_suite_skipping_mode(self, new_value):
self.itr_suite_skipping_mode = new_value

def put(self, spans):
def put(self, item):
spans_with_coverage = [
span
for span in spans
for span in item
if COVERAGE_TAG_NAME in span.get_tags() or span.get_struct_tag(COVERAGE_TAG_NAME) is not None
]
if not spans_with_coverage:
raise NoEncodableSpansError()
return super(CIVisibilityCoverageEncoderV02, self).put(spans_with_coverage)

def _build_coverage_attachment(self, data):
# type: (bytes) -> List[bytes]
def _build_coverage_attachment(self, data: bytes) -> List[bytes]:
return [
b"--%s" % self.boundary.encode("utf-8"),
b'Content-Disposition: form-data; name="coverage1"; filename="coverage1.msgpack"',
Expand All @@ -203,8 +277,7 @@ def _build_coverage_attachment(self, data):
data,
]

def _build_event_json_attachment(self):
# type: () -> List[bytes]
def _build_event_json_attachment(self) -> List[bytes]:
return [
b"--%s" % self.boundary.encode("utf-8"),
b'Content-Disposition: form-data; name="event"; filename="event.json"',
Expand All @@ -213,18 +286,16 @@ def _build_event_json_attachment(self):
b'{"dummy":true}',
]

def _build_body(self, data):
# type: (bytes) -> List[bytes]
def _build_body(self, data: bytes) -> List[bytes]:
return (
self._build_coverage_attachment(data)
+ self._build_event_json_attachment()
+ [b"--%s--" % self.boundary.encode("utf-8")]
)

def _build_data(self, traces):
# type: (List[List[Span]]) -> Optional[bytes]
def _build_data(self, traces: List[List[Span]]) -> Optional[bytes]:
normalized_covs = [
self._convert_span(span, "")
self._convert_span(span)
for trace in traces
for span in trace
if (COVERAGE_TAG_NAME in span.get_tags() or span.get_struct_tag(COVERAGE_TAG_NAME) is not None)
Expand All @@ -235,17 +306,17 @@ def _build_data(self, traces):
# TODO: Split the events in several payloads as needed to avoid hitting the intake's maximum payload size.
return msgpack_packb({"version": self.PAYLOAD_FORMAT_VERSION, "coverages": normalized_covs})

def _build_payload(self, traces):
# type: (List[List[Span]]) -> List[Tuple[Optional[bytes], int]]
def _build_payload(self, traces: List[List[Span]]) -> List[Tuple[Optional[bytes], int]]:
data = self._build_data(traces)
if not data:
return []
return [(b"\r\n".join(self._build_body(data)), len(traces))]
return [(b"\r\n".join(self._build_body(data)), len(data))]

def _convert_span(self, span, dd_origin, new_parent_session_span_id=0):
# type: (Span, Optional[str], Optional[int]) -> Dict[str, Any]
def _convert_span(
self, span: Span, dd_origin: Optional[str] = None, new_parent_session_span_id: int = 0
) -> Dict[str, Any]:
# DEV: new_parent_session_span_id is unused here, but it is used in super class
files: Dict[str, Any] = {}
files: dict[str, Any] = {}

files_struct_tag_value = span.get_struct_tag(COVERAGE_TAG_NAME)
if files_struct_tag_value is not None and "files" in files_struct_tag_value:
Expand Down
Loading
Loading