Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions opentelemetry-etw-logs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@

## vNext

- Added a `with_resource_attributes` method to the processor builder, allowing
users to specify which resource attribute keys are exported with each log
record.
- By default, the Resource attributes `"service.name"` and
`"service.instance.id"` continue to be exported as `cloud.roleName` and
`cloud.roleInstance`.
- This feature enables exporting additional resource attributes beyond the
defaults.

## v0.9.1

- Added `Processor::builder_etw_compat_only()` method that builds a processor using a provider name that is fully compatible with ETW requirements (dropping UserEvents provider name compatibility) by allowing hyphens (`-`).
Expand Down
3 changes: 2 additions & 1 deletion opentelemetry-etw-logs/src/exporter/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,10 @@ pub(crate) mod test_utils {

use crate::exporter::options::Options;
use crate::exporter::ETWExporter;
use std::collections::HashSet;

pub(crate) fn new_etw_exporter() -> ETWExporter {
ETWExporter::new(test_options())
ETWExporter::new(test_options(), HashSet::new())
}

pub(crate) fn new_instrumentation_scope() -> opentelemetry::InstrumentationScope {
Expand Down
55 changes: 45 additions & 10 deletions opentelemetry-etw-logs/src/exporter/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use std::borrow::Cow;
use std::cell::RefCell;
use std::collections::HashSet;
use std::fmt::Debug;
use std::pin::Pin;
use std::sync::Arc;

use tracelogging_dynamic as tld;

use opentelemetry::logs::Severity;
use opentelemetry::Key;
use opentelemetry::logs::{AnyValue, Severity};
use opentelemetry::{Key, Value};
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};

pub(crate) mod common;
Expand All @@ -26,12 +28,14 @@ thread_local! {
struct Resource {
pub cloud_role: Option<String>,
pub cloud_role_instance: Option<String>,
pub attributes_from_resource: Vec<(Key, AnyValue)>,
}

pub(crate) struct ETWExporter {
provider: Pin<Arc<tld::Provider>>,
resource: Resource,
options: Options,
resource_attribute_keys: HashSet<Cow<'static, str>>,
}

fn enabled_callback_noop(
Expand All @@ -49,7 +53,10 @@ fn enabled_callback_noop(
impl ETWExporter {
const KEYWORD: u64 = 1;

pub(crate) fn new(options: Options) -> Self {
pub(crate) fn new(
options: Options,
resource_attribute_keys: HashSet<Cow<'static, str>>,
) -> Self {
let mut provider_options = tld::Provider::options();

provider_options.callback(enabled_callback_noop, 0x0);
Expand All @@ -69,6 +76,7 @@ impl ETWExporter {
provider,
resource: Default::default(),
options,
resource_attribute_keys,
}
}

Expand Down Expand Up @@ -110,7 +118,12 @@ impl ETWExporter {

part_a::populate_part_a(event, &self.resource, log_record, field_tag);

let event_id = part_c::populate_part_c(event, log_record, field_tag);
let event_id = part_c::populate_part_c(
event,
log_record,
&self.resource.attributes_from_resource,
field_tag,
);

part_b::populate_part_b(event, log_record, otel_level, event_id);

Expand Down Expand Up @@ -150,12 +163,24 @@ impl opentelemetry_sdk::logs::LogExporter for ETWExporter {
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
self.resource.cloud_role = resource
.get(&Key::from_static_str("service.name"))
.map(|v| v.to_string());
self.resource.cloud_role_instance = resource
.get(&Key::from_static_str("service.instance.id"))
.map(|v| v.to_string());
// Clear previous resource attributes
self.resource.attributes_from_resource.clear();

// Add attributes from resource to the attributes_from_resource
for (key, value) in resource.iter() {
// Special handling for cloud role and instance
// as they are used in PartA of the Common Schema format.
if key.as_str() == "service.name" {
self.resource.cloud_role = Some(value.to_string());
} else if key.as_str() == "service.instance.id" {
self.resource.cloud_role_instance = Some(value.to_string());
} else if self.resource_attribute_keys.contains(key.as_str()) {
self.resource
.attributes_from_resource
.push((key.clone(), val_to_any_value(value)));
}
// Other attributes are ignored
}
}

fn shutdown(&self) -> OTelSdkResult {
Expand All @@ -169,6 +194,16 @@ impl opentelemetry_sdk::logs::LogExporter for ETWExporter {
}
}

fn val_to_any_value(val: &Value) -> AnyValue {
match val {
Value::Bool(b) => AnyValue::Boolean(*b),
Value::I64(i) => AnyValue::Int(*i),
Value::F64(f) => AnyValue::Double(*f),
Value::String(s) => AnyValue::String(s.clone()),
_ => AnyValue::String("".into()),
}
}

#[cfg(test)]
mod tests {
use opentelemetry_sdk::logs::LogExporter;
Expand Down
13 changes: 12 additions & 1 deletion opentelemetry-etw-logs/src/exporter/part_c.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use opentelemetry::logs::AnyValue;
use opentelemetry::Key;
use tracelogging_dynamic as tld;

pub(crate) const EVENT_ID: &str = "event_id";

pub(crate) fn populate_part_c(
event: &mut tld::EventBuilder,
log_record: &opentelemetry_sdk::logs::SdkLogRecord,
resource_attributes: &[(Key, AnyValue)],
field_tag: u32,
) -> Option<i64> {
//populate CS PartC
Expand All @@ -25,10 +27,14 @@ pub(crate) fn populate_part_c(
}
}

// Add count of resource attributes
cs_c_count += resource_attributes.len() as u16;

// If there are additional PartC attributes, add them to the event
if cs_c_count > 0 {
event.add_struct("PartC", cs_c_count, field_tag);
event.add_struct("PartC", cs_c_count as u8, field_tag);

// Add log record attributes first
// TODO: This 2nd iteration is not optimal, and can be optimized
for (key, value) in log_record.attributes_iter() {
match (key.as_str(), &value) {
Expand All @@ -40,6 +46,11 @@ pub(crate) fn populate_part_c(
}
}
}

// Add resource attributes
for (key, value) in resource_attributes {
super::common::add_attribute_to_event(event, key, value);
}
}
event_id
}
Expand Down
39 changes: 39 additions & 0 deletions opentelemetry-etw-logs/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,44 @@
//! The ETW exporter will enable applications to use OpenTelemetry API
//! to capture the telemetry events, and write them to the ETW subsystem.
//!
//! ## Resource Attribute Handling
//!
//! **Important**: By default, resource attributes are NOT exported with log records.
//! The ETW exporter only automatically exports these specific resource attributes:
//!
//! - **`service.name`** → Exported as `cloud.roleName` in PartA of Common Schema
//! - **`service.instance.id`** → Exported as `cloud.roleInstance` in PartA of Common Schema
//!
//! All other resource attributes are ignored unless explicitly specified.
//!
//! ### Opting in to Additional Resource Attributes
//!
//! To export additional resource attributes, use the `with_resource_attributes()` method:
//!
//! ```rust
//! use opentelemetry_sdk::logs::SdkLoggerProvider;
//! use opentelemetry_sdk::Resource;
//! use opentelemetry_etw_logs::Processor;
//! use opentelemetry::KeyValue;
//!
//! let etw_processor = Processor::builder("myprovider")
//! // Only export specific resource attributes
//! .with_resource_attributes(["custom_attribute1", "custom_attribute2"])
//! .build()
//! .unwrap();
//!
//! let provider = SdkLoggerProvider::builder()
//! .with_resource(
//! Resource::builder_empty()
//! .with_service_name("example")
//! .with_attribute(KeyValue::new("custom_attribute1", "value1"))
//! .with_attribute(KeyValue::new("custom_attribute2", "value2"))
//! .with_attribute(KeyValue::new("custom_attribute3", "value3")) // This won't be exported
//! .build(),
//! )
//! .with_log_processor(etw_processor)
//! .build();
//! ```

#![warn(missing_debug_implementations, missing_docs)]

Expand Down
91 changes: 84 additions & 7 deletions opentelemetry-etw-logs/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use opentelemetry::InstrumentationScope;
use opentelemetry_sdk::error::OTelSdkResult;
use opentelemetry_sdk::logs::{LogBatch, LogExporter, SdkLogRecord};
use opentelemetry_sdk::Resource;
use std::borrow::Cow;
use std::collections::HashSet;
use std::error::Error;
use std::fmt::Debug;

Expand Down Expand Up @@ -59,8 +61,11 @@ impl Processor {
}

/// Creates a new instance of the [`Processor`] using the given options.
pub(crate) fn new(options: Options) -> Self {
let exporter: ETWExporter = ETWExporter::new(options);
pub(crate) fn new(
options: Options,
resource_attribute_keys: HashSet<Cow<'static, str>>,
) -> Self {
let exporter: ETWExporter = ETWExporter::new(options, resource_attribute_keys);
Processor {
event_exporter: exporter,
}
Expand Down Expand Up @@ -113,6 +118,7 @@ impl opentelemetry_sdk::logs::LogProcessor for Processor {
pub struct ProcessorBuilder {
options: Options,
provider_name_compat_mode: ProviderNameCompatMode,
resource_attribute_keys: HashSet<Cow<'static, str>>,
}

impl ProcessorBuilder {
Expand All @@ -125,6 +131,7 @@ impl ProcessorBuilder {
ProcessorBuilder {
options: Options::new(provider_name.to_string()),
provider_name_compat_mode: ProviderNameCompatMode::CrossCompat,
resource_attribute_keys: HashSet::new(),
}
}

Expand All @@ -137,6 +144,7 @@ impl ProcessorBuilder {
ProcessorBuilder {
options: Options::new(provider_name.to_string()),
provider_name_compat_mode: ProviderNameCompatMode::EtwCompatOnly,
resource_attribute_keys: HashSet::new(),
}
}

Expand All @@ -153,11 +161,48 @@ impl ProcessorBuilder {
self
}

/// Sets the resource attributes for the processor.
///
/// This specifies which resource attributes should be exported with each log record.
///
/// # Performance Considerations
///
/// **Warning**: Each specified resource attribute will be serialized and sent
/// with EVERY log record. This is different from OTLP exporters where resource
/// attributes are serialized once per batch. Consider the performance impact
/// when selecting which attributes to export.
///
/// # Best Practices for ETW
///
/// **Recommendation**: Be selective about which resource attributes to export.
/// Since ETW requires a local listener/agent, the agent can often deduce many
/// resource attributes without requiring them to be sent with each log:
///
/// - **Infrastructure attributes** (datacenter, region, availability zone) can
/// be determined by the local agent.
/// - **Host attributes** (hostname, IP address, OS version) are available locally.
/// - **Deployment attributes** (environment, cluster) may be known to the agent.
///
/// Focus on attributes that are truly specific to your application instance
/// and cannot be easily determined by the local agent.
///
/// Nevertheless, if there are attributes that are fixed and must be emitted
/// with every log, modeling them as Resource attributes and using this method
/// is much more efficient than emitting them explicitly with every log.
pub fn with_resource_attributes<I, S>(mut self, attributes: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<Cow<'static, str>>,
{
self.resource_attribute_keys = attributes.into_iter().map(|s| s.into()).collect();
self
}

/// Builds the processor with given options, returning `Error` if it fails.
pub fn build(self) -> Result<Processor, Box<dyn Error>> {
self.validate()?;

Ok(Processor::new(self.options))
Ok(Processor::new(self.options, self.resource_attribute_keys))
}

fn validate(&self) -> Result<(), Box<dyn Error>> {
Expand Down Expand Up @@ -214,21 +259,21 @@ mod tests {

#[test]
fn test_shutdown() {
let processor = Processor::new(test_options());
let processor = Processor::new(test_options(), HashSet::new());

assert!(processor.shutdown().is_ok());
}

#[test]
fn test_force_flush() {
let processor = Processor::new(test_options());
let processor = Processor::new(test_options(), HashSet::new());

assert!(processor.force_flush().is_ok());
}

#[test]
fn test_emit() {
let processor: Processor = Processor::new(test_options());
let processor: Processor = Processor::new(test_options(), HashSet::new());

let mut record = SdkLoggerProvider::builder()
.build()
Expand All @@ -241,7 +286,7 @@ mod tests {
#[test]
#[cfg(feature = "spec_unstable_logs_enabled")]
fn test_event_enabled() {
let processor = Processor::new(test_options());
let processor = Processor::new(test_options(), HashSet::new());

// Unit test are forced to return true as there is no ETW session listening for the event
assert!(processor.event_enabled(opentelemetry::logs::Severity::Info, "test", Some("test")));
Expand Down Expand Up @@ -427,4 +472,36 @@ mod tests {
);
assert!(result.is_ok());
}

#[test]
fn test_resource_attributes() {
use opentelemetry::logs::LogRecord;
use opentelemetry::logs::Logger;
use opentelemetry::logs::LoggerProvider;
use opentelemetry::KeyValue;
use opentelemetry_sdk::logs::SdkLoggerProvider;
use opentelemetry_sdk::Resource;

let processor = Processor::builder("test_provider")
.with_resource_attributes(vec!["resource_attribute1", "resource_attribute2"])
.build()
.unwrap();

let logger_provider = SdkLoggerProvider::builder()
.with_resource(
Resource::builder()
.with_service_name("test_service")
.with_attribute(KeyValue::new("resource_attribute1", "value1"))
.with_attribute(KeyValue::new("resource_attribute2", "value2"))
.with_attribute(KeyValue::new("resource_attribute3", "value3")) // This should not be exported
.build(),
)
.with_log_processor(processor)
.build();

let logger = logger_provider.logger("test_logger");
let mut log_record = logger.create_log_record();
log_record.add_attribute("log_attribute", "log_value");
logger.emit(log_record);
}
}
Loading