Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/23666_memory_enrichment_ttl_field.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Added an optional `ttl_field` configuration option to the memory enrichment table, to override the global memory table TTL on a per event basis.

authors: esensar Quad9DNS
6 changes: 6 additions & 0 deletions src/enrichment_tables/memory/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use vector_lib::{
configurable::configurable_component,
enrichment::Table,
id::ComponentKey,
lookup::lookup_v2::OptionalValuePath,
schema::{self},
sink::VectorSink,
};
Expand Down Expand Up @@ -64,6 +65,10 @@ pub struct MemoryConfig {
#[configurable(derived)]
#[serde(skip_serializing_if = "vector_lib::serde::is_default")]
pub source_config: Option<MemorySourceConfig>,
/// Field in the incoming value used as the TTL override.
#[configurable(derived)]
#[serde(default)]
pub ttl_field: OptionalValuePath,

#[serde(skip)]
memory: Arc<Mutex<Option<Box<Memory>>>>,
Expand All @@ -89,6 +94,7 @@ impl Default for MemoryConfig {
log_namespace: None,
source_config: None,
internal_metrics: InternalMetricsConfig::default(),
ttl_field: OptionalValuePath::none(),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/enrichment_tables/memory/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl MemorySource {
})
.filter_map(|(k, v)| {
let mut event = Event::Log(LogEvent::from_map(
v.as_object_map(now, self.memory.config.ttl, k).ok()?,
v.as_object_map(now, k).ok()?,
EventMetadata::default(),
));
let log = event.as_mut_log();
Expand Down
97 changes: 83 additions & 14 deletions src/enrichment_tables/memory/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use crate::{
pub struct MemoryEntry {
value: String,
update_time: CopyValue<Instant>,
ttl: u64,
}

impl ByteSizeOf for MemoryEntry {
Expand All @@ -56,13 +57,10 @@ impl ByteSizeOf for MemoryEntry {
}

impl MemoryEntry {
pub(super) fn as_object_map(
&self,
now: Instant,
total_ttl: u64,
key: &str,
) -> Result<ObjectMap, String> {
let ttl = total_ttl.saturating_sub(now.duration_since(*self.update_time).as_secs());
pub(super) fn as_object_map(&self, now: Instant, key: &str) -> Result<ObjectMap, String> {
let ttl = self
.ttl
.saturating_sub(now.duration_since(*self.update_time).as_secs());
Ok(ObjectMap::from([
(
KeyString::from("key"),
Expand All @@ -80,8 +78,8 @@ impl MemoryEntry {
]))
}

fn expired(&self, now: Instant, ttl: u64) -> bool {
now.duration_since(*self.update_time).as_secs() > ttl
fn expired(&self, now: Instant) -> bool {
now.duration_since(*self.update_time).as_secs() > self.ttl
}
}

Expand Down Expand Up @@ -128,9 +126,9 @@ impl Memory {
let mut writer = self.write_handle.lock().expect("mutex poisoned");
let now = Instant::now();

for (k, v) in value.into_iter() {
for (k, value) in value.into_iter() {
let new_entry_key = String::from(k);
let Ok(v) = serde_json::to_string(&v) else {
let Ok(v) = serde_json::to_string(&value) else {
emit!(MemoryEnrichmentTableInsertFailed {
key: &new_entry_key,
include_key_metric_tag: self.config.internal_metrics.include_key_tag
Expand All @@ -140,6 +138,15 @@ impl Memory {
let new_entry = MemoryEntry {
value: v,
update_time: now.into(),
ttl: self
.config
.ttl_field
.path
.as_ref()
.and_then(|p| value.get(p))
.and_then(|v| v.as_integer())
.map(|v| v as u64)
.unwrap_or(self.config.ttl),
};
let new_entry_size = new_entry_key.size_of() + new_entry.size_of();
if let Some(max_byte_size) = self.config.max_byte_size
Expand Down Expand Up @@ -182,7 +189,7 @@ impl Memory {
if let Some(reader) = self.get_read_handle().read() {
for (k, v) in reader.iter() {
if let Some(entry) = v.get_one()
&& entry.expired(now, self.config.ttl)
&& entry.expired(now)
{
// Byte size is not reduced at this point, because the actual deletion
// will only happen at refresh time
Expand Down Expand Up @@ -283,8 +290,7 @@ impl Table for Memory {
key: &key,
include_key_metric_tag: self.config.internal_metrics.include_key_tag
});
row.as_object_map(Instant::now(), self.config.ttl, &key)
.map(|r| vec![r])
row.as_object_map(Instant::now(), &key).map(|r| vec![r])
}
None => {
emit!(MemoryEnrichmentTableReadFailed {
Expand Down Expand Up @@ -386,8 +392,10 @@ mod tests {
use futures::{StreamExt, future::ready};
use futures_util::stream;
use tokio::time;

use vector_lib::{
event::{EventContainer, MetricValue},
lookup::lookup_v2::OptionalValuePath,
metrics::Controller,
sink::VectorSink,
};
Expand Down Expand Up @@ -442,6 +450,7 @@ mod tests {
MemoryEntry {
value: "5".to_string(),
update_time: (Instant::now() - Duration::from_secs(secs_to_subtract)).into(),
ttl,
},
);
handle.write_handle.refresh();
Expand All @@ -462,6 +471,64 @@ mod tests {
);
}

#[test]
fn calculates_ttl_override() {
let global_ttl = 100;
let ttl_override = 10;
let memory = Memory::new(build_memory_config(|c| {
c.ttl = global_ttl;
c.ttl_field = OptionalValuePath::new("ttl");
}));
memory.handle_value(ObjectMap::from([
(
"ttl_override".into(),
Value::from(ObjectMap::from([
("val".into(), Value::from(5)),
("ttl".into(), Value::from(ttl_override)),
])),
),
(
"default_ttl".into(),
Value::from(ObjectMap::from([("val".into(), Value::from(5))])),
),
]));

let default_condition = Condition::Equals {
field: "key",
value: Value::from("default_ttl"),
};
let override_condition = Condition::Equals {
field: "key",
value: Value::from("ttl_override"),
};

assert_eq!(
Ok(ObjectMap::from([
("key".into(), Value::from("default_ttl")),
("ttl".into(), Value::from(global_ttl)),
(
"value".into(),
Value::from(ObjectMap::from([("val".into(), Value::from(5))]))
),
])),
memory.find_table_row(Case::Sensitive, &[default_condition], None, None, None)
);
assert_eq!(
Ok(ObjectMap::from([
("key".into(), Value::from("ttl_override")),
("ttl".into(), Value::from(ttl_override)),
(
"value".into(),
Value::from(ObjectMap::from([
("val".into(), Value::from(5)),
("ttl".into(), Value::from(ttl_override))
]))
),
])),
memory.find_table_row(Case::Sensitive, &[override_condition], None, None, None)
);
}

#[test]
fn removes_expired_records_on_scan_interval() {
let ttl = 100;
Expand All @@ -475,6 +542,7 @@ mod tests {
MemoryEntry {
value: "5".to_string(),
update_time: (Instant::now() - Duration::from_secs(ttl + 10)).into(),
ttl,
},
);
handle.write_handle.refresh();
Expand Down Expand Up @@ -542,6 +610,7 @@ mod tests {
MemoryEntry {
value: "5".to_string(),
update_time: (Instant::now() - Duration::from_secs(ttl / 2)).into(),
ttl,
},
);
handle.write_handle.refresh();
Expand Down
6 changes: 6 additions & 0 deletions website/cue/reference/generated/configuration.cue
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,12 @@ generated: configuration: configuration: {
required: false
relevant_when: "type = \"memory\""
}
ttl_field: {
type: string: default: ""
description: "Field in the incoming value used as the TTL override."
required: false
relevant_when: "type = \"memory\""
}
locale: {
type: string: default: "en"
description: """
Expand Down
Loading