Skip to content

Conversation

c-gamble
Copy link
Contributor

@c-gamble c-gamble commented Sep 3, 2025

Description of changes

To address Incident #20, we are adding a blocking send with a timeout to the inter-component messaging system. This will (hopefully) allow the metering event producer to enqueue work for the consumer more reliably, instead of failing on a single check of the queue's fullness as the current try_send does.

Test plan

  • Tests pass locally with pytest for python, yarn test for js, cargo test for rust

Migration plan

Config is all backward-compatible. Necessary updates have been made where appropriate.

Observability plan

Monitor error frequency after hotfix.

Documentation Changes

N/A

Copy link

github-actions bot commented Sep 3, 2025

Reviewer Checklist

Please leverage this checklist to ensure your code review is thorough before approving

Testing, Bugs, Errors, Logs, Documentation

  • Can you think of any use case in which the code does not behave as intended? Have they been tested?
  • Can you think of any inputs or external events that could break the code? Is user input validated and safe? Have they been tested?
  • If appropriate, are there adequate property based tests?
  • If appropriate, are there adequate unit tests?
  • Should any logging, debugging, tracing information be added or removed?
  • Are error messages user-friendly?
  • Have all documentation changes needed been made?
  • Have all non-obvious changes been commented?

System Compatibility

  • Are there any potential impacts on other parts of the system or backward compatibility?
  • Does this change intersect with any items on our roadmap, and if so, is there a plan for fitting them together?

Quality

  • Is this code of a unexpectedly high quality (Readability, Modularity, Intuitiveness)

Copy link
Contributor

propel-code-bot bot commented Sep 3, 2025

Introduce send_timeout for Robust Inter-Component Messaging

This PR revamps the messaging infrastructure across component channels by transitioning from an immediate-failure, non-blocking message send (try_send) to a blocking send with a configurable timeout (send_timeout). The change is motivated by observed failures (see Incident #20) where components, particularly the metering event producer, could not reliably enqueue work due to instantaneous capacity checks. Now, message senders will block for a specified timeout, after which a clear error is returned if enqueueing still fails. Configurations are fully backward-compatible with new parameters added where necessary.

Key Changes

• Component channels now use send_timeout (blocking with timeout) instead of try_send (instant fail) for inter-component messaging.
• A new send_timeout method is required for all types implementing the Component trait, making the timeout configurable per component.
• Configuration structs (e.g., DispatcherConfig, CompactorConfig, MemberlistProviderConfig) now include new duration fields for per-channel send timeouts.
• All major system components (e.g., Dispatcher, WorkerThread, Compactor, Memberlist, Orchestrator, ClientManager, etc.) are updated to implement/send the new timeout configuration throughout their construction and operation.
• Tests and internal APIs updated to inject/send timeouts everywhere a component is instantiated or messages are sent.
• Error handling for message sends is improved: failed sends due to timeout now return detailed errors including the timeout cause.

Affected Areas

• System messaging primitives (ComponentSender, Component, ReceiverForMessage, WrappedMessage)
• Component system: Dispatcher, WorkerThread, Scheduler, Orchestrator, Compactor, MemberlistProvider, ClientManager
• Configuration for dispatcher, compactor, and memberlist providers
• Testing code for all affected subsystems
• Error handling/reporting logic for channel sends

This summary was automatically generated by @propel-code-bot

@c-gamble c-gamble requested a review from rescrv September 3, 2025 20:35
@@ -42,6 +42,7 @@ pub enum ComponentRuntime {
pub trait Component: Send + Sized + Debug + 'static {
fn get_name() -> &'static str;
fn queue_size(&self) -> usize;
fn send_timeout(&self) -> Duration;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not have a default implementation here so as to avoid implementing for every concrete type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i was mimicking the queue size - imo we shouldn't set a blanket send_timeout that gets applied to all Components by default. instead we should force implementers to define their own send_timeout based on their use case

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like it is easy to shoot yourself in the foot with a wrong value. I'd advocate for a good default value in the base like < 5ms and only override it in places where we need a custom value

@@ -159,11 +160,18 @@ impl ConsumableJoinHandle {
#[derive(Debug)]
pub(crate) struct ComponentSender<C: Component> {
sender: tokio::sync::mpsc::Sender<WrappedMessage<C>>,
send_timeout: Duration,
Copy link
Contributor

@sanketkedia sanketkedia Sep 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this need the param? If C is already a component that has a send_timeout() then why not directly use that here something like C::send_timeout()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because ComponentSender's wrap_and_send is generic over a C, it doesn't take a C reference, and send_timeout should take a C reference so that implementers can store their send_timeout in config like the queue size

@c-gamble c-gamble requested a review from sanketkedia September 4, 2025 00:41
@@ -62,6 +62,10 @@ pub trait Orchestrator: Debug + Send + Sized + 'static {
1000
}

fn send_timeout(&self) -> Duration {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this needed? Is this used anywhere?

@@ -443,6 +452,10 @@ mod tests {
1000
}

fn send_timeout(&self) -> Duration {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't these be < 5ms?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants