Skip to content

Commit ba206c1

Browse files
Kirill Parasotchenkopeterbourgon
authored andcommitted
Add FinalizerFunc to NATS transport (#790) (#790)
1 parent da7ac23 commit ba206c1

File tree

2 files changed

+71
-0
lines changed

2 files changed

+71
-0
lines changed

transport/nats/subscriber.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ type Subscriber struct {
1818
before []RequestFunc
1919
after []SubscriberResponseFunc
2020
errorEncoder ErrorEncoder
21+
finalizer []SubscriberFinalizerFunc
2122
logger log.Logger
2223
}
2324

@@ -73,12 +74,26 @@ func SubscriberErrorLogger(logger log.Logger) SubscriberOption {
7374
return func(s *Subscriber) { s.logger = logger }
7475
}
7576

77+
// SubscriberFinalizer is executed at the end of every request from a publisher through NATS.
78+
// By default, no finalizer is registered.
79+
func SubscriberFinalizer(f ...SubscriberFinalizerFunc) SubscriberOption {
80+
return func(s *Subscriber) { s.finalizer = f }
81+
}
82+
7683
// ServeMsg provides nats.MsgHandler.
7784
func (s Subscriber) ServeMsg(nc *nats.Conn) func(msg *nats.Msg) {
7885
return func(msg *nats.Msg) {
7986
ctx, cancel := context.WithCancel(context.Background())
8087
defer cancel()
8188

89+
if len(s.finalizer) > 0 {
90+
defer func() {
91+
for _, f := range s.finalizer {
92+
f(ctx, msg)
93+
}
94+
}()
95+
}
96+
8297
for _, f := range s.before {
8398
ctx = f(ctx, msg)
8499
}
@@ -125,6 +140,11 @@ func (s Subscriber) ServeMsg(nc *nats.Conn) func(msg *nats.Msg) {
125140
// types.
126141
type ErrorEncoder func(ctx context.Context, err error, reply string, nc *nats.Conn)
127142

143+
// ServerFinalizerFunc can be used to perform work at the end of an request
144+
// from a publisher, after the response has been written to the publisher. The principal
145+
// intended use is for request logging.
146+
type SubscriberFinalizerFunc func(ctx context.Context, msg *nats.Msg)
147+
128148
// NopRequestDecoder is a DecodeRequestFunc that can be used for requests that do not
129149
// need to be decoded, and simply returns nil, nil.
130150
func NopRequestDecoder(_ context.Context, _ *nats.Msg) (interface{}, error) {

transport/nats/subscriber_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,57 @@ func TestMultipleSubscriberAfter(t *testing.T) {
289289
wg.Wait()
290290
}
291291

292+
func TestSubscriberFinalizerFunc(t *testing.T) {
293+
nc := newNatsConn(t)
294+
defer nc.Close()
295+
296+
var (
297+
response = struct{ Body string }{"go eat a fly ugly\n"}
298+
wg sync.WaitGroup
299+
done = make(chan struct{})
300+
)
301+
handler := natstransport.NewSubscriber(
302+
endpoint.Nop,
303+
func(context.Context, *nats.Msg) (interface{}, error) {
304+
return struct{}{}, nil
305+
},
306+
func(_ context.Context, reply string, nc *nats.Conn, _ interface{}) error {
307+
b, err := json.Marshal(response)
308+
if err != nil {
309+
return err
310+
}
311+
312+
return nc.Publish(reply, b)
313+
},
314+
natstransport.SubscriberFinalizer(func(ctx context.Context, _ *nats.Msg) {
315+
close(done)
316+
}),
317+
)
318+
319+
sub, err := nc.QueueSubscribe("natstransport.test", "natstransport", handler.ServeMsg(nc))
320+
if err != nil {
321+
t.Fatal(err)
322+
}
323+
defer sub.Unsubscribe()
324+
325+
wg.Add(1)
326+
go func() {
327+
defer wg.Done()
328+
_, err := nc.Request("natstransport.test", []byte("test data"), 2*time.Second)
329+
if err != nil {
330+
t.Fatal(err)
331+
}
332+
}()
333+
334+
select {
335+
case <-done:
336+
case <-time.After(time.Second):
337+
t.Fatal("timeout waiting for finalizer")
338+
}
339+
340+
wg.Wait()
341+
}
342+
292343
func TestEncodeJSONResponse(t *testing.T) {
293344
nc := newNatsConn(t)
294345
defer nc.Close()

0 commit comments

Comments
 (0)