From a5832234bfdeb9558a74f1c355c3aa91ac7f1149 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pato=20Sanda=C3=B1a?= Date: Mon, 18 Aug 2025 17:16:52 -0400 Subject: [PATCH] feat: opentelemetry-etw-logs: with resources --- opentelemetry-etw-logs/CHANGELOG.md | 9 ++ opentelemetry-etw-logs/src/exporter/common.rs | 3 +- opentelemetry-etw-logs/src/exporter/mod.rs | 55 +++++++++-- opentelemetry-etw-logs/src/exporter/part_c.rs | 13 ++- opentelemetry-etw-logs/src/lib.rs | 39 ++++++++ opentelemetry-etw-logs/src/processor.rs | 91 +++++++++++++++++-- 6 files changed, 191 insertions(+), 19 deletions(-) diff --git a/opentelemetry-etw-logs/CHANGELOG.md b/opentelemetry-etw-logs/CHANGELOG.md index 01b99c1e1..8fc3441ce 100644 --- a/opentelemetry-etw-logs/CHANGELOG.md +++ b/opentelemetry-etw-logs/CHANGELOG.md @@ -2,6 +2,15 @@ ## vNext +- Added a `with_resource_attributes` method to the processor builder, allowing + users to specify which resource attribute keys are exported with each log + record. +- By default, the Resource attributes `"service.name"` and + `"service.instance.id"` continue to be exported as `cloud.roleName` and + `cloud.roleInstance`. +- This feature enables exporting additional resource attributes beyond the + defaults. + ## v0.9.1 - Added `Processor::builder_etw_compat_only()` method that builds a processor using a provider name that is fully compatible with ETW requirements (dropping UserEvents provider name compatibility) by allowing hyphens (`-`). diff --git a/opentelemetry-etw-logs/src/exporter/common.rs b/opentelemetry-etw-logs/src/exporter/common.rs index 5e5b3744d..1ec602e20 100644 --- a/opentelemetry-etw-logs/src/exporter/common.rs +++ b/opentelemetry-etw-logs/src/exporter/common.rs @@ -83,9 +83,10 @@ pub(crate) mod test_utils { use crate::exporter::options::Options; use crate::exporter::ETWExporter; + use std::collections::HashSet; pub(crate) fn new_etw_exporter() -> ETWExporter { - ETWExporter::new(test_options()) + ETWExporter::new(test_options(), HashSet::new()) } pub(crate) fn new_instrumentation_scope() -> opentelemetry::InstrumentationScope { diff --git a/opentelemetry-etw-logs/src/exporter/mod.rs b/opentelemetry-etw-logs/src/exporter/mod.rs index 92b82b609..8f38a369e 100644 --- a/opentelemetry-etw-logs/src/exporter/mod.rs +++ b/opentelemetry-etw-logs/src/exporter/mod.rs @@ -1,12 +1,14 @@ +use std::borrow::Cow; use std::cell::RefCell; +use std::collections::HashSet; use std::fmt::Debug; use std::pin::Pin; use std::sync::Arc; use tracelogging_dynamic as tld; -use opentelemetry::logs::Severity; -use opentelemetry::Key; +use opentelemetry::logs::{AnyValue, Severity}; +use opentelemetry::{Key, Value}; use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult}; pub(crate) mod common; @@ -26,12 +28,14 @@ thread_local! { struct Resource { pub cloud_role: Option, pub cloud_role_instance: Option, + pub attributes_from_resource: Vec<(Key, AnyValue)>, } pub(crate) struct ETWExporter { provider: Pin>, resource: Resource, options: Options, + resource_attribute_keys: HashSet>, } fn enabled_callback_noop( @@ -49,7 +53,10 @@ fn enabled_callback_noop( impl ETWExporter { const KEYWORD: u64 = 1; - pub(crate) fn new(options: Options) -> Self { + pub(crate) fn new( + options: Options, + resource_attribute_keys: HashSet>, + ) -> Self { let mut provider_options = tld::Provider::options(); provider_options.callback(enabled_callback_noop, 0x0); @@ -69,6 +76,7 @@ impl ETWExporter { provider, resource: Default::default(), options, + resource_attribute_keys, } } @@ -110,7 +118,12 @@ impl ETWExporter { part_a::populate_part_a(event, &self.resource, log_record, field_tag); - let event_id = part_c::populate_part_c(event, log_record, field_tag); + let event_id = part_c::populate_part_c( + event, + log_record, + &self.resource.attributes_from_resource, + field_tag, + ); part_b::populate_part_b(event, log_record, otel_level, event_id); @@ -150,12 +163,24 @@ impl opentelemetry_sdk::logs::LogExporter for ETWExporter { } fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { - self.resource.cloud_role = resource - .get(&Key::from_static_str("service.name")) - .map(|v| v.to_string()); - self.resource.cloud_role_instance = resource - .get(&Key::from_static_str("service.instance.id")) - .map(|v| v.to_string()); + // Clear previous resource attributes + self.resource.attributes_from_resource.clear(); + + // Add attributes from resource to the attributes_from_resource + for (key, value) in resource.iter() { + // Special handling for cloud role and instance + // as they are used in PartA of the Common Schema format. + if key.as_str() == "service.name" { + self.resource.cloud_role = Some(value.to_string()); + } else if key.as_str() == "service.instance.id" { + self.resource.cloud_role_instance = Some(value.to_string()); + } else if self.resource_attribute_keys.contains(key.as_str()) { + self.resource + .attributes_from_resource + .push((key.clone(), val_to_any_value(value))); + } + // Other attributes are ignored + } } fn shutdown(&self) -> OTelSdkResult { @@ -169,6 +194,16 @@ impl opentelemetry_sdk::logs::LogExporter for ETWExporter { } } +fn val_to_any_value(val: &Value) -> AnyValue { + match val { + Value::Bool(b) => AnyValue::Boolean(*b), + Value::I64(i) => AnyValue::Int(*i), + Value::F64(f) => AnyValue::Double(*f), + Value::String(s) => AnyValue::String(s.clone()), + _ => AnyValue::String("".into()), + } +} + #[cfg(test)] mod tests { use opentelemetry_sdk::logs::LogExporter; diff --git a/opentelemetry-etw-logs/src/exporter/part_c.rs b/opentelemetry-etw-logs/src/exporter/part_c.rs index ccbdb5c4b..b1ba9eed9 100644 --- a/opentelemetry-etw-logs/src/exporter/part_c.rs +++ b/opentelemetry-etw-logs/src/exporter/part_c.rs @@ -1,4 +1,5 @@ use opentelemetry::logs::AnyValue; +use opentelemetry::Key; use tracelogging_dynamic as tld; pub(crate) const EVENT_ID: &str = "event_id"; @@ -6,6 +7,7 @@ pub(crate) const EVENT_ID: &str = "event_id"; pub(crate) fn populate_part_c( event: &mut tld::EventBuilder, log_record: &opentelemetry_sdk::logs::SdkLogRecord, + resource_attributes: &[(Key, AnyValue)], field_tag: u32, ) -> Option { //populate CS PartC @@ -25,10 +27,14 @@ pub(crate) fn populate_part_c( } } + // Add count of resource attributes + cs_c_count += resource_attributes.len() as u16; + // If there are additional PartC attributes, add them to the event if cs_c_count > 0 { - event.add_struct("PartC", cs_c_count, field_tag); + event.add_struct("PartC", cs_c_count as u8, field_tag); + // Add log record attributes first // TODO: This 2nd iteration is not optimal, and can be optimized for (key, value) in log_record.attributes_iter() { match (key.as_str(), &value) { @@ -40,6 +46,11 @@ pub(crate) fn populate_part_c( } } } + + // Add resource attributes + for (key, value) in resource_attributes { + super::common::add_attribute_to_event(event, key, value); + } } event_id } diff --git a/opentelemetry-etw-logs/src/lib.rs b/opentelemetry-etw-logs/src/lib.rs index 2fc6c3c0f..b7747e1b7 100644 --- a/opentelemetry-etw-logs/src/lib.rs +++ b/opentelemetry-etw-logs/src/lib.rs @@ -1,5 +1,44 @@ //! The ETW exporter will enable applications to use OpenTelemetry API //! to capture the telemetry events, and write them to the ETW subsystem. +//! +//! ## Resource Attribute Handling +//! +//! **Important**: By default, resource attributes are NOT exported with log records. +//! The ETW exporter only automatically exports these specific resource attributes: +//! +//! - **`service.name`** → Exported as `cloud.roleName` in PartA of Common Schema +//! - **`service.instance.id`** → Exported as `cloud.roleInstance` in PartA of Common Schema +//! +//! All other resource attributes are ignored unless explicitly specified. +//! +//! ### Opting in to Additional Resource Attributes +//! +//! To export additional resource attributes, use the `with_resource_attributes()` method: +//! +//! ```rust +//! use opentelemetry_sdk::logs::SdkLoggerProvider; +//! use opentelemetry_sdk::Resource; +//! use opentelemetry_etw_logs::Processor; +//! use opentelemetry::KeyValue; +//! +//! let etw_processor = Processor::builder("myprovider") +//! // Only export specific resource attributes +//! .with_resource_attributes(["custom_attribute1", "custom_attribute2"]) +//! .build() +//! .unwrap(); +//! +//! let provider = SdkLoggerProvider::builder() +//! .with_resource( +//! Resource::builder_empty() +//! .with_service_name("example") +//! .with_attribute(KeyValue::new("custom_attribute1", "value1")) +//! .with_attribute(KeyValue::new("custom_attribute2", "value2")) +//! .with_attribute(KeyValue::new("custom_attribute3", "value3")) // This won't be exported +//! .build(), +//! ) +//! .with_log_processor(etw_processor) +//! .build(); +//! ``` #![warn(missing_debug_implementations, missing_docs)] diff --git a/opentelemetry-etw-logs/src/processor.rs b/opentelemetry-etw-logs/src/processor.rs index 1311ef351..cd948c26e 100644 --- a/opentelemetry-etw-logs/src/processor.rs +++ b/opentelemetry-etw-logs/src/processor.rs @@ -2,6 +2,8 @@ use opentelemetry::InstrumentationScope; use opentelemetry_sdk::error::OTelSdkResult; use opentelemetry_sdk::logs::{LogBatch, LogExporter, SdkLogRecord}; use opentelemetry_sdk::Resource; +use std::borrow::Cow; +use std::collections::HashSet; use std::error::Error; use std::fmt::Debug; @@ -59,8 +61,11 @@ impl Processor { } /// Creates a new instance of the [`Processor`] using the given options. - pub(crate) fn new(options: Options) -> Self { - let exporter: ETWExporter = ETWExporter::new(options); + pub(crate) fn new( + options: Options, + resource_attribute_keys: HashSet>, + ) -> Self { + let exporter: ETWExporter = ETWExporter::new(options, resource_attribute_keys); Processor { event_exporter: exporter, } @@ -113,6 +118,7 @@ impl opentelemetry_sdk::logs::LogProcessor for Processor { pub struct ProcessorBuilder { options: Options, provider_name_compat_mode: ProviderNameCompatMode, + resource_attribute_keys: HashSet>, } impl ProcessorBuilder { @@ -125,6 +131,7 @@ impl ProcessorBuilder { ProcessorBuilder { options: Options::new(provider_name.to_string()), provider_name_compat_mode: ProviderNameCompatMode::CrossCompat, + resource_attribute_keys: HashSet::new(), } } @@ -137,6 +144,7 @@ impl ProcessorBuilder { ProcessorBuilder { options: Options::new(provider_name.to_string()), provider_name_compat_mode: ProviderNameCompatMode::EtwCompatOnly, + resource_attribute_keys: HashSet::new(), } } @@ -153,11 +161,48 @@ impl ProcessorBuilder { self } + /// Sets the resource attributes for the processor. + /// + /// This specifies which resource attributes should be exported with each log record. + /// + /// # Performance Considerations + /// + /// **Warning**: Each specified resource attribute will be serialized and sent + /// with EVERY log record. This is different from OTLP exporters where resource + /// attributes are serialized once per batch. Consider the performance impact + /// when selecting which attributes to export. + /// + /// # Best Practices for ETW + /// + /// **Recommendation**: Be selective about which resource attributes to export. + /// Since ETW requires a local listener/agent, the agent can often deduce many + /// resource attributes without requiring them to be sent with each log: + /// + /// - **Infrastructure attributes** (datacenter, region, availability zone) can + /// be determined by the local agent. + /// - **Host attributes** (hostname, IP address, OS version) are available locally. + /// - **Deployment attributes** (environment, cluster) may be known to the agent. + /// + /// Focus on attributes that are truly specific to your application instance + /// and cannot be easily determined by the local agent. + /// + /// Nevertheless, if there are attributes that are fixed and must be emitted + /// with every log, modeling them as Resource attributes and using this method + /// is much more efficient than emitting them explicitly with every log. + pub fn with_resource_attributes(mut self, attributes: I) -> Self + where + I: IntoIterator, + S: Into>, + { + self.resource_attribute_keys = attributes.into_iter().map(|s| s.into()).collect(); + self + } + /// Builds the processor with given options, returning `Error` if it fails. pub fn build(self) -> Result> { self.validate()?; - Ok(Processor::new(self.options)) + Ok(Processor::new(self.options, self.resource_attribute_keys)) } fn validate(&self) -> Result<(), Box> { @@ -214,21 +259,21 @@ mod tests { #[test] fn test_shutdown() { - let processor = Processor::new(test_options()); + let processor = Processor::new(test_options(), HashSet::new()); assert!(processor.shutdown().is_ok()); } #[test] fn test_force_flush() { - let processor = Processor::new(test_options()); + let processor = Processor::new(test_options(), HashSet::new()); assert!(processor.force_flush().is_ok()); } #[test] fn test_emit() { - let processor: Processor = Processor::new(test_options()); + let processor: Processor = Processor::new(test_options(), HashSet::new()); let mut record = SdkLoggerProvider::builder() .build() @@ -241,7 +286,7 @@ mod tests { #[test] #[cfg(feature = "spec_unstable_logs_enabled")] fn test_event_enabled() { - let processor = Processor::new(test_options()); + let processor = Processor::new(test_options(), HashSet::new()); // Unit test are forced to return true as there is no ETW session listening for the event assert!(processor.event_enabled(opentelemetry::logs::Severity::Info, "test", Some("test"))); @@ -427,4 +472,36 @@ mod tests { ); assert!(result.is_ok()); } + + #[test] + fn test_resource_attributes() { + use opentelemetry::logs::LogRecord; + use opentelemetry::logs::Logger; + use opentelemetry::logs::LoggerProvider; + use opentelemetry::KeyValue; + use opentelemetry_sdk::logs::SdkLoggerProvider; + use opentelemetry_sdk::Resource; + + let processor = Processor::builder("test_provider") + .with_resource_attributes(vec!["resource_attribute1", "resource_attribute2"]) + .build() + .unwrap(); + + let logger_provider = SdkLoggerProvider::builder() + .with_resource( + Resource::builder() + .with_service_name("test_service") + .with_attribute(KeyValue::new("resource_attribute1", "value1")) + .with_attribute(KeyValue::new("resource_attribute2", "value2")) + .with_attribute(KeyValue::new("resource_attribute3", "value3")) // This should not be exported + .build(), + ) + .with_log_processor(processor) + .build(); + + let logger = logger_provider.logger("test_logger"); + let mut log_record = logger.create_log_record(); + log_record.add_attribute("log_attribute", "log_value"); + logger.emit(log_record); + } }