Skip to content

Commit 551c7dd

Browse files
committed
onion_message: Process onion messages async
This commit implements the BackpressureQueue in the onion message endpoint.
1 parent 95c92e3 commit 551c7dd

File tree

2 files changed

+106
-14
lines changed

2 files changed

+106
-14
lines changed

onion_message/onion_endpoint.go

Lines changed: 94 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,28 @@ import (
55
"context"
66
"errors"
77
"fmt"
8+
"sync"
89

910
"github.com/btcsuite/btcd/btcec/v2"
1011
"github.com/lightningnetwork/lnd/htlcswitch/hop"
1112
"github.com/lightningnetwork/lnd/lnwire"
1213
"github.com/lightningnetwork/lnd/msgmux"
14+
"github.com/lightningnetwork/lnd/queue"
1315
"github.com/lightningnetwork/lnd/record"
1416
"github.com/lightningnetwork/lnd/subscribe"
1517
"github.com/lightningnetwork/lnd/tlv"
1618
)
1719

20+
const (
21+
// defaultOnionMessageQueueSize is the default size of the onion
22+
// message queue.
23+
defaultOnionMessageQueueSize = 50
24+
25+
// defaultMinRedThreshold is the default minimum threshold for the Random
26+
// Early Drop queue. It is set to 80% of the queue size.
27+
defaultMinRedThreshold = 40
28+
)
29+
1830
var (
1931
// ErrBadMessage is returned when we can't process an onion message.
2032
ErrBadMessage = errors.New("onion message processing failed")
@@ -108,6 +120,16 @@ type OnionEndpoint struct {
108120

109121
// MsgSender sends a onion message to the target peer.
110122
MsgSender OnionMessageSender
123+
124+
// onionMsgQueue is a queue that contains incoming onion messages.
125+
onionMsgQueue *queue.BackpressureQueue[msgmux.PeerMsg]
126+
127+
// quit is a channel that is closed when the endpoint is shutting down.
128+
quit chan struct{}
129+
130+
// wg is a wait group that is used to wait for the message handler to
131+
// exit.
132+
wg sync.WaitGroup
111133
}
112134

113135
// A compile-time check to ensure OnionEndpoint implements the Endpoint
@@ -116,8 +138,17 @@ var _ msgmux.Endpoint = (*OnionEndpoint)(nil)
116138

117139
// NewOnionEndpoint creates a new OnionEndpoint with the given options.
118140
func NewOnionEndpoint(opts ...OnionEndpointOption) *OnionEndpoint {
141+
// By default, we will drop onion messages if the queue is full.
142+
dropPredicate := queue.RandomEarlyDrop[msgmux.PeerMsg](
143+
defaultMinRedThreshold, defaultOnionMessageQueueSize,
144+
)
145+
119146
o := &OnionEndpoint{
120147
onionMessageServer: nil,
148+
onionMsgQueue: queue.NewBackpressureQueue[msgmux.PeerMsg](
149+
defaultOnionMessageQueueSize, dropPredicate,
150+
),
151+
quit: make(chan struct{}),
121152
}
122153
for _, opt := range opts {
123154
opt(o)
@@ -137,16 +168,76 @@ func (o *OnionEndpoint) CanHandle(msg msgmux.PeerMsg) bool {
137168
return ok
138169
}
139170

140-
// SendMessage processes the incoming onion message.
141-
// It returns true if the message was successfully processed.
171+
// SendMessage processes the incoming onion message. It returns true if the
172+
// message was successfully processed.
142173
func (o *OnionEndpoint) SendMessage(ctx context.Context,
143174
msg msgmux.PeerMsg) bool {
144175

145-
onionMsg, ok := msg.Message.(*lnwire.OnionMessage)
176+
_, ok := msg.Message.(*lnwire.OnionMessage)
146177
if !ok {
147178
return false
148179
}
149180

181+
err := o.onionMsgQueue.Enqueue(ctx, msg)
182+
if err != nil {
183+
if errors.Is(err, queue.ErrQueueFullAndDropped) {
184+
log.Warnf("Onion message queue full, dropping message")
185+
} else {
186+
log.Errorf("Failed to enqueue onion message: %v", err)
187+
}
188+
}
189+
190+
return true
191+
}
192+
193+
// Start starts the onion message handler.
194+
func (o *OnionEndpoint) Start() {
195+
o.wg.Add(1)
196+
go o.messageHandler()
197+
}
198+
199+
// Stop stops the onion message handler.
200+
func (o *OnionEndpoint) Stop() {
201+
close(o.quit)
202+
o.wg.Wait()
203+
}
204+
205+
// messageHandler is the main goroutine that processes onion messages from the
206+
// queue.
207+
func (o *OnionEndpoint) messageHandler() {
208+
defer o.wg.Done()
209+
210+
ctx, cancel := context.WithCancel(context.Background())
211+
go func() {
212+
<-o.quit
213+
cancel()
214+
}()
215+
216+
for {
217+
result := o.onionMsgQueue.Dequeue(ctx)
218+
219+
if result.IsErr() {
220+
if errors.Is(result.Err(), context.Canceled) {
221+
return
222+
}
223+
224+
log.Errorf("OnionEndpoint Dequeue failed: %v",
225+
result.Err())
226+
continue
227+
}
228+
229+
result.WhenOk(o.processMessage)
230+
}
231+
}
232+
233+
// processMessage processes an onion message.
234+
func (o *OnionEndpoint) processMessage(msg msgmux.PeerMsg) {
235+
onionMsg, ok := msg.Message.(*lnwire.OnionMessage)
236+
if !ok {
237+
// This should not happen as we check it before enqueueing.
238+
return
239+
}
240+
150241
peer := msg.PeerPub.SerializeCompressed()
151242
log.Debugf("OnionEndpoint received OnionMessage from peer %x: "+
152243
"BlindingPoint=%v, OnionPacket[:10]=%10x...", peer,
@@ -176,15 +267,7 @@ func (o *OnionEndpoint) SendMessage(ctx context.Context,
176267
// Send the update to any subscribers.
177268
if sendErr := o.onionMessageServer.SendUpdate(update); sendErr != nil {
178269
log.Errorf("Failed to send onion message update: %v", sendErr)
179-
return false
180270
}
181-
182-
// If we failed to handle the onion message, we return false.
183-
if err != nil {
184-
return false
185-
}
186-
187-
return true
188271
}
189272

190273
// handleOnionMessage decodes and processes an onion message.

peer/brontide.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -646,6 +646,9 @@ type Brontide struct {
646646

647647
// log is a peer-specific logging instance.
648648
log btclog.Logger
649+
650+
// onionMessageEndpoint is the endpoint that handles onion messages.
651+
onionMessageEndpoint *onion_message.OnionEndpoint
649652
}
650653

651654
// A compile-time check to ensure that Brontide satisfies the lnpeer.Peer
@@ -899,17 +902,18 @@ func (p *Brontide) Start() error {
899902
return fmt.Errorf("unable to load channels: %w", err)
900903
}
901904

902-
onionMessageEndpoint := onion_message.NewOnionEndpoint(
905+
p.onionMessageEndpoint = onion_message.NewOnionEndpoint(
903906
onion_message.WithMessageServer(p.cfg.OnionMessageServer),
904907
onion_message.WithOnionProcessor(p.cfg.Sphinx),
905908
onion_message.WithMessageSender(p.cfg.OnionMsgSender),
906909
)
910+
p.onionMessageEndpoint.Start()
907911

908912
// We register the onion message endpoint with the message router.
909913
err = fn.MapOptionZ(p.msgRouter, func(r msgmux.Router) error {
910-
_ = r.UnregisterEndpoint(onionMessageEndpoint.Name())
914+
_ = r.UnregisterEndpoint(p.onionMessageEndpoint.Name())
911915

912-
return r.RegisterEndpoint(onionMessageEndpoint)
916+
return r.RegisterEndpoint(p.onionMessageEndpoint)
913917
})
914918
if err != nil {
915919
return fmt.Errorf("unable to register endpoint for onion "+
@@ -1656,6 +1660,11 @@ func (p *Brontide) Disconnect(reason error) {
16561660
// Stop PingManager before closing TCP connection.
16571661
p.pingManager.Stop()
16581662

1663+
// Stop the onion message endpoint if we have one.
1664+
if p.onionMessageEndpoint != nil {
1665+
p.onionMessageEndpoint.Stop()
1666+
}
1667+
16591668
// Ensure that the TCP connection is properly closed before continuing.
16601669
p.cfg.Conn.Close()
16611670

0 commit comments

Comments
 (0)