diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala index f026d7f32e..a849fb42c0 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala @@ -264,11 +264,11 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm val fundingTxId = d.channels.get(shortChannelId).orElse(d.prunedChannels.get(shortChannelId)).get.fundingTxId log.info("funding tx txId={} of channelId={} has been spent by txId={}: waiting for the spending tx to have enough confirmations before removing the channel from the graph", fundingTxId, shortChannelId, spendingTx.txid) watcher ! WatchTxConfirmed(self, spendingTx.txid, ANNOUNCEMENTS_MINCONF * 2) - stay() using d.copy(spentChannels = d.spentChannels + (spendingTx.txid -> shortChannelId)) + stay() using d.copy(spentChannels = d.spentChannels.updated(spendingTx.txid, d.spentChannels.getOrElse(spendingTx.txid, Set.empty) + shortChannelId)) case Event(WatchTxConfirmedTriggered(_, _, spendingTx), d) => d.spentChannels.get(spendingTx.txid) match { - case Some(shortChannelId) => stay() using Validation.handleChannelSpent(d, watcher, nodeParams.db.network, spendingTx.txid, shortChannelId) + case Some(shortChannelIds) => stay() using Validation.handleChannelSpent(d, watcher, nodeParams.db.network, spendingTx.txid, shortChannelIds) case None => stay() } @@ -771,7 +771,7 @@ object Router { excludedChannels: Map[ChannelDesc, ExcludedChannelStatus], // those channels are temporarily excluded from route calculation, because their node returned a TemporaryChannelFailure graphWithBalances: GraphWithBalanceEstimates, sync: Map[PublicKey, Syncing], // keep tracks of channel range queries sent to each peer. If there is an entry in the map, it means that there is an ongoing query for which we have not yet received an 'end' message - spentChannels: Map[TxId, RealShortChannelId], // transactions that spend funding txs that are not yet deeply buried + spentChannels: Map[TxId, Set[RealShortChannelId]], // transactions that spend funding txs that are not yet deeply buried ) { def resolve(scid: ShortChannelId): Option[KnownChannel] = { // let's assume this is a real scid diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala index ed6dd28fdf..0e5ecb9468 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala @@ -113,18 +113,16 @@ object Validation { log.debug("validation successful for shortChannelId={}", c.shortChannelId) remoteOrigins.foreach(o => sendDecision(o.peerConnection, GossipDecision.Accepted(c))) val capacity = tx.txOut(outputIndex).amount - d0.spentChannels.get(tx.txid) match { - case Some(parentScid) => - d0.channels.get(parentScid) match { - case Some(parentChannel) => - Some(updateSplicedPublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, parentChannel)) - case None => - log.error("spent parent channel shortChannelId={} not found for splice shortChannelId={}", parentScid, c.shortChannelId) - val spendingTxs = d0.spentChannels.filter(_._2 == parentScid).keySet - spendingTxs.foreach(txId => watcher ! UnwatchTxConfirmed(txId)) - val d1 = d0.copy(spentChannels = d0.spentChannels -- spendingTxs) - Some(addPublicChannel(d1, nodeParams, watcher, c, tx.txid, capacity, None)) - } + // A single transaction may splice multiple channels (batching), in which case we have multiple parent + // channels. We cannot know which parent channel this announcement corresponds to, but it doesn't matter. + // We only need to update one of the parent channels between the same set of nodes to correctly update + // our graph. + val parentChannel_opt = d0.spentChannels + .getOrElse(tx.txid, Set.empty) + .flatMap(d0.channels.get) + .find(parent => parent.nodeId1 == c.nodeId1 && parent.nodeId2 == c.nodeId2) + parentChannel_opt match { + case Some(parentChannel) => Some(updateSplicedPublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, parentChannel)) case None => Some(addPublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, None)) } } @@ -191,9 +189,12 @@ object Validation { // we need to update the graph because the edge identifiers and capacity change from the parent scid to the new splice scid log.debug("updating the graph for shortChannelId={}", newPubChan.shortChannelId) val graph1 = d.graphWithBalances.updateChannel(ChannelDesc(parentChannel.shortChannelId, parentChannel.nodeId1, parentChannel.nodeId2), ann.shortChannelId, capacity) - val spendingTxs = d.spentChannels.filter(_._2 == parentChannel.shortChannelId).keySet - spendingTxs.foreach(txId => watcher ! UnwatchTxConfirmed(txId)) - val spentChannels1 = d.spentChannels -- spendingTxs + val spentChannels1 = d.spentChannels.collect { + case (txId, parentScids) if (parentScids - parentChannel.shortChannelId).nonEmpty => + txId -> (parentScids - parentChannel.shortChannelId) + } + // No need to keep watching transactions that have been removed from spentChannels. + (d.spentChannels.keySet -- spentChannels1.keys).foreach(txId => watcher ! UnwatchTxConfirmed(txId)) d.copy( // we also add the splice scid -> channelId and remove the parent scid -> channelId mappings channels = d.channels + (newPubChan.shortChannelId -> newPubChan) - parentChannel.shortChannelId, @@ -267,34 +268,42 @@ object Validation { } else d1 } - def handleChannelSpent(d: Data, watcher: typed.ActorRef[ZmqWatcher.Command], db: NetworkDb, spendingTxId: TxId, shortChannelId: RealShortChannelId)(implicit ctx: ActorContext, log: LoggingAdapter): Data = { + def handleChannelSpent(d: Data, watcher: typed.ActorRef[ZmqWatcher.Command], db: NetworkDb, spendingTxId: TxId, shortChannelIds: Set[RealShortChannelId])(implicit ctx: ActorContext, log: LoggingAdapter): Data = { implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors - val lostChannel = d.channels.get(shortChannelId).orElse(d.prunedChannels.get(shortChannelId)).get - log.info("funding tx for channelId={} was spent", shortChannelId) + val lostChannels = shortChannelIds.flatMap(shortChannelId => d.channels.get(shortChannelId).orElse(d.prunedChannels.get(shortChannelId))) + log.info("funding tx for channelIds={} was spent", shortChannelIds.mkString(",")) // we need to remove nodes that aren't tied to any channels anymore - val channels1 = d.channels - shortChannelId - val prunedChannels1 = d.prunedChannels - shortChannelId - val lostNodes = Seq(lostChannel.nodeId1, lostChannel.nodeId2).filterNot(nodeId => hasChannels(nodeId, channels1.values)) + val channels1 = d.channels -- shortChannelIds + val prunedChannels1 = d.prunedChannels -- shortChannelIds + val lostNodes = lostChannels.flatMap(lostChannel => Seq(lostChannel.nodeId1, lostChannel.nodeId2).filterNot(nodeId => hasChannels(nodeId, channels1.values))) // let's clean the db and send the events - log.info("pruning shortChannelId={} (spent)", shortChannelId) - db.removeChannel(shortChannelId) // NB: this also removes channel updates + log.info("pruning shortChannelIds={} (spent)", shortChannelIds.mkString(",")) + shortChannelIds.foreach(db.removeChannel(_)) // NB: this also removes channel updates // we also need to remove updates from the graph - val graphWithBalances1 = d.graphWithBalances - .removeChannel(ChannelDesc(lostChannel.shortChannelId, lostChannel.nodeId1, lostChannel.nodeId2)) + val graphWithBalances1 = lostChannels.foldLeft(d.graphWithBalances) { (graph, lostChannel) => + graph.removeChannel(ChannelDesc(lostChannel.shortChannelId, lostChannel.nodeId1, lostChannel.nodeId2)) + } // we notify front nodes - ctx.system.eventStream.publish(ChannelLost(shortChannelId)) + shortChannelIds.foreach(shortChannelId => ctx.system.eventStream.publish(ChannelLost(shortChannelId))) lostNodes.foreach { nodeId => log.info("pruning nodeId={} (spent)", nodeId) db.removeNode(nodeId) ctx.system.eventStream.publish(NodeLost(nodeId)) } - // we no longer need to track this or alternative transactions that spent the parent channel - // either this channel was really closed, or it was spliced and the announcement was not received in time - // we will re-add a spliced channel as a new channel later when we receive the announcement - watcher ! UnwatchExternalChannelSpent(lostChannel.fundingTxId, outputIndex(lostChannel.ann.shortChannelId)) - val spendingTxs = d.spentChannels.filter(_._2 == shortChannelId).keySet - // stop watching the spending txs that will never confirm, we already got confirmations for this spending tx + lostChannels.foreach { + lostChannel => + // we no longer need to track this or alternative transactions that spent the parent channel + // either this channel was really closed, or it was spliced and the announcement was not received in time + // we will re-add a spliced channel as a new channel later when we receive the announcement + watcher ! UnwatchExternalChannelSpent(lostChannel.fundingTxId, outputIndex(lostChannel.ann.shortChannelId)) + } + + // We may have received RBF candidates for this splice: we can find them by looking at transactions that spend one + // of the channels we're removing (note that they may spend a slightly different set of channels). + // Those transactions cannot confirm anymore (they have been double-spent by the current one), so we should stop + // watching them. + val spendingTxs = d.spentChannels.filter(_._2.intersect(shortChannelIds).nonEmpty).keySet (spendingTxs - spendingTxId).foreach(txId => watcher ! UnwatchTxConfirmed(txId)) val spentChannels1 = d.spentChannels -- spendingTxs d.copy(nodes = d.nodes -- lostNodes, channels = channels1, prunedChannels = prunedChannels1, graphWithBalances = graphWithBalances1, spentChannels = spentChannels1) @@ -599,7 +608,14 @@ object Validation { log.debug("this is a known pruned local channel, processing channel_update for channelId={} scid={}", lcu.channelId, ann.shortChannelId) handleChannelUpdate(d, db, nodeParams.currentBlockHeight, Left(lcu)) case Some(ann) => - val d1 = d.spentChannels.get(ann.fundingTxId).flatMap(parentScid => d.channels.get(parentScid)) match { + // A single transaction may splice multiple channels (batching), in which case we have multiple parent + // channels. We cannot know which parent channel this announcement corresponds to, but it doesn't matter. + // We only need to update one of the parent channels between the same set of nodes to correctly update + // our graph. + val d1 = d.spentChannels + .getOrElse(ann.fundingTxId, Set.empty) + .flatMap(d.channels.get) + .find(parent => parent.nodeId1 == ann.announcement.nodeId1 && parent.nodeId2 == ann.announcement.nodeId2) match { case Some(parentChannel) => // This is a splice for which we haven't processed the (local) channel_announcement yet. log.debug("processing channel_announcement for local splice with fundingTxId={} channelId={} scid={} (previous={})", ann.fundingTxId, lcu.channelId, ann.shortChannelId, parentChannel.shortChannelId) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/channel/GossipIntegrationSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/channel/GossipIntegrationSpec.scala index 1911f21c44..32b19ea443 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/channel/GossipIntegrationSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/channel/GossipIntegrationSpec.scala @@ -52,13 +52,20 @@ class GossipIntegrationSpec extends FixtureSpec with IntegrationPatience { val channels = getPeerChannels(alice, bob.nodeId) ++ getPeerChannels(bob, carol.nodeId) assert(channels.map(_.data.channelId).toSet == Set(channelId_ab, channelId_bc)) - eventually { + val scid_ab = eventually { assert(getChannelData(alice, channelId_ab).asInstanceOf[DATA_NORMAL].commitments.latest.localFundingStatus.isInstanceOf[LocalFundingStatus.ConfirmedFundingTx]) assert(getChannelData(bob, channelId_bc).asInstanceOf[DATA_NORMAL].commitments.latest.localFundingStatus.isInstanceOf[LocalFundingStatus.ConfirmedFundingTx]) + val scid_ab = getChannelData(alice, channelId_ab).asInstanceOf[DATA_NORMAL].commitments.latest.shortChannelId_opt.get + // Wait for Alice to receive both initial local channel updates. + inside(getRouterData(alice)) { routerData => + val channel_ab = routerData.channels(scid_ab) + val receivedUpdates = Seq(channel_ab.update_1_opt, channel_ab.update_2_opt).flatten + assert(receivedUpdates.count(_.shortChannelId == scid_ab) == 2) + } + scid_ab } // We splice in to increase the capacity of the alice->bob channel. - val scid_ab = getChannelData(alice, channelId_ab).asInstanceOf[DATA_NORMAL].commitments.latest.shortChannelId_opt.get val spliceTxId = spliceIn(alice, channelId_ab, 100_000 sat, None).asInstanceOf[RES_SPLICE].fundingTxId // The announcement for the splice transaction and the corresponding channel updates are broadcast. @@ -73,10 +80,12 @@ class GossipIntegrationSpec extends FixtureSpec with IntegrationPatience { val splice_scid_ab = channelData_alice.commitments.latest.shortChannelId_opt.get assert(splice_scid_ab != scid_ab) assert(channelData_bob.commitments.latest.shortChannelId_opt.contains(splice_scid_ab)) + val scid_bc = getChannelData(bob, channelId_bc).asInstanceOf[DATA_NORMAL].commitments.latest.shortChannelId_opt.get // Alice creates a channel_announcement for the splice transaction and updates the graph. val spliceAnn = inside(getRouterData(alice)) { routerData => - assert(routerData.channels.contains(splice_scid_ab)) + assert(routerData.channels.keys == Set(splice_scid_ab, scid_bc)) + assert(routerData.spentChannels.isEmpty) val channel_ab = routerData.channels(splice_scid_ab) assert(channel_ab.capacity == 200_000.sat) assert(channel_ab.update_1_opt.nonEmpty && channel_ab.update_2_opt.nonEmpty) @@ -91,6 +100,8 @@ class GossipIntegrationSpec extends FixtureSpec with IntegrationPatience { // Bob also creates a channel_announcement for the splice transaction and updates the graph. inside(getRouterData(bob)) { routerData => + assert(routerData.channels.keys == Set(splice_scid_ab, scid_bc)) + assert(routerData.spentChannels.isEmpty) assert(routerData.channels.get(splice_scid_ab).map(_.ann).contains(spliceAnn)) routerData.channels.get(splice_scid_ab).foreach(c => { assert(c.capacity == 200_000.sat) @@ -106,6 +117,8 @@ class GossipIntegrationSpec extends FixtureSpec with IntegrationPatience { // The channel_announcement for the splice propagates to Carol. inside(getRouterData(carol)) { routerData => + assert(routerData.channels.keys == Set(splice_scid_ab, scid_bc)) + assert(routerData.spentChannels.isEmpty) assert(routerData.channels.get(splice_scid_ab).map(_.ann).contains(spliceAnn)) routerData.channels.get(splice_scid_ab).foreach(c => { assert(c.capacity == 200_000.sat) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/BaseRouterSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/BaseRouterSpec.scala index e28dfb66c0..9d21c490a4 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/router/BaseRouterSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/BaseRouterSpec.scala @@ -231,6 +231,44 @@ abstract class BaseRouterSpec extends TestKitBaseClass with FixtureAnyFunSuiteLi } } + def addChannel(router: ActorRef, watcher: TestProbe, scid: RealShortChannelId, priv1: PrivateKey, priv2: PrivateKey, priv_funding1: PrivateKey, priv_funding2: PrivateKey): (ChannelAnnouncement, ChannelUpdate, ChannelUpdate) = { + val ann = channelAnnouncement(scid, priv1, priv2, priv_funding1, priv_funding2) + val pub1 = priv1.publicKey + val pub2 = priv2.publicKey + val update1 = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv1, pub2, scid, CltvExpiryDelta(7), htlcMinimumMsat = 0 msat, feeBaseMsat = 10 msat, feeProportionalMillionths = 10, htlcMaximumMsat = htlcMaximum) + val update2 = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv2, pub1, scid, CltvExpiryDelta(7), htlcMinimumMsat = 0 msat, feeBaseMsat = 10 msat, feeProportionalMillionths = 10, htlcMaximumMsat = htlcMaximum) + val pub_funding1 = priv_funding1.publicKey + val pub_funding2 = priv_funding2.publicKey + assert(ChannelDesc(update1, ann) == ChannelDesc(ann.shortChannelId, pub1, pub2)) + val sender1 = TestProbe() + val peerConnection = TestProbe() + peerConnection.ignoreMsg { case _: TransportHandler.ReadAck => true } + peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, ann)) + peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, update1)) + peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, update2)) + assert(watcher.expectMsgType[ValidateRequest].ann == ann) + watcher.send(router, ValidateResult(ann, Right((Transaction(version = 2, txIn = Nil, txOut = TxOut(publicChannelCapacity, write(pay2wsh(Scripts.multiSig2of2(pub_funding1, pub_funding2)))) :: Nil, lockTime = 0), UtxoStatus.Unspent)))) + assert(watcher.expectMsgType[WatchExternalChannelSpent].shortChannelId == scid) + peerConnection.expectMsgAllOf( + GossipDecision.Accepted(ann), + GossipDecision.Accepted(update1), + GossipDecision.Accepted(update2) + ) + peerConnection.expectNoMessage(100 millis) + awaitCond({ + sender1.send(router, GetNodes) + val nodes = sender1.expectMsgType[Iterable[NodeAnnouncement]] + sender1.send(router, GetChannels) + val channels = sender1.expectMsgType[Iterable[ChannelAnnouncement]].toSeq + sender1.send(router, GetChannelUpdates) + val updates = sender1.expectMsgType[Iterable[ChannelUpdate]].toSeq + nodes.exists(_.nodeId == pub1) && nodes.exists(_.nodeId == pub2) && + channels.contains(ann) && + updates.contains(update1) && updates.contains(update2) + }, max = 10 seconds, interval = 1 second) + (ann, update1, update2) + } + } object BaseRouterSpec { diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala index 4736d68603..2856f0f48c 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala @@ -326,10 +326,12 @@ class RouterSpec extends BaseRouterSpec { def spendingTx(node1: PublicKey, node2: PublicKey, capacity: Satoshi = publicChannelCapacity): Transaction = { val fundingScript = write(pay2wsh(Scripts.multiSig2of2(node1, node2))) - val nextFundingTx = Transaction(version = 0, txIn = TxIn(OutPoint(fundingTx(node1, node2, capacity), 0), fundingScript, 0) :: Nil, txOut = TxOut(capacity, fundingScript) :: Nil, lockTime = 0) + val nextFundingTx = Transaction(version = 2, txIn = TxIn(OutPoint(fundingTx(node1, node2, capacity), 0), fundingScript, 0) :: Nil, txOut = TxOut(capacity, fundingScript) :: Nil, lockTime = 0) nextFundingTx } + def batchSpendingTx(spendingTxs: Seq[Transaction]): Transaction = Transaction(version = 2, txIn = spendingTxs.flatMap(_.txIn), txOut = spendingTxs.flatMap(_.txOut), lockTime = 0) + test("properly announce lost channels and nodes") { fixture => import fixture._ val eventListener = TestProbe() @@ -1153,12 +1155,39 @@ class RouterSpec extends BaseRouterSpec { } } + def processSpliceChannelAnnouncement(fixture: FixtureParam, parentScid: RealShortChannelId, channelAnnouncement: ChannelAnnouncement, spliceTx: Transaction, newCapacity: Satoshi): Unit = { + import fixture._ + // A splice of the channel is announced and validated. + val peerConnection = TestProbe() + peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, channelAnnouncement)) + peerConnection.expectNoMessage(100 millis) + assert(watcher.expectMsgType[ValidateRequest].ann == channelAnnouncement) + watcher.send(router, ValidateResult(channelAnnouncement, Right(spliceTx, UtxoStatus.Unspent))) + assert(watcher.expectMsgType[WatchExternalChannelSpent].shortChannelId == channelAnnouncement.shortChannelId) + watcher.expectMsgType[UnwatchExternalChannelSpent] // unwatch the parent channel + peerConnection.expectMsg(TransportHandler.ReadAck(channelAnnouncement)) + peerConnection.expectMsg(GossipDecision.Accepted(channelAnnouncement)) + assert(peerConnection.sender() == router) + + // And the graph should be updated too. + val sender = TestProbe() + sender.send(router, Router.GetRouterData) + inside(sender.expectMsgType[Data]) { routerData => + val g = routerData.graphWithBalances.graph + val edge_ab = g.getEdge(ChannelDesc(channelAnnouncement.shortChannelId, channelAnnouncement.nodeId1, channelAnnouncement.nodeId2)).get + val edge_ba = g.getEdge(ChannelDesc(channelAnnouncement.shortChannelId, channelAnnouncement.nodeId2, channelAnnouncement.nodeId1)).get + assert(g.getEdge(ChannelDesc(parentScid, channelAnnouncement.nodeId1, channelAnnouncement.nodeId2)).isEmpty) + assert(g.getEdge(ChannelDesc(parentScid, channelAnnouncement.nodeId2, channelAnnouncement.nodeId1)).isEmpty) + assert(newCapacity == spliceTx.txOut(channelAnnouncement.shortChannelId.outputIndex).amount) + assert(edge_ab.capacity == newCapacity && edge_ba.capacity == newCapacity) + } + } + test("update an existing channel after a splice") { fixture => import fixture._ val eventListener = TestProbe() system.eventStream.subscribe(eventListener.ref, classOf[NetworkEvent]) - val peerConnection = TestProbe() // Channel ab is spent by a splice tx. val capacity1 = publicChannelCapacity - 100_000.sat @@ -1171,8 +1200,8 @@ class RouterSpec extends BaseRouterSpec { eventListener.expectNoMessage(100 millis) // Channel ab is spent and confirmed by an RBF of splice tx. - val capacity2 = publicChannelCapacity - 100_000.sat - 1000.sat - val spliceTx2 = spendingTx(funding_a, funding_b, capacity2) + val newCapacity = publicChannelCapacity - 100_000.sat - 1000.sat + val spliceTx2 = spendingTx(funding_a, funding_b, newCapacity) router ! WatchExternalChannelSpentTriggered(scid_ab, spliceTx2) inside(watcher.expectMsgType[WatchTxConfirmed]) { w => assert(w.txId == spliceTx2.txid) @@ -1183,32 +1212,9 @@ class RouterSpec extends BaseRouterSpec { // The splice of channel ab is announced. val spliceScid = RealShortChannelId(BlockHeight(450000), 1, 0) val spliceAnn = channelAnnouncement(spliceScid, priv_a, priv_b, priv_funding_a, priv_funding_b) - peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, spliceAnn)) - peerConnection.expectNoMessage(100 millis) - assert(watcher.expectMsgType[ValidateRequest].ann == spliceAnn) - watcher.send(router, ValidateResult(spliceAnn, Right(spliceTx2, UtxoStatus.Unspent))) - peerConnection.expectMsg(TransportHandler.ReadAck(spliceAnn)) - peerConnection.expectMsg(GossipDecision.Accepted(spliceAnn)) - assert(peerConnection.sender() == router) - - // And the graph should be updated too. - val sender = TestProbe() - sender.send(router, Router.GetRouterData) - val g = sender.expectMsgType[Data].graphWithBalances.graph - val edge_ab = g.getEdge(ChannelDesc(spliceScid, a, b)).get - val edge_ba = g.getEdge(ChannelDesc(spliceScid, b, a)).get - assert(g.getEdge(ChannelDesc(scid_ab, a, b)).isEmpty) - assert(g.getEdge(ChannelDesc(scid_ab, b, a)).isEmpty) - assert(edge_ab.capacity == capacity2 && edge_ba.capacity == capacity2) - - // The channel update for the splice is confirmed and the channel is not removed. - router ! WatchTxConfirmedTriggered(BlockHeight(0), 0, spendingTx(funding_a, funding_b)) - eventListener.expectMsg(ChannelsDiscovered(SingleChannelDiscovered(spliceAnn, capacity2, None, None) :: Nil)) - eventListener.expectMsg(ChannelLost(scid_ab)) - peerConnection.expectNoMessage(100 millis) - eventListener.expectNoMessage(100 millis) + processSpliceChannelAnnouncement(fixture, scid_ab, spliceAnn, spliceTx2, newCapacity) - // The router no longer tracks the parent scid. + // The router no longer tracks the parent scids. val probe = TestProbe() awaitAssert({ probe.send(router, GetRouterData) @@ -1218,4 +1224,170 @@ class RouterSpec extends BaseRouterSpec { }) } + test("update multiple existing channels with a batch splice") { fixture => + import fixture._ + + // add second b-c channel + val scid_bc2 = RealShortChannelId(BlockHeight(420000), 7, 0) + addChannel(fixture.router, fixture.watcher, scid_bc2, priv_b, priv_c, priv_funding_b, priv_funding_c) + + val eventListener = TestProbe() + system.eventStream.subscribe(eventListener.ref, classOf[NetworkEvent]) + + val newCapacity_ab = publicChannelCapacity - 100_000.sat + val newCapacity_bc = publicChannelCapacity + 50_000.sat + val spliceTx_ab = spendingTx(funding_a, funding_b, newCapacity_ab) + val spliceTx_bc = spendingTx(funding_b, funding_c, newCapacity_bc) + val spliceTx_bc2 = spendingTx(funding_b, funding_c) + val batchSpliceTx = batchSpendingTx(Seq(spliceTx_ab, spliceTx_bc, spliceTx_bc2)) + + // Channel ab is spent by a splice tx. + router ! WatchExternalChannelSpentTriggered(scid_ab, spliceTx_ab) + inside(watcher.expectMsgType[WatchTxConfirmed]) { w => + assert(w.txId == spliceTx_ab.txid) + assert(w.minDepth == 12) + } + + // Channel bc is spent by a splice tx. + router ! WatchExternalChannelSpentTriggered(scid_bc, spliceTx_bc) + inside(watcher.expectMsgType[WatchTxConfirmed]) { w => + assert(w.txId == spliceTx_bc.txid) + assert(w.minDepth == 12) + } + + // Channel bc2 is spent by a splice tx. + router ! WatchExternalChannelSpentTriggered(scid_bc2, spliceTx_bc2) + inside(watcher.expectMsgType[WatchTxConfirmed]) { w => + assert(w.txId == spliceTx_bc2.txid) + assert(w.minDepth == 12) + } + + // Channels ab, bc and bc2 are all spent by the same batch splice tx. + router ! WatchExternalChannelSpentTriggered(scid_ab, batchSpliceTx) + inside(watcher.expectMsgType[WatchTxConfirmed]) { w => + assert(w.txId == batchSpliceTx.txid) + assert(w.minDepth == 12) + } + router ! WatchExternalChannelSpentTriggered(scid_bc, batchSpliceTx) + inside(watcher.expectMsgType[WatchTxConfirmed]) { w => + assert(w.txId == batchSpliceTx.txid) + assert(w.minDepth == 12) + } + router ! WatchExternalChannelSpentTriggered(scid_bc2, batchSpliceTx) + inside(watcher.expectMsgType[WatchTxConfirmed]) { w => + assert(w.txId == batchSpliceTx.txid) + assert(w.minDepth == 12) + } + + // Channels ab, bc and bc2 are also all spent by an RBF of the batch splice tx. + val newCapacity_ab_RBF = newCapacity_ab - 1000.sat + val batchSpliceTx_RBF = batchSpendingTx(Seq(spendingTx(funding_a, funding_b, newCapacity_ab_RBF), spliceTx_bc, spliceTx_bc2)) + router ! WatchExternalChannelSpentTriggered(scid_ab, batchSpliceTx_RBF) + inside(watcher.expectMsgType[WatchTxConfirmed]) { w => + assert(w.txId == batchSpliceTx_RBF.txid) + assert(w.minDepth == 12) + } + router ! WatchExternalChannelSpentTriggered(scid_bc, batchSpliceTx_RBF) + inside(watcher.expectMsgType[WatchTxConfirmed]) { w => + assert(w.txId == batchSpliceTx_RBF.txid) + assert(w.minDepth == 12) + } + router ! WatchExternalChannelSpentTriggered(scid_bc2, batchSpliceTx_RBF) + inside(watcher.expectMsgType[WatchTxConfirmed]) { w => + assert(w.txId == batchSpliceTx_RBF.txid) + assert(w.minDepth == 12) + } + + // The router tracks the possible spending txs for channels ab, bc and bc2. + val sender = TestProbe() + awaitAssert({ + sender.send(router, GetRouterData) + inside(sender.expectMsgType[Data]) { routerData => + assert(routerData.spentChannels(spliceTx_ab.txid) == Set(scid_ab)) + assert(routerData.spentChannels(spliceTx_bc.txid) == Set(scid_bc)) + assert(routerData.spentChannels(spliceTx_bc2.txid) == Set(scid_bc2)) + assert(routerData.spentChannels(batchSpliceTx.txid) == Set(scid_ab, scid_bc, scid_bc2)) + assert(routerData.spentChannels(batchSpliceTx_RBF.txid) == Set(scid_ab, scid_bc, scid_bc2)) + } + }) + + // The splice of channel ab is announced, verified and added to the graph; the parent channel is removed from the graph. + val spliceScid_ab = RealShortChannelId(BlockHeight(450000), 1, 0) + val spliceAnn_ab = channelAnnouncement(spliceScid_ab, priv_a, priv_b, priv_funding_a, priv_funding_b) + processSpliceChannelAnnouncement(fixture, scid_ab, spliceAnn_ab, batchSpliceTx_RBF, newCapacity_ab_RBF) + assert(watcher.expectMsgType[UnwatchTxConfirmed].txId == spliceTx_ab.txid) + + // The router still tracks the possible spending txs for channels bc and bc2. + awaitAssert({ + sender.send(router, GetRouterData) + inside(sender.expectMsgType[Data]) { routerData => + assert(!routerData.spentChannels.contains(spliceTx_ab.txid)) + assert(routerData.spentChannels.contains(spliceTx_bc.txid)) + assert(routerData.spentChannels.contains(spliceTx_bc2.txid)) + assert(routerData.spentChannels(spliceTx_bc.txid) == Set(scid_bc)) + assert(routerData.spentChannels(batchSpliceTx.txid) == Set(scid_bc, scid_bc2)) + assert(routerData.spentChannels(batchSpliceTx_RBF.txid) == Set(scid_bc, scid_bc2)) + } + }) + + // The splice of channel bc is announced, verified and added to the graph; the parent channel is removed from the graph. + val spliceScid_bc = RealShortChannelId(BlockHeight(450000), 1, 1) + val spliceAnn_bc = channelAnnouncement(spliceScid_bc, priv_b, priv_c, priv_funding_b, priv_funding_c) + processSpliceChannelAnnouncement(fixture, scid_bc, spliceAnn_bc, batchSpliceTx_RBF, newCapacity_bc) + assert(watcher.expectMsgType[UnwatchTxConfirmed].txId == spliceTx_bc.txid) + + // The router still tracks the possible spending txs for channel bc or bc2 - either could be considered the parent of scid_bc. + awaitAssert({ + sender.send(router, GetRouterData) + inside(sender.expectMsgType[Data]) { routerData => + if (routerData.spentChannels.contains(spliceTx_bc.txid)) { + assert(!routerData.spentChannels.contains(spliceTx_bc2.txid)) + assert(routerData.spentChannels(spliceTx_bc.txid) == Set(scid_bc)) + assert(routerData.spentChannels(batchSpliceTx.txid) == Set(scid_bc)) + assert(routerData.spentChannels(batchSpliceTx_RBF.txid) == Set(scid_bc)) + } else { + assert(routerData.spentChannels.contains(spliceTx_bc2.txid)) + assert(routerData.spentChannels(spliceTx_bc2.txid) == Set(scid_bc2)) + assert(routerData.spentChannels(batchSpliceTx.txid) == Set(scid_bc2)) + assert(routerData.spentChannels(batchSpliceTx_RBF.txid) == Set(scid_bc2)) + } + } + }) + + // Splice channel updates received for ab and bc add new channels to and remove the parent channels from the graph. + eventListener.expectMsg(ChannelsDiscovered(SingleChannelDiscovered(spliceAnn_ab, newCapacity_ab_RBF, None, None) :: Nil)) + eventListener.expectMsg(ChannelLost(scid_ab)) + eventListener.expectMsg(ChannelsDiscovered(SingleChannelDiscovered(spliceAnn_bc, newCapacity_bc, None, None) :: Nil)) + eventListener.expectMsg(ChannelLost(scid_bc)) + + // No splice channel update for channel bc2 was received before the batch splice tx confirms so channel bc2 is removed. + sender.send(router, GetRouterData) + val fundingTxId_bc2 = sender.expectMsgType[Data].channels(scid_bc2).fundingTxId + router ! WatchTxConfirmedTriggered(BlockHeight(0), 0, batchSpliceTx_RBF) + assert(watcher.expectMsgType[UnwatchExternalChannelSpent].txId == fundingTxId_bc2) + eventListener.expectMsg(ChannelLost(scid_bc2)) + eventListener.expectNoMessage(100 millis) + + // Alternative spending transactions in the mempool are now unspendable and need not be watched. + val unwatchedTxs = Set( + watcher.expectMsgType[UnwatchTxConfirmed].txId, + watcher.expectMsgType[UnwatchTxConfirmed].txId, + ) + assert(unwatchedTxs == Set(spliceTx_bc2.txid, batchSpliceTx.txid)) + watcher.expectNoMessage(100 millis) + + // The router no longer tracks the parent scids. + awaitAssert({ + sender.send(router, GetRouterData) + inside(sender.expectMsgType[Data]) { routerData => + assert(routerData.spentChannels.isEmpty) + assert(!routerData.channels.contains(scid_ab)) + assert(!routerData.channels.contains(scid_bc)) + assert(!routerData.channels.contains(scid_bc2)) + assert(routerData.channels.contains(spliceScid_ab)) + assert(routerData.channels.contains(spliceScid_bc)) + } + }) + } + }