Skip to content

Commit 9d9542d

Browse files
esensardomalessithomasqueirozb
authored
feat(enrichment tables): add per-event ttl for memory enrichment table (#23666)
* feat(enrichment tables): add per-event ttl for memory enrichment table Adds an optional `ttl_field` configuration to memory enrichment table, to allow overriding the global default TTL per event. * Add changelog entry * Update website/cue/reference/generated/configuration.cue Co-authored-by: domalessi <[email protected]> * Update description in the code --------- Co-authored-by: domalessi <[email protected]> Co-authored-by: Thomas <[email protected]>
1 parent b8a3457 commit 9d9542d

File tree

5 files changed

+99
-15
lines changed

5 files changed

+99
-15
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Added an optional `ttl_field` configuration option to the memory enrichment table, to override the global memory table TTL on a per event basis.
2+
3+
authors: esensar Quad9DNS

src/enrichment_tables/memory/config.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use vector_lib::{
88
configurable::configurable_component,
99
enrichment::Table,
1010
id::ComponentKey,
11+
lookup::lookup_v2::OptionalValuePath,
1112
schema::{self},
1213
sink::VectorSink,
1314
};
@@ -64,6 +65,10 @@ pub struct MemoryConfig {
6465
#[configurable(derived)]
6566
#[serde(skip_serializing_if = "vector_lib::serde::is_default")]
6667
pub source_config: Option<MemorySourceConfig>,
68+
/// Field in the incoming value used as the TTL override.
69+
#[configurable(derived)]
70+
#[serde(default)]
71+
pub ttl_field: OptionalValuePath,
6772

6873
#[serde(skip)]
6974
memory: Arc<Mutex<Option<Box<Memory>>>>,
@@ -89,6 +94,7 @@ impl Default for MemoryConfig {
8994
log_namespace: None,
9095
source_config: None,
9196
internal_metrics: InternalMetricsConfig::default(),
97+
ttl_field: OptionalValuePath::none(),
9298
}
9399
}
94100
}

src/enrichment_tables/memory/source.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ impl MemorySource {
9797
})
9898
.filter_map(|(k, v)| {
9999
let mut event = Event::Log(LogEvent::from_map(
100-
v.as_object_map(now, self.memory.config.ttl, k).ok()?,
100+
v.as_object_map(now, k).ok()?,
101101
EventMetadata::default(),
102102
));
103103
let log = event.as_mut_log();

src/enrichment_tables/memory/table.rs

Lines changed: 83 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ use crate::{
4747
pub struct MemoryEntry {
4848
value: String,
4949
update_time: CopyValue<Instant>,
50+
ttl: u64,
5051
}
5152

5253
impl ByteSizeOf for MemoryEntry {
@@ -56,13 +57,10 @@ impl ByteSizeOf for MemoryEntry {
5657
}
5758

5859
impl MemoryEntry {
59-
pub(super) fn as_object_map(
60-
&self,
61-
now: Instant,
62-
total_ttl: u64,
63-
key: &str,
64-
) -> Result<ObjectMap, String> {
65-
let ttl = total_ttl.saturating_sub(now.duration_since(*self.update_time).as_secs());
60+
pub(super) fn as_object_map(&self, now: Instant, key: &str) -> Result<ObjectMap, String> {
61+
let ttl = self
62+
.ttl
63+
.saturating_sub(now.duration_since(*self.update_time).as_secs());
6664
Ok(ObjectMap::from([
6765
(
6866
KeyString::from("key"),
@@ -80,8 +78,8 @@ impl MemoryEntry {
8078
]))
8179
}
8280

83-
fn expired(&self, now: Instant, ttl: u64) -> bool {
84-
now.duration_since(*self.update_time).as_secs() > ttl
81+
fn expired(&self, now: Instant) -> bool {
82+
now.duration_since(*self.update_time).as_secs() > self.ttl
8583
}
8684
}
8785

@@ -128,9 +126,9 @@ impl Memory {
128126
let mut writer = self.write_handle.lock().expect("mutex poisoned");
129127
let now = Instant::now();
130128

131-
for (k, v) in value.into_iter() {
129+
for (k, value) in value.into_iter() {
132130
let new_entry_key = String::from(k);
133-
let Ok(v) = serde_json::to_string(&v) else {
131+
let Ok(v) = serde_json::to_string(&value) else {
134132
emit!(MemoryEnrichmentTableInsertFailed {
135133
key: &new_entry_key,
136134
include_key_metric_tag: self.config.internal_metrics.include_key_tag
@@ -140,6 +138,15 @@ impl Memory {
140138
let new_entry = MemoryEntry {
141139
value: v,
142140
update_time: now.into(),
141+
ttl: self
142+
.config
143+
.ttl_field
144+
.path
145+
.as_ref()
146+
.and_then(|p| value.get(p))
147+
.and_then(|v| v.as_integer())
148+
.map(|v| v as u64)
149+
.unwrap_or(self.config.ttl),
143150
};
144151
let new_entry_size = new_entry_key.size_of() + new_entry.size_of();
145152
if let Some(max_byte_size) = self.config.max_byte_size
@@ -182,7 +189,7 @@ impl Memory {
182189
if let Some(reader) = self.get_read_handle().read() {
183190
for (k, v) in reader.iter() {
184191
if let Some(entry) = v.get_one()
185-
&& entry.expired(now, self.config.ttl)
192+
&& entry.expired(now)
186193
{
187194
// Byte size is not reduced at this point, because the actual deletion
188195
// will only happen at refresh time
@@ -283,8 +290,7 @@ impl Table for Memory {
283290
key: &key,
284291
include_key_metric_tag: self.config.internal_metrics.include_key_tag
285292
});
286-
row.as_object_map(Instant::now(), self.config.ttl, &key)
287-
.map(|r| vec![r])
293+
row.as_object_map(Instant::now(), &key).map(|r| vec![r])
288294
}
289295
None => {
290296
emit!(MemoryEnrichmentTableReadFailed {
@@ -386,8 +392,10 @@ mod tests {
386392
use futures::{StreamExt, future::ready};
387393
use futures_util::stream;
388394
use tokio::time;
395+
389396
use vector_lib::{
390397
event::{EventContainer, MetricValue},
398+
lookup::lookup_v2::OptionalValuePath,
391399
metrics::Controller,
392400
sink::VectorSink,
393401
};
@@ -442,6 +450,7 @@ mod tests {
442450
MemoryEntry {
443451
value: "5".to_string(),
444452
update_time: (Instant::now() - Duration::from_secs(secs_to_subtract)).into(),
453+
ttl,
445454
},
446455
);
447456
handle.write_handle.refresh();
@@ -462,6 +471,64 @@ mod tests {
462471
);
463472
}
464473

474+
#[test]
475+
fn calculates_ttl_override() {
476+
let global_ttl = 100;
477+
let ttl_override = 10;
478+
let memory = Memory::new(build_memory_config(|c| {
479+
c.ttl = global_ttl;
480+
c.ttl_field = OptionalValuePath::new("ttl");
481+
}));
482+
memory.handle_value(ObjectMap::from([
483+
(
484+
"ttl_override".into(),
485+
Value::from(ObjectMap::from([
486+
("val".into(), Value::from(5)),
487+
("ttl".into(), Value::from(ttl_override)),
488+
])),
489+
),
490+
(
491+
"default_ttl".into(),
492+
Value::from(ObjectMap::from([("val".into(), Value::from(5))])),
493+
),
494+
]));
495+
496+
let default_condition = Condition::Equals {
497+
field: "key",
498+
value: Value::from("default_ttl"),
499+
};
500+
let override_condition = Condition::Equals {
501+
field: "key",
502+
value: Value::from("ttl_override"),
503+
};
504+
505+
assert_eq!(
506+
Ok(ObjectMap::from([
507+
("key".into(), Value::from("default_ttl")),
508+
("ttl".into(), Value::from(global_ttl)),
509+
(
510+
"value".into(),
511+
Value::from(ObjectMap::from([("val".into(), Value::from(5))]))
512+
),
513+
])),
514+
memory.find_table_row(Case::Sensitive, &[default_condition], None, None, None)
515+
);
516+
assert_eq!(
517+
Ok(ObjectMap::from([
518+
("key".into(), Value::from("ttl_override")),
519+
("ttl".into(), Value::from(ttl_override)),
520+
(
521+
"value".into(),
522+
Value::from(ObjectMap::from([
523+
("val".into(), Value::from(5)),
524+
("ttl".into(), Value::from(ttl_override))
525+
]))
526+
),
527+
])),
528+
memory.find_table_row(Case::Sensitive, &[override_condition], None, None, None)
529+
);
530+
}
531+
465532
#[test]
466533
fn removes_expired_records_on_scan_interval() {
467534
let ttl = 100;
@@ -475,6 +542,7 @@ mod tests {
475542
MemoryEntry {
476543
value: "5".to_string(),
477544
update_time: (Instant::now() - Duration::from_secs(ttl + 10)).into(),
545+
ttl,
478546
},
479547
);
480548
handle.write_handle.refresh();
@@ -542,6 +610,7 @@ mod tests {
542610
MemoryEntry {
543611
value: "5".to_string(),
544612
update_time: (Instant::now() - Duration::from_secs(ttl / 2)).into(),
613+
ttl,
545614
},
546615
);
547616
handle.write_handle.refresh();

website/cue/reference/generated/configuration.cue

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,12 @@ generated: configuration: configuration: {
207207
required: false
208208
relevant_when: "type = \"memory\""
209209
}
210+
ttl_field: {
211+
type: string: default: ""
212+
description: "Field in the incoming value used as the TTL override."
213+
required: false
214+
relevant_when: "type = \"memory\""
215+
}
210216
locale: {
211217
type: string: default: "en"
212218
description: """

0 commit comments

Comments
 (0)