Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 99 additions & 22 deletions src/ops/py_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ impl PyFunctionExecutor {
.transpose()?
.as_ref(),
)
.to_result_with_py_trace(py)?;
.to_result_with_py_trace(py)
.with_context(|| format!("while calling user-configured function"))?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also attach the function name?

The same for error messages below.

Ok(result.into_bound(py))
}
}
Expand Down Expand Up @@ -221,7 +222,8 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory {
PyTuple::new(py, args.into_iter())?,
Some(&kwargs.into_py_dict(py)?),
)
.to_result_with_py_trace(py)?;
.to_result_with_py_trace(py)
.with_context(|| format!("while building user-configured function"))?;
let (result_type, executor) = result
.extract::<(crate::py::Pythonized<schema::EnrichedValueType>, Py<PyAny>)>(py)?;
Ok((
Expand All @@ -244,7 +246,8 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory {
Python::with_gil(|py| -> anyhow::Result<_> {
let prepare_coro = executor
.call_method(py, "prepare", (), None)
.to_result_with_py_trace(py)?;
.to_result_with_py_trace(py)
.with_context(|| format!("while preparing user-configured function"))?;
let prepare_fut = from_py_future(
py,
&pyo3_async_runtimes::TaskLocals::new(
Expand Down Expand Up @@ -337,6 +340,7 @@ impl interface::SourceExecutor for PySourceExecutor {
py_source_executor
.call_method(py, "list_async", (pythonize(py, options)?,), None)
.to_result_with_py_trace(py)
.with_context(|| format!("while listing user-configured source"))
})?;

// Create a stream that pulls from the Python async iterator one item at a time
Expand Down Expand Up @@ -376,7 +380,13 @@ impl interface::SourceExecutor for PySourceExecutor {
),
None,
)
.to_result_with_py_trace(py)?;
.to_result_with_py_trace(py)
.with_context(|| {
format!(
"while fetching user-configured source for key: {:?}",
&key_clone
)
})?;
let task_locals =
pyo3_async_runtimes::TaskLocals::new(py_exec_ctx.event_loop.bind(py).clone());
Ok(from_py_future(
Expand Down Expand Up @@ -416,7 +426,8 @@ impl PySourceExecutor {
let next_item_coro = Python::with_gil(|py| -> Result<_> {
let coro = py_async_iter
.call_method0(py, "__anext__")
.to_result_with_py_trace(py)?;
.to_result_with_py_trace(py)
.with_context(|| format!("while iterating over user-configured source"))?;
let task_locals =
pyo3_async_runtimes::TaskLocals::new(py_exec_ctx.event_loop.bind(py).clone());
Ok(from_py_future(py, &task_locals, coro.into_bound(py))?)
Expand Down Expand Up @@ -549,7 +560,7 @@ impl PySourceExecutor {
impl interface::SourceFactory for PySourceConnectorFactory {
async fn build(
self: Arc<Self>,
_source_name: &str,
source_name: &str,
spec: serde_json::Value,
context: Arc<interface::FlowInstanceContext>,
) -> Result<(
Expand All @@ -567,7 +578,13 @@ impl interface::SourceFactory for PySourceConnectorFactory {
let value_type_result = self
.py_source_connector
.call_method(py, "get_table_type", (), None)
.to_result_with_py_trace(py)?;
.to_result_with_py_trace(py)
.with_context(|| {
format!(
"while fetching table type from user-configured source `{}`",
source_name
)
})?;
let table_type: schema::EnrichedValueType =
depythonize(&value_type_result.into_bound(py))?;
Ok(table_type)
Expand Down Expand Up @@ -596,14 +613,20 @@ impl interface::SourceFactory for PySourceConnectorFactory {
table_type.typ
),
};

let source_name = source_name.to_string();
let executor_fut = async move {
// Create the executor using the async create_executor method
let create_future = Python::with_gil(|py| -> Result<_> {
let create_coro = self
.py_source_connector
.call_method(py, "create_executor", (pythonize(py, &spec)?,), None)
.to_result_with_py_trace(py)?;
.to_result_with_py_trace(py)
.with_context(|| {
format!(
"while constructing executor for user-configured source `{}`",
source_name
)
})?;
let task_locals =
pyo3_async_runtimes::TaskLocals::new(py_exec_ctx.event_loop.bind(py).clone());
let create_future = from_py_future(py, &task_locals, create_coro.into_bound(py))?;
Expand All @@ -614,13 +637,25 @@ impl interface::SourceFactory for PySourceConnectorFactory {

let (py_source_executor_context, provides_ordinal) =
Python::with_gil(|py| -> Result<_> {
let executor_context =
py_executor_context_result.to_result_with_py_trace(py)?;
let executor_context = py_executor_context_result
.to_result_with_py_trace(py)
.with_context(|| {
format!(
"while getting executor context for user-configured source `{}`",
source_name
)
})?;

// Get provides_ordinal from the executor context
let provides_ordinal = executor_context
.call_method(py, "provides_ordinal", (), None)
.to_result_with_py_trace(py)?
.to_result_with_py_trace(py)
.with_context(|| {
format!(
"while calling provides_ordinal for user-configured source `{}`",
source_name
)
})?
.extract::<bool>(py)?;

Ok((executor_context, provides_ordinal))
Expand Down Expand Up @@ -720,20 +755,38 @@ impl interface::TargetFactory for PyExportTargetFactory {
),
None,
)
.to_result_with_py_trace(py)?;
.to_result_with_py_trace(py)
.with_context(|| {
format!(
"while setting up export context for user-configured target `{}`",
&data_collection.name
)
})?;

// Call the `get_persistent_key` method to get the persistent key.
let persistent_key = self
.py_target_connector
.call_method(py, "get_persistent_key", (&py_export_ctx,), None)
.to_result_with_py_trace(py)?;
.to_result_with_py_trace(py)
.with_context(|| {
format!(
"while getting persistent key for user-configured target `{}`",
&data_collection.name
)
})?;
let persistent_key: serde_json::Value =
depythonize(&persistent_key.into_bound(py))?;

let setup_state = self
.py_target_connector
.call_method(py, "get_setup_state", (&py_export_ctx,), None)
.to_result_with_py_trace(py)?;
.to_result_with_py_trace(py)
.with_context(|| {
format!(
"while getting setup state for user-configured target `{}`",
&data_collection.name
)
})?;
let setup_state: serde_json::Value = depythonize(&setup_state.into_bound(py))?;

anyhow::Ok((py_export_ctx, persistent_key, setup_state))
Expand All @@ -747,7 +800,13 @@ impl interface::TargetFactory for PyExportTargetFactory {
let prepare_coro = factory
.py_target_connector
.call_method(py, "prepare_async", (&py_export_ctx,), None)
.to_result_with_py_trace(py)?;
.to_result_with_py_trace(py)
.with_context(|| {
format!(
"while preparing user-configured target `{}`",
&data_collection.name
)
})?;
let task_locals = pyo3_async_runtimes::TaskLocals::new(
py_exec_ctx.event_loop.bind(py).clone(),
);
Expand Down Expand Up @@ -816,7 +875,10 @@ impl interface::TargetFactory for PyExportTargetFactory {
),
None,
)
.to_result_with_py_trace(py)?;
.to_result_with_py_trace(py)
.with_context(|| {
format!("while calling check_state_compatibility in user-configured target")
})?;
let compatibility: SetupStateCompatibility = depythonize(&result.into_bound(py))?;
Ok(compatibility)
})?;
Expand All @@ -828,7 +890,10 @@ impl interface::TargetFactory for PyExportTargetFactory {
let result = self
.py_target_connector
.call_method(py, "describe_resource", (pythonize(py, key)?,), None)
.to_result_with_py_trace(py)?;
.to_result_with_py_trace(py)
.with_context(|| {
format!("while calling describe_resource in user-configured target")
})?;
let description = result.extract::<String>(py)?;
Ok(description)
})
Expand Down Expand Up @@ -885,7 +950,10 @@ impl interface::TargetFactory for PyExportTargetFactory {
(pythonize(py, &setup_changes)?,),
None,
)
.to_result_with_py_trace(py)?;
.to_result_with_py_trace(py)
.with_context(|| {
format!("while calling apply_setup_changes_async in user-configured target")
})?;
let task_locals =
pyo3_async_runtimes::TaskLocals::new(py_exec_ctx.event_loop.bind(py).clone());
Ok(from_py_future(
Expand All @@ -895,7 +963,11 @@ impl interface::TargetFactory for PyExportTargetFactory {
)?)
})?
.await;
Python::with_gil(move |py| py_result.to_result_with_py_trace(py))?;
Python::with_gil(move |py| {
py_result
.to_result_with_py_trace(py)
.with_context(|| format!("while applying setup changes in user-configured target"))
})?;

Ok(())
}
Expand Down Expand Up @@ -946,7 +1018,8 @@ impl interface::TargetFactory for PyExportTargetFactory {
let result_coro = self
.py_target_connector
.call_method(py, "mutate_async", (py_args,), None)
.to_result_with_py_trace(py)?;
.to_result_with_py_trace(py)
.with_context(|| format!("while calling mutate_async in user-configured target"))?;
let task_locals =
pyo3_async_runtimes::TaskLocals::new(py_exec_ctx.event_loop.bind(py).clone());
Ok(from_py_future(
Expand All @@ -957,7 +1030,11 @@ impl interface::TargetFactory for PyExportTargetFactory {
})?
.await;

Python::with_gil(move |py| py_result.to_result_with_py_trace(py))?;
Python::with_gil(move |py| {
py_result
.to_result_with_py_trace(py)
.with_context(|| format!("while applying mutations in user-configured target"))
})?;
Ok(())
}
}