Skip to content
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
cd46d89
change type of return for encode()
gnufede Jul 17, 2025
987a9fe
read from encode as a list of 1 tuple
gnufede Jul 17, 2025
72db8d2
adjust msgpack encoders
gnufede Jul 17, 2025
e530dc6
None, 0 case
gnufede Jul 17, 2025
cf5f3d9
update some places
gnufede Jul 17, 2025
8caf2ea
print riot command if tests failed
gnufede Jul 17, 2025
3c06386
fix some tests
gnufede Jul 17, 2025
73fdeb2
preserve exit status
gnufede Jul 17, 2025
32ca193
fix metadata_included test
gnufede Jul 17, 2025
f52be9c
fix other tests
gnufede Jul 17, 2025
00ef9ea
Merge branch 'main' into gnufede/encoder-returns-list
gnufede Jul 17, 2025
1076cb1
remove unrelated change
gnufede Jul 17, 2025
1d5a1b5
missing piece
gnufede Jul 18, 2025
e5bda2e
Merge branch 'main' into gnufede/encoder-returns-list
gnufede Jul 18, 2025
d696391
return empty list instead of None, 0
gnufede Jul 18, 2025
8ff147a
rename results to traces
gnufede Jul 18, 2025
84e09d9
lint, and s/results/traces
gnufede Jul 18, 2025
33b2409
Merge branch 'main' into gnufede/encoder-returns-list
gnufede Jul 18, 2025
eff9d15
revert change
gnufede Jul 18, 2025
2fa704a
Merge branch 'main' into gnufede/encoder-returns-list
gnufede Jul 18, 2025
f8af5e9
use walrus operator
gnufede Jul 18, 2025
0c6ba8d
specific empty traces case
gnufede Jul 18, 2025
f2ef944
iterate over the traces, add unit tests
gnufede Jul 21, 2025
c283c15
Merge branch 'main' into gnufede/encoder-returns-list
gnufede Jul 21, 2025
cd613f5
remove commented line
gnufede Jul 21, 2025
fc5fa2d
fix new test
gnufede Jul 21, 2025
8124f0d
Merge branch 'main' into gnufede/encoder-returns-list
gnufede Jul 21, 2025
3baae61
make build_payload have the new signature
gnufede Jul 21, 2025
b37c35b
simplify logic, adjust tests
gnufede Jul 21, 2025
696ffd8
Merge branch 'main' into gnufede/encoder-returns-list
gnufede Jul 21, 2025
cb0a5f3
Merge branch 'main' into gnufede/encoder-returns-list
gnufede Jul 21, 2025
906e5a0
wrap exception for gzip compress
gnufede Jul 21, 2025
48448e1
add regression test
gnufede Jul 21, 2025
6d5f66d
Merge branch 'main' into gnufede/encoder-returns-list
gnufede Jul 21, 2025
7b4e6a5
Merge branch 'main' into gnufede/encoder-returns-list
gnufede Jul 21, 2025
50ac7e8
Merge branch 'main' into gnufede/encoder-returns-list
gnufede Jul 22, 2025
a2cc920
empty commit
gnufede Jul 22, 2025
13d8f8d
Merge branch 'main' into gnufede/encoder-returns-list
gnufede Jul 22, 2025
46d3dca
Merge branch 'main' into gnufede/encoder-returns-list
gnufede Jul 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ddtrace/internal/_encoding.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class BufferedEncoder(object):
def __init__(self, max_size: int, max_item_size: int) -> None: ...
def __len__(self) -> int: ...
def put(self, item: Any) -> None: ...
def encode(self) -> Tuple[Optional[bytes], int]: ...
def encode(self) -> List[Tuple[Optional[bytes], int]]: ...
@property
def size(self) -> int: ...

Expand Down
6 changes: 3 additions & 3 deletions ddtrace/internal/_encoding.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ cdef class MsgpackEncoderBase(BufferedEncoder):
cpdef encode(self):
with self._lock:
if not self._count:
return None, 0
return []

return self.flush()

Expand Down Expand Up @@ -575,7 +575,7 @@ cdef class MsgpackEncoderV04(MsgpackEncoderBase):
cpdef flush(self):
with self._lock:
try:
return self.get_bytes(), len(self)
return [(self.get_bytes(), len(self))]
finally:
self._reset_buffer()

Expand Down Expand Up @@ -994,7 +994,7 @@ cdef class MsgpackEncoderV05(MsgpackEncoderBase):
PyLong_FromLong(<long> self.get_buffer()),
<Py_ssize_t> super(MsgpackEncoderV05, self).size,
)
return self._st.flush(), len(self)
return [(self._st.flush(), len(self))]
finally:
self._reset_buffer()

Expand Down
30 changes: 18 additions & 12 deletions ddtrace/internal/ci_visibility/encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from typing import Dict # noqa:F401
from typing import List # noqa:F401
from typing import Optional # noqa:F401
from typing import Tuple # noqa:F401

from ddtrace._trace.span import Span # noqa:F401

Expand Down Expand Up @@ -73,11 +74,10 @@ def encode_traces(self, traces):
def encode(self):
with self._lock:
with StopWatch() as sw:
payload = self._build_payload(self.buffer)
result_payloads = self._build_payload(self.buffer)
record_endpoint_payload_events_serialization_time(endpoint=self.ENDPOINT_TYPE, seconds=sw.elapsed())
buffer_size = len(self.buffer)
self._init_buffer()
return payload, buffer_size
return result_payloads

def _get_parent_session(self, traces):
for trace in traces:
Expand All @@ -87,6 +87,7 @@ def _get_parent_session(self, traces):
return 0

def _build_payload(self, traces):
# type: (List[List[Span]]) -> List[Tuple[Optional[bytes], int]]
new_parent_session_span_id = self._get_parent_session(traces)
is_not_xdist_worker = os.getenv("PYTEST_XDIST_WORKER") is None
normalized_spans = [
Expand All @@ -96,20 +97,25 @@ def _build_payload(self, traces):
if (is_not_xdist_worker or span.get_tag(EVENT_TYPE) != SESSION_TYPE)
]
if not normalized_spans:
return None
return []
record_endpoint_payload_events_count(endpoint=ENDPOINT.TEST_CYCLE, count=len(normalized_spans))

# TODO: Split the events in several payloads as needed to avoid hitting the intake's maximum payload size.
return CIVisibilityEncoderV01._pack_payload(
{"version": self.PAYLOAD_FORMAT_VERSION, "metadata": self._metadata, "events": normalized_spans}
)
return [
(
CIVisibilityEncoderV01._pack_payload(
{"version": self.PAYLOAD_FORMAT_VERSION, "metadata": self._metadata, "events": normalized_spans}
),
len(traces),
)
]

@staticmethod
def _pack_payload(payload):
return msgpack_packb(payload)

def _convert_span(self, span, dd_origin, new_parent_session_span_id=0):
# type: (Span, str, Optional[int]) -> Dict[str, Any]
# type: (Span, Optional[str], Optional[int]) -> Dict[str, Any]
sp = JSONEncoderV2._span_to_dict(span)
sp = JSONEncoderV2._normalize_span(sp)
sp["type"] = span.get_tag(EVENT_TYPE) or span.span_type
Expand Down Expand Up @@ -230,14 +236,14 @@ def _build_data(self, traces):
return msgpack_packb({"version": self.PAYLOAD_FORMAT_VERSION, "coverages": normalized_covs})

def _build_payload(self, traces):
# type: (List[List[Span]]) -> Optional[bytes]
# type: (List[List[Span]]) -> List[Tuple[Optional[bytes], int]]
data = self._build_data(traces)
if not data:
return None
return b"\r\n".join(self._build_body(data))
return []
return [(b"\r\n".join(self._build_body(data)), len(traces))]

def _convert_span(self, span, dd_origin, new_parent_session_span_id=0):
# type: (Span, str, Optional[int]) -> Dict[str, Any]
# type: (Span, Optional[str], Optional[int]) -> Dict[str, Any]
# DEV: new_parent_session_span_id is unused here, but it is used in super class
files: Dict[str, Any] = {}

Expand Down
34 changes: 21 additions & 13 deletions ddtrace/internal/writer/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,27 +385,35 @@ def flush_queue(self, raise_exc: bool = False):
def _flush_queue_with_client(self, client: WriterClientBase, raise_exc: bool = False) -> None:
n_traces = len(client.encoder)
try:
encoded, n_traces = client.encoder.encode()

if encoded is None:
if not (encoded_traces := client.encoder.encode()):
return

# Should gzip the payload if intake accepts it
if self._intake_accepts_gzip:
original_size = len(encoded)
# Replace the value to send with the gzipped the value
encoded = gzip.compress(encoded, compresslevel=6)
log.debug("Original size in bytes: %s, Compressed size: %s", original_size, len(encoded))

# And add the header
self._headers["Content-Encoding"] = "gzip"

except Exception:
# FIXME(munir): if client.encoder raises an Exception n_traces may not be accurate due to race conditions
log.error("failed to encode trace with encoder %r", client.encoder, exc_info=True)
self._metrics_dist("encoder.dropped.traces", n_traces)
return

for payload in encoded_traces:
encoded_data, n_traces = payload
self._flush_single_payload(encoded_data, n_traces, client=client, raise_exc=raise_exc)

def _flush_single_payload(
self, encoded: Optional[bytes], n_traces: int, client: WriterClientBase, raise_exc: bool = False
) -> None:
if encoded is None:
return

# Should gzip the payload if intake accepts it
if self._intake_accepts_gzip:
original_size = len(encoded)
# Replace the value to send with the gzipped the value
encoded = gzip.compress(encoded, compresslevel=6)
log.debug("Original size in bytes: %s, Compressed size: %s", original_size, len(encoded))

# And add the header
self._headers["Content-Encoding"] = "gzip"

try:
self._send_payload_with_backoff(encoded, n_traces, client)
except Exception:
Expand Down
8 changes: 6 additions & 2 deletions tests/ci_visibility/test_ci_visibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -1417,7 +1417,9 @@ def tearDown(self):
def assert_test_session_name(self, name):
"""Check that the payload metadata contains the test session name attributes."""
payload = msgpack.loads(
CIVisibility._instance.tracer._span_aggregator.writer._clients[0].encoder._build_payload([[Span("foo")]])
CIVisibility._instance.tracer._span_aggregator.writer._clients[0].encoder._build_payload([[Span("foo")]])[
0
][0]
)
assert payload["metadata"]["test_session_end"] == {"test_session.name": name}
assert payload["metadata"]["test_suite_end"] == {"test_session.name": name}
Expand Down Expand Up @@ -1493,7 +1495,9 @@ def test_set_library_capabilities(self):
)

payload = msgpack.loads(
CIVisibility._instance.tracer._span_aggregator.writer._clients[0].encoder._build_payload([[Span("foo")]])
CIVisibility._instance.tracer._span_aggregator.writer._clients[0].encoder._build_payload([[Span("foo")]])[
0
][0]
)
assert payload["metadata"]["test"] == {
"_dd.library_capabilities.early_flake_detection": "1",
Expand Down
57 changes: 38 additions & 19 deletions tests/ci_visibility/test_encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ def test_encode_traces_civisibility_v0():
encoder.set_metadata("*", {"language": "python"})
for trace in traces:
encoder.put(trace)
payload, num_traces = encoder.encode()
encoded_traces = encoder.encode()
assert encoded_traces, "Expected encoded traces but got empty list"
[(payload, num_traces)] = encoded_traces
assert num_traces == 3
assert isinstance(payload, bytes)
decoded = msgpack.unpackb(payload, raw=True, strict_map_key=False)
Expand Down Expand Up @@ -103,23 +105,22 @@ def test_encode_traces_civisibility_v0():
assert expected_event == received_event


def test_encode_traces_civisibility_v0_no_traces():
def test_encode_traces_civisibility_v01_no_traces():
encoder = CIVisibilityEncoderV01(0, 0)
encoder.set_metadata("*", {"language": "python"})
payload, _ = encoder.encode()
assert payload is None
encoded_traces = encoder.encode()
assert encoded_traces == [], "Expected empty list when no traces"


def test_encode_traces_civisibility_v0_empty_traces():
def test_encode_traces_civisibility_v01_empty_traces():
traces = [[], []]

encoder = CIVisibilityEncoderV01(0, 0)
encoder.set_metadata("*", {"language": "python"})
for trace in traces:
encoder.put(trace)
payload, size = encoder.encode()
assert size == 2
assert payload is None
encoded_traces = encoder.encode()
assert encoded_traces == [], "Expected empty list when no content"


def test_encode_traces_civisibility_v2_coverage_per_test():
Expand Down Expand Up @@ -160,7 +161,9 @@ def test_encode_traces_civisibility_v2_coverage_per_test():
}
assert expected_cov == received_covs[0]

complete_payload, _ = encoder.encode()
encoded_traces = encoder.encode()
assert encoded_traces, "Expected encoded traces but got empty list"
[(complete_payload, _)] = encoded_traces
assert isinstance(complete_payload, bytes)
payload_per_line = complete_payload.split(b"\r\n")
assert len(payload_per_line) == 11
Expand Down Expand Up @@ -200,7 +203,9 @@ def test_encode_traces_civisibility_v2_coverage_per_suite():
encoder.put(trace)

payload = encoder._build_data(traces)
complete_payload, _ = encoder.encode()
encoded_traces = encoder.encode()
assert encoded_traces, "Expected encoded traces but got empty list"
[(complete_payload, _)] = encoded_traces
assert isinstance(payload, bytes)
decoded = msgpack.unpackb(payload, raw=True, strict_map_key=False)
assert decoded[b"version"] == 2
Expand Down Expand Up @@ -255,8 +260,8 @@ def test_encode_traces_civisibility_v2_coverage_empty_traces():
payload = encoder._build_data(traces)
assert payload is None

complete_payload, _ = encoder.encode()
assert complete_payload is None
encoded_traces = encoder.encode()
assert encoded_traces == [], "Expected empty list when payload is None"


class PytestEncodingTestCase(PytestTestCaseBase):
Expand All @@ -280,7 +285,9 @@ def test_ok():
span.set_tag(ITR_CORRELATION_ID_TAG_NAME, "encodertestcorrelationid")
ci_agentless_encoder = CIVisibilityEncoderV01(0, 0)
ci_agentless_encoder.put(spans)
event_payload, _ = ci_agentless_encoder.encode()
encoded_traces = ci_agentless_encoder.encode()
assert encoded_traces, "Expected encoded traces but got empty list"
[(event_payload, _)] = encoded_traces
decoded_event_payload = self.tracer.encoder._decode(event_payload)
given_test_span = spans[0]
given_test_event = decoded_event_payload[b"events"][0]
Expand Down Expand Up @@ -341,7 +348,9 @@ def test_ok():
span.set_tag(ITR_CORRELATION_ID_TAG_NAME, "encodertestcorrelationid")
ci_agentless_encoder = CIVisibilityEncoderV01(0, 0)
ci_agentless_encoder.put(spans)
event_payload, _ = ci_agentless_encoder.encode()
encoded_traces = ci_agentless_encoder.encode()
assert encoded_traces, "Expected encoded traces but got empty list"
[(event_payload, _)] = encoded_traces
decoded_event_payload = self.tracer.encoder._decode(event_payload)
given_test_suite_span = spans[3]
assert given_test_suite_span.get_tag("type") == "test_suite_end"
Expand Down Expand Up @@ -397,7 +406,9 @@ def test_module_event_payload(self):
spans = self.pop_spans()
ci_agentless_encoder = CIVisibilityEncoderV01(0, 0)
ci_agentless_encoder.put(spans)
event_payload, _ = ci_agentless_encoder.encode()
encoded_traces = ci_agentless_encoder.encode()
assert encoded_traces, "Expected encoded traces but got empty list"
[(event_payload, _)] = encoded_traces
decoded_event_payload = self.tracer.encoder._decode(event_payload)
given_test_module_span = spans[2]
given_test_module_event = decoded_event_payload[b"events"][2]
Expand Down Expand Up @@ -448,7 +459,9 @@ def test_ok():
spans = self.pop_spans()
ci_agentless_encoder = CIVisibilityEncoderV01(0, 0)
ci_agentless_encoder.put(spans)
event_payload, _ = ci_agentless_encoder.encode()
encoded_traces = ci_agentless_encoder.encode()
assert encoded_traces, "Expected encoded traces but got empty list"
[(event_payload, _)] = encoded_traces
decoded_event_payload = self.tracer.encoder._decode(event_payload)
given_test_session_span = spans[1]
given_test_session_event = decoded_event_payload[b"events"][1]
Expand Down Expand Up @@ -543,7 +556,9 @@ def test_xdist_worker_session_filtering(mock_xdist_worker_env):

for trace in traces:
encoder.put(trace)
payload, num_traces = encoder.encode()
encoded_traces = encoder.encode()
assert encoded_traces, "Expected encoded traces but got empty list"
[(payload, num_traces)] = encoded_traces

assert num_traces == 1
assert isinstance(payload, bytes)
Expand All @@ -570,7 +585,9 @@ def test_xdist_non_worker_includes_session(mock_no_xdist_worker_env):

for trace in traces:
encoder.put(trace)
payload, num_traces = encoder.encode()
encoded_traces = encoder.encode()
assert encoded_traces, "Expected encoded traces but got empty list"
[(payload, num_traces)] = encoded_traces

assert num_traces == 1
assert isinstance(payload, bytes)
Expand Down Expand Up @@ -652,7 +669,9 @@ def test_full_encoding_with_parent_session_override():

for trace in traces:
encoder.put(trace)
payload, num_traces = encoder.encode()
encoded_traces = encoder.encode()
assert encoded_traces, "Expected encoded traces but got empty list"
[(payload, num_traces)] = encoded_traces

assert num_traces == 1
assert isinstance(payload, bytes)
Expand Down
4 changes: 3 additions & 1 deletion tests/ci_visibility/test_is_user_provided_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ def test_is_user_provided_service_false(self):
class IsUserProvidedServiceTestTagTestCase(SubprocessTestCase):
def assert_is_user_provided_service_equals(self, value):
payload = msgpack.loads(
CIVisibility._instance.tracer._span_aggregator.writer._clients[0].encoder._build_payload([[Span("foo")]])
CIVisibility._instance.tracer._span_aggregator.writer._clients[0].encoder._build_payload([[Span("foo")]])[
0
][0]
)
assert payload["metadata"]["*"]["_dd.test.is_user_provided_service"] == value

Expand Down
8 changes: 6 additions & 2 deletions tests/contrib/pytest/test_pytest.py
Original file line number Diff line number Diff line change
Expand Up @@ -777,15 +777,19 @@ def test_service(ddtracer):
# Check if spans tagged with dd_origin after encoding and decoding as the tagging occurs at encode time
encoder = self.tracer.encoder
encoder.put(spans)
trace, _ = encoder.encode()
encoded_results = encoder.encode()
assert encoded_results, "Expected encoded traces but got empty list"
[(trace, _)] = encoded_results
(decoded_trace,) = self.tracer.encoder._decode(trace)
assert len(decoded_trace) == 7
for span in decoded_trace:
assert span[b"meta"][b"_dd.origin"] == b"ciapp-test"

ci_agentless_encoder = CIVisibilityEncoderV01(0, 0)
ci_agentless_encoder.put(spans)
event_payload, _ = ci_agentless_encoder.encode()
encoded_results = ci_agentless_encoder.encode()
assert encoded_results, "Expected encoded traces but got empty list"
[(event_payload, _)] = encoded_results
decoded_event_payload = self.tracer.encoder._decode(event_payload)
assert len(decoded_event_payload[b"events"]) == 7
for event in decoded_event_payload[b"events"]:
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ def test_trace_with_non_bytes_payload_logs_payload_when_LOG_ERROR_PAYLOADS():

class NonBytesBadEncoder(BadEncoder):
def encode(self):
return "bad_payload", 1
return [("bad_payload", 1)]

def encode_traces(self, traces):
return "bad_payload"
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def put(self, trace):
pass

def encode(self):
return b"bad_payload", 0
return [(b"bad_payload", 0)]

def encode_traces(self, traces):
return b"bad_payload"
Expand Down
Loading
Loading