Skip to content

Commit bb588c7

Browse files
authored
chore(telemetry): report all configuration sources with sequence tracking [config chaining] (#14122)
## Description This PR updates the behavior of the telemetry writer to ensure that all discovered configuration sources are submitted to the instrumentation platform. Previously, only the effective (final) configuration value was sent—typically the last value set in code within 60 seconds. Now, the telemetry writer will queue and send every configuration object it observes. Each configuration entry will include a seq_id field to indicate the order in which configurations were applied. This allows the telemetry intake system to reconstruct the full configuration history and understand how a given value was derived. ## Motivation Accurate tracking of configuration state is essential for debugging, auditing, and understanding application behavior across environments. By reporting every configuration source (e.g., environment variable, fleet config, in-code setting), we provide full visibility into how and when each setting was applied. The addition of seq_id enables the intake pipeline to reliably determine the configuration precedence and reconstruct state transitions over time. Note: should follow this spec https://docs.google.com/document/d/1vhIimn2vt4tDRSxsHn6vWSc8zYHl0Lv0Fk7CQps04C4/edit?tab=t.0 ## Next Steps - Update the get_config(...) and report_telemetry methods to: - Queue a configuration event whenever a new configuration source is discovered. - Ensure that multiple sources for the same config key (e.g., environment variable, fleet config, code) are reported, each with its own seq_id to reflect the correct order of precedence. ## Checklist - [ ] PR author has checked that all the criteria below are met - The PR description includes an overview of the change - The PR description articulates the motivation for the change - The change includes tests OR the PR description describes a testing strategy - The PR description notes risks associated with the change, if any - Newly-added code is easy to change - The change follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) - The change includes or references documentation updates if necessary - Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [ ] Reviewer has checked that all the criteria below are met - Title is accurate - All changes are related to the pull request's stated goal - Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - Testing strategy adequately addresses listed risks - Newly-added code is easy to change - Release note makes sense to a user of the library - If necessary, author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)
1 parent 46827b3 commit bb588c7

File tree

4 files changed

+96
-94
lines changed

4 files changed

+96
-94
lines changed

ddtrace/internal/telemetry/writer.py

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,8 @@ class TelemetryWriter(PeriodicService):
145145
# Counter representing the number of events sent by the writer. Here we are relying on the atomicity
146146
# of `itertools.count()` which is a CPython implementation detail. The sequence field in telemetry
147147
# payloads is only used in tests and is not required to process Telemetry events.
148-
_sequence = itertools.count(1)
148+
_sequence_payloads = itertools.count(1)
149+
_sequence_configurations = itertools.count(1)
149150
_ORIGINAL_EXCEPTHOOK = staticmethod(sys.excepthook)
150151
CWD = os.getcwd()
151152

@@ -168,7 +169,7 @@ def __init__(self, is_periodic=True, agentless=None):
168169
self._logs = set() # type: Set[Dict[str, Any]]
169170
self._forked = False # type: bool
170171
self._events_queue = [] # type: List[Dict]
171-
self._configuration_queue = {} # type: Dict[str, Dict]
172+
self._configuration_queue = [] # type: List[Dict]
172173
self._imported_dependencies: Dict[str, str] = dict()
173174
self._modules_already_imported: Set[str] = set()
174175
self._product_enablement = {product.value: False for product in TELEMETRY_APM_PRODUCT}
@@ -266,7 +267,7 @@ def add_event(self, payload, payload_type):
266267
"tracer_time": int(time.time()),
267268
"runtime_id": get_runtime_id(),
268269
"api_version": "v2",
269-
"seq_id": next(self._sequence),
270+
"seq_id": next(self._sequence_payloads),
270271
"debug": self._debug,
271272
"application": get_application(config.SERVICE, config.VERSION, config.ENV),
272273
"host": get_host_info(),
@@ -388,8 +389,8 @@ def _flush_configuration_queue(self):
388389
# type: () -> List[Dict]
389390
"""Flushes and returns a list of all queued configurations"""
390391
with self._service_lock:
391-
configurations = list(self._configuration_queue.values())
392-
self._configuration_queue = {}
392+
configurations = self._configuration_queue
393+
self._configuration_queue = []
393394
return configurations
394395

395396
def _app_client_configuration_changed_event(self, configurations):
@@ -464,10 +465,6 @@ def product_activated(self, product, enabled):
464465
if self.started:
465466
self._send_product_change_updates = True
466467

467-
def remove_configuration(self, configuration_name):
468-
with self._service_lock:
469-
del self._configuration_queue[configuration_name]
470-
471468
def add_configuration(self, configuration_name, configuration_value, origin="unknown", config_id=None):
472469
# type: (str, Any, str, Optional[str]) -> None
473470
"""Creates and queues the name, origin, value of a configuration"""
@@ -488,17 +485,21 @@ def add_configuration(self, configuration_name, configuration_value, origin="unk
488485
config["config_id"] = config_id
489486

490487
with self._service_lock:
491-
self._configuration_queue[configuration_name] = config
488+
config["seq_id"] = next(self._sequence_configurations)
489+
self._configuration_queue.append(config)
492490

493491
def add_configurations(self, configuration_list):
494492
"""Creates and queues a list of configurations"""
495493
with self._service_lock:
496-
for name, value, _origin in configuration_list:
497-
self._configuration_queue[name] = {
498-
"name": name,
499-
"origin": _origin,
500-
"value": value,
501-
}
494+
for name, value, origin in configuration_list:
495+
self._configuration_queue.append(
496+
{
497+
"name": name,
498+
"origin": origin,
499+
"value": value,
500+
"seq_id": next(self._sequence_configurations),
501+
}
502+
)
502503

503504
def add_log(self, level, message, stack_trace="", tags=None):
504505
"""
@@ -701,7 +702,7 @@ def reset_queues(self):
701702
self._namespace.flush()
702703
self._logs = set()
703704
self._imported_dependencies = {}
704-
self._configuration_queue = {}
705+
self._configuration_queue = []
705706

706707
def _flush_events_queue(self):
707708
# type: () -> List[Dict]
@@ -727,7 +728,8 @@ def _fork_writer(self):
727728
self.enable()
728729

729730
def _restart_sequence(self):
730-
self._sequence = itertools.count(1)
731+
self._sequence_payloads = itertools.count(1)
732+
self._sequence_configurations = itertools.count(1)
731733

732734
def _stop_service(self, join=True, *args, **kwargs):
733735
# type: (...) -> None

tests/conftest.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -621,7 +621,7 @@ def get_metrics(self, name=None):
621621
for series in event["payload"]["series"]:
622622
if name is None or series["metric"] == name:
623623
metrics.append(series)
624-
metrics.sort(key=lambda x: (x["metric"], x["tags"]), reverse=False)
624+
metrics.sort(key=lambda x: (x["metric"], x["tags"]))
625625
return metrics
626626

627627
def get_dependencies(self, name=None):
@@ -630,18 +630,28 @@ def get_dependencies(self, name=None):
630630
for dep in event["payload"]["dependencies"]:
631631
if name is None or dep["name"] == name:
632632
deps.append(dep)
633-
deps.sort(key=lambda x: x["name"], reverse=False)
633+
deps.sort(key=lambda x: x["name"])
634634
return deps
635635

636-
def get_configurations(self, name=None, ignores=None):
636+
def get_configurations(self, name=None, ignores=None, remove_seq_id=False, effective=False):
637637
ignores = ignores or []
638638
configurations = []
639639
events_with_configs = self.get_events("app-started") + self.get_events("app-client-configuration-change")
640640
for event in events_with_configs:
641641
for c in event["payload"]["configuration"]:
642-
if c["name"] == name or (name is None and c["name"] not in ignores):
643-
configurations.append(c)
644-
configurations.sort(key=lambda x: x["name"], reverse=False)
642+
config = c.copy()
643+
if remove_seq_id:
644+
config.pop("seq_id")
645+
if config["name"] == name or (name is None and config["name"] not in ignores):
646+
configurations.append(config)
647+
648+
if effective:
649+
config_map = {}
650+
for c in configurations:
651+
config_map[c["name"]] = c
652+
configurations = list(config_map.values())
653+
654+
configurations.sort(key=lambda x: x["name"])
645655
return configurations
646656

647657

tests/integration/test_settings.py

Lines changed: 36 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,6 @@
55
from tests.integration.utils import AGENT_VERSION
66

77

8-
def _get_telemetry_config_items(events, item_name):
9-
items = []
10-
for event in reversed(sorted(events, key=lambda e: (e["tracer_time"], e["seq_id"]))):
11-
for item in reversed(event.get("payload", {}).get("configuration", [])):
12-
if item_name == item["name"]:
13-
items.append(item)
14-
if items:
15-
return items or None
16-
17-
188
@pytest.mark.skipif(AGENT_VERSION != "testagent", reason="Tests only compatible with a testagent")
199
def test_setting_origin_environment(test_agent_session, run_python_code_in_subprocess):
2010
env = os.environ.copy()
@@ -38,30 +28,25 @@ def test_setting_origin_environment(test_agent_session, run_python_code_in_subpr
3828
)
3929
assert status == 0, err
4030

41-
events = test_agent_session.get_events(subprocess=True)
42-
events_trace_sample_rate = _get_telemetry_config_items(events, "DD_TRACE_SAMPLING_RULES")
31+
configurations = test_agent_session.get_configurations(effective=True, remove_seq_id=True)
4332

4433
assert {
4534
"name": "DD_TRACE_SAMPLING_RULES",
4635
"value": '[{"sample_rate":0.1}]',
4736
"origin": "env_var",
48-
} in events_trace_sample_rate
37+
} in configurations
4938

50-
events_logs_injection_enabled = _get_telemetry_config_items(events, "DD_LOGS_INJECTION")
51-
assert {"name": "DD_LOGS_INJECTION", "value": True, "origin": "env_var"} in events_logs_injection_enabled
39+
assert {"name": "DD_LOGS_INJECTION", "value": True, "origin": "env_var"} in configurations
5240

53-
events_trace_header_tags = _get_telemetry_config_items(events, "DD_TRACE_HEADER_TAGS")
5441
assert {
5542
"name": "DD_TRACE_HEADER_TAGS",
5643
"value": "X-Header-Tag-1:header_tag_1,X-Header-Tag-2:header_tag_2",
5744
"origin": "env_var",
58-
} in events_trace_header_tags
45+
} in configurations
5946

60-
events_trace_tags = _get_telemetry_config_items(events, "DD_TAGS")
61-
assert {"name": "DD_TAGS", "value": "team:apm,component:web", "origin": "env_var"} in events_trace_tags
47+
assert {"name": "DD_TAGS", "value": "team:apm,component:web", "origin": "env_var"} in configurations
6248

63-
events_tracing_enabled = _get_telemetry_config_items(events, "DD_TRACE_ENABLED")
64-
assert {"name": "DD_TRACE_ENABLED", "value": True, "origin": "env_var"} in events_tracing_enabled
49+
assert {"name": "DD_TRACE_ENABLED", "value": True, "origin": "env_var"} in configurations
6550

6651

6752
@pytest.mark.skipif(AGENT_VERSION != "testagent", reason="Tests only compatible with a testagent")
@@ -93,35 +78,31 @@ def test_setting_origin_code(test_agent_session, run_python_code_in_subprocess):
9378
)
9479
assert status == 0, err
9580

96-
events = test_agent_session.get_events(subprocess=True)
81+
configurations = test_agent_session.get_configurations(effective=True, remove_seq_id=True)
9782

98-
events_logs_injection_enabled = _get_telemetry_config_items(events, "DD_LOGS_INJECTION")
9983
assert {
10084
"name": "DD_LOGS_INJECTION",
10185
"value": False,
10286
"origin": "code",
103-
} in events_logs_injection_enabled
87+
} in configurations
10488

105-
events_trace_header_tags = _get_telemetry_config_items(events, "DD_TRACE_HEADER_TAGS")
10689
assert {
10790
"name": "DD_TRACE_HEADER_TAGS",
10891
"value": "header:value",
10992
"origin": "code",
110-
} in events_trace_header_tags
93+
} in configurations
11194

112-
events_trace_tags = _get_telemetry_config_items(events, "DD_TAGS")
11395
assert {
11496
"name": "DD_TAGS",
11597
"value": "header:value",
11698
"origin": "code",
117-
} in events_trace_tags
99+
} in configurations
118100

119-
events_tracing_enabled = _get_telemetry_config_items(events, "DD_TRACE_ENABLED")
120101
assert {
121102
"name": "DD_TRACE_ENABLED",
122103
"value": False,
123104
"origin": "code",
124-
} in events_tracing_enabled
105+
} in configurations
125106

126107

127108
@pytest.mark.skipif(AGENT_VERSION != "testagent", reason="Tests only compatible with a testagent")
@@ -166,9 +147,10 @@ def test_remoteconfig_sampling_rate_default(test_agent_session, ddtrace_run_pyth
166147
)
167148
assert status == 0, err
168149

169-
events = test_agent_session.get_events(subprocess=True)
170-
events_trace_sample_rate = _get_telemetry_config_items(events, "DD_TRACE_SAMPLING_RULES")
171-
assert {"name": "DD_TRACE_SAMPLING_RULES", "value": "", "origin": "default"} in events_trace_sample_rate
150+
configurations = test_agent_session.get_configurations(name="DD_TRACE_SAMPLING_RULES", effective=True)
151+
assert len(configurations) == 1
152+
assert configurations[0]["value"] == ""
153+
assert configurations[0]["origin"] == "default"
172154

173155

174156
@pytest.mark.skipif(AGENT_VERSION != "testagent", reason="Tests only compatible with a testagent")
@@ -213,14 +195,17 @@ def test_remoteconfig_sampling_rate_telemetry(test_agent_session, run_python_cod
213195
)
214196
assert status == 0, err
215197

216-
events = test_agent_session.get_events(subprocess=True)
217-
events_trace_sample_rate = _get_telemetry_config_items(events, "DD_TRACE_SAMPLING_RULES")
218-
assert {
219-
"name": "DD_TRACE_SAMPLING_RULES",
220-
"origin": "remote_config",
221-
"value": '[{"sample_rate": 0.5, "service": "*", "name": "*", "resource": "*", '
222-
'"tags": {}, "provenance": "customer"}]',
223-
} in events_trace_sample_rate
198+
configurations = test_agent_session.get_configurations(
199+
name="DD_TRACE_SAMPLING_RULES", effective=True, remove_seq_id=True
200+
)
201+
assert configurations == [
202+
{
203+
"name": "DD_TRACE_SAMPLING_RULES",
204+
"origin": "remote_config",
205+
"value": '[{"sample_rate": 0.5, "service": "*", "name": "*", "resource": "*", '
206+
'"tags": {}, "provenance": "customer"}]',
207+
}
208+
]
224209

225210

226211
@pytest.mark.skipif(AGENT_VERSION != "testagent", reason="Tests only compatible with a testagent")
@@ -258,10 +243,13 @@ def test_remoteconfig_header_tags_telemetry(test_agent_session, ddtrace_run_pyth
258243
)
259244
assert status == 0, err
260245

261-
events = test_agent_session.get_events(subprocess=True)
262-
events_trace_header_tags = _get_telemetry_config_items(events, "DD_TRACE_HEADER_TAGS")
263-
assert {
264-
"name": "DD_TRACE_HEADER_TAGS",
265-
"value": "used:header_tag_69,unused:header_tag_70,used-with-default:",
266-
"origin": "remote_config",
267-
} in events_trace_header_tags
246+
configurations = test_agent_session.get_configurations(
247+
name="DD_TRACE_HEADER_TAGS", effective=True, remove_seq_id=True
248+
)
249+
assert configurations == [
250+
{
251+
"name": "DD_TRACE_HEADER_TAGS",
252+
"value": "used:header_tag_69,unused:header_tag_70,used-with-default:",
253+
"origin": "remote_config",
254+
}
255+
]

tests/telemetry/test_writer.py

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ def test_app_started_event_configuration_override_asm(
7979
_, stderr, status, _ = run_python_code_in_subprocess("import ddtrace.auto", env=env)
8080
assert status == 0, stderr
8181

82-
configuration = test_agent_session.get_configurations(name=env_var)
82+
configuration = test_agent_session.get_configurations(name=env_var, remove_seq_id=True)
8383
assert len(configuration) == 1, configuration
8484
assert configuration[0] == {"name": env_var, "origin": "env_var", "value": expected_value}
8585

@@ -296,12 +296,13 @@ def test_app_started_event_configuration_override(test_agent_session, run_python
296296

297297
# DD_TRACE_AGENT_URL in gitlab is different from CI, to keep things simple we will
298298
# skip validating this config
299-
configurations = test_agent_session.get_configurations(ignores=["DD_TRACE_AGENT_URL"])
299+
configurations = test_agent_session.get_configurations(
300+
ignores=["DD_TRACE_AGENT_URL", "DD_AGENT_PORT", "DD_TRACE_AGENT_PORT"], remove_seq_id=True, effective=True
301+
)
300302
assert configurations
301303

302304
expected = [
303305
{"name": "DD_AGENT_HOST", "origin": "default", "value": None},
304-
{"name": "DD_AGENT_PORT", "origin": "default", "value": None},
305306
{"name": "DD_API_KEY", "origin": "default", "value": None},
306307
{"name": "DD_API_SECURITY_ENABLED", "origin": "env_var", "value": False},
307308
{"name": "DD_API_SECURITY_ENDPOINT_COLLECTION_ENABLED", "origin": "default", "value": True},
@@ -471,7 +472,6 @@ def test_app_started_event_configuration_override(test_agent_session, run_python
471472
{"name": "DD_TRACE_128_BIT_TRACEID_GENERATION_ENABLED", "origin": "env_var", "value": True},
472473
{"name": "DD_TRACE_128_BIT_TRACEID_LOGGING_ENABLED", "origin": "default", "value": False},
473474
{"name": "DD_TRACE_AGENT_HOSTNAME", "origin": "default", "value": None},
474-
{"name": "DD_TRACE_AGENT_PORT", "origin": "default", "value": None},
475475
{"name": "DD_TRACE_AGENT_TIMEOUT_SECONDS", "origin": "default", "value": 2.0},
476476
{"name": "DD_TRACE_API_VERSION", "origin": "env_var", "value": "v0.5"},
477477
{"name": "DD_TRACE_BAGGAGE_TAG_KEYS", "origin": "default", "value": "user.id,account.id,session.id"},
@@ -743,28 +743,30 @@ def test_app_client_configuration_changed_event(telemetry_writer, test_agent_ses
743743
telemetry_writer.periodic(force_flush=True)
744744
"""asserts that queuing a configuration sends a valid telemetry request"""
745745
with override_global_config(dict()):
746-
telemetry_writer.add_configuration("appsec_enabled", True)
747-
telemetry_writer.add_configuration("DD_TRACE_PROPAGATION_STYLE_EXTRACT", "datadog")
748-
telemetry_writer.add_configuration("appsec_enabled", False, "env_var")
746+
telemetry_writer.add_configuration("appsec_enabled", True, "env_var")
747+
telemetry_writer.add_configuration("DD_TRACE_PROPAGATION_STYLE_EXTRACT", "datadog", "default")
748+
telemetry_writer.add_configuration("appsec_enabled", False, "code")
749749

750750
telemetry_writer.periodic(force_flush=True)
751751

752752
events = test_agent_session.get_events("app-client-configuration-change")
753753
received_configurations = [c for event in events for c in event["payload"]["configuration"]]
754-
received_configurations.sort(key=lambda c: c["name"])
755-
# assert the latest configuration value is send to the agent
756-
assert received_configurations == [
757-
{
758-
"name": "DD_TRACE_PROPAGATION_STYLE_EXTRACT",
759-
"origin": "unknown",
760-
"value": "datadog",
761-
},
762-
{
763-
"name": "appsec_enabled",
764-
"origin": "env_var",
765-
"value": False,
766-
},
767-
]
754+
received_configurations.sort(key=lambda c: c["seq_id"])
755+
assert (
756+
received_configurations[0]["seq_id"]
757+
< received_configurations[1]["seq_id"]
758+
< received_configurations[2]["seq_id"]
759+
)
760+
# assert that all configuration values are sent to the agent in the order they were added (by seq_id)
761+
assert received_configurations[0]["name"] == "appsec_enabled"
762+
assert received_configurations[0]["origin"] == "env_var"
763+
assert received_configurations[0]["value"] is True
764+
assert received_configurations[1]["name"] == "DD_TRACE_PROPAGATION_STYLE_EXTRACT"
765+
assert received_configurations[1]["origin"] == "default"
766+
assert received_configurations[1]["value"] == "datadog"
767+
assert received_configurations[2]["name"] == "appsec_enabled"
768+
assert received_configurations[2]["origin"] == "code"
769+
assert received_configurations[2]["value"] is False
768770

769771

770772
def test_add_integration_disabled_writer(telemetry_writer, test_agent_session):
@@ -992,7 +994,7 @@ def test_otel_config_telemetry(test_agent_session, run_python_code_in_subprocess
992994
_, stderr, status, _ = run_python_code_in_subprocess("import ddtrace", env=env)
993995
assert status == 0, stderr
994996

995-
configurations = {c["name"]: c for c in test_agent_session.get_configurations()}
997+
configurations = {c["name"]: c for c in test_agent_session.get_configurations(remove_seq_id=True)}
996998

997999
assert configurations["DD_SERVICE"] == {"name": "DD_SERVICE", "origin": "env_var", "value": "dd_service"}
9981000
assert configurations["OTEL_LOG_LEVEL"] == {"name": "OTEL_LOG_LEVEL", "origin": "env_var", "value": "debug"}

0 commit comments

Comments
 (0)