Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
14 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 271 additions & 0 deletions tests/otel/test_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
import time

from utils import context, weblog, interfaces, scenarios, irrelevant, features
from collections.abc import Callable


def validate_resource_metrics(resource_metrics_list: list) -> None:
# Assert the following protobuf structure from https://github.com/open-telemetry/opentelemetry-proto/blob/v1.7.0/opentelemetry/proto/metrics/v1/metrics.proto:
# message ResourceMetrics {
# optional opentelemetry.proto.resource.v1.Resource resource = 1;
# repeated ScopeMetrics scope_metrics = 2;
# optional string schema_url = 3;
# }

assert all(
len(resource_metrics) == 1 for resource_metrics in resource_metrics_list
), "Metrics payloads from one configured application should have one set of resource metrics"
assert any(
"scopeMetrics" in resource_metrics[0] and len(resource_metrics[0]["scopeMetrics"]) > 0
for resource_metrics in resource_metrics_list
), "Scope metrics should be present on some payloads"
assert all(
resource_metrics[0]["resource"] is not None for resource_metrics in resource_metrics_list
), "Resource metrics from an application with configured resources should have a resource field"
# Do not assert on schema_url, as it is not always present


def validate_scope_metrics(scope_metrics_list: list) -> None:
# Assert the following protobuf structure from https://github.com/open-telemetry/opentelemetry-proto/blob/v1.7.0/opentelemetry/proto/metrics/v1/metrics.proto:
# message ScopeMetrics {
# optional opentelemetry.proto.common.v1.InstrumentationScope scope = 1;
# repeated Metric metrics = 2;
# optional string schema_url = 3;
# }

assert all(
len(scope_metrics) >= 1 for scope_metrics in scope_metrics_list
), "Metrics payloads from one configured application should have one or more set of scope metrics"
assert all(
scope_metrics[0]["scope"] is not None for scope_metrics in scope_metrics_list
), "Scope metrics should have a scope field"
# Do not assert on schema_url, as it is not always present
for scope_metrics in scope_metrics_list:
# Assert values only for metrics we created, since we're asserting against specific fields
if "opentelemetry" not in scope_metrics[0]["scope"]["name"]:
for metric in scope_metrics[0]["metrics"]:
validate_metric(metric)


def validate_metric(metric: dict) -> None:
# Assert the following protobuf structure from https://github.com/open-telemetry/opentelemetry-proto/blob/v1.7.0/opentelemetry/proto/metrics/v1/metrics.proto:
# message Metric {
# optional string name = 1;
# optional string description = 2;
# optional string unit = 3;
# oneof data {
# Gauge gauge = 5;
# Sum sum = 7;
# Histogram histogram = 9;
# ExponentialHistogram exponential_histogram = 10;
# Summary summary = 11;
# }
# repeated opentelemetry.proto.common.v1.KeyValue metadata = 12;
# }

assert metric["name"] is not None, "Metrics are expected to have a name"
# assert metric["description"] is not None, "Metrics are expected to have a description"
# assert metric["unit"] is not None, "Metrics are expected to have a unit"
# assert metric["metadata"] is not None, "Metrics are expected to have metadata"

assert metric["name"].lower() in metric_to_validator, f"Metric {metric['name']} is not expected"
func, name, value, aggregation_temporality = metric_to_validator[metric["name"].lower()]
func(metric, name, value, aggregation_temporality)


def validate_counter(metric: dict, name: str, value: object, aggregation_temporality: str) -> None:
# Assert the following protobuf structure from https://github.com/open-telemetry/opentelemetry-proto/blob/v1.7.0/opentelemetry/proto/metrics/v1/metrics.proto:
# message Sum {
# repeated NumberDataPoint data_points = 1;
# optional AggregationTemporality aggregation_temporality = 2;
# optional bool is_monotonic = 3;
# }
# enum AggregationTemporality {
# AGGREGATION_TEMPORALITY_UNSPECIFIED = 0;
# AGGREGATION_TEMPORALITY_DELTA = 1;
# AGGREGATION_TEMPORALITY_CUMULATIVE = 2;
# }

assert metric["name"].lower() == name
assert "sum" in metric
assert len(metric["sum"]["dataPoints"]) == 1
assert metric["sum"]["aggregationTemporality"] == aggregation_temporality
assert metric["sum"]["isMonotonic"]
validate_number_data_point(metric["sum"]["dataPoints"][0], "asInt", value, "0")


def validate_histogram(metric: dict, name: str, value: object, aggregation_temporality: str) -> None:
# Assert the following protobuf structure from https://github.com/open-telemetry/opentelemetry-proto/blob/v1.7.0/opentelemetry/proto/metrics/v1/metrics.proto:
# message Histogram {
# repeated HistogramDataPoint data_points = 1;
# optional AggregationTemporality aggregation_temporality = 2;
# }
# message HistogramDataPoint {
# reserved 1;
# repeated opentelemetry.proto.common.v1.KeyValue attributes = 9;
# fixed64 start_time_unix_nano = 2;
# fixed64 time_unix_nano = 3;
# fixed64 count = 4;
# optional double sum = 5;
# repeated fixed64 bucket_counts = 6;
# repeated double explicit_bounds = 7;
# repeated Exemplar exemplars = 8;
# uint32 flags = 10;
# optional double min = 11;
# optional double max = 12;
# }

assert metric["name"].lower() == name
assert "histogram" in metric # This asserts the metric type is a histogram
assert len(metric["histogram"]["dataPoints"]) == 1
assert metric["histogram"]["aggregationTemporality"] == aggregation_temporality

assert metric["histogram"]["dataPoints"][0]["startTimeUnixNano"].isdecimal()
assert metric["histogram"]["dataPoints"][0]["timeUnixNano"].isdecimal()
assert metric["histogram"]["dataPoints"][0]["count"] == "1"
assert metric["histogram"]["dataPoints"][0]["sum"] == value
assert metric["histogram"]["dataPoints"][0]["min"] == value
assert metric["histogram"]["dataPoints"][0]["max"] == value


def validate_up_down_counter(metric: dict, name: str, value: object, aggregation_temporality: str) -> None:
# Assert the following protobuf structure from https://github.com/open-telemetry/opentelemetry-proto/blob/v1.7.0/opentelemetry/proto/metrics/v1/metrics.proto:
# message Sum {
# repeated NumberDataPoint data_points = 1;
# optional AggregationTemporality aggregation_temporality = 2;
# optional bool is_monotonic = 3;
# }
# enum AggregationTemporality {
# AGGREGATION_TEMPORALITY_UNSPECIFIED = 0;
# AGGREGATION_TEMPORALITY_DELTA = 1;
# AGGREGATION_TEMPORALITY_CUMULATIVE = 2;
# }

assert metric["name"].lower() == name
assert "sum" in metric
assert len(metric["sum"]["dataPoints"]) == 1
assert metric["sum"]["aggregationTemporality"] == aggregation_temporality
validate_number_data_point(metric["sum"]["dataPoints"][0], "asInt", value)


def validate_gauge(metric: dict, name: str, value: object, _: str) -> None:
# Assert the following protobuf structure from https://github.com/open-telemetry/opentelemetry-proto/blob/v1.7.0/opentelemetry/proto/metrics/v1/metrics.proto:
# message Gauge {
# repeated NumberDataPoint data_points = 1;
# }

assert metric["name"].lower() == name
assert "gauge" in metric
assert len(metric["gauge"]["dataPoints"]) == 1
validate_number_data_point(metric["gauge"]["dataPoints"][0], "asDouble", value, start_time_is_required=False)


def validate_number_data_point(
data_point: dict, value_type: object, value: object, default_value: object = None, start_time_is_required: bool = True
) -> None:
# Assert the following protobuf structure from https://github.com/open-telemetry/opentelemetry-proto/blob/v1.7.0/opentelemetry/proto/metrics/v1/metrics.proto:
# message NumberDataPoint {
# reserved 1;
# repeated opentelemetry.proto.common.v1.KeyValue attributes = 7;
# optional fixed64 start_time_unix_nano = 2;
# optional fixed64 time_unix_nano = 3;
# oneof value {
# double as_double = 4;
# sfixed64 as_int = 6;
# }
# repeated Exemplar exemplars = 5;
# uint32 flags = 8;
# }

if start_time_is_required:
assert data_point["startTimeUnixNano"].isdecimal()
assert data_point["timeUnixNano"].isdecimal()
if default_value is not None:
assert data_point[value_type] == value or data_point[value_type] == default_value
else:
assert data_point[value_type] == value
# Do not assert attributes on every data point
# Do not assert exemplars on every data point
# Do not assert flags on every data point


metric_to_validator: dict[str, tuple[Callable[[dict, str, object, str], None], str, object, str]] = {
"example.counter": (validate_counter, "example.counter", "11", "AGGREGATION_TEMPORALITY_DELTA"),
"example.async.counter": (validate_counter, "example.async.counter", "22", "AGGREGATION_TEMPORALITY_DELTA"),
"example.histogram": (validate_histogram, "example.histogram", 33.0, "AGGREGATION_TEMPORALITY_DELTA"),
"example.updowncounter": (
validate_up_down_counter,
"example.updowncounter",
"55",
"AGGREGATION_TEMPORALITY_CUMULATIVE",
),
"example.async.updowncounter": (
validate_up_down_counter,
"example.async.updowncounter",
"66",
"AGGREGATION_TEMPORALITY_CUMULATIVE",
),
"example.gauge": (validate_gauge, "example.gauge", 77.0, ""),
"example.async.gauge": (validate_gauge, "example.async.gauge", 88.0, ""),
}


@scenarios.otel_metric_e2e
@scenarios.apm_tracing_e2e_otel
@irrelevant(context.library not in ("java_otel", "dotnet", "python"))
@features.not_reported # FPD does not support otel libs
class Test_OTelMetrics:
def setup_agent_otlp_upload(self):
self.start = int(time.time())
self.r = weblog.get(path="/basic/metric")
self.expected_metrics = [
"example.counter",
"example.async.counter",
"example.gauge",
"example.async.gauge",
"example.updowncounter",
"example.async.updowncounter",
"example.histogram",
]

def test_agent_otlp_upload(self):
seen = set()

all_resource_metrics = []
all_scope_metrics = []
filtered_individual_metrics = []

for _, resource_metrics in interfaces.open_telemetry.get_metrics(host="agent"):
all_resource_metrics.append(resource_metrics)
if "scopeMetrics" not in resource_metrics[0]:
continue

scope_metrics = resource_metrics[0]["scopeMetrics"]
all_scope_metrics.append(scope_metrics)

for scope_metric in scope_metrics:
for metric in scope_metric["metrics"]:
metric_name = metric["name"].lower()
if metric_name.startswith("example"):
# Asynchronous instruments report on each metric read, so we need to deduplicate
# Also, the UpDownCounter instrument (in addition to the AsyncUpDownCounter instrument) is cumulative, so we need to deduplicate
if metric_name.startswith("example.async") or metric_name == "example.updowncounter":
if metric_name not in seen:
filtered_individual_metrics.append(metric)
seen.add(metric_name)
else:
filtered_individual_metrics.append(metric)

# Assert resource metrics are present and valid
assert all(
len(resource_metrics) == 1 for resource_metrics in all_resource_metrics
), "Metrics payloads from one configured application should have one set of resource metrics, but in general this may be 0+"
assert all(
resource_metrics[0]["resource"] is not None for resource_metrics in all_resource_metrics
), "Resource metrics from an application with configured resources should have a resource field, but in general is optional"
validate_resource_metrics(all_resource_metrics)

# Assert scope metrics are present and valid
# Specific OTLP metric types not tested are: ExponentialHistogram (and by extension ExponentialHistogramDataPoint), Summary (and by extension SummaryDataPoint), Exemplar
assert len(filtered_individual_metrics) == len(self.expected_metrics), "Agent metrics should match expected"
validate_scope_metrics(all_scope_metrics)
13 changes: 11 additions & 2 deletions utils/_context/_scenarios/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -555,9 +555,18 @@ class _Scenarios:
# APM tracing end-to-end scenarios
apm_tracing_e2e_otel = EndToEndScenario(
"APM_TRACING_E2E_OTEL",
weblog_env={"DD_TRACE_OTEL_ENABLED": "true"},
weblog_env={
"DD_TRACE_OTEL_ENABLED": "true",
"DD_TRACE_OTEL_METRICS_ENABLED": "true",
Comment on lines +559 to +560
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we should remove the first config here and also change the name of the env var below to DD_METRICS_OTEL_ENABLED instead?

"OTEL_EXPORTER_OTLP_PROTOCOL": "http/protobuf",
"OTEL_EXPORTER_OTLP_ENDPOINT": f"http://proxy:{ProxyPorts.open_telemetry_weblog}",
"OTEL_EXPORTER_OTLP_HEADERS": "dd-protocol=otlp,dd-otlp-path=agent",
"OTEL_METRIC_EXPORT_INTERVAL": "1000",
"OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE": "delta",
},
backend_interface_timeout=5,
require_api_key=True,
use_proxy_for_open_telemetry=True,
doc="",
)
apm_tracing_e2e_single_span = EndToEndScenario(
Expand All @@ -574,7 +583,7 @@ class _Scenarios:
)

otel_tracing_e2e = OpenTelemetryScenario("OTEL_TRACING_E2E", require_api_key=True, doc="")
otel_metric_e2e = OpenTelemetryScenario("OTEL_METRIC_E2E", require_api_key=True, doc="")
otel_metric_e2e = OpenTelemetryScenario("OTEL_METRIC_E2E", require_api_key=True, doc="", include_collector=False, include_intake=False)
otel_log_e2e = OpenTelemetryScenario("OTEL_LOG_E2E", require_api_key=True, doc="")

library_conf_custom_header_tags = EndToEndScenario(
Expand Down
21 changes: 19 additions & 2 deletions utils/_context/_scenarios/endtoend.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,10 @@ def __init__(
additional_trace_header_tags: tuple[str, ...] = (),
library_interface_timeout: int | None = None,
agent_interface_timeout: int = 5,
open_telemetry_interface_timeout: int = 5,
use_proxy_for_weblog: bool = True,
use_proxy_for_agent: bool = True,
use_proxy_for_open_telemetry: bool = True,
rc_api_enabled: bool = False,
meta_structs_disabled: bool = False,
span_events: bool = True,
Expand Down Expand Up @@ -285,7 +287,7 @@ def __init__(
github_workflow=github_workflow,
scenario_groups=scenario_groups,
enable_ipv6=enable_ipv6,
use_proxy=use_proxy_for_agent or use_proxy_for_weblog,
use_proxy=use_proxy_for_agent or use_proxy_for_weblog or use_proxy_for_open_telemetry,
rc_api_enabled=rc_api_enabled,
meta_structs_disabled=meta_structs_disabled,
span_events=span_events,
Expand All @@ -302,6 +304,7 @@ def __init__(

self._use_proxy_for_agent = use_proxy_for_agent
self._use_proxy_for_weblog = use_proxy_for_weblog
self._use_proxy_for_open_telemetry = use_proxy_for_open_telemetry

self._require_api_key = require_api_key

Expand Down Expand Up @@ -387,6 +390,7 @@ def __init__(

self.agent_interface_timeout = agent_interface_timeout
self.backend_interface_timeout = backend_interface_timeout
self.open_telemetry_interface_timeout = open_telemetry_interface_timeout
self._library_interface_timeout = library_interface_timeout

def configure(self, config: pytest.Config):
Expand All @@ -408,6 +412,7 @@ def configure(self, config: pytest.Config):
interfaces.library_dotnet_managed.configure(self.host_log_folder, replay=self.replay)
interfaces.library_stdout.configure(self.host_log_folder, replay=self.replay)
interfaces.agent_stdout.configure(self.host_log_folder, replay=self.replay)
interfaces.open_telemetry.configure(self.host_log_folder, replay=self.replay)

for container in self.buddies:
container.interface.configure(self.host_log_folder, replay=self.replay)
Expand Down Expand Up @@ -450,7 +455,8 @@ def _get_weblog_system_info(self):

def _start_interfaces_watchdog(self):
super().start_interfaces_watchdog(
[interfaces.library, interfaces.agent] + [container.interface for container in self.buddies]
[interfaces.library, interfaces.agent, interfaces.open_telemetry]
+ [container.interface for container in self.buddies]
)

def _set_weblog_domain(self):
Expand Down Expand Up @@ -506,6 +512,9 @@ def _wait_for_app_readiness(self):
raise ValueError("Datadog agent not ready")
logger.debug("Agent ready")

# Explicitly do not wait for the open telemetry interface to be ready.
# It's possible it is only invoked during the test run, and not during the warmup.

def post_setup(self, session: pytest.Session):
# if no test are run, skip interface tomeouts
is_empty_test_run = session.config.option.skip_empty_scenario and len(session.items) == 0
Expand All @@ -532,6 +541,9 @@ def _wait_and_stop_containers(self, *, force_interface_timout_to_zero: bool):
interfaces.agent.load_data_from_logs()
interfaces.agent.check_deserialization_errors()

interfaces.open_telemetry.load_data_from_logs()
interfaces.open_telemetry.check_deserialization_errors()

interfaces.backend.load_data_from_logs()

else:
Expand Down Expand Up @@ -570,6 +582,11 @@ def _wait_and_stop_containers(self, *, force_interface_timout_to_zero: bool):
self._wait_interface(
interfaces.backend, 0 if force_interface_timout_to_zero else self.backend_interface_timeout
)
self._wait_interface(
interfaces.open_telemetry,
0 if force_interface_timout_to_zero else self.open_telemetry_interface_timeout,
)
interfaces.open_telemetry.check_deserialization_errors()

def _wait_interface(self, interface: ProxyBasedInterfaceValidator, timeout: int):
logger.terminal.write_sep("-", f"Wait for {interface} ({timeout}s)")
Expand Down
Loading
Loading