Skip to content

Commit 3530535

Browse files
added evaluate_with_timeout_and_warning
1 parent 0620d5c commit 3530535

File tree

2 files changed

+57
-100
lines changed

2 files changed

+57
-100
lines changed

src/builder/analyzer.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ use futures::{FutureExt, future::try_join_all};
1616
use tokio::time::Duration;
1717

1818
const TIMEOUT_THRESHOLD: u64 = 1800;
19-
const WARNING_THRESHOLD: u64 = 30;
2019

2120
#[derive(Debug)]
2221
pub(super) enum ValueTypeBuilder {

src/execution/evaluator.rs

Lines changed: 57 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,43 @@ async fn evaluate_child_op_scope(
363363
})
364364
}
365365

366+
async fn evaluate_with_timeout_and_warning<F, T>(
367+
eval_future: F,
368+
timeout_duration: Duration,
369+
warn_duration: Duration,
370+
op_kind: String,
371+
op_name: String,
372+
) -> Result<T>
373+
where
374+
F: std::future::Future<Output = Result<T>>,
375+
{
376+
let mut eval_future = Box::pin(eval_future);
377+
let mut warned = false;
378+
let timeout_future = tokio::time::sleep(timeout_duration);
379+
tokio::pin!(timeout_future);
380+
381+
loop {
382+
tokio::select! {
383+
res = &mut eval_future => {
384+
return res;
385+
}
386+
_ = &mut timeout_future => {
387+
return Err(anyhow!(
388+
"Function '{}' ({}) timed out after {} seconds",
389+
op_kind, op_name, timeout_duration.as_secs()
390+
));
391+
}
392+
_ = tokio::time::sleep(warn_duration), if !warned => {
393+
warn!(
394+
"Function '{}' ({}) is taking longer than {}s",
395+
op_kind, op_name, WARNING_THRESHOLD
396+
);
397+
warned = true;
398+
}
399+
}
400+
}
401+
}
402+
366403
async fn evaluate_op_scope(
367404
op_scope: &AnalyzedOpScope,
368405
scoped_entries: RefList<'_, &ScopeEntry<'_>>,
@@ -409,70 +446,28 @@ async fn evaluate_op_scope(
409446
let eval_future = evaluate_with_cell(output_value_cell.as_ref(), move || {
410447
op.executor.evaluate(input_values)
411448
});
412-
let mut eval_future = Box::pin(eval_future);
413-
let mut warned = false;
414-
let timeout_future = tokio::time::sleep(timeout_duration);
415-
tokio::pin!(timeout_future);
416-
417-
let res = loop {
418-
tokio::select! {
419-
res = &mut eval_future => {
420-
break Ok(res?);
421-
}
422-
_ = &mut timeout_future => {
423-
break Err(anyhow!(
424-
"Function '{}' ({}) timed out after {} seconds",
425-
op.op_kind, op.name, timeout_duration.as_secs()
426-
));
427-
}
428-
_ = tokio::time::sleep(warn_duration), if !warned => {
429-
eprintln!(
430-
"WARNING: Function '{}' ({}) is taking longer than 30s",
431-
op_kind_for_warning, op_name_for_warning
432-
);
433-
warn!(
434-
"Function '{}' ({}) is taking longer than {}s",
435-
op_kind_for_warning, op_name_for_warning, WARNING_THRESHOLD
436-
);
437-
warned = true;
438-
}
439-
}
440-
};
441-
442-
res.and_then(|v| head_scope.define_field(&op.output, &v))
449+
let v = evaluate_with_timeout_and_warning(
450+
eval_future,
451+
timeout_duration,
452+
warn_duration,
453+
op_kind_for_warning,
454+
op_name_for_warning,
455+
)
456+
.await?;
457+
458+
head_scope.define_field(&op.output, &v)
443459
} else {
444460
let eval_future = op.executor.evaluate(input_values);
445-
let mut eval_future = Box::pin(eval_future);
446-
let mut warned = false;
447-
let timeout_future = tokio::time::sleep(timeout_duration);
448-
tokio::pin!(timeout_future);
449-
450-
let res = loop {
451-
tokio::select! {
452-
res = &mut eval_future => {
453-
break Ok(res?);
454-
}
455-
_ = &mut timeout_future => {
456-
break Err(anyhow!(
457-
"Function '{}' ({}) timed out after {} seconds",
458-
op.op_kind, op.name, timeout_duration.as_secs()
459-
));
460-
}
461-
_ = tokio::time::sleep(warn_duration), if !warned => {
462-
eprintln!(
463-
"WARNING: Function '{}' ({}) is taking longer than 30s",
464-
op_kind_for_warning, op_name_for_warning
465-
);
466-
warn!(
467-
"Function '{}' ({}) is taking longer than {}s",
468-
op_kind_for_warning, op_name_for_warning, WARNING_THRESHOLD
469-
);
470-
warned = true;
471-
}
472-
}
473-
};
474-
475-
res.and_then(|v| head_scope.define_field(&op.output, &v))
461+
let v = evaluate_with_timeout_and_warning(
462+
eval_future,
463+
timeout_duration,
464+
warn_duration,
465+
op_kind_for_warning,
466+
op_name_for_warning,
467+
)
468+
.await?;
469+
470+
head_scope.define_field(&op.output, &v)
476471
};
477472

478473
if let Some(ref op_stats) = operation_in_process_stats {
@@ -581,43 +576,6 @@ async fn evaluate_op_scope(
581576
let collector_entry = scoped_entries
582577
.headn(op.collector_ref.scope_up_level as usize)
583578
.ok_or_else(|| anyhow::anyhow!("Collector level out of bound"))?;
584-
585-
// Assemble input values
586-
let input_values: Vec<value::Value> =
587-
assemble_input_values(&op.input.fields, scoped_entries)
588-
.collect::<Result<Vec<_>>>()?;
589-
590-
// Create field_values vector for all fields in the merged schema
591-
let mut field_values = op
592-
.field_index_mapping
593-
.iter()
594-
.map(|idx| {
595-
idx.map_or(value::Value::Null, |input_idx| {
596-
input_values[input_idx].clone()
597-
})
598-
})
599-
.collect::<Vec<_>>();
600-
601-
// Handle auto_uuid_field (assumed to be at position 0 for efficiency)
602-
if op.has_auto_uuid_field {
603-
if let Some(uuid_idx) = op.collector_schema.auto_uuid_field_idx {
604-
let uuid = memory.next_uuid(
605-
op.fingerprinter
606-
.clone()
607-
.with(
608-
&field_values
609-
.iter()
610-
.enumerate()
611-
.filter(|(i, _)| *i != uuid_idx)
612-
.map(|(_, v)| v)
613-
.collect::<Vec<_>>(),
614-
)?
615-
.into_fingerprint(),
616-
)?;
617-
field_values[uuid_idx] = value::Value::Basic(value::BasicValue::Uuid(uuid));
618-
}
619-
}
620-
621579
{
622580
let mut collected_records = collector_entry.collected_values
623581
[op.collector_ref.local.collector_idx as usize]

0 commit comments

Comments
 (0)