Skip to content

Commit 7a8a3aa

Browse files
authored
Added changes for rate limited sampler (azure-exporter changes) (#41954)
* Added changes for rate limited sampler azure-exporter * Added CHANGELOG entry * Fixed spell check errors * Fixed spell check in tests * CHANGELOG updated * Fixed if-else block * Revert "Fixed if-else block" This reverts commit 3f8e58e. * Refactored if-else blocks to ternary operators in rate-limited sampler * Addressed feedback * Fixed djb2 arguments * Added comments to test file * Update processor file to match main * Fixed linting errors * Updated logic for when sampling percentage is 100% * Updated DJB2 arguments * Fixed linting errors * Moved parent_context check to top of the function
1 parent 8ee6148 commit 7a8a3aa

File tree

6 files changed

+743
-1
lines changed

6 files changed

+743
-1
lines changed

sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@
2222
([#41950](https://github.com/Azure/azure-sdk-for-python/pull/41950))
2323
- Customer Facing Statsbeat: Added logic for retry item count
2424
([#41971](https://github.com/Azure/azure-sdk-for-python/pull/41971))
25-
25+
- Added RateLimited Sampler
26+
([#41954](https://github.com/Azure/azure-sdk-for-python/pull/41954))
2627

2728
- Support AI Foundry by Handling GEN_AI_SYSTEM Attributes with [Spec](https://github.com/aep-health-and-standards/Telemetry-Collection-Spec/blob/main/ApplicationInsights/genai_semconv_mapping.md) ([#41705](https://github.com/Azure/azure-sdk-for-python/pull/41705))
2829

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@
88
from azure.monitor.opentelemetry.exporter.export.metrics._exporter import AzureMonitorMetricExporter
99
from azure.monitor.opentelemetry.exporter.export.trace._exporter import AzureMonitorTraceExporter
1010
from azure.monitor.opentelemetry.exporter.export.trace._sampling import ApplicationInsightsSampler
11+
from azure.monitor.opentelemetry.exporter.export.trace._rate_limited_sampling import RateLimitedSampler
1112
from ._version import VERSION
1213

1314
__all__ = [
1415
"ApplicationInsightsSampler",
16+
"RateLimitedSampler",
1517
"AzureMonitorMetricExporter",
1618
"AzureMonitorLogExporter",
1719
"AzureMonitorTraceExporter",

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_constants.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,11 @@
99
HTTP_CLIENT_REQUEST_DURATION,
1010
HTTP_SERVER_REQUEST_DURATION,
1111
)
12+
# pylint:disable=no-name-in-module
13+
from fixedint import Int32
1214
from azure.core import CaseInsensitiveEnumMeta
1315

16+
1417
# Environment variables
1518

1619
_APPLICATIONINSIGHTS_STATSBEAT_DISABLED_ALL = "APPLICATIONINSIGHTS_STATSBEAT_DISABLED_ALL"
@@ -294,6 +297,9 @@ class _RP_Names(Enum):
294297
# sampleRate
295298

296299
_SAMPLE_RATE_KEY = "_MS.sampleRate"
300+
_SAMPLING_HASH = 5381
301+
_INTEGER_MAX: int = Int32.maxval
302+
_INTEGER_MIN: int = Int32.minval
297303

298304
# AAD Auth
299305

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
# Licensed under the MIT License.
3+
4+
import math
5+
import threading
6+
import time
7+
from typing import Optional, Sequence
8+
from opentelemetry.context import Context
9+
from opentelemetry.trace import Link, SpanKind, format_trace_id
10+
from opentelemetry.sdk.trace.sampling import (
11+
Decision,
12+
Sampler,
13+
SamplingResult,
14+
_get_parent_trace_state,
15+
)
16+
from opentelemetry.trace.span import TraceState
17+
from opentelemetry.util.types import Attributes
18+
19+
from azure.monitor.opentelemetry.exporter._constants import _SAMPLE_RATE_KEY
20+
21+
from azure.monitor.opentelemetry.exporter.export.trace._utils import (
22+
_get_DJB2_sample_score,
23+
_round_down_to_nearest,
24+
parent_context_sampling,
25+
)
26+
27+
class _State:
28+
def __init__(self, effective_window_count: float, effective_window_nanoseconds: float, last_nano_time: int):
29+
self.effective_window_count = effective_window_count
30+
self.effective_window_nanoseconds = effective_window_nanoseconds
31+
self.last_nano_time = last_nano_time
32+
33+
class RateLimitedSamplingPercentage:
34+
def __init__(self, target_spans_per_second_limit: float, round_to_nearest: bool = True):
35+
if target_spans_per_second_limit < 0.0:
36+
raise ValueError("Limit for sampled spans per second must be nonnegative!")
37+
# Hardcoded adaptation time of 0.1 seconds for adjusting to sudden changes in telemetry volumes
38+
adaptation_time_seconds = 0.1
39+
self._inverse_adaptation_time_nanoseconds = 1e-9 / adaptation_time_seconds
40+
self._target_spans_per_nanosecond_limit = 1e-9 * target_spans_per_second_limit
41+
initial_nano_time = int(time.time_ns())
42+
self._state = _State(0.0, 0.0, initial_nano_time)
43+
self._lock = threading.Lock()
44+
self._round_to_nearest = round_to_nearest
45+
46+
def _update_state(self, old_state: _State, current_nano_time: int) -> _State:
47+
if current_nano_time <= old_state.last_nano_time:
48+
return _State(
49+
old_state.effective_window_count + 1,
50+
old_state.effective_window_nanoseconds,
51+
old_state.last_nano_time
52+
)
53+
nano_time_delta = current_nano_time - old_state.last_nano_time
54+
decay_factor = math.exp(-nano_time_delta * self._inverse_adaptation_time_nanoseconds)
55+
current_effective_window_count = old_state.effective_window_count * decay_factor + 1
56+
current_effective_window_nanoseconds = old_state.effective_window_nanoseconds * decay_factor + nano_time_delta
57+
58+
return _State(current_effective_window_count, current_effective_window_nanoseconds, current_nano_time)
59+
60+
def get(self) -> float:
61+
current_nano_time = int(time.time_ns())
62+
63+
with self._lock:
64+
old_state = self._state
65+
self._state = self._update_state(old_state, current_nano_time)
66+
current_state = self._state
67+
68+
# Calculate sampling probability based on current state
69+
if current_state.effective_window_count == 0:
70+
return 100.0
71+
72+
sampling_probability = (
73+
(current_state.effective_window_nanoseconds * self._target_spans_per_nanosecond_limit) /
74+
current_state.effective_window_count
75+
)
76+
77+
sampling_percentage = 100 * min(sampling_probability, 1.0)
78+
79+
if self._round_to_nearest:
80+
sampling_percentage = _round_down_to_nearest(sampling_percentage)
81+
82+
return sampling_percentage
83+
84+
85+
class RateLimitedSampler(Sampler):
86+
def __init__(self, target_spans_per_second_limit: float):
87+
self._sampling_percentage_generator = RateLimitedSamplingPercentage(target_spans_per_second_limit)
88+
self._description = f"RateLimitedSampler{{{target_spans_per_second_limit}}}"
89+
90+
def should_sample(
91+
self,
92+
parent_context: Optional[Context],
93+
trace_id: int,
94+
name: str,
95+
kind: Optional[SpanKind] = None,
96+
attributes: Attributes = None,
97+
links: Optional[Sequence["Link"]] = None,
98+
trace_state: Optional["TraceState"] = None,
99+
) -> "SamplingResult":
100+
101+
if parent_context is not None:
102+
parent_result = parent_context_sampling(parent_context, attributes)
103+
if parent_result is not None:
104+
return parent_result
105+
106+
sampling_percentage = self._sampling_percentage_generator.get()
107+
sampling_score = _get_DJB2_sample_score(format_trace_id(trace_id).lower()) * 100.0
108+
109+
if sampling_score < sampling_percentage:
110+
decision = Decision.RECORD_AND_SAMPLE
111+
else:
112+
decision = Decision.DROP
113+
114+
if sampling_percentage == 100.0:
115+
new_attributes = {}
116+
else:
117+
new_attributes = {} if attributes is None else dict(attributes)
118+
new_attributes[_SAMPLE_RATE_KEY] = sampling_percentage
119+
120+
return SamplingResult(
121+
decision,
122+
new_attributes,
123+
_get_parent_trace_state(parent_context),
124+
)
125+
126+
def get_description(self) -> str:
127+
return self._description

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_utils.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,35 @@
33

44
from typing import no_type_check, Optional, Tuple
55
from urllib.parse import urlparse
6+
import math
67

78
from opentelemetry.semconv.attributes import (
89
client_attributes,
910
server_attributes,
1011
url_attributes,
1112
user_agent_attributes,
1213
)
14+
from opentelemetry.context import Context
15+
from opentelemetry.trace import get_current_span
16+
from opentelemetry.sdk.trace.sampling import (
17+
Decision,
18+
SamplingResult,
19+
_get_parent_trace_state,
20+
)
1321
from opentelemetry.semconv.trace import DbSystemValues, SpanAttributes
1422
from opentelemetry.util.types import Attributes
1523

24+
# pylint:disable=no-name-in-module
25+
from fixedint import Int32
26+
27+
from azure.monitor.opentelemetry.exporter._constants import _SAMPLE_RATE_KEY
28+
29+
from azure.monitor.opentelemetry.exporter._constants import (
30+
_SAMPLING_HASH,
31+
_INTEGER_MAX,
32+
_INTEGER_MIN,
33+
)
34+
1635

1736
# pylint:disable=too-many-return-statements
1837
def _get_default_port_db(db_system: str) -> int:
@@ -320,3 +339,65 @@ def _get_url_for_http_request(attributes: Attributes) -> Optional[str]:
320339
http_target,
321340
)
322341
return url
342+
343+
def _get_DJB2_sample_score(trace_id_hex: str) -> float:
344+
# This algorithm uses 32bit integers
345+
hash_value = Int32(_SAMPLING_HASH)
346+
for char in trace_id_hex:
347+
hash_value = ((hash_value << 5) + hash_value) + ord(char)
348+
349+
if hash_value == _INTEGER_MIN:
350+
hash_value = int(_INTEGER_MAX)
351+
else:
352+
hash_value = abs(hash_value)
353+
354+
# divide by _INTEGER_MAX for value between 0 and 1 for sampling score
355+
return float(hash_value) / _INTEGER_MAX
356+
357+
def _round_down_to_nearest(sampling_percentage: float) -> float:
358+
if sampling_percentage == 0:
359+
return 0
360+
# Handle extremely small percentages that would cause overflow
361+
if sampling_percentage <= _INTEGER_MIN: # Extremely small threshold
362+
return 0.0
363+
item_count = 100.0 / sampling_percentage
364+
# Handle case where item_count is infinity or too large for math.ceil
365+
if not math.isfinite(item_count) or item_count >= _INTEGER_MAX:
366+
return 0.0
367+
return 100.0 / math.ceil(item_count)
368+
369+
def parent_context_sampling(
370+
parent_context: Optional[Context],
371+
attributes: Attributes = None
372+
) -> Optional["SamplingResult"]:
373+
374+
if parent_context is not None:
375+
parent_span = get_current_span(parent_context)
376+
parent_span_context = parent_span.get_span_context()
377+
if parent_span_context.is_valid and not parent_span_context.is_remote:
378+
if not parent_span.is_recording():
379+
# Parent was dropped, drop this child too
380+
new_attributes = {} if attributes is None else dict(attributes)
381+
new_attributes[_SAMPLE_RATE_KEY] = 0.0
382+
383+
return SamplingResult(
384+
Decision.DROP,
385+
new_attributes,
386+
_get_parent_trace_state(parent_context),
387+
)
388+
389+
parent_attributes = getattr(parent_span, 'attributes', {})
390+
parent_sample_rate = parent_attributes.get(_SAMPLE_RATE_KEY)
391+
392+
if parent_sample_rate is not None:
393+
# Honor parent's sampling rate
394+
new_attributes = {} if attributes is None else dict(attributes)
395+
new_attributes[_SAMPLE_RATE_KEY] = parent_sample_rate
396+
397+
return SamplingResult(
398+
Decision.RECORD_AND_SAMPLE,
399+
new_attributes,
400+
_get_parent_trace_state(parent_context),
401+
)
402+
return None
403+
return None

0 commit comments

Comments
 (0)