Skip to content

Commit b01b53f

Browse files
gnufedebrettlangdon
authored andcommitted
refactor: encoder.encode() to return a list of tuples (#14042)
1 parent 0a3f2c3 commit b01b53f

File tree

14 files changed

+490
-73
lines changed

14 files changed

+490
-73
lines changed

ddtrace/internal/_encoding.pyi

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ class BufferedEncoder(object):
2323
def __init__(self, max_size: int, max_item_size: int) -> None: ...
2424
def __len__(self) -> int: ...
2525
def put(self, item: Any) -> None: ...
26-
def encode(self) -> Tuple[Optional[bytes], int]: ...
26+
def encode(self) -> List[Tuple[Optional[bytes], int]]: ...
2727
@property
2828
def size(self) -> int: ...
2929

ddtrace/internal/_encoding.pyx

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -466,7 +466,7 @@ cdef class MsgpackEncoderBase(BufferedEncoder):
466466
cpdef encode(self):
467467
with self._lock:
468468
if not self._count:
469-
return None, 0
469+
return []
470470

471471
return self.flush()
472472

@@ -590,7 +590,7 @@ cdef class MsgpackEncoderV04(MsgpackEncoderBase):
590590
cpdef flush(self):
591591
with self._lock:
592592
try:
593-
return self.get_bytes(), len(self)
593+
return [(self.get_bytes(), len(self))]
594594
finally:
595595
self._reset_buffer()
596596

@@ -1009,7 +1009,7 @@ cdef class MsgpackEncoderV05(MsgpackEncoderBase):
10091009
PyLong_FromLong(<long> self.get_buffer()),
10101010
<Py_ssize_t> super(MsgpackEncoderV05, self).size,
10111011
)
1012-
return self._st.flush(), len(self)
1012+
return [(self._st.flush(), len(self))]
10131013
finally:
10141014
self._reset_buffer()
10151015

ddtrace/internal/ci_visibility/encoder.py

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
from typing import Dict # noqa:F401
3333
from typing import List # noqa:F401
3434
from typing import Optional # noqa:F401
35+
from typing import Tuple # noqa:F401
3536

3637
from ddtrace._trace.span import Span # noqa:F401
3738

@@ -73,11 +74,10 @@ def encode_traces(self, traces):
7374
def encode(self):
7475
with self._lock:
7576
with StopWatch() as sw:
76-
payload = self._build_payload(self.buffer)
77+
result_payloads = self._build_payload(self.buffer)
7778
record_endpoint_payload_events_serialization_time(endpoint=self.ENDPOINT_TYPE, seconds=sw.elapsed())
78-
buffer_size = len(self.buffer)
7979
self._init_buffer()
80-
return payload, buffer_size
80+
return result_payloads
8181

8282
def _get_parent_session(self, traces):
8383
for trace in traces:
@@ -87,6 +87,7 @@ def _get_parent_session(self, traces):
8787
return 0
8888

8989
def _build_payload(self, traces):
90+
# type: (List[List[Span]]) -> List[Tuple[Optional[bytes], int]]
9091
new_parent_session_span_id = self._get_parent_session(traces)
9192
is_not_xdist_worker = os.getenv("PYTEST_XDIST_WORKER") is None
9293
normalized_spans = [
@@ -96,20 +97,25 @@ def _build_payload(self, traces):
9697
if (is_not_xdist_worker or span.get_tag(EVENT_TYPE) != SESSION_TYPE)
9798
]
9899
if not normalized_spans:
99-
return None
100+
return []
100101
record_endpoint_payload_events_count(endpoint=ENDPOINT.TEST_CYCLE, count=len(normalized_spans))
101102

102103
# TODO: Split the events in several payloads as needed to avoid hitting the intake's maximum payload size.
103-
return CIVisibilityEncoderV01._pack_payload(
104-
{"version": self.PAYLOAD_FORMAT_VERSION, "metadata": self._metadata, "events": normalized_spans}
105-
)
104+
return [
105+
(
106+
CIVisibilityEncoderV01._pack_payload(
107+
{"version": self.PAYLOAD_FORMAT_VERSION, "metadata": self._metadata, "events": normalized_spans}
108+
),
109+
len(traces),
110+
)
111+
]
106112

107113
@staticmethod
108114
def _pack_payload(payload):
109115
return msgpack_packb(payload)
110116

111117
def _convert_span(self, span, dd_origin, new_parent_session_span_id=0):
112-
# type: (Span, str, Optional[int]) -> Dict[str, Any]
118+
# type: (Span, Optional[str], Optional[int]) -> Dict[str, Any]
113119
sp = JSONEncoderV2._span_to_dict(span)
114120
sp = JSONEncoderV2._normalize_span(sp)
115121
sp["type"] = span.get_tag(EVENT_TYPE) or span.span_type
@@ -230,14 +236,14 @@ def _build_data(self, traces):
230236
return msgpack_packb({"version": self.PAYLOAD_FORMAT_VERSION, "coverages": normalized_covs})
231237

232238
def _build_payload(self, traces):
233-
# type: (List[List[Span]]) -> Optional[bytes]
239+
# type: (List[List[Span]]) -> List[Tuple[Optional[bytes], int]]
234240
data = self._build_data(traces)
235241
if not data:
236-
return None
237-
return b"\r\n".join(self._build_body(data))
242+
return []
243+
return [(b"\r\n".join(self._build_body(data)), len(traces))]
238244

239245
def _convert_span(self, span, dd_origin, new_parent_session_span_id=0):
240-
# type: (Span, str, Optional[int]) -> Dict[str, Any]
246+
# type: (Span, Optional[str], Optional[int]) -> Dict[str, Any]
241247
# DEV: new_parent_session_span_id is unused here, but it is used in super class
242248
files: Dict[str, Any] = {}
243249

ddtrace/internal/writer/writer.py

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -385,26 +385,39 @@ def flush_queue(self, raise_exc: bool = False):
385385
def _flush_queue_with_client(self, client: WriterClientBase, raise_exc: bool = False) -> None:
386386
n_traces = len(client.encoder)
387387
try:
388-
encoded, n_traces = client.encoder.encode()
389-
390-
if encoded is None:
388+
if not (encoded_traces := client.encoder.encode()):
391389
return
392390

393-
# Should gzip the payload if intake accepts it
394-
if self._intake_accepts_gzip:
391+
except Exception:
392+
# FIXME(munir): if client.encoder raises an Exception n_traces may not be accurate due to race conditions
393+
log.error("failed to encode trace with encoder %r", client.encoder, exc_info=True)
394+
self._metrics_dist("encoder.dropped.traces", n_traces)
395+
return
396+
397+
for payload in encoded_traces:
398+
encoded_data, n_traces = payload
399+
self._flush_single_payload(encoded_data, n_traces, client=client, raise_exc=raise_exc)
400+
401+
def _flush_single_payload(
402+
self, encoded: Optional[bytes], n_traces: int, client: WriterClientBase, raise_exc: bool = False
403+
) -> None:
404+
if encoded is None:
405+
return
406+
407+
# Should gzip the payload if intake accepts it
408+
if self._intake_accepts_gzip:
409+
try:
395410
original_size = len(encoded)
396411
# Replace the value to send with the gzipped the value
397412
encoded = gzip.compress(encoded, compresslevel=6)
398413
log.debug("Original size in bytes: %s, Compressed size: %s", original_size, len(encoded))
399414

400415
# And add the header
401416
self._headers["Content-Encoding"] = "gzip"
402-
403-
except Exception:
404-
# FIXME(munir): if client.encoder raises an Exception n_traces may not be accurate due to race conditions
405-
log.error("failed to encode trace with encoder %r", client.encoder, exc_info=True)
406-
self._metrics_dist("encoder.dropped.traces", n_traces)
407-
return
417+
except Exception:
418+
log.error("failed to compress traces with encoder %r", client.encoder, exc_info=True)
419+
self._metrics_dist("encoder.dropped.traces", n_traces)
420+
return
408421

409422
try:
410423
self._send_payload_with_backoff(encoded, n_traces, client)

tests/ci_visibility/test_ci_visibility.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1417,7 +1417,9 @@ def tearDown(self):
14171417
def assert_test_session_name(self, name):
14181418
"""Check that the payload metadata contains the test session name attributes."""
14191419
payload = msgpack.loads(
1420-
CIVisibility._instance.tracer._span_aggregator.writer._clients[0].encoder._build_payload([[Span("foo")]])
1420+
CIVisibility._instance.tracer._span_aggregator.writer._clients[0].encoder._build_payload([[Span("foo")]])[
1421+
0
1422+
][0]
14211423
)
14221424
assert payload["metadata"]["test_session_end"] == {"test_session.name": name}
14231425
assert payload["metadata"]["test_suite_end"] == {"test_session.name": name}
@@ -1493,7 +1495,9 @@ def test_set_library_capabilities(self):
14931495
)
14941496

14951497
payload = msgpack.loads(
1496-
CIVisibility._instance.tracer._span_aggregator.writer._clients[0].encoder._build_payload([[Span("foo")]])
1498+
CIVisibility._instance.tracer._span_aggregator.writer._clients[0].encoder._build_payload([[Span("foo")]])[
1499+
0
1500+
][0]
14971501
)
14981502
assert payload["metadata"]["test"] == {
14991503
"_dd.library_capabilities.early_flake_detection": "1",

tests/ci_visibility/test_encoder.py

Lines changed: 38 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,9 @@ def test_encode_traces_civisibility_v0():
6464
encoder.set_metadata("*", {"language": "python"})
6565
for trace in traces:
6666
encoder.put(trace)
67-
payload, num_traces = encoder.encode()
67+
encoded_traces = encoder.encode()
68+
assert encoded_traces, "Expected encoded traces but got empty list"
69+
[(payload, num_traces)] = encoded_traces
6870
assert num_traces == 3
6971
assert isinstance(payload, bytes)
7072
decoded = msgpack.unpackb(payload, raw=True, strict_map_key=False)
@@ -103,23 +105,22 @@ def test_encode_traces_civisibility_v0():
103105
assert expected_event == received_event
104106

105107

106-
def test_encode_traces_civisibility_v0_no_traces():
108+
def test_encode_traces_civisibility_v01_no_traces():
107109
encoder = CIVisibilityEncoderV01(0, 0)
108110
encoder.set_metadata("*", {"language": "python"})
109-
payload, _ = encoder.encode()
110-
assert payload is None
111+
encoded_traces = encoder.encode()
112+
assert encoded_traces == [], "Expected empty list when no traces"
111113

112114

113-
def test_encode_traces_civisibility_v0_empty_traces():
115+
def test_encode_traces_civisibility_v01_empty_traces():
114116
traces = [[], []]
115117

116118
encoder = CIVisibilityEncoderV01(0, 0)
117119
encoder.set_metadata("*", {"language": "python"})
118120
for trace in traces:
119121
encoder.put(trace)
120-
payload, size = encoder.encode()
121-
assert size == 2
122-
assert payload is None
122+
encoded_traces = encoder.encode()
123+
assert encoded_traces == [], "Expected empty list when no content"
123124

124125

125126
def test_encode_traces_civisibility_v2_coverage_per_test():
@@ -160,7 +161,9 @@ def test_encode_traces_civisibility_v2_coverage_per_test():
160161
}
161162
assert expected_cov == received_covs[0]
162163

163-
complete_payload, _ = encoder.encode()
164+
encoded_traces = encoder.encode()
165+
assert encoded_traces, "Expected encoded traces but got empty list"
166+
[(complete_payload, _)] = encoded_traces
164167
assert isinstance(complete_payload, bytes)
165168
payload_per_line = complete_payload.split(b"\r\n")
166169
assert len(payload_per_line) == 11
@@ -200,7 +203,9 @@ def test_encode_traces_civisibility_v2_coverage_per_suite():
200203
encoder.put(trace)
201204

202205
payload = encoder._build_data(traces)
203-
complete_payload, _ = encoder.encode()
206+
encoded_traces = encoder.encode()
207+
assert encoded_traces, "Expected encoded traces but got empty list"
208+
[(complete_payload, _)] = encoded_traces
204209
assert isinstance(payload, bytes)
205210
decoded = msgpack.unpackb(payload, raw=True, strict_map_key=False)
206211
assert decoded[b"version"] == 2
@@ -255,8 +260,8 @@ def test_encode_traces_civisibility_v2_coverage_empty_traces():
255260
payload = encoder._build_data(traces)
256261
assert payload is None
257262

258-
complete_payload, _ = encoder.encode()
259-
assert complete_payload is None
263+
encoded_traces = encoder.encode()
264+
assert encoded_traces == [], "Expected empty list when payload is None"
260265

261266

262267
class PytestEncodingTestCase(PytestTestCaseBase):
@@ -280,7 +285,9 @@ def test_ok():
280285
span.set_tag(ITR_CORRELATION_ID_TAG_NAME, "encodertestcorrelationid")
281286
ci_agentless_encoder = CIVisibilityEncoderV01(0, 0)
282287
ci_agentless_encoder.put(spans)
283-
event_payload, _ = ci_agentless_encoder.encode()
288+
encoded_traces = ci_agentless_encoder.encode()
289+
assert encoded_traces, "Expected encoded traces but got empty list"
290+
[(event_payload, _)] = encoded_traces
284291
decoded_event_payload = self.tracer.encoder._decode(event_payload)
285292
given_test_span = spans[0]
286293
given_test_event = decoded_event_payload[b"events"][0]
@@ -341,7 +348,9 @@ def test_ok():
341348
span.set_tag(ITR_CORRELATION_ID_TAG_NAME, "encodertestcorrelationid")
342349
ci_agentless_encoder = CIVisibilityEncoderV01(0, 0)
343350
ci_agentless_encoder.put(spans)
344-
event_payload, _ = ci_agentless_encoder.encode()
351+
encoded_traces = ci_agentless_encoder.encode()
352+
assert encoded_traces, "Expected encoded traces but got empty list"
353+
[(event_payload, _)] = encoded_traces
345354
decoded_event_payload = self.tracer.encoder._decode(event_payload)
346355
given_test_suite_span = spans[3]
347356
assert given_test_suite_span.get_tag("type") == "test_suite_end"
@@ -397,7 +406,9 @@ def test_module_event_payload(self):
397406
spans = self.pop_spans()
398407
ci_agentless_encoder = CIVisibilityEncoderV01(0, 0)
399408
ci_agentless_encoder.put(spans)
400-
event_payload, _ = ci_agentless_encoder.encode()
409+
encoded_traces = ci_agentless_encoder.encode()
410+
assert encoded_traces, "Expected encoded traces but got empty list"
411+
[(event_payload, _)] = encoded_traces
401412
decoded_event_payload = self.tracer.encoder._decode(event_payload)
402413
given_test_module_span = spans[2]
403414
given_test_module_event = decoded_event_payload[b"events"][2]
@@ -448,7 +459,9 @@ def test_ok():
448459
spans = self.pop_spans()
449460
ci_agentless_encoder = CIVisibilityEncoderV01(0, 0)
450461
ci_agentless_encoder.put(spans)
451-
event_payload, _ = ci_agentless_encoder.encode()
462+
encoded_traces = ci_agentless_encoder.encode()
463+
assert encoded_traces, "Expected encoded traces but got empty list"
464+
[(event_payload, _)] = encoded_traces
452465
decoded_event_payload = self.tracer.encoder._decode(event_payload)
453466
given_test_session_span = spans[1]
454467
given_test_session_event = decoded_event_payload[b"events"][1]
@@ -543,7 +556,9 @@ def test_xdist_worker_session_filtering(mock_xdist_worker_env):
543556

544557
for trace in traces:
545558
encoder.put(trace)
546-
payload, num_traces = encoder.encode()
559+
encoded_traces = encoder.encode()
560+
assert encoded_traces, "Expected encoded traces but got empty list"
561+
[(payload, num_traces)] = encoded_traces
547562

548563
assert num_traces == 1
549564
assert isinstance(payload, bytes)
@@ -570,7 +585,9 @@ def test_xdist_non_worker_includes_session(mock_no_xdist_worker_env):
570585

571586
for trace in traces:
572587
encoder.put(trace)
573-
payload, num_traces = encoder.encode()
588+
encoded_traces = encoder.encode()
589+
assert encoded_traces, "Expected encoded traces but got empty list"
590+
[(payload, num_traces)] = encoded_traces
574591

575592
assert num_traces == 1
576593
assert isinstance(payload, bytes)
@@ -652,7 +669,9 @@ def test_full_encoding_with_parent_session_override():
652669

653670
for trace in traces:
654671
encoder.put(trace)
655-
payload, num_traces = encoder.encode()
672+
encoded_traces = encoder.encode()
673+
assert encoded_traces, "Expected encoded traces but got empty list"
674+
[(payload, num_traces)] = encoded_traces
656675

657676
assert num_traces == 1
658677
assert isinstance(payload, bytes)

tests/ci_visibility/test_is_user_provided_service.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ def test_is_user_provided_service_false(self):
2727
class IsUserProvidedServiceTestTagTestCase(SubprocessTestCase):
2828
def assert_is_user_provided_service_equals(self, value):
2929
payload = msgpack.loads(
30-
CIVisibility._instance.tracer._span_aggregator.writer._clients[0].encoder._build_payload([[Span("foo")]])
30+
CIVisibility._instance.tracer._span_aggregator.writer._clients[0].encoder._build_payload([[Span("foo")]])[
31+
0
32+
][0]
3133
)
3234
assert payload["metadata"]["*"]["_dd.test.is_user_provided_service"] == value
3335

tests/contrib/pytest/test_pytest.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -777,15 +777,19 @@ def test_service(ddtracer):
777777
# Check if spans tagged with dd_origin after encoding and decoding as the tagging occurs at encode time
778778
encoder = self.tracer.encoder
779779
encoder.put(spans)
780-
trace, _ = encoder.encode()
780+
encoded_results = encoder.encode()
781+
assert encoded_results, "Expected encoded traces but got empty list"
782+
[(trace, _)] = encoded_results
781783
(decoded_trace,) = self.tracer.encoder._decode(trace)
782784
assert len(decoded_trace) == 7
783785
for span in decoded_trace:
784786
assert span[b"meta"][b"_dd.origin"] == b"ciapp-test"
785787

786788
ci_agentless_encoder = CIVisibilityEncoderV01(0, 0)
787789
ci_agentless_encoder.put(spans)
788-
event_payload, _ = ci_agentless_encoder.encode()
790+
encoded_results = ci_agentless_encoder.encode()
791+
assert encoded_results, "Expected encoded traces but got empty list"
792+
[(event_payload, _)] = encoded_results
789793
decoded_event_payload = self.tracer.encoder._decode(event_payload)
790794
assert len(decoded_event_payload[b"events"]) == 7
791795
for event in decoded_event_payload[b"events"]:

tests/integration/test_integration.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -507,7 +507,7 @@ def test_trace_with_non_bytes_payload_logs_payload_when_LOG_ERROR_PAYLOADS():
507507

508508
class NonBytesBadEncoder(BadEncoder):
509509
def encode(self):
510-
return "bad_payload", 1
510+
return [("bad_payload", 1)]
511511

512512
def encode_traces(self, traces):
513513
return "bad_payload"

tests/integration/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ def put(self, trace):
1919
pass
2020

2121
def encode(self):
22-
return b"bad_payload", 0
22+
return [(b"bad_payload", 0)]
2323

2424
def encode_traces(self, traces):
2525
return b"bad_payload"

0 commit comments

Comments
 (0)