Skip to content

Rate limit outgoing gossip bandwidth by peer #10103

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,7 @@ func DefaultConfig() Config {
MsgBurstBytes: discovery.DefaultMsgBytesBurst,
FilterConcurrency: discovery.DefaultFilterConcurrency,
BanThreshold: discovery.DefaultBanThreshold,
PeerMsgRateBytes: discovery.DefaultPeerMsgBytesPerSecond,
},
Invoices: &lncfg.Invoices{
HoldExpiryDelta: lncfg.DefaultHoldInvoiceExpiryDelta,
Expand Down
5 changes: 5 additions & 0 deletions discovery/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,10 @@ type Config struct {
// BanThreshold is the score used to decide whether a given peer is
// banned or not.
BanThreshold uint64

// PeerMsgRateBytes is the rate limit for the number of bytes per second
// that we'll allocate to outbound gossip messages for a single peer.
PeerMsgRateBytes uint64
}

// processedNetworkMsg is a wrapper around networkMsg and a boolean. It is
Expand Down Expand Up @@ -609,6 +613,7 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper
AllotedMsgBytesPerSecond: cfg.MsgRateBytes,
AllotedMsgBytesBurst: cfg.MsgBurstBytes,
FilterConcurrency: cfg.FilterConcurrency,
PeerMsgBytesPerSecond: cfg.PeerMsgRateBytes,
})

gossiper.reliableSender = newReliableSender(&reliableSenderCfg{
Expand Down
48 changes: 29 additions & 19 deletions discovery/sync_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ const (
// they'll be refilled at this rate.
DefaultMsgBytesPerSecond = 1000 * 1_024

// DefaultPeerMsgBytesPerSecond is the max bytes/s we'll permit for
// outgoing messages for a single peer. Once tokens (bytes) have been
// taken from the bucket, they'll be refilled at this rate.
DefaultPeerMsgBytesPerSecond = 50 * 1_024
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about scaling this based on the total global value? So something like: a peer can only use up to 5% of the global limit?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah make sense. Tho I think we may still need an absolute value, similar to how we handle the budget used in the sweeper. The idea here is to limit the per-peer bandwidth, assuming only one or two bad peers exist, like what we've seen from the logs. For large nodes, they can have hundreds of peers, so 5% may be too large. And for small nodes, they may have less than 10 peers, and 5% can be too conservative.


// assumedMsgSize is the assumed size of a message if we can't compute
// its serialized size. This comes out to 1 KB.
assumedMsgSize = 1_024
Expand Down Expand Up @@ -141,6 +146,11 @@ type SyncManagerCfg struct {
// FilterConcurrency is the maximum number of concurrent gossip filter
// applications that can be processed. If not set, defaults to 5.
FilterConcurrency int

// PeerMsgBytesPerSecond is the allotted bandwidth rate, expressed in
// bytes/second that a single gossip syncer can consume. Once we exceed
// this rate, message sending will block until we're below the rate.
PeerMsgBytesPerSecond uint64
}

// SyncManager is a subsystem of the gossiper that manages the gossip syncers
Expand Down Expand Up @@ -200,7 +210,7 @@ type SyncManager struct {
gossipFilterSema chan struct{}

// rateLimiter dictates the frequency with which we will reply to gossip
// queries from a peer. This is used to delay responses to peers to
// queries from all peers. This is used to delay responses to peers to
// prevent DOS vulnerabilities if they are spamming with an unreasonable
// number of queries.
rateLimiter *rate.Limiter
Expand Down Expand Up @@ -554,8 +564,8 @@ func (m *SyncManager) isPinnedSyncer(s *GossipSyncer) bool {

// deriveRateLimitReservation will take the current message and derive a
// reservation that can be used to wait on the rate limiter.
func (m *SyncManager) deriveRateLimitReservation(msg lnwire.Message,
) (*rate.Reservation, error) {
func deriveRateLimitReservation(rl *rate.Limiter,
msg lnwire.Message) (*rate.Reservation, error) {

var (
msgSize uint32
Expand All @@ -575,12 +585,12 @@ func (m *SyncManager) deriveRateLimitReservation(msg lnwire.Message,
msgSize = assumedMsgSize
}

return m.rateLimiter.ReserveN(time.Now(), int(msgSize)), nil
return rl.ReserveN(time.Now(), int(msgSize)), nil
}

// waitMsgDelay takes a delay, and waits until it has finished.
func (m *SyncManager) waitMsgDelay(ctx context.Context, peerPub [33]byte,
limitReservation *rate.Reservation) error {
func waitMsgDelay(ctx context.Context, peerPub [33]byte,
limitReservation *rate.Reservation, quit <-chan struct{}) error {

// If we've already replied a handful of times, we will start to delay
// responses back to the remote peer. This can help prevent DOS attacks
Expand All @@ -602,7 +612,7 @@ func (m *SyncManager) waitMsgDelay(ctx context.Context, peerPub [33]byte,

return ErrGossipSyncerExiting

case <-m.quit:
case <-quit:
limitReservation.Cancel()

return ErrGossipSyncerExiting
Expand All @@ -614,25 +624,29 @@ func (m *SyncManager) waitMsgDelay(ctx context.Context, peerPub [33]byte,

// maybeRateLimitMsg takes a message, and may wait a period of time to rate
// limit the msg.
func (m *SyncManager) maybeRateLimitMsg(ctx context.Context, peerPub [33]byte,
msg lnwire.Message) error {
func maybeRateLimitMsg(ctx context.Context, rl *rate.Limiter, peerPub [33]byte,
msg lnwire.Message, quit <-chan struct{}) error {

delay, err := m.deriveRateLimitReservation(msg)
delay, err := deriveRateLimitReservation(rl, msg)
if err != nil {
return nil
}

return m.waitMsgDelay(ctx, peerPub, delay)
return waitMsgDelay(ctx, peerPub, delay, quit)
}

// sendMessages sends a set of messages to the remote peer.
func (m *SyncManager) sendMessages(ctx context.Context, sync bool,
peer lnpeer.Peer, nodeID route.Vertex, msgs ...lnwire.Message) error {

for _, msg := range msgs {
if err := m.maybeRateLimitMsg(ctx, nodeID, msg); err != nil {
err := maybeRateLimitMsg(
ctx, m.rateLimiter, nodeID, msg, m.quit,
)
if err != nil {
return err
}

if err := peer.SendMessageLazy(sync, msg); err != nil {
return err
}
Expand All @@ -654,22 +668,18 @@ func (m *SyncManager) createGossipSyncer(peer lnpeer.Peer) *GossipSyncer {
encodingType: encoding,
chunkSize: encodingTypeToChunkSize[encoding],
batchSize: requestBatchSize,
sendToPeer: func(ctx context.Context,
msgs ...lnwire.Message) error {

return m.sendMessages(ctx, false, peer, nodeID, msgs...)
},
sendToPeerSync: func(ctx context.Context,
sendMsg: func(ctx context.Context, sync bool,
msgs ...lnwire.Message) error {

return m.sendMessages(ctx, true, peer, nodeID, msgs...)
return m.sendMessages(ctx, sync, peer, nodeID, msgs...)
},
ignoreHistoricalFilters: m.cfg.IgnoreHistoricalFilters,
bestHeight: m.cfg.BestHeight,
markGraphSynced: m.markGraphSynced,
maxQueryChanRangeReplies: maxQueryChanRangeReplies,
noTimestampQueryOption: m.cfg.NoTimestampQueries,
isStillZombieChannel: m.cfg.IsStillZombieChannel,
msgBytesPerSecond: m.cfg.PeerMsgBytesPerSecond,
}, m.gossipFilterSema)

// Gossip syncers are initialized by default in a PassiveSync type
Expand Down
20 changes: 12 additions & 8 deletions discovery/sync_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ func TestDeriveRateLimitReservation(t *testing.T) {
}

// First message should have no delay as it fits within burst.
delay1, err := sm.deriveRateLimitReservation(msg)
delay1, err := deriveRateLimitReservation(sm.rateLimiter, msg)
require.NoError(t, err)
require.Equal(
t, time.Duration(0), delay1.Delay(), "first message "+
Expand All @@ -757,7 +757,7 @@ func TestDeriveRateLimitReservation(t *testing.T) {

// Second message should have a non-zero delay as the token
// bucket is now depleted.
delay2, err := sm.deriveRateLimitReservation(msg)
delay2, err := deriveRateLimitReservation(sm.rateLimiter, msg)
require.NoError(t, err)
require.True(
t, delay2.Delay() > 0, "second message should have "+
Expand All @@ -766,7 +766,7 @@ func TestDeriveRateLimitReservation(t *testing.T) {

// Third message should have an even longer delay since the
// token bucket is still refilling at a constant rate.
delay3, err := sm.deriveRateLimitReservation(msg)
delay3, err := deriveRateLimitReservation(sm.rateLimiter, msg)
require.NoError(t, err)
require.True(t, delay3.Delay() > delay2.Delay(), "third "+
"message should have longer delay than second: %s > %s",
Expand Down Expand Up @@ -798,7 +798,7 @@ func TestDeriveRateLimitReservation(t *testing.T) {

// The error should propagate through
// deriveRateLimitReservation.
_, err := sm.deriveRateLimitReservation(msg)
_, err := deriveRateLimitReservation(sm.rateLimiter, msg)
require.Error(t, err)
require.Equal(
t, errMsg, err, "Error should be propagated unchanged",
Expand All @@ -815,7 +815,7 @@ func TestDeriveRateLimitReservation(t *testing.T) {
initialMsg := &TestSizeableMessage{
size: uint32(bytesBurst),
}
_, err := sm.deriveRateLimitReservation(initialMsg)
_, err := deriveRateLimitReservation(sm.rateLimiter, initialMsg)
require.NoError(t, err)

// Now send two messages of different sizes and compare their
Expand All @@ -828,18 +828,22 @@ func TestDeriveRateLimitReservation(t *testing.T) {
}

// Send the small message first.
smallDelay, err := sm.deriveRateLimitReservation(smallMsg)
smallDelay, err := deriveRateLimitReservation(
sm.rateLimiter, smallMsg,
)
require.NoError(t, err)

// Reset the limiter to the same state, then empty the bucket.
sm.rateLimiter = rate.NewLimiter(
rate.Limit(bytesPerSec), int(bytesBurst),
)
_, err = sm.deriveRateLimitReservation(initialMsg)
_, err = deriveRateLimitReservation(sm.rateLimiter, initialMsg)
require.NoError(t, err)

// Now send the large message.
largeDelay, err := sm.deriveRateLimitReservation(largeMsg)
largeDelay, err := deriveRateLimitReservation(
sm.rateLimiter, largeMsg,
)
require.NoError(t, err)

// The large message should have a longer delay than the small
Expand Down
Loading
Loading