Skip to content

Commit 9549248

Browse files
committed
multi: endpoints for onion messages
This commit creates the necessary endpoints for onion messages. Specifically, it adds the following: - `SendOnionMessage` endpoint to send onion messages. - `SubscribeOnionMessages` endpoint to subscribe to incoming onion messages. It uses the `msgmux` package to handle the onion messages.
1 parent ded4cfb commit 9549248

File tree

9 files changed

+376
-0
lines changed

9 files changed

+376
-0
lines changed

itest/list_on_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -523,6 +523,10 @@ var allTestCases = []*lntest.TestCase{
523523
Name: "custom message",
524524
TestFunc: testCustomMessage,
525525
},
526+
{
527+
Name: "onion message",
528+
TestFunc: testOnionMessage,
529+
},
526530
{
527531
Name: "sign verify message with addr",
528532
TestFunc: testSignVerifyMessageWithAddr,

itest/lnd_onion_message_test.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package itest
2+
3+
import (
4+
"time"
5+
6+
"github.com/btcsuite/btcd/btcec/v2"
7+
"github.com/lightningnetwork/lnd/lnrpc"
8+
"github.com/lightningnetwork/lnd/lntest"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
// testOnionMessage tests sending and receiving of the onion message type.
13+
func testOnionMessage(ht *lntest.HarnessTest) {
14+
alice := ht.NewNode("Alice", nil)
15+
bob := ht.NewNode("Bob", nil)
16+
17+
// Subscribe Alice to onion messages before we send any, so that we
18+
// don't miss any.
19+
msgClient, cancel := alice.RPC.SubscribeOnionMessages()
20+
defer cancel()
21+
22+
// Create a channel to receive onion messages on.
23+
messages := make(chan *lnrpc.OnionMessage)
24+
go func() {
25+
for {
26+
// If we fail to receive, just exit. The test should
27+
// fail elsewhere if it doesn't get a message that it
28+
// was expecting.
29+
msg, err := msgClient.Recv()
30+
if err != nil {
31+
return
32+
}
33+
34+
// Deliver the message into our channel or exit if the
35+
// test is shutting down.
36+
select {
37+
case messages <- msg:
38+
case <-ht.Context().Done():
39+
return
40+
}
41+
}
42+
}()
43+
44+
// Connect alice and bob so that they can exchange messages.
45+
ht.EnsureConnected(alice, bob)
46+
47+
// Create a random onion message.
48+
randomPriv, err := btcec.NewPrivateKey()
49+
require.NoError(ht.T, err)
50+
randomPub := randomPriv.PubKey()
51+
msgBlindingPoint := randomPub.SerializeCompressed()
52+
msgOnion := []byte{1, 2, 3}
53+
54+
// Send it from Bob to Alice.
55+
bobMsg := &lnrpc.SendOnionMessageRequest{
56+
Peer: alice.PubKey[:],
57+
BlindingPoint: msgBlindingPoint,
58+
Onion: msgOnion,
59+
}
60+
bob.RPC.SendOnionMessage(bobMsg)
61+
62+
// Wait for Alice to receive the message.
63+
select {
64+
case msg := <-messages:
65+
// Check our type and data and (sanity) check the peer we got
66+
// it from.
67+
require.Equal(ht, msgOnion, msg.Onion, "msg data wrong")
68+
require.Equal(ht, msgBlindingPoint, msg.BlindingPoint, "msg "+
69+
"blinding point wrong")
70+
require.Equal(ht, bob.PubKey[:], msg.Peer, "msg peer wrong")
71+
72+
case <-time.After(lntest.DefaultTimeout):
73+
ht.Fatalf("alice did not receive onion message: %v", bobMsg)
74+
}
75+
}

lntest/rpc/lnd.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -726,6 +726,8 @@ func (h *HarnessRPC) SubscribeChannelEvents() ChannelEventsClient {
726726

727727
type CustomMessageClient lnrpc.Lightning_SubscribeCustomMessagesClient
728728

729+
type OnionMessageClient lnrpc.Lightning_SubscribeOnionMessagesClient
730+
729731
// SubscribeCustomMessages creates a subscription client for custom messages.
730732
func (h *HarnessRPC) SubscribeCustomMessages() (CustomMessageClient,
731733
context.CancelFunc) {
@@ -758,6 +760,38 @@ func (h *HarnessRPC) SendCustomMessage(
758760
return resp
759761
}
760762

763+
// SendOnionMessage makes a RPC call to the node's SendOnionMessage and
764+
// returns the response.
765+
func (h *HarnessRPC) SendOnionMessage(
766+
req *lnrpc.SendOnionMessageRequest) *lnrpc.SendOnionMessageResponse {
767+
768+
ctxt, cancel := context.WithTimeout(h.runCtx, DefaultTimeout)
769+
defer cancel()
770+
771+
resp, err := h.LN.SendOnionMessage(ctxt, req)
772+
h.NoError(err, "SendOnionMessage")
773+
774+
return resp
775+
}
776+
777+
// SubscribeOnionMessages creates a subscription client for onion messages.
778+
func (h *HarnessRPC) SubscribeOnionMessages() (OnionMessageClient,
779+
context.CancelFunc) {
780+
781+
ctxt, cancel := context.WithCancel(h.runCtx)
782+
783+
req := &lnrpc.SubscribeOnionMessagesRequest{}
784+
785+
// SubscribeCustomMessages needs to have the context alive for the
786+
// entire test case as the returned client will be used for send and
787+
// receive events stream. Thus we use runCtx here instead of a timeout
788+
// context.
789+
stream, err := h.LN.SubscribeOnionMessages(ctxt, req)
790+
h.NoError(err, "SubscribeOnionMessages")
791+
792+
return stream, cancel
793+
}
794+
761795
// GetChanInfo makes a RPC call to the node's GetChanInfo and returns the
762796
// response.
763797
func (h *HarnessRPC) GetChanInfo(

log.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
"github.com/lightningnetwork/lnd/monitoring"
4747
"github.com/lightningnetwork/lnd/msgmux"
4848
"github.com/lightningnetwork/lnd/netann"
49+
"github.com/lightningnetwork/lnd/onion_message"
4950
paymentsdb "github.com/lightningnetwork/lnd/payments/db"
5051
"github.com/lightningnetwork/lnd/peer"
5152
"github.com/lightningnetwork/lnd/peernotifier"
@@ -212,6 +213,7 @@ func SetupLoggers(root *build.SubLoggerManager, interceptor signal.Interceptor)
212213
root, paymentsdb.Subsystem, interceptor, paymentsdb.UseLogger,
213214
)
214215

216+
AddSubLogger(root, onion_message.Subsystem, interceptor, onion_message.UseLogger)
215217
}
216218

217219
// AddSubLogger is a helper method to conveniently create and register the

onion_message/log.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package onion_message
2+
3+
import (
4+
"github.com/btcsuite/btclog/v2"
5+
"github.com/lightningnetwork/lnd/build"
6+
)
7+
8+
// Subsystem defines the logging code for this subsystem.
9+
const Subsystem = "OMSG"
10+
11+
// log is a logger that is initialized with no output filters. This
12+
// means the package will not perform any logging by default until the caller
13+
// requests it.
14+
var log btclog.Logger
15+
16+
// The default amount of logging is none.
17+
func init() {
18+
UseLogger(build.NewSubLogger(Subsystem, nil))
19+
}
20+
21+
// DisableLog disables all library log output. Logging output is disabled
22+
// by default until UseLogger is called.
23+
func DisableLog() {
24+
UseLogger(btclog.Disabled)
25+
}
26+
27+
// UseLogger uses a specified Logger to output package logging info.
28+
// This should be used in preference to SetLogWriter if the caller is also
29+
// using btclog.
30+
func UseLogger(logger btclog.Logger) {
31+
log = logger
32+
}

onion_message/onion_endpoint.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package onion_message
2+
3+
import (
4+
"context"
5+
6+
"github.com/lightningnetwork/lnd/lnwire"
7+
"github.com/lightningnetwork/lnd/msgmux"
8+
"github.com/lightningnetwork/lnd/subscribe"
9+
)
10+
11+
// OnionEndpoint handles incoming onion messages.
12+
type OnionEndpoint struct {
13+
// subscribe.Server is used for subscriptions to onion messages.
14+
onionMessageServer *subscribe.Server
15+
}
16+
17+
// OnionMessageUpdate is onion message update dispatched to any potential
18+
// subscriber.
19+
type OnionMessageUpdate struct {
20+
// Peer is the peer pubkey
21+
Peer [33]byte
22+
23+
// BlindingPoint is the route blinding ephemeral pubkey to be used for
24+
// the onion message.
25+
BlindingPoint []byte
26+
27+
// OnionBlob is the raw serialized mix header used to relay messages in
28+
// a privacy-preserving manner. This blob should be handled in the same
29+
// manner as onions used to route HTLCs, with the exception that it uses
30+
// blinded routes by default.
31+
OnionBlob []byte
32+
}
33+
34+
// NewOnionEndpoint creates a new OnionEndpoint.
35+
func NewOnionEndpoint(messageServer *subscribe.Server) *OnionEndpoint {
36+
return &OnionEndpoint{
37+
onionMessageServer: messageServer,
38+
}
39+
}
40+
41+
// Name returns the unique name of the endpoint.
42+
func (o *OnionEndpoint) Name() string {
43+
return "OnionMessageHandler"
44+
}
45+
46+
// CanHandle checks if the endpoint can handle the incoming message.
47+
// It returns true if the message is an lnwire.OnionMessage.
48+
func (o *OnionEndpoint) CanHandle(msg msgmux.PeerMsg) bool {
49+
_, ok := msg.Message.(*lnwire.OnionMessage)
50+
return ok
51+
}
52+
53+
// SendMessage processes the incoming onion message.
54+
// It returns true if the message was successfully processed.
55+
func (o *OnionEndpoint) SendMessage(ctx context.Context,
56+
msg msgmux.PeerMsg) bool {
57+
58+
onionMsg, ok := msg.Message.(*lnwire.OnionMessage)
59+
if !ok {
60+
return false
61+
}
62+
63+
peer := msg.PeerPub.SerializeCompressed()
64+
log.Debugf("OnionEndpoint received OnionMessage from peer %s: "+
65+
"BlindingPoint=%v, OnionPacket[:10]=%x...", peer,
66+
onionMsg.BlindingPoint, onionMsg.OnionBlob)
67+
68+
var peerArr [33]byte
69+
copy(peerArr[:], peer)
70+
err := o.onionMessageServer.SendUpdate(&OnionMessageUpdate{
71+
Peer: peerArr,
72+
BlindingPoint: onionMsg.BlindingPoint.SerializeCompressed(),
73+
OnionBlob: onionMsg.OnionBlob,
74+
})
75+
if err != nil {
76+
log.Errorf("Failed to send onion message update: %v", err)
77+
return false
78+
}
79+
80+
return true
81+
}
82+
83+
// A compile-time check to ensure OnionEndpoint implements the Endpoint
84+
// interface.
85+
var _ msgmux.Endpoint = (*OnionEndpoint)(nil)

peer/brontide.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
"github.com/lightningnetwork/lnd/lnwire"
4747
"github.com/lightningnetwork/lnd/msgmux"
4848
"github.com/lightningnetwork/lnd/netann"
49+
"github.com/lightningnetwork/lnd/onion_message"
4950
"github.com/lightningnetwork/lnd/pool"
5051
"github.com/lightningnetwork/lnd/protofsm"
5152
"github.com/lightningnetwork/lnd/queue"
@@ -457,6 +458,10 @@ type Config struct {
457458
// used to modify the way the co-op close transaction is constructed.
458459
AuxChanCloser fn.Option[chancloser.AuxChanCloser]
459460

461+
// OnionMessageServer is an instance of a message server that dispatches
462+
// onion messages to subscribers.
463+
OnionMessageServer *subscribe.Server
464+
460465
// ShouldFwdExpEndorsement is a closure that indicates whether
461466
// experimental endorsement signals should be set.
462467
ShouldFwdExpEndorsement func() bool
@@ -892,6 +897,21 @@ func (p *Brontide) Start() error {
892897
return fmt.Errorf("unable to load channels: %w", err)
893898
}
894899

900+
onionMessageEndpoint := onion_message.NewOnionEndpoint(
901+
p.cfg.OnionMessageServer,
902+
)
903+
904+
// We register the onion message endpoint with the message router.
905+
err = fn.MapOptionZ(p.msgRouter, func(r msgmux.Router) error {
906+
_ = r.UnregisterEndpoint(onionMessageEndpoint.Name())
907+
908+
return r.RegisterEndpoint(onionMessageEndpoint)
909+
})
910+
if err != nil {
911+
return fmt.Errorf("unable to register endpoint for onion "+
912+
"messaging: %w", err)
913+
}
914+
895915
p.startTime = time.Now()
896916

897917
// Before launching the writeHandler goroutine, we send any channel

0 commit comments

Comments
 (0)