Skip to content

Commit 985d29f

Browse files
authored
Merge pull request #10134 from lightningnetwork/0-19-3-branch-rc1
release: create v0.19.3-rc1 branch
2 parents a839456 + eccdd3c commit 985d29f

33 files changed

+1691
-184
lines changed

build/version.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,11 @@ const (
4747
AppMinor uint = 19
4848

4949
// AppPatch defines the application patch for this binary.
50-
AppPatch uint = 2
50+
AppPatch uint = 3
5151

5252
// AppPreRelease MUST only contain characters from semanticAlphabet per
5353
// the semantic versioning spec.
54-
AppPreRelease = "beta"
54+
AppPreRelease = "beta.rc1"
5555
)
5656

5757
func init() {

config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -719,6 +719,7 @@ func DefaultConfig() Config {
719719
AnnouncementConf: discovery.DefaultProofMatureDelta,
720720
MsgRateBytes: discovery.DefaultMsgBytesPerSecond,
721721
MsgBurstBytes: discovery.DefaultMsgBytesBurst,
722+
FilterConcurrency: discovery.DefaultFilterConcurrency,
722723
},
723724
Invoices: &lncfg.Invoices{
724725
HoldExpiryDelta: lncfg.DefaultHoldInvoiceExpiryDelta,

contractcourt/anchor_resolver.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,10 @@ func (c *anchorResolver) Launch() error {
202202
// an output that we want to sweep only if it is economical to do so.
203203
//
204204
// An exclusive group is not necessary anymore, because we know that
205-
// this is the only anchor that can be swept.
205+
// this is the only anchor that can be swept. However, to avoid this
206+
// anchor input being group with other inputs, we still keep the
207+
// exclusive group here such that the anchor will be swept
208+
// independently.
206209
//
207210
// We also clear the parent tx information for cpfp, because the
208211
// commitment tx is confirmed.
@@ -222,6 +225,8 @@ func (c *anchorResolver) Launch() error {
222225
c.broadcastHeight, nil,
223226
)
224227

228+
exclusiveGroup := c.ShortChanID.ToUint64()
229+
225230
resultChan, err := c.Sweeper.SweepInput(
226231
&anchorInput,
227232
sweep.Params{
@@ -233,6 +238,10 @@ func (c *anchorResolver) Launch() error {
233238
// There's no rush to sweep the anchor, so we use a nil
234239
// deadline here.
235240
DeadlineHeight: fn.None[int32](),
241+
242+
// Use the chan id as the exclusive group. This prevents
243+
// any of the anchors from being batched together.
244+
ExclusiveGroup: &exclusiveGroup,
236245
},
237246
)
238247

contractcourt/chain_arbitrator.go

Lines changed: 59 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,11 @@ type ChainArbitrator struct {
270270
// beat is the current best known blockbeat.
271271
beat chainio.Blockbeat
272272

273+
// resolvedChan is used to signal that the given channel outpoint has
274+
// been resolved onchain. Once received, chain arbitrator will perform
275+
// cleanups.
276+
resolvedChan chan wire.OutPoint
277+
273278
quit chan struct{}
274279

275280
wg sync.WaitGroup
@@ -286,6 +291,7 @@ func NewChainArbitrator(cfg ChainArbitratorConfig,
286291
activeWatchers: make(map[wire.OutPoint]*chainWatcher),
287292
chanSource: db,
288293
quit: make(chan struct{}),
294+
resolvedChan: make(chan wire.OutPoint),
289295
}
290296

291297
// Mount the block consumer.
@@ -459,6 +465,9 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
459465
channel.ShortChanID(), htlc,
460466
)
461467
},
468+
NotifyChannelResolved: func() {
469+
c.notifyChannelResolved(chanPoint)
470+
},
462471
}
463472

464473
// The final component needed is an arbitrator log that the arbitrator
@@ -474,14 +483,6 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
474483
return nil, err
475484
}
476485

477-
arbCfg.MarkChannelResolved = func() error {
478-
if c.cfg.NotifyFullyResolvedChannel != nil {
479-
c.cfg.NotifyFullyResolvedChannel(chanPoint)
480-
}
481-
482-
return c.ResolveContract(chanPoint)
483-
}
484-
485486
// Finally, we'll need to construct a series of htlc Sets based on all
486487
// currently known valid commitments.
487488
htlcSets := make(map[HtlcSetKey]htlcSet)
@@ -578,6 +579,17 @@ func (c *ChainArbitrator) Start(beat chainio.Blockbeat) error {
578579
// Set the current beat.
579580
c.beat = beat
580581

582+
// Start the goroutine which listens for signals to mark the channel as
583+
// resolved.
584+
//
585+
// NOTE: We must start this goroutine here we won't block the following
586+
// channel loading.
587+
c.wg.Add(1)
588+
go func() {
589+
defer c.wg.Done()
590+
c.resolveContracts()
591+
}()
592+
581593
// First, we'll fetch all the channels that are still open, in order to
582594
// collect them within our set of active contracts.
583595
if err := c.loadOpenChannels(); err != nil {
@@ -697,6 +709,32 @@ func (c *ChainArbitrator) Start(beat chainio.Blockbeat) error {
697709
return nil
698710
}
699711

712+
// resolveContracts listens to the `resolvedChan` to mark a given channel as
713+
// fully resolved.
714+
func (c *ChainArbitrator) resolveContracts() {
715+
for {
716+
select {
717+
// The channel arbitrator signals that a given channel has been
718+
// resolved, we now update chain arbitrator's internal state for
719+
// this channel.
720+
case cp := <-c.resolvedChan:
721+
if c.cfg.NotifyFullyResolvedChannel != nil {
722+
c.cfg.NotifyFullyResolvedChannel(cp)
723+
}
724+
725+
err := c.ResolveContract(cp)
726+
if err != nil {
727+
log.Errorf("Failed to resolve contract for "+
728+
"channel %v", cp)
729+
}
730+
731+
// Exit if the chain arbitrator is shutting down.
732+
case <-c.quit:
733+
return
734+
}
735+
}
736+
}
737+
700738
// dispatchBlocks consumes a block epoch notification stream and dispatches
701739
// blocks to each of the chain arb's active channel arbitrators. This function
702740
// must be run in a goroutine.
@@ -762,6 +800,16 @@ func (c *ChainArbitrator) handleBlockbeat(beat chainio.Blockbeat) {
762800
c.NotifyBlockProcessed(beat, err)
763801
}
764802

803+
// notifyChannelResolved is used by the channel arbitrator to signal that a
804+
// given channel has been resolved.
805+
func (c *ChainArbitrator) notifyChannelResolved(cp wire.OutPoint) {
806+
select {
807+
case c.resolvedChan <- cp:
808+
case <-c.quit:
809+
return
810+
}
811+
}
812+
765813
// republishClosingTxs will load any stored cooperative or unilateral closing
766814
// transactions and republish them. This helps ensure propagation of the
767815
// transactions in the event that prior publications failed.
@@ -1346,20 +1394,16 @@ func (c *ChainArbitrator) loadPendingCloseChannels() error {
13461394
closeChanInfo.ShortChanID, htlc,
13471395
)
13481396
},
1397+
NotifyChannelResolved: func() {
1398+
c.notifyChannelResolved(chanPoint)
1399+
},
13491400
}
13501401
chanLog, err := newBoltArbitratorLog(
13511402
c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint,
13521403
)
13531404
if err != nil {
13541405
return err
13551406
}
1356-
arbCfg.MarkChannelResolved = func() error {
1357-
if c.cfg.NotifyFullyResolvedChannel != nil {
1358-
c.cfg.NotifyFullyResolvedChannel(chanPoint)
1359-
}
1360-
1361-
return c.ResolveContract(chanPoint)
1362-
}
13631407

13641408
// We create an empty map of HTLC's here since it's possible
13651409
// that the channel is in StateDefault and updateActiveHTLCs is

contractcourt/channel_arbitrator.go

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -153,13 +153,9 @@ type ChannelArbitratorConfig struct {
153153
// true. Otherwise this value is unset.
154154
CloseType channeldb.ClosureType
155155

156-
// MarkChannelResolved is a function closure that serves to mark a
157-
// channel as "fully resolved". A channel itself can be considered
158-
// fully resolved once all active contracts have individually been
159-
// fully resolved.
160-
//
161-
// TODO(roasbeef): need RPC's to combine for pendingchannels RPC
162-
MarkChannelResolved func() error
156+
// NotifyChannelResolved is used by the channel arbitrator to signal
157+
// that a given channel has been resolved.
158+
NotifyChannelResolved func()
163159

164160
// PutResolverReport records a resolver report for the channel. If the
165161
// transaction provided is nil, the function should write the report
@@ -1397,10 +1393,7 @@ func (c *ChannelArbitrator) stateStep(
13971393
log.Infof("ChannelPoint(%v) has been fully resolved "+
13981394
"on-chain at height=%v", c.cfg.ChanPoint, triggerHeight)
13991395

1400-
if err := c.cfg.MarkChannelResolved(); err != nil {
1401-
log.Errorf("unable to mark channel resolved: %v", err)
1402-
return StateError, closeTx, err
1403-
}
1396+
c.cfg.NotifyChannelResolved()
14041397
}
14051398

14061399
log.Tracef("ChannelArbitrator(%v): next_state=%v", c.cfg.ChanPoint,

contractcourt/channel_arbitrator_test.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -417,17 +417,16 @@ func createTestChannelArbitrator(t *testing.T, log ArbitratorLog,
417417
}
418418

419419
// We'll use the resolvedChan to synchronize on call to
420-
// MarkChannelResolved.
420+
// NotifyChannelResolved.
421421
resolvedChan := make(chan struct{}, 1)
422422

423423
// Next we'll create the matching configuration struct that contains
424424
// all interfaces and methods the arbitrator needs to do its job.
425425
arbCfg := &ChannelArbitratorConfig{
426426
ChanPoint: chanPoint,
427427
ShortChanID: shortChanID,
428-
MarkChannelResolved: func() error {
428+
NotifyChannelResolved: func() {
429429
resolvedChan <- struct{}{}
430-
return nil
431430
},
432431
MarkCommitmentBroadcasted: func(_ *wire.MsgTx,
433432
_ lntypes.ChannelParty) error {
@@ -547,7 +546,7 @@ func TestChannelArbitratorCooperativeClose(t *testing.T) {
547546
}
548547

549548
// Cooperative close should do trigger a MarkChannelClosed +
550-
// MarkChannelResolved.
549+
// NotifyChannelResolved.
551550
closeInfo := &CooperativeCloseInfo{
552551
&channeldb.ChannelCloseSummary{},
553552
}

discovery/gossiper.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,10 @@ type Config struct {
399399
// MsgBurstBytes is the allotted burst amount in bytes. This is the
400400
// number of starting tokens in our token bucket algorithm.
401401
MsgBurstBytes uint64
402+
403+
// FilterConcurrency is the maximum number of concurrent gossip filter
404+
// applications that can be processed.
405+
FilterConcurrency int
402406
}
403407

404408
// processedNetworkMsg is a wrapper around networkMsg and a boolean. It is
@@ -600,6 +604,7 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper
600604
IsStillZombieChannel: cfg.IsStillZombieChannel,
601605
AllotedMsgBytesPerSecond: cfg.MsgRateBytes,
602606
AllotedMsgBytesBurst: cfg.MsgBurstBytes,
607+
FilterConcurrency: cfg.FilterConcurrency,
603608
})
604609

605610
gossiper.reliableSender = newReliableSender(&reliableSenderCfg{
@@ -907,13 +912,16 @@ func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(ctx context.Context,
907912
return errChan
908913
}
909914

910-
// If we've found the message target, then we'll dispatch the
911-
// message directly to it.
912-
if err := syncer.ApplyGossipFilter(ctx, m); err != nil {
913-
log.Warnf("Unable to apply gossip filter for peer=%x: "+
914-
"%v", peer.PubKey(), err)
915+
// Queue the message for asynchronous processing to prevent
916+
// blocking the gossiper when rate limiting is active.
917+
if !syncer.QueueTimestampRange(m) {
918+
log.Warnf("Unable to queue gossip filter for peer=%x: "+
919+
"queue full", peer.PubKey())
915920

916-
errChan <- err
921+
// Return nil to indicate we've handled the message,
922+
// even though it was dropped. This prevents the peer
923+
// from being disconnected.
924+
errChan <- nil
917925
return errChan
918926
}
919927

discovery/sync_manager.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,20 @@ const (
2525
// network as possible.
2626
DefaultHistoricalSyncInterval = time.Hour
2727

28-
// filterSemaSize is the capacity of gossipFilterSema.
29-
filterSemaSize = 5
28+
// DefaultFilterConcurrency is the default maximum number of concurrent
29+
// gossip filter applications that can be processed.
30+
DefaultFilterConcurrency = 5
3031

3132
// DefaultMsgBytesBurst is the allotted burst in bytes we'll permit.
3233
// This is the most that can be sent in a given go. Requests beyond
3334
// this, will block indefinitely. Once tokens (bytes are depleted),
3435
// they'll be refilled at the DefaultMsgBytesPerSecond rate.
35-
DefaultMsgBytesBurst = 2 * 100 * 1_024
36+
DefaultMsgBytesBurst = 2 * 1000 * 1_024
3637

3738
// DefaultMsgBytesPerSecond is the max bytes/s we'll permit for outgoing
3839
// messages. Once tokens (bytes) have been taken from the bucket,
3940
// they'll be refilled at this rate.
40-
DefaultMsgBytesPerSecond = 100 * 1_024
41+
DefaultMsgBytesPerSecond = 1000 * 1_024
4142

4243
// assumedMsgSize is the assumed size of a message if we can't compute
4344
// its serialized size. This comes out to 1 KB.
@@ -136,6 +137,10 @@ type SyncManagerCfg struct {
136137
// AllotedMsgBytesBurst is the amount of burst bytes we'll permit, if
137138
// we've exceeded the hard upper limit.
138139
AllotedMsgBytesBurst uint64
140+
141+
// FilterConcurrency is the maximum number of concurrent gossip filter
142+
// applications that can be processed. If not set, defaults to 5.
143+
FilterConcurrency int
139144
}
140145

141146
// SyncManager is a subsystem of the gossiper that manages the gossip syncers
@@ -207,8 +212,13 @@ type SyncManager struct {
207212
// newSyncManager constructs a new SyncManager backed by the given config.
208213
func newSyncManager(cfg *SyncManagerCfg) *SyncManager {
209214

210-
filterSema := make(chan struct{}, filterSemaSize)
211-
for i := 0; i < filterSemaSize; i++ {
215+
filterConcurrency := cfg.FilterConcurrency
216+
if filterConcurrency == 0 {
217+
filterConcurrency = DefaultFilterConcurrency
218+
}
219+
220+
filterSema := make(chan struct{}, filterConcurrency)
221+
for i := 0; i < filterConcurrency; i++ {
212222
filterSema <- struct{}{}
213223
}
214224

0 commit comments

Comments
 (0)