-
Notifications
You must be signed in to change notification settings - Fork 2.2k
actor: add new abstraction over mailbox #10142
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: actor
Are you sure you want to change the base?
Conversation
This commit introduces a new Mailbox interface that abstracts the message queue implementation for actors. Previously, actors used a direct channel for their mailbox, which limited flexibility and made it difficult to implement alternative mailbox strategies. The new Mailbox interface provides methods for sending, receiving, and draining messages, with full context support for cancellation. The Receive method leverages Go 1.23's iter.Seq pattern, providing a clean iterator-based API that allows natural for-range loops over messages. The ChannelMailbox implementation maintains the existing channel-based behavior while conforming to the new interface. It stores the actor's context internally, ensuring both caller and actor contexts are properly respected during send and receive operations. This simplifies context handling compared to complex context merging approaches. This abstraction enables future implementations such as priority mailboxes, persistent mailboxes, or bounded mailboxes with overflow strategies, without requiring changes to the actor implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary of Changes
Hello @Roasbeef, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
I've implemented a significant architectural improvement to the actor system by introducing an abstraction layer over the actor's mailbox. This change decouples the actor's core logic from its message queuing mechanism, making the system more extensible and adaptable. The immediate benefit is a cleaner, more robust message processing flow within the actor, with future possibilities for advanced mailbox behaviors like persistence or flow control.
Highlights
- Mailbox Abstraction: I've introduced a new
Mailbox
interface that abstracts the underlying message queue mechanism for actors. This allows for greater flexibility, enabling different mailbox implementations such as durable on-disk queues or backpressure-aware mailboxes in the future. - ChannelMailbox Implementation with iter.Seq: The default mailbox implementation,
ChannelMailbox
, now leverages Go channels and integrates with Go'siter.Seq
for an elegant and context-aware message reception API within the actor's processing loop. - Actor Core Logic Refactoring: The core
Actor
logic and itsTell
andAsk
methods have been updated to interact with the newMailbox
interface, simplifying the code and centralizing message handling concerns within the mailbox implementation itself.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command>
or @gemini-code-assist <command>
. Below is a summary of the supported commands.
Feature | Command | Description |
---|---|---|
Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/
folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments or fill out our survey to provide feedback.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a valuable abstraction for the actor mailbox, which is a great step towards making the system more flexible and modular. The use of Go 1.22 iterators for message consumption is a nice, modern touch. However, the new ChannelMailbox
implementation has a critical race condition between send and close operations that could lead to panics in a concurrent environment. Additionally, some of the new tests rely on time.Sleep
for synchronization, which can make them flaky. I've provided suggestions to fix the race condition and improve the robustness of the tests.
func (m *ChannelMailbox[M, R]) Send(ctx context.Context, | ||
env envelope[M, R]) bool { | ||
if m.IsClosed() { | ||
return false | ||
} | ||
|
||
select { | ||
case m.ch <- env: | ||
return true | ||
case <-ctx.Done(): | ||
return false | ||
case <-m.actorCtx.Done(): | ||
// Actor is shutting down. | ||
return false | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a critical race condition here. A concurrent call to Close()
could close the channel m.ch
after m.IsClosed()
returns false
but before the m.ch <- env
send operation occurs. This would cause the send on the closed channel to panic.
To fix this, you should use the RWMutex
(m.mu
) to protect the send operation. By acquiring a read lock before checking IsClosed
and sending, you can prevent Close()
(which acquires a write lock) from executing between the check and the send.
func (m *ChannelMailbox[M, R]) Send(ctx context.Context, | |
env envelope[M, R]) bool { | |
if m.IsClosed() { | |
return false | |
} | |
select { | |
case m.ch <- env: | |
return true | |
case <-ctx.Done(): | |
return false | |
case <-m.actorCtx.Done(): | |
// Actor is shutting down. | |
return false | |
} | |
} | |
func (m *ChannelMailbox[M, R]) Send(ctx context.Context, | |
env envelope[M, R]) bool { | |
m.mu.RLock() | |
defer m.mu.RUnlock() | |
if m.IsClosed() { | |
return false | |
} | |
select { | |
case m.ch <- env: | |
return true | |
case <-ctx.Done(): | |
return false | |
case <-m.actorCtx.Done(): | |
// Actor is shutting down. | |
return false | |
} | |
} |
func (m *ChannelMailbox[M, R]) TrySend(env envelope[M, R]) bool { | ||
if m.IsClosed() { | ||
return false | ||
} | ||
|
||
select { | ||
case m.ch <- env: | ||
return true | ||
default: | ||
return false | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the Send
method, this function has a critical race condition. A concurrent call to Close()
can lead to a panic when attempting to send on the closed channel. You should acquire a read lock from m.mu
to ensure the channel is not closed between the IsClosed()
check and the send operation.
func (m *ChannelMailbox[M, R]) TrySend(env envelope[M, R]) bool { | |
if m.IsClosed() { | |
return false | |
} | |
select { | |
case m.ch <- env: | |
return true | |
default: | |
return false | |
} | |
} | |
func (m *ChannelMailbox[M, R]) TrySend(env envelope[M, R]) bool { | |
m.mu.RLock() | |
defer m.mu.RUnlock() | |
if m.IsClosed() { | |
return false | |
} | |
select { | |
case m.ch <- env: | |
return true | |
default: | |
return false | |
} | |
} |
go func() { | ||
defer wg.Done() | ||
for env := range mailbox.Receive(ctx) { | ||
received = append(received, env.message.Value) | ||
if len(received) >= 3 { | ||
break | ||
} | ||
} | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test uses time.Sleep
for synchronization, which can lead to flaky tests. A more robust approach is to rely on deterministic synchronization primitives.
Instead of sleeping, you can close the mailbox immediately after sending the messages. The mailbox.Receive
iterator will then naturally terminate when the channel is drained and closed. Waiting on the WaitGroup
after closing the channel will ensure the receiver goroutine has processed all messages without relying on arbitrary sleep durations.
You should also remove the if len(received) >= 3 { break }
condition inside the goroutine to ensure it processes all messages until the channel is closed.
go func() {
defer wg.Done()
for env := range mailbox.Receive(ctx) {
received = append(received, env.message.Value)
}
}()
go func() { | ||
defer receiverWg.Done() | ||
for env := range mailbox.Receive(ctx) { | ||
received = append(received, env.message.Value) | ||
if len(received) >= numSenders*messagesPerSender { | ||
break | ||
} | ||
} | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This concurrent test relies on time.Sleep
to ensure the receiver has time to process messages, which can make the test flaky. A better approach is to use deterministic synchronization.
After waiting for all sender goroutines to finish with wg.Wait()
, you can immediately call mailbox.Close()
. This signals to the receiver's range
loop that no more messages will arrive, and it will exit gracefully after processing all buffered messages. Then, receiverWg.Wait()
will block until the receiver has actually finished, making the test robust.
The break
condition inside the receiver goroutine should also be removed to allow it to drain the channel completely.
go func() {
defer receiverWg.Done()
for env := range mailbox.Receive(ctx) {
received = append(received, env.message.Value)
}
}()
This commit adds thorough test coverage for the new Mailbox interface and ChannelMailbox implementation. The tests verify correct behavior across various scenarios including successful sends, context cancellation, mailbox closure, and concurrent operations. The test suite specifically validates that the mailbox respects both the caller's context and the actor's context during send and receive operations. This ensures that actors properly shut down when their context is cancelled, and that callers can cancel operations without affecting the actor's lifecycle. Additional tests cover edge cases such as zero-capacity mailboxes (which default to a capacity of 1), draining messages after closure, and concurrent sends from multiple goroutines. The concurrent test uses 10 senders each sending 100 messages to verify thread-safety and proper message ordering. All tests pass with the race detector enabled, confirming the implementation is free from data races.
This commit refactors the Actor implementation to use the new Mailbox interface instead of directly managing a channel. This change significantly simplifies the actor's message processing loop and improves separation of concerns. The main changes include replacing the direct channel field with a Mailbox interface, updating NewActor to create a ChannelMailbox instance, and refactoring the process method to use the iterator pattern provided by mailbox.Receive. The new implementation uses a clean for-range loop over the mailbox's message iterator, eliminating the complex select statement that previously handled both message reception and context cancellation. The Tell and Ask methods in actorRefImpl have been simplified to use the mailbox's Send method, which internally handles both the caller's context and the actor's context. This eliminates the need for complex select statements in these methods and ensures consistent context handling throughout the actor system. Message draining during shutdown is now handled through the mailbox's Drain method, providing a cleaner separation between normal message processing and cleanup operations. The actor still properly sends unprocessed messages to the Dead Letter Office and completes pending promises with appropriate errors during shutdown.
This commit updates the example test function names to follow Go's convention for example functions. The functions were renamed from ExampleBasicActor and ExampleStructActor to Example_basicActor and Example_structActor respectively. This naming convention with an underscore ensures that the examples are still recognized as valid example functions by the Go testing framework while avoiding potential naming conflicts with actual types in the package.
e465137
to
99f3f9e
Compare
In this PR, we add an abstraction over the mailbox w/ the default actor impl. This is useful, as this allows us to slot in a different mailbox. Examples include a durable on disk queue, a mailbox with backpressure awareness, etc.
We then make a default implementation that uses Go channels, then use that in place.
The new interface also uses an
iter.Seq
to allow for an easy iteration API that still is aware of context cancellations.