Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions bd-log-primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,21 @@ pub struct LogRef<'a> {
pub capture_session: Option<&'a str>,
}

impl<'a> LogRef<'a> {
pub fn into_owned(self) -> Log {
Log {
log_level: self.log_level,
log_type: self.log_type,
message: self.message.clone(),
fields: self.fields.captured_fields.clone(),
matching_fields: self.fields.matching_fields.clone(),
session_id: self.session_id.to_string(),
occurred_at: self.occurred_at,
capture_session: self.capture_session.map(|s| s.to_string()),
}
}
}

//
// FieldsRef
//
Expand Down
20 changes: 17 additions & 3 deletions bd-workflows/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use bd_client_common::file::{read_compressed, write_compressed};
use bd_client_stats::Stats;
use bd_client_stats_store::{Counter, Histogram, Scope};
use bd_error_reporter::reporter::handle_unexpected;
use bd_log_primitives::{Log, LogRef};
use bd_log_primitives::{Log, LogRef, LogType, log_level};
use bd_runtime::runtime::workflows::PersistenceWriteIntervalFlag;
use bd_runtime::runtime::{ConfigLoader, DurationWatch, IntWatch, session_capture};
use bd_stats_common::labels;
Expand Down Expand Up @@ -501,6 +501,8 @@ impl WorkflowsEngine {
// Measure duration in here even if the list of workflows is empty.
let _timer = self.stats.process_log_duration.start_timer();

let mut logs_to_inject = BTreeMap::new();

if self.state.session_id.is_empty() {
log::debug!(
"workflows engine: moving from no session to session \"{}\"",
Expand All @@ -515,6 +517,8 @@ impl WorkflowsEngine {
// all workflows in their initial states.
self.state.session_id = log.session_id.to_string();
self.stats.sessions_total.inc();

logs_to_inject.insert("new_session", Self::make_new_session_log(log));
} else if self.state.session_id != log.session_id {
log::debug!(
"workflows engine: moving from \"{}\" to new session \"{}\", cleaning workflows state",
Expand All @@ -532,6 +536,7 @@ impl WorkflowsEngine {
self.clean_state();
self.state.session_id = log.session_id.to_string();
self.stats.sessions_total.inc();
logs_to_inject.insert("new_session", Self::make_new_session_log(log));
}

// Return early if there's no work to avoid unnecessary copies.
Expand All @@ -547,12 +552,11 @@ impl WorkflowsEngine {
triggered_flushes_buffer_ids: BTreeSet::new(),
triggered_flush_buffers_action_ids: BTreeSet::new(),
capture_screenshot: false,
logs_to_inject: BTreeMap::new(),
logs_to_inject,
};
}

let mut actions: Vec<TriggeredAction<'_>> = vec![];
let mut logs_to_inject: BTreeMap<&'a str, Log> = BTreeMap::new();
for (index, workflow) in &mut self.state.workflows.iter_mut().enumerate() {
let Some(config) = self.configs.get(index) else {
continue;
Expand Down Expand Up @@ -703,6 +707,16 @@ impl WorkflowsEngine {
}
}

fn make_new_session_log(log: &LogRef<'_>) -> Log {
let mut new_session_log = log.into_owned();
new_session_log.log_level = log_level::INFO;
new_session_log.log_type = LogType::Lifecycle;
new_session_log.message = "New bitdrift session".into();
new_session_log.capture_session = None;

new_session_log
}

fn clean_state(&mut self) {
// We clear the ongoing workflows state as opposed to the whole state because:
// * pending actions (uploads) are not affected by the session change, and ongoing logs uploads
Expand Down
Loading