Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ddtrace/_trace/trace_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -872,10 +872,12 @@ def _on_azure_functions_service_bus_trigger_span_modifier(
span = ctx.span
_set_azure_function_tags(span, azure_functions_config, function_name, trigger, span_kind)
span.set_tag_str(MESSAGING_DESTINATION_NAME, entity_name)
span.set_tag_str(MESSAGING_MESSAGE_ID, message_id)
span.set_tag_str(MESSAGING_OPERATION, "receive")
span.set_tag_str(MESSAGING_SYSTEM, azure_servicebusx.SERVICE)

if message_id is not None:
span.set_tag_str(MESSAGING_MESSAGE_ID, message_id)


def _on_azure_servicebus_send_message_modifier(ctx, azure_servicebus_config, entity_name, fully_qualified_namespace):
span = ctx.span
Expand Down
35 changes: 23 additions & 12 deletions ddtrace/contrib/internal/azure_functions/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from ddtrace.trace import Pin

from .utils import create_context
from .utils import message_list_has_single_context
from .utils import wrap_function_with_tracing


Expand Down Expand Up @@ -97,24 +98,34 @@ def _wrap_service_bus_trigger(pin, func, function_name, trigger_arg_name, trigge
def context_factory(kwargs):
resource_name = f"{trigger_type} {function_name}"
msg = kwargs.get(trigger_arg_name)
return create_context(
"azure.functions.patched_service_bus", pin, resource_name, headers=msg.application_properties
)

# Reparent trace if single message or list of messages all with same context
if isinstance(msg, azure_functions.ServiceBusMessage):
application_properties = msg.application_properties
elif (
isinstance(msg, list)
and msg
and isinstance(msg[0], azure_functions.ServiceBusMessage)
and message_list_has_single_context(msg)
):
application_properties = msg[0].application_properties
else:
application_properties = None

return create_context("azure.functions.patched_service_bus", pin, resource_name, headers=application_properties)

def pre_dispatch(ctx, kwargs):
msg = kwargs.get(trigger_arg_name)

if isinstance(msg, azure_functions.ServiceBusMessage):
message_id = msg.message_id
else:
message_id = None

entity_name = trigger_details.get("topicName") or trigger_details.get("queueName")
return (
"azure.functions.service_bus_trigger_modifier",
(
ctx,
config.azure_functions,
function_name,
trigger_type,
SpanKind.CONSUMER,
entity_name,
msg.message_id,
),
(ctx, config.azure_functions, function_name, trigger_type, SpanKind.CONSUMER, entity_name, message_id),
)

return wrap_function_with_tracing(func, context_factory, pre_dispatch=pre_dispatch)
Expand Down
14 changes: 14 additions & 0 deletions ddtrace/contrib/internal/azure_functions/utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import functools
import inspect
from typing import List

import azure.functions as azure_functions

from ddtrace import config
from ddtrace.contrib.internal.trace_utils import int_service
from ddtrace.ext import SpanTypes
from ddtrace.internal import core
from ddtrace.internal.schema import schematize_cloud_faas_operation
from ddtrace.propagation.http import HTTPPropagator


def create_context(context_name, pin, resource=None, headers=None):
Expand Down Expand Up @@ -59,3 +63,13 @@ def wrapper(*args, **kwargs):
core.dispatch(*post_dispatch(ctx, res))

return wrapper


def message_list_has_single_context(msg_list: List[azure_functions.ServiceBusMessage]):
first_context = HTTPPropagator.extract(msg_list[0].application_properties)
for message in msg_list[1:]:
context = HTTPPropagator.extract(message.application_properties)
if first_context != context:
return False

return True
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
fixes:
- |
azure_functions: This fix resolves an issue where a function that consumes a list of service bus messages throws an exception when instrumented.
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,39 @@ def http_post_root_servicebus(req: func.HttpRequest) -> func.HttpResponse:
conn_str=os.getenv("CONNECTION_STRING", "")
) as servicebus_client:
with servicebus_client.get_queue_sender(queue_name="queue.1") as queue_sender:
queue_sender.send_messages(azure_servicebus.ServiceBusMessage("test message"))
queue_sender.send_messages(azure_servicebus.ServiceBusMessage('{"body":"test message"}'))
with servicebus_client.get_topic_sender(topic_name="topic.1") as topic_sender:
topic_sender.send_messages(azure_servicebus.ServiceBusMessage("test message"))
topic_sender.send_messages([azure_servicebus.ServiceBusMessage('{"body":"test message"}')])
return func.HttpResponse("Hello Datadog!")


@app.route(
route="httppostrootservicebusmanysamecontext", auth_level=func.AuthLevel.ANONYMOUS, methods=[func.HttpMethod.POST]
)
def http_post_root_servicebus_many_same_context(req: func.HttpRequest) -> func.HttpResponse:
with azure_servicebus.ServiceBusClient.from_connection_string(
conn_str=os.getenv("CONNECTION_STRING", "")
) as servicebus_client:
with servicebus_client.get_topic_sender(topic_name="topic.1") as topic_sender:
topic_sender.send_messages(
[
azure_servicebus.ServiceBusMessage('{"body":"test message 1"}'),
azure_servicebus.ServiceBusMessage('{"body":"test message 2"}'),
]
)
return func.HttpResponse("Hello Datadog!")


@app.route(
route="httppostrootservicebusmanydiffcontext", auth_level=func.AuthLevel.ANONYMOUS, methods=[func.HttpMethod.POST]
)
def http_post_root_servicebus_many_diff_context(req: func.HttpRequest) -> func.HttpResponse:
with azure_servicebus.ServiceBusClient.from_connection_string(
conn_str=os.getenv("CONNECTION_STRING", "")
) as servicebus_client:
with servicebus_client.get_topic_sender(topic_name="topic.1") as topic_sender:
topic_sender.send_messages([azure_servicebus.ServiceBusMessage('{"body":"test message 1"}')])
topic_sender.send_messages([azure_servicebus.ServiceBusMessage('{"body":"test message 2"}')])
return func.HttpResponse("Hello Datadog!")


Expand All @@ -98,7 +128,11 @@ def service_bus_queue(msg: func.ServiceBusMessage):

@app.function_name(name="servicebustopic")
@app.service_bus_topic_trigger(
arg_name="msg", topic_name="topic.1", connection="CONNECTION_STRING", subscription_name="subscription.3"
arg_name="msg",
topic_name="topic.1",
connection="CONNECTION_STRING",
subscription_name="subscription.3",
cardinality=os.getenv("CARDINALITY", "one"),
)
def service_bus_topic(msg: func.ServiceBusMessage):
pass
Expand Down
32 changes: 32 additions & 0 deletions tests/contrib/azure_functions/test_azure_functions_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
from tests.webclient import Client


CARDINALITY_MANY_PARAMS = {
"CARDINALITY": "many",
}

DEFAULT_HEADERS = {
"User-Agent": "python-httpx/x.xx.x",
}
Expand Down Expand Up @@ -138,6 +142,34 @@ def test_service_bus_distributed_tracing(azure_functions_client: Client) -> None
assert azure_functions_client.post("/api/httppostrootservicebus", headers=DEFAULT_HEADERS).status_code == 200


@pytest.mark.parametrize(
"azure_functions_client",
[CARDINALITY_MANY_PARAMS],
ids=["many"],
indirect=True,
)
@pytest.mark.snapshot()
def test_service_bus_consume_same_context(azure_functions_client: Client) -> None:
assert (
azure_functions_client.post("/api/httppostrootservicebusmanysamecontext", headers=DEFAULT_HEADERS).status_code
== 200
)


@pytest.mark.parametrize(
"azure_functions_client",
[CARDINALITY_MANY_PARAMS],
ids=["many"],
indirect=True,
)
@pytest.mark.snapshot()
def test_service_bus_consume_diff_context(azure_functions_client: Client) -> None:
assert (
azure_functions_client.post("/api/httppostrootservicebusmanydiffcontext", headers=DEFAULT_HEADERS).status_code
== 200
)


@pytest.mark.snapshot
def test_timer(azure_functions_client: Client) -> None:
assert (
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
[[
{
"name": "azure.functions.invoke",
"service": "test-func",
"resource": "POST /api/httppostrootservicebusmanydiffcontext",
"trace_id": 0,
"span_id": 1,
"parent_id": 0,
"type": "serverless",
"meta": {
"_dd.p.dm": "-0",
"_dd.p.tid": "687ab41600000000",
"aas.function.name": "http_post_root_servicebus_many_diff_context",
"aas.function.trigger": "Http",
"component": "azure_functions",
"http.method": "POST",
"http.route": "/api/httppostrootservicebusmanydiffcontext",
"http.status_code": "200",
"http.url": "http://0.0.0.0:7071/api/httppostrootservicebusmanydiffcontext",
"http.useragent": "python-httpx/x.xx.x",
"language": "python",
"runtime-id": "582a094714874ab1ac856c53c66d13ab",
"span.kind": "server"
},
"metrics": {
"_dd.top_level": 1,
"_dd.tracer_kr": 1.0,
"_sampling_priority_v1": 1,
"process_id": 31098
},
"duration": 1029446917,
"start": 1752871958551887096
},
{
"name": "azure.servicebus.send",
"service": "azure_servicebus",
"resource": "topic.1",
"trace_id": 0,
"span_id": 2,
"parent_id": 1,
"type": "worker",
"meta": {
"_dd.base_service": "test-func",
"component": "azure_servicebus",
"messaging.destination.name": "topic.1",
"messaging.operation": "send",
"messaging.system": "servicebus",
"network.destination.name": "localhost",
"span.kind": "producer"
},
"metrics": {
"_dd.top_level": 1
},
"duration": 10415875,
"start": 1752871959065671596
},
{
"name": "azure.servicebus.send",
"service": "azure_servicebus",
"resource": "topic.1",
"trace_id": 0,
"span_id": 3,
"parent_id": 1,
"type": "worker",
"meta": {
"_dd.base_service": "test-func",
"_dd.p.tid": "687ab41600000000",
"component": "azure_servicebus",
"messaging.destination.name": "topic.1",
"messaging.operation": "send",
"messaging.system": "servicebus",
"network.destination.name": "localhost",
"span.kind": "producer"
},
"metrics": {
"_dd.top_level": 1
},
"duration": 6302958,
"start": 1752871959077427680
}],
[
{
"name": "azure.functions.invoke",
"service": "test-func",
"resource": "ServiceBus servicebustopic",
"trace_id": 1,
"span_id": 1,
"parent_id": 0,
"type": "serverless",
"meta": {
"_dd.p.dm": "-0",
"_dd.p.tid": "687ab41700000000",
"aas.function.name": "servicebustopic",
"aas.function.trigger": "ServiceBus",
"component": "azure_functions",
"language": "python",
"messaging.destination.name": "topic.1",
"messaging.operation": "receive",
"messaging.system": "servicebus",
"runtime-id": "582a094714874ab1ac856c53c66d13ab",
"span.kind": "consumer"
},
"metrics": {
"_dd.top_level": 1,
"_dd.tracer_kr": 1.0,
"_sampling_priority_v1": 1,
"process_id": 31098
},
"duration": 58792,
"start": 1752871959272796763
}]]
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
[[
{
"name": "azure.functions.invoke",
"service": "test-func",
"resource": "POST /api/httppostrootservicebusmanysamecontext",
"trace_id": 0,
"span_id": 1,
"parent_id": 0,
"type": "serverless",
"meta": {
"_dd.p.dm": "-0",
"_dd.p.tid": "687ab37a00000000",
"aas.function.name": "http_post_root_servicebus_many_same_context",
"aas.function.trigger": "Http",
"component": "azure_functions",
"http.method": "POST",
"http.route": "/api/httppostrootservicebusmanysamecontext",
"http.status_code": "200",
"http.url": "http://0.0.0.0:7071/api/httppostrootservicebusmanysamecontext",
"http.useragent": "python-httpx/x.xx.x",
"language": "python",
"runtime-id": "7719a316c6f64aecb8ea198a9a951639",
"span.kind": "server"
},
"metrics": {
"_dd.top_level": 1,
"_dd.tracer_kr": 1.0,
"_sampling_priority_v1": 1,
"process_id": 29901
},
"duration": 1027642209,
"start": 1752871802527607218
},
{
"name": "azure.servicebus.send",
"service": "azure_servicebus",
"resource": "topic.1",
"trace_id": 0,
"span_id": 2,
"parent_id": 1,
"type": "worker",
"meta": {
"_dd.base_service": "test-func",
"component": "azure_servicebus",
"messaging.destination.name": "topic.1",
"messaging.operation": "send",
"messaging.system": "servicebus",
"network.destination.name": "localhost",
"span.kind": "producer"
},
"metrics": {
"_dd.top_level": 1
},
"duration": 10803292,
"start": 1752871803035560635
},
{
"name": "azure.functions.invoke",
"service": "test-func",
"resource": "ServiceBus servicebustopic",
"trace_id": 0,
"span_id": 3,
"parent_id": 2,
"type": "serverless",
"meta": {
"_dd.p.tid": "687ab37a00000000",
"aas.function.name": "servicebustopic",
"aas.function.trigger": "ServiceBus",
"component": "azure_functions",
"messaging.destination.name": "topic.1",
"messaging.operation": "receive",
"messaging.system": "servicebus",
"runtime-id": "7719a316c6f64aecb8ea198a9a951639",
"span.kind": "consumer"
},
"metrics": {
"_dd.top_level": 1,
"process_id": 29901
},
"duration": 71209,
"start": 1752871803228843135
}]]
Loading