Skip to content

Commit bec708b

Browse files
authored
Send splice commit_sigs as a batch (#787)
We introduce a `CommitSigBatch` class to group `commit_sig` messages when splice transactions are pending. We use this class to ensure that all the `commit_sig` messages in the batch are sent together to our peer, without any other messages in-between. When we receive `commit_sig` messages that contain the `batch` TLV, we group them directly in the `PeerConnection` before relaying them as a batch to the channel.
1 parent a74a915 commit bec708b

File tree

9 files changed

+157
-148
lines changed

9 files changed

+157
-148
lines changed

modules/core/src/commonMain/kotlin/fr/acinq/lightning/channel/Commitments.kt

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -762,7 +762,7 @@ data class Commitments(
762762
return failure?.let { Either.Left(it) } ?: Either.Right(copy(changes = changes1))
763763
}
764764

765-
fun sendCommit(channelKeys: KeyManager.ChannelKeys, log: MDCLogger): Either<ChannelException, Pair<Commitments, List<CommitSig>>> {
765+
fun sendCommit(channelKeys: KeyManager.ChannelKeys, log: MDCLogger): Either<ChannelException, Pair<Commitments, CommitSigs>> {
766766
val remoteNextPerCommitmentPoint = remoteNextCommitInfo.right ?: return Either.Left(CannotSignBeforeRevocation(channelId))
767767
if (!changes.localHasChanges()) return Either.Left(CannotSignWithoutChanges(channelId))
768768
val (active1, sigs) = active.map { it.sendCommit(channelKeys, params, changes, remoteNextPerCommitmentPoint, active.size, log) }.unzip()
@@ -774,18 +774,22 @@ data class Commitments(
774774
remoteChanges = changes.remoteChanges.copy(acked = emptyList(), signed = changes.remoteChanges.acked)
775775
)
776776
)
777-
return Either.Right(Pair(commitments1, sigs))
777+
return Either.Right(Pair(commitments1, CommitSigs.fromSigs(sigs)))
778778
}
779779

780-
fun receiveCommit(commits: List<CommitSig>, channelKeys: KeyManager.ChannelKeys, log: MDCLogger): Either<ChannelException, Pair<Commitments, RevokeAndAck>> {
780+
fun receiveCommit(commits: CommitSigs, channelKeys: KeyManager.ChannelKeys, log: MDCLogger): Either<ChannelException, Pair<Commitments, RevokeAndAck>> {
781781
// We may receive more commit_sig than the number of active commitments, because there can be a race where we send splice_locked
782782
// while our peer is sending us a batch of commit_sig. When that happens, we simply need to discard the commit_sig that belong
783783
// to commitments we deactivated.
784-
if (commits.size < active.size) {
785-
return Either.Left(CommitSigCountMismatch(channelId, active.size, commits.size))
784+
val sigs = when (commits) {
785+
is CommitSigBatch -> commits.messages
786+
is CommitSig -> listOf(commits)
787+
}
788+
if (sigs.size < active.size) {
789+
return Either.Left(CommitSigCountMismatch(channelId, active.size, sigs.size))
786790
}
787791
// Signatures are sent in order (most recent first), calling `zip` will drop trailing sigs that are for deactivated/pruned commitments.
788-
val active1 = active.zip(commits).map {
792+
val active1 = active.zip(sigs).map {
789793
when (val commitment1 = it.first.receiveCommit(channelKeys, params, changes, it.second, log)) {
790794
is Either.Left -> return Either.Left(commitment1.value)
791795
is Either.Right -> commitment1.value

modules/core/src/commonMain/kotlin/fr/acinq/lightning/channel/states/Channel.kt

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -674,22 +674,6 @@ sealed class ChannelStateWithCommitments : PersistedChannelState() {
674674
}
675675
}
676676
}
677-
678-
// in Normal and Shutdown we aggregate sigs for splices before processing
679-
var sigStash = emptyList<CommitSig>()
680-
681-
/** For splices we will send one commit_sig per active commitments. */
682-
internal fun ChannelContext.aggregateSigs(commit: CommitSig): List<CommitSig>? {
683-
sigStash = sigStash + commit
684-
logger.debug { "received sig for batch of size=${commit.batchSize}" }
685-
return if (sigStash.size == commit.batchSize) {
686-
val sigs = sigStash
687-
sigStash = emptyList()
688-
sigs
689-
} else {
690-
null
691-
}
692-
}
693677
}
694678

695679
object Channel {

modules/core/src/commonMain/kotlin/fr/acinq/lightning/channel/states/Normal.kt

Lines changed: 26 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ data class Normal(
8080
val actions = buildList {
8181
add(ChannelAction.Storage.StoreHtlcInfos(htlcInfos))
8282
add(ChannelAction.Storage.StoreState(nextState))
83-
addAll(result.value.second.map { ChannelAction.Message.Send(it) })
83+
add(ChannelAction.Message.Send(result.value.second))
8484
}
8585
Pair(nextState, actions)
8686
}
@@ -173,12 +173,12 @@ data class Normal(
173173
is Either.Left -> handleLocalError(cmd, result.value)
174174
is Either.Right -> Pair(this@Normal.copy(commitments = result.value), listOf())
175175
}
176-
is CommitSig -> when {
176+
is CommitSigs -> when {
177177
spliceStatus == SpliceStatus.Aborted -> {
178178
logger.warning { "received commit_sig after sending tx_abort, they probably sent it before receiving our tx_abort, ignoring..." }
179179
Pair(this@Normal, listOf())
180180
}
181-
spliceStatus is SpliceStatus.WaitingForSigs -> {
181+
spliceStatus is SpliceStatus.WaitingForSigs && cmd.message is CommitSig -> {
182182
val (signingSession1, action) = spliceStatus.session.receiveCommitSig(channelKeys(), commitments.params, cmd.message, currentBlockHeight.toLong(), logger)
183183
when (action) {
184184
is InteractiveTxSigningSessionAction.AbortFundingAttempt -> {
@@ -193,7 +193,7 @@ data class Normal(
193193
is InteractiveTxSigningSessionAction.SendTxSigs -> sendSpliceTxSigs(spliceStatus.origins, action, spliceStatus.liquidityPurchase)
194194
}
195195
}
196-
ignoreRetransmittedCommitSig(cmd.message) -> {
196+
cmd.message is CommitSig && ignoreRetransmittedCommitSig(cmd.message) -> {
197197
// We haven't received our peer's tx_signatures for the latest funding transaction and asked them to resend it on reconnection.
198198
// They also resend their corresponding commit_sig, but we have already received it so we should ignore it.
199199
// Note that the funding transaction may have confirmed while we were offline.
@@ -202,34 +202,29 @@ data class Normal(
202202
}
203203
// NB: in all other cases we process the commit_sig normally. We could do a full pattern matching on all splice statuses, but it would force us to handle
204204
// corner cases like race condition between splice_init and a non-splice commit_sig
205-
else -> {
206-
when (val sigs = aggregateSigs(cmd.message)) {
207-
is List<CommitSig> -> when (val result = commitments.receiveCommit(sigs, channelKeys(), logger)) {
208-
is Either.Left -> handleLocalError(cmd, result.value)
209-
is Either.Right -> {
210-
val commitments1 = result.value.first
211-
val spliceStatus1 = when {
212-
spliceStatus is SpliceStatus.QuiescenceRequested && commitments1.localIsQuiescent() -> SpliceStatus.InitiatorQuiescent(spliceStatus.command)
213-
spliceStatus is SpliceStatus.ReceivedStfu && commitments1.localIsQuiescent() -> SpliceStatus.NonInitiatorQuiescent
214-
else -> spliceStatus
215-
}
216-
val nextState = this@Normal.copy(commitments = commitments1, spliceStatus = spliceStatus1)
217-
val actions = mutableListOf<ChannelAction>()
218-
actions.add(ChannelAction.Storage.StoreState(nextState))
219-
actions.add(ChannelAction.Message.Send(result.value.second))
220-
if (commitments1.changes.localHasChanges()) {
221-
actions.add(ChannelAction.Message.SendToSelf(ChannelCommand.Commitment.Sign))
222-
}
223-
// If we're now quiescent, we may send our stfu message.
224-
when {
225-
spliceStatus is SpliceStatus.QuiescenceRequested && commitments1.localIsQuiescent() -> actions.add(ChannelAction.Message.Send(Stfu(channelId, initiator = true)))
226-
spliceStatus is SpliceStatus.ReceivedStfu && commitments1.localIsQuiescent() -> actions.add(ChannelAction.Message.Send(Stfu(channelId, initiator = false)))
227-
else -> {}
228-
}
229-
Pair(nextState, actions)
230-
}
205+
else -> when (val result = commitments.receiveCommit(cmd.message, channelKeys(), logger)) {
206+
is Either.Left -> handleLocalError(cmd, result.value)
207+
is Either.Right -> {
208+
val commitments1 = result.value.first
209+
val spliceStatus1 = when {
210+
spliceStatus is SpliceStatus.QuiescenceRequested && commitments1.localIsQuiescent() -> SpliceStatus.InitiatorQuiescent(spliceStatus.command)
211+
spliceStatus is SpliceStatus.ReceivedStfu && commitments1.localIsQuiescent() -> SpliceStatus.NonInitiatorQuiescent
212+
else -> spliceStatus
213+
}
214+
val nextState = this@Normal.copy(commitments = commitments1, spliceStatus = spliceStatus1)
215+
val actions = mutableListOf<ChannelAction>()
216+
actions.add(ChannelAction.Storage.StoreState(nextState))
217+
actions.add(ChannelAction.Message.Send(result.value.second))
218+
if (commitments1.changes.localHasChanges()) {
219+
actions.add(ChannelAction.Message.SendToSelf(ChannelCommand.Commitment.Sign))
231220
}
232-
else -> Pair(this@Normal, listOf())
221+
// If we're now quiescent, we may send our stfu message.
222+
when {
223+
spliceStatus is SpliceStatus.QuiescenceRequested && commitments1.localIsQuiescent() -> actions.add(ChannelAction.Message.Send(Stfu(channelId, initiator = true)))
224+
spliceStatus is SpliceStatus.ReceivedStfu && commitments1.localIsQuiescent() -> actions.add(ChannelAction.Message.Send(Stfu(channelId, initiator = false)))
225+
else -> {}
226+
}
227+
Pair(nextState, actions)
233228
}
234229
}
235230
}
@@ -822,8 +817,6 @@ data class Normal(
822817
SpliceStatus.None
823818
}
824819
}
825-
// reset the commit_sig batch
826-
sigStash = emptyList()
827820
Pair(Offline(this@Normal.copy(spliceStatus = spliceStatus1)), failedHtlcs)
828821
}
829822
is ChannelCommand.Connected -> unhandled(cmd)

modules/core/src/commonMain/kotlin/fr/acinq/lightning/channel/states/ShuttingDown.kt

Lines changed: 17 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -39,29 +39,26 @@ data class ShuttingDown(
3939
is Either.Left -> handleLocalError(cmd, result.value)
4040
is Either.Right -> Pair(this@ShuttingDown.copy(commitments = result.value), listOf())
4141
}
42-
is CommitSig -> when (val sigs = aggregateSigs(cmd.message)) {
43-
is List<CommitSig> -> when (val result = commitments.receiveCommit(sigs, channelKeys(), logger)) {
44-
is Either.Left -> handleLocalError(cmd, result.value)
45-
is Either.Right -> {
46-
val (commitments1, revocation) = result.value
47-
when {
48-
commitments1.hasNoPendingHtlcsOrFeeUpdate() -> startClosingNegotiation(closeCommand, commitments1, localShutdown, remoteShutdown, listOf(ChannelAction.Message.Send(revocation)))
49-
else -> {
50-
val nextState = this@ShuttingDown.copy(commitments = commitments1)
51-
val actions = buildList {
52-
add(ChannelAction.Storage.StoreState(nextState))
53-
add(ChannelAction.Message.Send(revocation))
54-
if (commitments1.changes.localHasChanges()) {
55-
// if we have newly acknowledged changes let's sign them
56-
add(ChannelAction.Message.SendToSelf(ChannelCommand.Commitment.Sign))
57-
}
42+
is CommitSigs -> when (val result = commitments.receiveCommit(cmd.message, channelKeys(), logger)) {
43+
is Either.Left -> handleLocalError(cmd, result.value)
44+
is Either.Right -> {
45+
val (commitments1, revocation) = result.value
46+
when {
47+
commitments1.hasNoPendingHtlcsOrFeeUpdate() -> startClosingNegotiation(closeCommand, commitments1, localShutdown, remoteShutdown, listOf(ChannelAction.Message.Send(revocation)))
48+
else -> {
49+
val nextState = this@ShuttingDown.copy(commitments = commitments1)
50+
val actions = buildList {
51+
add(ChannelAction.Storage.StoreState(nextState))
52+
add(ChannelAction.Message.Send(revocation))
53+
if (commitments1.changes.localHasChanges()) {
54+
// if we have newly acknowledged changes let's sign them
55+
add(ChannelAction.Message.SendToSelf(ChannelCommand.Commitment.Sign))
5856
}
59-
Pair(nextState, actions)
6057
}
58+
Pair(nextState, actions)
6159
}
6260
}
6361
}
64-
else -> Pair(this@ShuttingDown, listOf())
6562
}
6663
is RevokeAndAck -> when (val result = commitments.receiveRevocation(cmd.message)) {
6764
is Either.Left -> handleLocalError(cmd, result.value)
@@ -128,7 +125,7 @@ data class ShuttingDown(
128125
val actions = buildList {
129126
add(ChannelAction.Storage.StoreHtlcInfos(htlcInfos))
130127
add(ChannelAction.Storage.StoreState(nextState))
131-
addAll(result.value.second.map { ChannelAction.Message.Send(it) })
128+
add(ChannelAction.Message.Send(result.value.second))
132129
}
133130
Pair(nextState, actions)
134131
}
@@ -164,11 +161,7 @@ data class ShuttingDown(
164161
is WatchSpentTriggered -> handlePotentialForceClose(watch)
165162
}
166163
is ChannelCommand.Commitment.CheckHtlcTimeout -> checkHtlcTimeout()
167-
is ChannelCommand.Disconnected -> {
168-
// reset the commit_sig batch
169-
sigStash = emptyList()
170-
Pair(Offline(this@ShuttingDown), listOf())
171-
}
164+
is ChannelCommand.Disconnected -> Pair(Offline(this@ShuttingDown), listOf())
172165
else -> unhandled(cmd)
173166
}
174167
}

modules/core/src/commonMain/kotlin/fr/acinq/lightning/channel/states/Syncing.kt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -413,29 +413,29 @@ data class Syncing(val state: PersistedChannelState, val channelReestablishSent:
413413
val signedUpdates = commitments.changes.localChanges.signed
414414
val channelParams = commitments.params
415415
val batchSize = commitments.active.size
416-
val commitSigs = commitments.active.mapNotNull { c ->
416+
val commitSigs = CommitSigs.fromSigs(commitments.active.mapNotNull { c ->
417417
val commitInput = c.commitInput
418418
// Note that we ignore errors and simply skip failures to sign: we've already signed those updates before
419419
// the disconnection, so we don't expect any error here unless our peer sends an invalid nonce. In that
420420
// case, we simply won't send back our commit_sig until they fix their node.
421421
c.nextRemoteCommit?.commit?.sign(channelKeys, channelParams, c.fundingTxIndex, c.remoteFundingPubkey, commitInput, batchSize)
422-
}
422+
})
423423
val retransmit = when (retransmitRevocation) {
424424
null -> buildList {
425425
addAll(signedUpdates)
426-
addAll(commitSigs)
426+
add(commitSigs)
427427
}
428428
else -> if (commitments.localCommitIndex > rnci.value.sentAfterLocalCommitIndex) {
429429
buildList {
430430
addAll(signedUpdates)
431-
addAll(commitSigs)
431+
add(commitSigs)
432432
add(retransmitRevocation)
433433
}
434434
} else {
435435
buildList {
436436
add(retransmitRevocation)
437437
addAll(signedUpdates)
438-
addAll(commitSigs)
438+
add(commitSigs)
439439
}
440440
}
441441
}

0 commit comments

Comments
 (0)