Skip to content

Commit e5726a7

Browse files
committed
multi: endpoints for onion messages
This commit creates the necessary endpoints for onion messages. Specifically, it adds the following: - `SendOnionMessage` endpoint to send onion messages. - `SubscribeOnionMessages` endpoint to subscribe to incoming onion messages. It uses the `msgmux` package to handle the onion messages.
1 parent 6f27809 commit e5726a7

File tree

9 files changed

+376
-0
lines changed

9 files changed

+376
-0
lines changed

itest/list_on_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,10 @@ var allTestCases = []*lntest.TestCase{
511511
Name: "custom message",
512512
TestFunc: testCustomMessage,
513513
},
514+
{
515+
Name: "onion message",
516+
TestFunc: testOnionMessage,
517+
},
514518
{
515519
Name: "sign verify message with addr",
516520
TestFunc: testSignVerifyMessageWithAddr,

itest/lnd_onion_message_test.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package itest
2+
3+
import (
4+
"time"
5+
6+
"github.com/btcsuite/btcd/btcec/v2"
7+
"github.com/lightningnetwork/lnd/lnrpc"
8+
"github.com/lightningnetwork/lnd/lntest"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
// testOnionMessage tests sending and receiving of the onion message type.
13+
func testOnionMessage(ht *lntest.HarnessTest) {
14+
alice := ht.NewNode("Alice", nil)
15+
bob := ht.NewNode("Bob", nil)
16+
17+
// Subscribe Alice to onion messages before we send any, so that we
18+
// don't miss any.
19+
msgClient, cancel := alice.RPC.SubscribeOnionMessages()
20+
defer cancel()
21+
22+
// Create a channel to receive onion messages on.
23+
messages := make(chan *lnrpc.OnionMessage)
24+
go func() {
25+
for {
26+
// If we fail to receive, just exit. The test should
27+
// fail elsewhere if it doesn't get a message that it
28+
// was expecting.
29+
msg, err := msgClient.Recv()
30+
if err != nil {
31+
return
32+
}
33+
34+
// Deliver the message into our channel or exit if the
35+
// test is shutting down.
36+
select {
37+
case messages <- msg:
38+
case <-ht.Context().Done():
39+
return
40+
}
41+
}
42+
}()
43+
44+
// Connect alice and bob so that they can exchange messages.
45+
ht.EnsureConnected(alice, bob)
46+
47+
// Create a random onion message.
48+
randomPriv, err := btcec.NewPrivateKey()
49+
require.NoError(ht.T, err)
50+
randomPub := randomPriv.PubKey()
51+
msgBlindingPoint := randomPub.SerializeCompressed()
52+
msgOnion := []byte{1, 2, 3}
53+
54+
// Send it from Bob to Alice.
55+
bobMsg := &lnrpc.SendOnionMessageRequest{
56+
Peer: alice.PubKey[:],
57+
BlindingPoint: msgBlindingPoint,
58+
Onion: msgOnion,
59+
}
60+
bob.RPC.SendOnionMessage(bobMsg)
61+
62+
// Wait for Alice to receive the message.
63+
select {
64+
case msg := <-messages:
65+
// Check our type and data and (sanity) check the peer we got
66+
// it from.
67+
require.Equal(ht, msgOnion, msg.Onion, "msg data wrong")
68+
require.Equal(ht, msgBlindingPoint, msg.BlindingPoint, "msg "+
69+
"blinding point wrong")
70+
require.Equal(ht, bob.PubKey[:], msg.Peer, "msg peer wrong")
71+
72+
case <-time.After(lntest.DefaultTimeout):
73+
ht.Fatalf("alice did not receive onion message: %v", bobMsg)
74+
}
75+
}

lntest/rpc/lnd.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -666,6 +666,8 @@ func (h *HarnessRPC) SubscribeChannelEvents() ChannelEventsClient {
666666

667667
type CustomMessageClient lnrpc.Lightning_SubscribeCustomMessagesClient
668668

669+
type OnionMessageClient lnrpc.Lightning_SubscribeOnionMessagesClient
670+
669671
// SubscribeCustomMessages creates a subscription client for custom messages.
670672
func (h *HarnessRPC) SubscribeCustomMessages() (CustomMessageClient,
671673
context.CancelFunc) {
@@ -698,6 +700,38 @@ func (h *HarnessRPC) SendCustomMessage(
698700
return resp
699701
}
700702

703+
// SendOnionMessage makes a RPC call to the node's SendOnionMessage and
704+
// returns the response.
705+
func (h *HarnessRPC) SendOnionMessage(
706+
req *lnrpc.SendOnionMessageRequest) *lnrpc.SendOnionMessageResponse {
707+
708+
ctxt, cancel := context.WithTimeout(h.runCtx, DefaultTimeout)
709+
defer cancel()
710+
711+
resp, err := h.LN.SendOnionMessage(ctxt, req)
712+
h.NoError(err, "SendOnionMessage")
713+
714+
return resp
715+
}
716+
717+
// SubscribeOnionMessages creates a subscription client for onion messages.
718+
func (h *HarnessRPC) SubscribeOnionMessages() (OnionMessageClient,
719+
context.CancelFunc) {
720+
721+
ctxt, cancel := context.WithCancel(h.runCtx)
722+
723+
req := &lnrpc.SubscribeOnionMessagesRequest{}
724+
725+
// SubscribeCustomMessages needs to have the context alive for the
726+
// entire test case as the returned client will be used for send and
727+
// receive events stream. Thus we use runCtx here instead of a timeout
728+
// context.
729+
stream, err := h.LN.SubscribeOnionMessages(ctxt, req)
730+
h.NoError(err, "SubscribeOnionMessages")
731+
732+
return stream, cancel
733+
}
734+
701735
// GetChanInfo makes a RPC call to the node's GetChanInfo and returns the
702736
// response.
703737
func (h *HarnessRPC) GetChanInfo(

log.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
"github.com/lightningnetwork/lnd/monitoring"
4747
"github.com/lightningnetwork/lnd/msgmux"
4848
"github.com/lightningnetwork/lnd/netann"
49+
"github.com/lightningnetwork/lnd/onion_message"
4950
"github.com/lightningnetwork/lnd/peer"
5051
"github.com/lightningnetwork/lnd/peernotifier"
5152
"github.com/lightningnetwork/lnd/protofsm"
@@ -207,6 +208,7 @@ func SetupLoggers(root *build.SubLoggerManager, interceptor signal.Interceptor)
207208
AddSubLogger(root, chainio.Subsystem, interceptor, chainio.UseLogger)
208209
AddSubLogger(root, msgmux.Subsystem, interceptor, msgmux.UseLogger)
209210
AddSubLogger(root, sqldb.Subsystem, interceptor, sqldb.UseLogger)
211+
AddSubLogger(root, onion_message.Subsystem, interceptor, onion_message.UseLogger)
210212
}
211213

212214
// AddSubLogger is a helper method to conveniently create and register the

onion_message/log.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package onion_message
2+
3+
import (
4+
"github.com/btcsuite/btclog/v2"
5+
"github.com/lightningnetwork/lnd/build"
6+
)
7+
8+
// Subsystem defines the logging code for this subsystem.
9+
const Subsystem = "OMSG"
10+
11+
// log is a logger that is initialized with no output filters. This
12+
// means the package will not perform any logging by default until the caller
13+
// requests it.
14+
var log btclog.Logger
15+
16+
// The default amount of logging is none.
17+
func init() {
18+
UseLogger(build.NewSubLogger(Subsystem, nil))
19+
}
20+
21+
// DisableLog disables all library log output. Logging output is disabled
22+
// by default until UseLogger is called.
23+
func DisableLog() {
24+
UseLogger(btclog.Disabled)
25+
}
26+
27+
// UseLogger uses a specified Logger to output package logging info.
28+
// This should be used in preference to SetLogWriter if the caller is also
29+
// using btclog.
30+
func UseLogger(logger btclog.Logger) {
31+
log = logger
32+
}

onion_message/onion_endpoint.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package onion_message
2+
3+
import (
4+
"context"
5+
6+
"github.com/lightningnetwork/lnd/lnwire"
7+
"github.com/lightningnetwork/lnd/msgmux"
8+
"github.com/lightningnetwork/lnd/subscribe"
9+
)
10+
11+
// OnionEndpoint handles incoming onion messages.
12+
type OnionEndpoint struct {
13+
// subscribe.Server is used for subscriptions to onion messages.
14+
onionMessageServer *subscribe.Server
15+
}
16+
17+
// OnionMessageUpdate is onion message update dispatched to any potential
18+
// subscriber.
19+
type OnionMessageUpdate struct {
20+
// Peer is the peer pubkey
21+
Peer [33]byte
22+
23+
// BlindingPoint is the route blinding ephemeral pubkey to be used for
24+
// the onion message.
25+
BlindingPoint []byte
26+
27+
// OnionBlob is the raw serialized mix header used to relay messages in
28+
// a privacy-preserving manner. This blob should be handled in the same
29+
// manner as onions used to route HTLCs, with the exception that it uses
30+
// blinded routes by default.
31+
OnionBlob []byte
32+
}
33+
34+
// NewOnionEndpoint creates a new OnionEndpoint.
35+
func NewOnionEndpoint(messageServer *subscribe.Server) *OnionEndpoint {
36+
return &OnionEndpoint{
37+
onionMessageServer: messageServer,
38+
}
39+
}
40+
41+
// Name returns the unique name of the endpoint.
42+
func (o *OnionEndpoint) Name() string {
43+
return "OnionMessageHandler"
44+
}
45+
46+
// CanHandle checks if the endpoint can handle the incoming message.
47+
// It returns true if the message is an lnwire.OnionMessage.
48+
func (o *OnionEndpoint) CanHandle(msg msgmux.PeerMsg) bool {
49+
_, ok := msg.Message.(*lnwire.OnionMessage)
50+
return ok
51+
}
52+
53+
// SendMessage processes the incoming onion message.
54+
// It returns true if the message was successfully processed.
55+
func (o *OnionEndpoint) SendMessage(ctx context.Context,
56+
msg msgmux.PeerMsg) bool {
57+
58+
onionMsg, ok := msg.Message.(*lnwire.OnionMessage)
59+
if !ok {
60+
return false
61+
}
62+
63+
peer := msg.PeerPub.SerializeCompressed()
64+
log.Debugf("OnionEndpoint received OnionMessage from peer %s: "+
65+
"BlindingPoint=%v, OnionPacket[:10]=%x...", peer,
66+
onionMsg.BlindingPoint, onionMsg.OnionBlob)
67+
68+
var peerArr [33]byte
69+
copy(peerArr[:], peer)
70+
err := o.onionMessageServer.SendUpdate(&OnionMessageUpdate{
71+
Peer: peerArr,
72+
BlindingPoint: onionMsg.BlindingPoint.SerializeCompressed(),
73+
OnionBlob: onionMsg.OnionBlob,
74+
})
75+
if err != nil {
76+
log.Errorf("Failed to send onion message update: %v", err)
77+
return false
78+
}
79+
80+
return true
81+
}
82+
83+
// A compile-time check to ensure OnionEndpoint implements the Endpoint
84+
// interface.
85+
var _ msgmux.Endpoint = (*OnionEndpoint)(nil)

peer/brontide.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
"github.com/lightningnetwork/lnd/lnwire"
4747
"github.com/lightningnetwork/lnd/msgmux"
4848
"github.com/lightningnetwork/lnd/netann"
49+
"github.com/lightningnetwork/lnd/onion_message"
4950
"github.com/lightningnetwork/lnd/pool"
5051
"github.com/lightningnetwork/lnd/protofsm"
5152
"github.com/lightningnetwork/lnd/queue"
@@ -451,6 +452,10 @@ type Config struct {
451452
// used to modify the way the co-op close transaction is constructed.
452453
AuxChanCloser fn.Option[chancloser.AuxChanCloser]
453454

455+
// OnionMessageServer is an instance of a message server that dispatches
456+
// onion messages to subscribers.
457+
OnionMessageServer *subscribe.Server
458+
454459
// ShouldFwdExpEndorsement is a closure that indicates whether
455460
// experimental endorsement signals should be set.
456461
ShouldFwdExpEndorsement func() bool
@@ -886,6 +891,21 @@ func (p *Brontide) Start() error {
886891
return fmt.Errorf("unable to load channels: %w", err)
887892
}
888893

894+
onionMessageEndpoint := onion_message.NewOnionEndpoint(
895+
p.cfg.OnionMessageServer,
896+
)
897+
898+
// We register the onion message endpoint with the message router.
899+
err = fn.MapOptionZ(p.msgRouter, func(r msgmux.Router) error {
900+
_ = r.UnregisterEndpoint(onionMessageEndpoint.Name())
901+
902+
return r.RegisterEndpoint(onionMessageEndpoint)
903+
})
904+
if err != nil {
905+
return fmt.Errorf("unable to register endpoint for onion "+
906+
"messaging: %w", err)
907+
}
908+
889909
p.startTime = time.Now()
890910

891911
// Before launching the writeHandler goroutine, we send any channel

0 commit comments

Comments
 (0)