|
| 1 | +// pulse - bitdrift's observability proxy |
| 2 | +// Copyright Bitdrift, Inc. All rights reserved. |
| 3 | +// |
| 4 | +// Use of this source code is governed by a source available license that can be found in the |
| 5 | +// LICENSE file or at: |
| 6 | +// https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt |
| 7 | + |
| 8 | +#[cfg(test)] |
| 9 | +mod test; |
| 10 | + |
| 11 | +use anyhow::{anyhow, bail}; |
| 12 | +use bd_server_stats::stats::Collector; |
| 13 | +use config::bootstrap::v1::bootstrap::Config; |
| 14 | +use config::processor::v1::processor::processor_config::Processor_type; |
| 15 | +use drop::DropConfig; |
| 16 | +use drop::drop_processor_config::Config_source; |
| 17 | +use protobuf::Message; |
| 18 | +use pulse_common::proto::yaml_to_proto; |
| 19 | +use pulse_metrics::pipeline::processor::drop::TranslatedDropConfig; |
| 20 | +use pulse_metrics::protos::metric::{DownstreamId, MetricSource, ParsedMetric}; |
| 21 | +use pulse_metrics::protos::statsd; |
| 22 | +use pulse_protobuf::protos::pulse::config; |
| 23 | +use pulse_protobuf::protos::pulse::config::common::v1::common::wire_protocol::StatsD; |
| 24 | +use pulse_protobuf::protos::pulse::config::processor::v1::drop; |
| 25 | +use pulse_protobuf::protos::pulse::config::processor::v1::processor::ProcessorConfig; |
| 26 | +use pulse_protobuf::protos::pulse::drop_tester::v1::drop_tester::drop_test_case::Config_type; |
| 27 | +use pulse_protobuf::protos::pulse::drop_tester::v1::drop_tester::{DropTestCase, DropTesterConfig}; |
| 28 | +use std::time::Instant; |
| 29 | + |
| 30 | +#[ctor::ctor] |
| 31 | +fn global_init() { |
| 32 | + bd_log::SwapLogger::initialize(); |
| 33 | +} |
| 34 | + |
| 35 | +fn run_test_case(test_case: DropTestCase, proxy_config: Option<&Config>) -> anyhow::Result<usize> { |
| 36 | + fn extract_from_config( |
| 37 | + field_name: &str, |
| 38 | + proxy_config: Option<&Config>, |
| 39 | + processor_name: &str, |
| 40 | + extract: impl Fn(&ProcessorConfig) -> Option<DropConfig>, |
| 41 | + ) -> anyhow::Result<DropConfig> { |
| 42 | + let Some(config) = proxy_config else { |
| 43 | + bail!("{field_name} requires passing a proxy config via --proxy-config"); |
| 44 | + }; |
| 45 | + config |
| 46 | + .pipeline() |
| 47 | + .processors |
| 48 | + .iter() |
| 49 | + .find_map(|(name, value)| { |
| 50 | + if name.as_str() == processor_name { |
| 51 | + return extract(value); |
| 52 | + } |
| 53 | + None |
| 54 | + }) |
| 55 | + .ok_or_else(|| anyhow!("no processor named '{processor_name} found in proxy config")) |
| 56 | + } |
| 57 | + |
| 58 | + let drop_config: DropConfig = match test_case.config_type.as_ref().expect("pgv") { |
| 59 | + Config_type::Config(config) => Ok(config.clone()), |
| 60 | + Config_type::DropProcessorName(processor_name) => extract_from_config( |
| 61 | + "mutate_processor_name", |
| 62 | + proxy_config, |
| 63 | + processor_name, |
| 64 | + |value| { |
| 65 | + if let Some(Processor_type::Drop(drop)) = &value.processor_type { |
| 66 | + return Some(match drop.config_source.as_ref().expect("pgv") { |
| 67 | + Config_source::Inline(config) => config.clone(), |
| 68 | + Config_source::FileSource(_) => { |
| 69 | + // TODO(mattklein123): Support file source if needed. |
| 70 | + return None; |
| 71 | + }, |
| 72 | + }); |
| 73 | + } |
| 74 | + |
| 75 | + None |
| 76 | + }, |
| 77 | + ), |
| 78 | + }?; |
| 79 | + |
| 80 | + let drop_config = TranslatedDropConfig::new(&drop_config, &Collector::default().scope("test"))?; |
| 81 | + |
| 82 | + let mut num_metrics = 0; |
| 83 | + for metric in test_case.metrics { |
| 84 | + num_metrics += 1; |
| 85 | + |
| 86 | + // TODO(mattklein123): Support parsing other formats. Probably a limited PromQL query of the |
| 87 | + // metric? |
| 88 | + let mut input = statsd::parse( |
| 89 | + &metric.input.clone().into_bytes(), |
| 90 | + StatsD::default_instance(), |
| 91 | + ) |
| 92 | + .map_err(|e| anyhow!("unable to parse input '{}' as statsd: {e}", metric.input))?; |
| 93 | + log::debug!("parsed input metric: {input}"); |
| 94 | + input.timestamp = 0; |
| 95 | + let parsed_input = ParsedMetric::new( |
| 96 | + input, |
| 97 | + MetricSource::PromRemoteWrite, |
| 98 | + Instant::now(), |
| 99 | + DownstreamId::LocalOrigin, |
| 100 | + ); |
| 101 | + |
| 102 | + let dropped_by = drop_config.drop_sample(&parsed_input).unwrap_or(""); |
| 103 | + if metric.dropped_by.as_str() != dropped_by { |
| 104 | + bail!( |
| 105 | + "expected metric '{}' to be dropped by '{}' but actually dropped by '{}'", |
| 106 | + metric.input, |
| 107 | + metric.dropped_by, |
| 108 | + dropped_by |
| 109 | + ); |
| 110 | + } |
| 111 | + } |
| 112 | + |
| 113 | + Ok(num_metrics) |
| 114 | +} |
| 115 | + |
| 116 | +pub fn run(config: &str, proxy_config: Option<&str>) -> anyhow::Result<()> { |
| 117 | + let config: DropTesterConfig = yaml_to_proto(config)?; |
| 118 | + let proxy_config: Option<Config> = proxy_config.map(yaml_to_proto).transpose()?; |
| 119 | + |
| 120 | + let num_test_cases = config.test_cases.len(); |
| 121 | + let mut num_metrics = 0; |
| 122 | + for test_case in config.test_cases { |
| 123 | + num_metrics += run_test_case(test_case, proxy_config.as_ref())?; |
| 124 | + } |
| 125 | + log::info!("processed {num_test_cases} test case(s) and {num_metrics} test metrics(s)"); |
| 126 | + |
| 127 | + Ok(()) |
| 128 | +} |
0 commit comments