From e76d88887a17e792447a97195cee2fe45e4b454d Mon Sep 17 00:00:00 2001 From: Artur Gajowniczek Date: Mon, 22 Sep 2025 17:05:24 +0000 Subject: [PATCH] Update google genai instrumentation to latest semantic convention. Co-authored-by: Aaron Abbott --- .../instrumentation/google_genai/flags.py | 23 +- .../google_genai/generate_content.py | 287 ++++++++++++++---- .../instrumentation/google_genai/message.py | 141 +++++++++ .../google_genai/message_models.py | 58 ++++ .../google_genai/otel_wrapper.py | 36 ++- .../google_genai/tool_call_wrapper.py | 58 ++-- 6 files changed, 500 insertions(+), 103 deletions(-) create mode 100644 instrumentation-genai/opentelemetry-instrumentation-google-genai/src/opentelemetry/instrumentation/google_genai/message.py create mode 100644 instrumentation-genai/opentelemetry-instrumentation-google-genai/src/opentelemetry/instrumentation/google_genai/message_models.py diff --git a/instrumentation-genai/opentelemetry-instrumentation-google-genai/src/opentelemetry/instrumentation/google_genai/flags.py b/instrumentation-genai/opentelemetry-instrumentation-google-genai/src/opentelemetry/instrumentation/google_genai/flags.py index 541d9ab48f..6fd404eadf 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-google-genai/src/opentelemetry/instrumentation/google_genai/flags.py +++ b/instrumentation-genai/opentelemetry-instrumentation-google-genai/src/opentelemetry/instrumentation/google_genai/flags.py @@ -12,12 +12,25 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os +from os import environ +from typing import Union -_CONTENT_RECORDING_ENV_VAR = ( - "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT" +from opentelemetry.instrumentation._semconv import _StabilityMode +from opentelemetry.util.genai.environment_variables import ( + OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT, ) +from opentelemetry.util.genai.types import ContentCapturingMode +from opentelemetry.util.genai.utils import get_content_capturing_mode -def is_content_recording_enabled(): - return os.getenv(_CONTENT_RECORDING_ENV_VAR, "false").lower() == "true" +def is_content_recording_enabled( + mode: _StabilityMode, +) -> Union[bool, ContentCapturingMode]: + if mode == _StabilityMode.DEFAULT: + capture_content = environ.get( + OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT, "false" + ) + return capture_content.lower() == "true" + if mode == _StabilityMode.GEN_AI_LATEST_EXPERIMENTAL: + return get_content_capturing_mode() + raise RuntimeError(f"{mode} mode not supported") diff --git a/instrumentation-genai/opentelemetry-instrumentation-google-genai/src/opentelemetry/instrumentation/google_genai/generate_content.py b/instrumentation-genai/opentelemetry-instrumentation-google-genai/src/opentelemetry/instrumentation/google_genai/generate_content.py index 7e85336e56..cf8bc7da67 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-google-genai/src/opentelemetry/instrumentation/google_genai/generate_content.py +++ b/instrumentation-genai/opentelemetry-instrumentation-google-genai/src/opentelemetry/instrumentation/google_genai/generate_content.py @@ -13,6 +13,7 @@ # limitations under the License. import copy +import dataclasses import functools import json import logging @@ -21,6 +22,7 @@ from typing import Any, AsyncIterator, Awaitable, Iterator, Optional, Union from google.genai.models import AsyncModels, Models +from google.genai.models import t as transformers from google.genai.types import ( BlockedReason, Candidate, @@ -33,18 +35,33 @@ GenerateContentConfigOrDict, GenerateContentResponse, ) - from opentelemetry import trace +from opentelemetry._events import Event +from opentelemetry.instrumentation._semconv import ( + _OpenTelemetrySemanticConventionStability, + _OpenTelemetryStabilitySignalType, + _StabilityMode, +) from opentelemetry.semconv._incubating.attributes import ( code_attributes, gen_ai_attributes, ) from opentelemetry.semconv.attributes import error_attributes +from opentelemetry.trace.span import Span +from opentelemetry.util.genai.types import ContentCapturingMode +from opentelemetry.util.genai.upload_hook import load_upload_hook from .allowlist_util import AllowList from .custom_semconv import GCP_GENAI_OPERATION_CONFIG from .dict_util import flatten_dict from .flags import is_content_recording_enabled +from .message import ( + InputMessage, + OutputMessage, + to_input_messages, + to_output_messages, + to_system_instruction, +) from .otel_wrapper import OTelWrapper from .tool_call_wrapper import wrapped as wrapped_tool @@ -144,7 +161,7 @@ def _to_dict(value: object): def _add_request_options_to_span( - span, config: Optional[GenerateContentConfigOrDict], allow_list: AllowList + span: Span, config: Optional[GenerateContentConfigOrDict], allow_list: AllowList ): if config is None: return @@ -188,9 +205,7 @@ def _add_request_options_to_span( }, ) for key, value in attributes.items(): - if key.startswith( - GCP_GENAI_OPERATION_CONFIG - ) and not allow_list.allowed(key): + if key.startswith(GCP_GENAI_OPERATION_CONFIG) and not allow_list.allowed(key): # The allowlist is used to control inclusion of the dynamic keys. continue span.set_attribute(key, value) @@ -226,12 +241,42 @@ def _wrapped_config_with_tools( if not config.tools: return config result = copy.copy(config) - result.tools = [ - wrapped_tool(tool, otel_wrapper, **kwargs) for tool in config.tools - ] + result.tools = [wrapped_tool(tool, otel_wrapper, **kwargs) for tool in config.tools] return result +def _config_to_system_instruction( + config: Union[GenerateContentConfigOrDict, None], +) -> Union[ContentUnion, None]: + if not config: + return None + + if isinstance(config, dict): + return GenerateContentConfig.model_validate(config).system_instruction + return config.system_instruction + + +def _create_completion_details_attributes( + input_messages: list[InputMessage], + output_messages: list[OutputMessage], + system_instruction: Union[InputMessage, None], +): + attributes = { + "gen_ai.input.messages": json.dumps( + [dataclasses.asdict(input_message) for input_message in input_messages] + ), + "gen_ai.output.messages": json.dumps( + [dataclasses.asdict(output_message) for output_message in output_messages] + ), + } + if system_instruction: + attributes["gen_ai.system.instructions"] = json.dumps( + dataclasses.asdict(system_instruction) + ) + + return attributes + + class _GenerateContentInstrumentationHelper: def __init__( self, @@ -248,7 +293,12 @@ def __init__( self._error_type = None self._input_tokens = 0 self._output_tokens = 0 - self._content_recording_enabled = is_content_recording_enabled() + self.sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( + _OpenTelemetryStabilitySignalType.GEN_AI + ) + self._content_recording_enabled = is_content_recording_enabled( + self.sem_conv_opt_in_mode + ) self._response_index = 0 self._candidate_index = 0 self._generate_content_config_key_allowlist = ( @@ -268,7 +318,7 @@ def wrapped_config( def start_span_as_current_span( self, model_name, function_name, end_on_exit=True - ): + ) -> Span: return self._otel_wrapper.start_as_current_span( f"{_GENERATE_CONTENT_OP_NAME} {model_name}", start_time=self._start_time, @@ -281,29 +331,37 @@ def start_span_as_current_span( end_on_exit=end_on_exit, ) - def process_request( - self, - contents: Union[ContentListUnion, ContentListUnionDict], - config: Optional[GenerateContentConfigOrDict], + def add_request_options_to_span( + self, config: Optional[GenerateContentConfigOrDict] ): span = trace.get_current_span() _add_request_options_to_span( span, config, self._generate_content_config_key_allowlist ) + + def process_request( + self, + contents: Union[ContentListUnion, ContentListUnionDict], + config: Optional[GenerateContentConfigOrDict], + ): self._maybe_log_system_instruction(config=config) self._maybe_log_user_prompt(contents) def process_response(self, response: GenerateContentResponse): - # TODO: Determine if there are other response properties that - # need to be reflected back into the span attributes. - # - # See also: TODOS.md. - self._update_finish_reasons(response) - self._maybe_update_token_counts(response) - self._maybe_update_error_type(response) + self._update_response(response) self._maybe_log_response(response) self._response_index += 1 + def process_completion( + self, + request: Union[ContentListUnion, ContentListUnionDict], + response: GenerateContentResponse, + config: Optional[GenerateContentConfigOrDict] = None, + ): + self._update_response(response) + self._maybe_log_completion_details(request, response, config) + self._response_index += 1 + def process_error(self, e: Exception): self._error_type = str(e.__class__.__name__) @@ -322,7 +380,16 @@ def finalize_processing(self): self._record_token_usage_metric() self._record_duration_metric() - def _update_finish_reasons(self, response): + def _update_response(self, response: GenerateContentResponse): + # TODO: Determine if there are other response properties that + # need to be reflected back into the span attributes. + # + # See also: TODOS.md. + self._update_finish_reasons(response) + self._maybe_update_token_counts(response) + self._maybe_update_error_type(response) + + def _update_finish_reasons(self, response: GenerateContentResponse): if not response.candidates: return for candidate in response.candidates: @@ -373,6 +440,56 @@ def _maybe_update_error_type(self, response: GenerateContentResponse): block_reason = response.prompt_feedback.block_reason.name.upper() self._error_type = f"BLOCKED_{block_reason}" + def _maybe_log_completion_details( + self, + request: Union[ContentListUnion, ContentListUnionDict], + response: GenerateContentResponse, + config: Optional[GenerateContentConfigOrDict] = None, + ): + attributes = { + gen_ai_attributes.GEN_AI_SYSTEM: self._genai_system, + } + system_instruction = None + if system_content := _config_to_system_instruction(config): + system_instruction = to_system_instruction( + content=transformers.t_contents(system_content)[0] + ) + input_messages = to_input_messages(contents=transformers.t_contents(request)) + output_messages = to_output_messages(candidates=response.candidates or []) + + completion_details_attributes = _create_completion_details_attributes( + input_messages, output_messages, system_instruction + ) + + span = None + if self._content_recording_enabled in [ + ContentCapturingMode.SPAN_ONLY, + ContentCapturingMode.SPAN_AND_EVENT, + ]: + span = trace.get_current_span() + span.set_attributes(completion_details_attributes) + if self._content_recording_enabled in [ + ContentCapturingMode.EVENT_ONLY, + ContentCapturingMode.SPAN_AND_EVENT, + ]: + attributes.update(completion_details_attributes) + event = Event(name="gen_ai.completion.details", attributes=attributes) + hook = load_upload_hook() + hook.upload( + inputs=input_messages, + outputs=output_messages, + system_instruction=( + system_instruction.parts if system_instruction else [] + ), + span=span, + log_record=event, + ) + # TODO Cannot access attribute shutdown for class UploadHook + # hook.shutdown() + self._otel_wrapper.log_completion_details( + event=event, + ) + def _maybe_log_system_instruction( self, config: Optional[GenerateContentConfigOrDict] = None ): @@ -410,9 +527,7 @@ def _maybe_log_user_prompt( total = len(contents) index = 0 for entry in contents: - self._maybe_log_single_user_prompt( - entry, index=index, total=total - ) + self._maybe_log_single_user_prompt(entry, index=index, total=total) index += 1 else: self._maybe_log_single_user_prompt(contents) @@ -445,32 +560,6 @@ def _maybe_log_single_user_prompt( body=body, ) - def _maybe_log_response_stats(self, response: GenerateContentResponse): - # TODO: Determine if there is a way that we can log a summary - # of the overall response in a manner that is aligned with - # Semantic Conventions. For example, it would be natural - # to report an event that looks something like: - # - # gen_ai.response.stats { - # response_index: 0, - # candidate_count: 3, - # parts_per_candidate: [ - # 3, - # 1, - # 5 - # ] - # } - # - pass - - def _maybe_log_response_safety_ratings( - self, response: GenerateContentResponse - ): - # TODO: Determine if there is a way that we can log - # the "prompt_feedback". This would be especially useful - # in the case where the response is blocked. - pass - def _maybe_log_response(self, response: GenerateContentResponse): self._maybe_log_response_stats(response) self._maybe_log_response_safety_ratings(response) @@ -526,6 +615,30 @@ def _maybe_log_response_candidate( body=body, ) + def _maybe_log_response_stats(self, response: GenerateContentResponse): + # TODO: Determine if there is a way that we can log a summary + # of the overall response in a manner that is aligned with + # Semantic Conventions. For example, it would be natural + # to report an event that looks something like: + # + # gen_ai.response.stats { + # response_index: 0, + # candidate_count: 3, + # parts_per_candidate: [ + # 3, + # 1, + # 5 + # ] + # } + # + pass + + def _maybe_log_response_safety_ratings(self, response: GenerateContentResponse): + # TODO: Determine if there is a way that we can log + # the "prompt_feedback". This would be especially useful + # in the case where the response is blocked. + pass + def _record_token_usage_metric(self): self._otel_wrapper.token_usage_metric.record( self._input_tokens, @@ -587,7 +700,9 @@ def instrumented_generate_content( with helper.start_span_as_current_span( model, "google.genai.Models.generate_content" ): - helper.process_request(contents, config) + helper.add_request_options_to_span(config) + if helper.sem_conv_opt_in_mode == _StabilityMode.DEFAULT: + helper.process_request(contents, config) try: response = wrapped_func( self, @@ -596,7 +711,17 @@ def instrumented_generate_content( config=helper.wrapped_config(config), **kwargs, ) - helper.process_response(response) + if helper.sem_conv_opt_in_mode == _StabilityMode.DEFAULT: + helper.process_response(response) + elif ( + helper.sem_conv_opt_in_mode + == _StabilityMode.GEN_AI_LATEST_EXPERIMENTAL + ): + helper.process_completion(contents, response, config) + else: + raise ValueError( + f"Sem Conv opt in mode {helper.sem_conv_opt_in_mode} not supported." + ) return response except Exception as error: helper.process_error(error) @@ -632,7 +757,9 @@ def instrumented_generate_content_stream( with helper.start_span_as_current_span( model, "google.genai.Models.generate_content_stream" ): - helper.process_request(contents, config) + helper.add_request_options_to_span(config) + if helper.sem_conv_opt_in_mode == _StabilityMode.DEFAULT: + helper.process_request(contents, config) try: for response in wrapped_func( self, @@ -641,7 +768,17 @@ def instrumented_generate_content_stream( config=helper.wrapped_config(config), **kwargs, ): - helper.process_response(response) + if helper.sem_conv_opt_in_mode == _StabilityMode.DEFAULT: + helper.process_response(response) + elif ( + helper.sem_conv_opt_in_mode + == _StabilityMode.GEN_AI_LATEST_EXPERIMENTAL + ): + helper.process_completion(contents, response, config) + else: + raise ValueError( + f"Sem Conv opt in mode {helper.sem_conv_opt_in_mode} not supported." + ) yield response except Exception as error: helper.process_error(error) @@ -677,7 +814,9 @@ async def instrumented_generate_content( with helper.start_span_as_current_span( model, "google.genai.AsyncModels.generate_content" ): - helper.process_request(contents, config) + helper.add_request_options_to_span(config) + if helper.sem_conv_opt_in_mode == _StabilityMode.DEFAULT: + helper.process_request(contents, config) try: response = await wrapped_func( self, @@ -686,7 +825,17 @@ async def instrumented_generate_content( config=helper.wrapped_config(config), **kwargs, ) - helper.process_response(response) + if helper.sem_conv_opt_in_mode == _StabilityMode.DEFAULT: + helper.process_response(response) + elif ( + helper.sem_conv_opt_in_mode + == _StabilityMode.GEN_AI_LATEST_EXPERIMENTAL + ): + helper.process_completion(contents, response, config) + else: + raise ValueError( + f"Sem Conv opt in mode {helper.sem_conv_opt_in_mode} not supported." + ) return response except Exception as error: helper.process_error(error) @@ -725,6 +874,8 @@ async def instrumented_generate_content_stream( "google.genai.AsyncModels.generate_content_stream", end_on_exit=False, ) as span: + helper.add_request_options_to_span(config) + if helper.sem_conv_opt_in_mode == _StabilityMode.DEFAULT: helper.process_request(contents, config) try: response_async_generator = await wrapped_func( @@ -744,7 +895,17 @@ async def _response_async_generator_wrapper(): with trace.use_span(span, end_on_exit=True): try: async for response in response_async_generator: - helper.process_response(response) + if helper.sem_conv_opt_in_mode == _StabilityMode.DEFAULT: + helper.process_response(response) + elif ( + helper.sem_conv_opt_in_mode + == _StabilityMode.GEN_AI_LATEST_EXPERIMENTAL + ): + helper.process_completion(contents, response, config) + else: + raise ValueError( + f"Sem Conv opt in mode {helper.sem_conv_opt_in_mode} not supported." + ) yield response except Exception as error: helper.process_error(error) @@ -782,9 +943,11 @@ def instrument_generate_content( otel_wrapper, generate_content_config_key_allowlist=generate_content_config_key_allowlist, ) - AsyncModels.generate_content_stream = _create_instrumented_async_generate_content_stream( - snapshot, - otel_wrapper, - generate_content_config_key_allowlist=generate_content_config_key_allowlist, + AsyncModels.generate_content_stream = ( + _create_instrumented_async_generate_content_stream( + snapshot, + otel_wrapper, + generate_content_config_key_allowlist=generate_content_config_key_allowlist, + ) ) return snapshot diff --git a/instrumentation-genai/opentelemetry-instrumentation-google-genai/src/opentelemetry/instrumentation/google_genai/message.py b/instrumentation-genai/opentelemetry-instrumentation-google-genai/src/opentelemetry/instrumentation/google_genai/message.py new file mode 100644 index 0000000000..eaf470892e --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-google-genai/src/opentelemetry/instrumentation/google_genai/message.py @@ -0,0 +1,141 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import logging + +from google.genai import types as genai_types +from opentelemetry.util.genai.types import ( + InputMessage, + OutputMessage, + MessagePart, + FinishReason, + Text, + ToolCall, + ToolCallResponse, +) + +from .message_models import ( + # BlobPart, + # FileDataPart, + Role, +) + +_logger = logging.getLogger(__name__) + +def to_input_messages( + *, + contents: list[genai_types.Content], +) -> list[InputMessage]: + return [_to_input_message(content) for content in contents]) + +def to_output_messages( + *, + candidates: list[genai_types.Candidate], +) -> list[OutputMessage]: + def content_to_output_message( + candidate: genai_types.Candidate, + ) -> OutputMessage | None: + if not candidate.content: + return None + + message = _to_input_message(candidate.content) + return OutputMessage( + finish_reason=_to_finish_reason(candidate.finish_reason), + role=message.role, + parts=message.parts, + ) + + messages = ( + content_to_output_message(candidate) for candidate in candidates + ) + return [message for message in messages if message is not None] + +def to_system_instruction( + *, + content: genai_types.Content, +) -> InputMessage: + return _to_input_message(content) + +def _to_input_message( + content: genai_types.Content, +) -> InputMessage: + parts = ( + _to_part(part, idx) for idx, part in enumerate(content.parts or []) + ) + return InputMessage( + role=_to_role(content.role), + # filter Nones + parts=[part for part in parts if part is not None], + ) + +def _to_part(part: genai_types.Part, idx: int) -> MessagePart | None: + def tool_call_id(name: str | None) -> str: + if name: + return f"{name}_{idx}" + return f"{idx}" + + if (text := part.text) is not None: + return Text(content=text) + + # if data := part.inline_data: # TODO ??? + # return BlobPart(mime_type=data.mime_type or "", data=data.data or b"") + + # if data := part.file_data: # TODO ??? + # return FileDataPart( + # mime_type=data.mime_type or "", file_uri=data.file_uri or "" + # ) + + if call := part.function_call: + return ToolCall( + id=call.id or tool_call_id(call.name), # TODO ??? + name=call.name or "", + arguments=call.args, + ) + + if response := part.function_response: + return ToolCallResponse( + id=response.id or tool_call_id(response.name), # TODO ??? + response=response.response, + ) + + _logger.info("Unknown part dropped from telemetry %s", part) + return None + +def _to_role(role: str | None) -> Role | str: + if role == "user": + return Role.USER + if role == "model": + return Role.ASSISTANT + return "" + + +def _to_finish_reason( + finish_reason: genai_types.FinishReason | None, +) -> FinishReason | str: + if finish_reason is None: + return "" + if ( + finish_reason is genai_types.FinishReason.FINISH_REASON_UNSPECIFIED + or finish_reason is genai_types.FinishReason.OTHER + ): + return "error" + if finish_reason is genai_types.FinishReason.STOP: + return "stop" + if finish_reason is genai_types.FinishReason.MAX_TOKENS: + return "length" + + # If there is no 1:1 mapping to an OTel preferred enum value, use the exact vertex reason + return finish_reason.name diff --git a/instrumentation-genai/opentelemetry-instrumentation-google-genai/src/opentelemetry/instrumentation/google_genai/message_models.py b/instrumentation-genai/opentelemetry-instrumentation-google-genai/src/opentelemetry/instrumentation/google_genai/message_models.py new file mode 100644 index 0000000000..bf5b804ba7 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-google-genai/src/opentelemetry/instrumentation/google_genai/message_models.py @@ -0,0 +1,58 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Copied and adapted from +# https://gist.github.com/lmolkova/09ba0de7f68280f1eac27a6acfd9b1a6?permalink_comment_id=5578799#gistcomment-5578799 + +from enum import Enum +from typing import Annotated, Literal + +from pydantic import Base64Encoder, BaseModel, EncodedBytes + + +class Base64OneWayEncoder(Base64Encoder): + @classmethod + def decode(cls, data: bytes) -> bytes: + """NoOp""" + return data + + +Base64EncodedBytes = Annotated[ + bytes, EncodedBytes(encoder=Base64OneWayEncoder) +] + + +class Role(str, Enum): + SYSTEM = "system" + USER = "user" + ASSISTANT = "assistant" + TOOL = "tool" + + +class BlobPart(BaseModel): + type: Literal["blob"] = "blob" + mime_type: str + data: Base64EncodedBytes + + class Config: + extra = "allow" + + +class FileDataPart(BaseModel): + type: Literal["file_data"] = "file_data" + mime_type: str + file_uri: str + + class Config: + extra = "allow" diff --git a/instrumentation-genai/opentelemetry-instrumentation-google-genai/src/opentelemetry/instrumentation/google_genai/otel_wrapper.py b/instrumentation-genai/opentelemetry-instrumentation-google-genai/src/opentelemetry/instrumentation/google_genai/otel_wrapper.py index b7dbb5de41..3d6a5a41a2 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-google-genai/src/opentelemetry/instrumentation/google_genai/otel_wrapper.py +++ b/instrumentation-genai/opentelemetry-instrumentation-google-genai/src/opentelemetry/instrumentation/google_genai/otel_wrapper.py @@ -11,14 +11,17 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations import logging +from typing import Any import google.genai - -from opentelemetry._events import Event +from opentelemetry._events import Event, EventLogger, EventLoggerProvider +from opentelemetry.metrics import Meter, MeterProvider from opentelemetry.semconv._incubating.metrics import gen_ai_metrics from opentelemetry.semconv.schemas import Schemas +from opentelemetry.trace import Tracer, TracerProvider from .version import __version__ as _LIBRARY_VERSION @@ -36,19 +39,23 @@ class OTelWrapper: - def __init__(self, tracer, event_logger, meter): + def __init__(self, tracer: Tracer, event_logger: EventLogger, meter: Meter): self._tracer = tracer self._event_logger = event_logger self._meter = meter self._operation_duration_metric = ( gen_ai_metrics.create_gen_ai_client_operation_duration(meter) ) - self._token_usage_metric = ( - gen_ai_metrics.create_gen_ai_client_token_usage(meter) + self._token_usage_metric = gen_ai_metrics.create_gen_ai_client_token_usage( + meter ) @staticmethod - def from_providers(tracer_provider, event_logger_provider, meter_provider): + def from_providers( + tracer_provider: TracerProvider, + event_logger_provider: EventLoggerProvider, + meter_provider: MeterProvider, + ): return OTelWrapper( tracer_provider.get_tracer( _SCOPE_NAME, _LIBRARY_VERSION, _SCHEMA_URL, _SCOPE_ATTRIBUTES @@ -72,21 +79,30 @@ def operation_duration_metric(self): def token_usage_metric(self): return self._token_usage_metric - def log_system_prompt(self, attributes, body): + def log_system_prompt(self, attributes: dict[str, str], body: dict[str, Any]): _logger.debug("Recording system prompt.") event_name = "gen_ai.system.message" self._log_event(event_name, attributes, body) - def log_user_prompt(self, attributes, body): + def log_user_prompt(self, attributes: dict[str, str], body: dict[str, Any]): _logger.debug("Recording user prompt.") event_name = "gen_ai.user.message" self._log_event(event_name, attributes, body) - def log_response_content(self, attributes, body): + def log_response_content(self, attributes: dict[str, str], body: dict[str, Any]): _logger.debug("Recording response.") event_name = "gen_ai.choice" self._log_event(event_name, attributes, body) - def _log_event(self, event_name, attributes, body): + def _log_event( + self, event_name: str, attributes: dict[str, str], body: dict[str, Any] + ): event = Event(event_name, body=body, attributes=attributes) self._event_logger.emit(event) + + def log_completion_details( + self, + event: Event, + ) -> None: + _logger.debug("Recording completion details event.") + self._event_logger.emit(event) diff --git a/instrumentation-genai/opentelemetry-instrumentation-google-genai/src/opentelemetry/instrumentation/google_genai/tool_call_wrapper.py b/instrumentation-genai/opentelemetry-instrumentation-google-genai/src/opentelemetry/instrumentation/google_genai/tool_call_wrapper.py index 7b4cc1924a..80d83ea3ae 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-google-genai/src/opentelemetry/instrumentation/google_genai/tool_call_wrapper.py +++ b/instrumentation-genai/opentelemetry-instrumentation-google-genai/src/opentelemetry/instrumentation/google_genai/tool_call_wrapper.py @@ -27,6 +27,13 @@ from opentelemetry.semconv._incubating.attributes import ( code_attributes, ) +from opentelemetry.instrumentation._semconv import ( + _StabilityMode, + _OpenTelemetrySemanticConventionStability, + _OpenTelemetryStabilitySignalType, +) + +from opentelemetry.util.genai.types import ContentCapturingMode from .flags import is_content_recording_enabled from .otel_wrapper import OTelWrapper @@ -45,9 +52,7 @@ def _to_otel_value(python_value): if isinstance(python_value, list): return [_to_otel_value(x) for x in python_value] if isinstance(python_value, dict): - return { - key: _to_otel_value(val) for (key, val) in python_value.items() - } + return {key: _to_otel_value(val) for (key, val) in python_value.items()} if hasattr(python_value, "model_dump"): return python_value.model_dump() if hasattr(python_value, "__dict__"): @@ -76,6 +81,21 @@ def _to_otel_attribute(python_value): return json.dumps(otel_value) +def _is_capture_content_enabled() -> bool: + mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( + _OpenTelemetryStabilitySignalType.GEN_AI + ) + if mode == _StabilityMode.DEFAULT: + return bool(is_content_recording_enabled(mode)) + if mode == _StabilityMode.GEN_AI_LATEST_EXPERIMENTAL: + capturing_mode = is_content_recording_enabled(mode) + return capturing_mode in [ + ContentCapturingMode.SPAN_ONLY, + ContentCapturingMode.SPAN_AND_EVENT, + ] + raise RuntimeError(f"{mode} mode not supported") + + def _create_function_span_name(wrapped_function): """Constructs the span name for a given local function tool call.""" function_name = wrapped_function.__name__ @@ -100,9 +120,7 @@ def _create_function_span_attributes( return result -def _record_function_call_argument( - span, param_name, param_value, include_values -): +def _record_function_call_argument(span, param_name, param_value, include_values): attribute_prefix = f"code.function.parameters.{param_name}" type_attribute = f"{attribute_prefix}.type" span.set_attribute(type_attribute, type(param_value).__name__) @@ -115,7 +133,7 @@ def _record_function_call_arguments( otel_wrapper, wrapped_function, function_args, function_kwargs ): """Records the details about a function invocation as span attributes.""" - include_values = is_content_recording_enabled() + include_values = _is_capture_content_enabled() span = trace.get_current_span() signature = inspect.signature(wrapped_function) params = list(signature.parameters.values()) @@ -130,13 +148,11 @@ def _record_function_call_arguments( def _record_function_call_result(otel_wrapper, wrapped_function, result): """Records the details about a function result as span attributes.""" - include_values = is_content_recording_enabled() + include_values = _is_capture_content_enabled() span = trace.get_current_span() span.set_attribute("code.function.return.type", type(result).__name__) if include_values: - span.set_attribute( - "code.function.return.value", _to_otel_attribute(result) - ) + span.set_attribute("code.function.return.value", _to_otel_attribute(result)) def _wrap_sync_tool_function( @@ -151,12 +167,8 @@ def wrapped_function(*args, **kwargs): attributes = _create_function_span_attributes( tool_function, args, kwargs, extra_span_attributes ) - with otel_wrapper.start_as_current_span( - span_name, attributes=attributes - ): - _record_function_call_arguments( - otel_wrapper, tool_function, args, kwargs - ) + with otel_wrapper.start_as_current_span(span_name, attributes=attributes): + _record_function_call_arguments(otel_wrapper, tool_function, args, kwargs) result = tool_function(*args, **kwargs) _record_function_call_result(otel_wrapper, tool_function, result) return result @@ -176,12 +188,8 @@ async def wrapped_function(*args, **kwargs): attributes = _create_function_span_attributes( tool_function, args, kwargs, extra_span_attributes ) - with otel_wrapper.start_as_current_span( - span_name, attributes=attributes - ): - _record_function_call_arguments( - otel_wrapper, tool_function, args, kwargs - ) + with otel_wrapper.start_as_current_span(span_name, attributes=attributes): + _record_function_call_arguments(otel_wrapper, tool_function, args, kwargs) result = await tool_function(*args, **kwargs) _record_function_call_result(otel_wrapper, tool_function, result) return result @@ -207,9 +215,7 @@ def wrapped( if tool_or_tools is None: return None if isinstance(tool_or_tools, list): - return [ - wrapped(item, otel_wrapper, **kwargs) for item in tool_or_tools - ] + return [wrapped(item, otel_wrapper, **kwargs) for item in tool_or_tools] if isinstance(tool_or_tools, dict): return { key: wrapped(value, otel_wrapper, **kwargs)