Skip to content

Commit c0e211e

Browse files
committed
actor: refactor Actor to use Mailbox interface
This commit refactors the Actor implementation to use the new Mailbox interface instead of directly managing a channel. This change significantly simplifies the actor's message processing loop and improves separation of concerns. The main changes include replacing the direct channel field with a Mailbox interface, updating NewActor to create a ChannelMailbox instance, and refactoring the process method to use the iterator pattern provided by mailbox.Receive. The new implementation uses a clean for-range loop over the mailbox's message iterator, eliminating the complex select statement that previously handled both message reception and context cancellation. The Tell and Ask methods in actorRefImpl have been simplified to use the mailbox's Send method, which internally handles both the caller's context and the actor's context. This eliminates the need for complex select statements in these methods and ensures consistent context handling throughout the actor system. Message draining during shutdown is now handled through the mailbox's Drain method, providing a cleaner separation between normal message processing and cleanup operations. The actor still properly sends unprocessed messages to the Dead Letter Office and completes pending promises with appropriate errors during shutdown.
1 parent 86b77cd commit c0e211e

File tree

1 file changed

+48
-71
lines changed

1 file changed

+48
-71
lines changed

actor/actor.go

Lines changed: 48 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ type Actor[M Message, R any] struct {
4545
behavior ActorBehavior[M, R]
4646

4747
// mailbox is the incoming message queue for the actor.
48-
mailbox chan envelope[M, R]
48+
mailbox Mailbox[M, R]
4949

5050
// ctx is the context governing the actor's lifecycle.
5151
ctx context.Context
@@ -83,10 +83,13 @@ func NewActor[M Message, R any](cfg ActorConfig[M, R]) *Actor[M, R] {
8383
mailboxCapacity = 1
8484
}
8585

86+
// Create mailbox - could be injected via config in the future.
87+
mailbox := NewChannelMailbox[M, R](ctx, mailboxCapacity)
88+
8689
actor := &Actor[M, R]{
8790
id: cfg.ID,
8891
behavior: cfg.Behavior,
89-
mailbox: make(chan envelope[M, R], mailboxCapacity),
92+
mailbox: mailbox,
9093
ctx: ctx,
9194
cancel: cancel,
9295
dlo: cfg.DLO,
@@ -111,49 +114,32 @@ func (a *Actor[M, R]) Start() {
111114
// process is the main event loop for the actor. It continuously monitors its
112115
// mailbox for incoming messages and its context for cancellation signals.
113116
func (a *Actor[M, R]) process() {
114-
for {
115-
select {
116-
case env := <-a.mailbox:
117-
result := a.behavior.Receive(a.ctx, env.message)
118-
119-
// If a promise was provided (i.e., it was an "ask"
120-
// operation), complete the promise with the result from
121-
// the behavior.
122-
if env.promise != nil {
123-
env.promise.Complete(result)
124-
}
125-
126-
// The actor's context has been cancelled, signaling a stop
127-
// request. Exit the processing loop to terminate the actor's
128-
// goroutine. Before exiting, drain any remaining messages from
129-
// the mailbox.
130-
case <-a.ctx.Done():
131-
// Close the mailbox to prevent new incoming messages
132-
// and to allow the range operator below to terminate.
133-
close(a.mailbox)
134-
135-
// Drain any remaining messages.
136-
for env := range a.mailbox {
137-
// If a DLO is configured, send the original
138-
// message there for auditing or potential
139-
// manual reprocessing.
140-
if a.dlo != nil {
141-
a.dlo.Tell(
142-
context.Background(),
143-
env.message,
144-
)
145-
}
146-
147-
// If it was an Ask, complete the promise with
148-
// an error indicating the actor terminated.
149-
if env.promise != nil {
150-
env.promise.Complete(fn.Err[R](
151-
ErrActorTerminated),
152-
)
153-
}
154-
}
155-
156-
return
117+
// Use the new iterator pattern for receiving messages.
118+
for env := range a.mailbox.Receive(a.ctx) {
119+
result := a.behavior.Receive(a.ctx, env.message)
120+
121+
// If a promise was provided (i.e., it was an "ask"
122+
// operation), complete the promise with the result from
123+
// the behavior.
124+
if env.promise != nil {
125+
env.promise.Complete(result)
126+
}
127+
}
128+
129+
// Context was cancelled or mailbox closed, drain remaining messages.
130+
a.mailbox.Close()
131+
132+
for env := range a.mailbox.Drain() {
133+
// If a DLO is configured, send the original message there
134+
// for auditing or potential manual reprocessing.
135+
if a.dlo != nil {
136+
a.dlo.Tell(context.Background(), env.message)
137+
}
138+
139+
// If it was an Ask, complete the promise with an error
140+
// indicating the actor terminated.
141+
if env.promise != nil {
142+
env.promise.Complete(fn.Err[R](ErrActorTerminated))
157143
}
158144
}
159145
}
@@ -186,19 +172,15 @@ func (ref *actorRefImpl[M, R]) Tell(ctx context.Context, msg M) {
186172
return
187173
}
188174

189-
select {
190-
// Message successfully enqueued in the actor's mailbox.
191-
case ref.actor.mailbox <- envelope[M, R]{message: msg, promise: nil}:
175+
env := envelope[M, R]{message: msg, promise: nil}
192176

193-
// The context for the Tell operation was cancelled before the message
194-
// could be enqueued. The message is dropped.
195-
case <-ctx.Done():
196-
197-
// The actor itself has been stopped/terminated.
198-
case <-ref.actor.ctx.Done():
199-
// If the actor is terminated and has a DLO, send the message
200-
// there. Otherwise, it's dropped.
201-
ref.trySendToDLO(msg)
177+
// Use mailbox Send method which internally checks both contexts.
178+
if !ref.actor.mailbox.Send(ctx, env) {
179+
// Failed to send - check if actor terminated.
180+
if ref.actor.ctx.Err() != nil {
181+
ref.trySendToDLO(msg)
182+
}
183+
// Otherwise it was the caller's context that cancelled.
202184
}
203185
}
204186

@@ -219,21 +201,16 @@ func (ref *actorRefImpl[M, R]) Ask(ctx context.Context, msg M) Future[R] {
219201
return promise.Future()
220202
}
221203

222-
select {
223-
// Attempt to send the message along with its promise to the actor's
224-
// mailbox.
225-
case ref.actor.mailbox <- envelope[M, R]{message: msg, promise: promise}:
204+
env := envelope[M, R]{message: msg, promise: promise}
226205

227-
// The context for the Ask operation was cancelled before the message
228-
// could be enqueued. Complete the promise with the context's error to
229-
// unblock the caller.
230-
case <-ctx.Done():
231-
promise.Complete(fn.Err[R](ctx.Err()))
232-
233-
// The actor's context was cancelled (e.g., actor stopped) while this
234-
// Ask operation was attempting to send (e.g., mailbox was full).
235-
case <-ref.actor.ctx.Done():
236-
promise.Complete(fn.Err[R](ErrActorTerminated))
206+
// Use mailbox Send method which internally checks both contexts.
207+
if !ref.actor.mailbox.Send(ctx, env) {
208+
// Determine the error based on what failed.
209+
if ref.actor.ctx.Err() != nil {
210+
promise.Complete(fn.Err[R](ErrActorTerminated))
211+
} else {
212+
promise.Complete(fn.Err[R](ctx.Err()))
213+
}
237214
}
238215

239216
// Return the future associated with the promise, allowing the caller to

0 commit comments

Comments
 (0)