Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add router support for batched splices #2989

Merged
merged 5 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,11 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
val fundingTxId = d.channels.get(shortChannelId).orElse(d.prunedChannels.get(shortChannelId)).get.fundingTxId
log.info("funding tx txId={} of channelId={} has been spent by txId={}: waiting for the spending tx to have enough confirmations before removing the channel from the graph", fundingTxId, shortChannelId, spendingTx.txid)
watcher ! WatchTxConfirmed(self, spendingTx.txid, ANNOUNCEMENTS_MINCONF * 2)
stay() using d.copy(spentChannels = d.spentChannels + (spendingTx.txid -> shortChannelId))
stay() using d.copy(spentChannels = d.spentChannels.updated(spendingTx.txid, d.spentChannels.getOrElse(spendingTx.txid, Set.empty) + shortChannelId))

case Event(WatchTxConfirmedTriggered(_, _, spendingTx), d) =>
d.spentChannels.get(spendingTx.txid) match {
case Some(shortChannelId) => stay() using Validation.handleChannelSpent(d, watcher, nodeParams.db.network, spendingTx.txid, shortChannelId)
case Some(shortChannelIds) => stay() using Validation.handleChannelSpent(d, watcher, nodeParams.db.network, spendingTx.txid, shortChannelIds)
case None => stay()
}

Expand Down Expand Up @@ -771,7 +771,7 @@ object Router {
excludedChannels: Map[ChannelDesc, ExcludedChannelStatus], // those channels are temporarily excluded from route calculation, because their node returned a TemporaryChannelFailure
graphWithBalances: GraphWithBalanceEstimates,
sync: Map[PublicKey, Syncing], // keep tracks of channel range queries sent to each peer. If there is an entry in the map, it means that there is an ongoing query for which we have not yet received an 'end' message
spentChannels: Map[TxId, RealShortChannelId], // transactions that spend funding txs that are not yet deeply buried
spentChannels: Map[TxId, Set[RealShortChannelId]], // transactions that spend funding txs that are not yet deeply buried
) {
def resolve(scid: ShortChannelId): Option[KnownChannel] = {
// let's assume this is a real scid
Expand Down
82 changes: 49 additions & 33 deletions eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -113,18 +113,16 @@ object Validation {
log.debug("validation successful for shortChannelId={}", c.shortChannelId)
remoteOrigins.foreach(o => sendDecision(o.peerConnection, GossipDecision.Accepted(c)))
val capacity = tx.txOut(outputIndex).amount
d0.spentChannels.get(tx.txid) match {
case Some(parentScid) =>
d0.channels.get(parentScid) match {
case Some(parentChannel) =>
Some(updateSplicedPublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, parentChannel))
case None =>
log.error("spent parent channel shortChannelId={} not found for splice shortChannelId={}", parentScid, c.shortChannelId)
val spendingTxs = d0.spentChannels.filter(_._2 == parentScid).keySet
spendingTxs.foreach(txId => watcher ! UnwatchTxConfirmed(txId))
val d1 = d0.copy(spentChannels = d0.spentChannels -- spendingTxs)
Some(addPublicChannel(d1, nodeParams, watcher, c, tx.txid, capacity, None))
}
// A single transaction may splice multiple channels (batching), in which case we have multiple parent
// channels. We cannot know which parent channel this announcement corresponds to, but it doesn't matter.
// We only need to update one of the parent channels between the same set of nodes to correctly update
// our graph.
val parentChannel_opt = d0.spentChannels
.getOrElse(tx.txid, Set.empty)
.flatMap(d0.channels.get)
.find(parent => parent.nodeId1 == c.nodeId1 && parent.nodeId2 == c.nodeId2)
parentChannel_opt match {
case Some(parentChannel) => Some(updateSplicedPublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, parentChannel))
case None => Some(addPublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, None))
}
}
Expand Down Expand Up @@ -191,9 +189,12 @@ object Validation {
// we need to update the graph because the edge identifiers and capacity change from the parent scid to the new splice scid
log.debug("updating the graph for shortChannelId={}", newPubChan.shortChannelId)
val graph1 = d.graphWithBalances.updateChannel(ChannelDesc(parentChannel.shortChannelId, parentChannel.nodeId1, parentChannel.nodeId2), ann.shortChannelId, capacity)
val spendingTxs = d.spentChannels.filter(_._2 == parentChannel.shortChannelId).keySet
spendingTxs.foreach(txId => watcher ! UnwatchTxConfirmed(txId))
val spentChannels1 = d.spentChannels -- spendingTxs
val spentChannels1 = d.spentChannels.collect {
case (txId, parentScids) if (parentScids - parentChannel.shortChannelId).nonEmpty =>
txId -> (parentScids - parentChannel.shortChannelId)
}
// No need to keep watching transactions that have been removed from spentChannels.
(d.spentChannels.keySet -- spentChannels1.keys).foreach(txId => watcher ! UnwatchTxConfirmed(txId))
d.copy(
// we also add the splice scid -> channelId and remove the parent scid -> channelId mappings
channels = d.channels + (newPubChan.shortChannelId -> newPubChan) - parentChannel.shortChannelId,
Expand Down Expand Up @@ -267,34 +268,42 @@ object Validation {
} else d1
}

def handleChannelSpent(d: Data, watcher: typed.ActorRef[ZmqWatcher.Command], db: NetworkDb, spendingTxId: TxId, shortChannelId: RealShortChannelId)(implicit ctx: ActorContext, log: LoggingAdapter): Data = {
def handleChannelSpent(d: Data, watcher: typed.ActorRef[ZmqWatcher.Command], db: NetworkDb, spendingTxId: TxId, shortChannelIds: Set[RealShortChannelId])(implicit ctx: ActorContext, log: LoggingAdapter): Data = {
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors
val lostChannel = d.channels.get(shortChannelId).orElse(d.prunedChannels.get(shortChannelId)).get
log.info("funding tx for channelId={} was spent", shortChannelId)
val lostChannels = shortChannelIds.flatMap(shortChannelId => d.channels.get(shortChannelId).orElse(d.prunedChannels.get(shortChannelId)))
log.info("funding tx for channelIds={} was spent", shortChannelIds.mkString(","))
// we need to remove nodes that aren't tied to any channels anymore
val channels1 = d.channels - shortChannelId
val prunedChannels1 = d.prunedChannels - shortChannelId
val lostNodes = Seq(lostChannel.nodeId1, lostChannel.nodeId2).filterNot(nodeId => hasChannels(nodeId, channels1.values))
val channels1 = d.channels -- shortChannelIds
val prunedChannels1 = d.prunedChannels -- shortChannelIds
val lostNodes = lostChannels.flatMap(lostChannel => Seq(lostChannel.nodeId1, lostChannel.nodeId2).filterNot(nodeId => hasChannels(nodeId, channels1.values)))
// let's clean the db and send the events
log.info("pruning shortChannelId={} (spent)", shortChannelId)
db.removeChannel(shortChannelId) // NB: this also removes channel updates
log.info("pruning shortChannelIds={} (spent)", shortChannelIds.mkString(","))
shortChannelIds.foreach(db.removeChannel(_)) // NB: this also removes channel updates
// we also need to remove updates from the graph
val graphWithBalances1 = d.graphWithBalances
.removeChannel(ChannelDesc(lostChannel.shortChannelId, lostChannel.nodeId1, lostChannel.nodeId2))
val graphWithBalances1 = lostChannels.foldLeft(d.graphWithBalances) { (graph, lostChannel) =>
graph.removeChannel(ChannelDesc(lostChannel.shortChannelId, lostChannel.nodeId1, lostChannel.nodeId2))
}
// we notify front nodes
ctx.system.eventStream.publish(ChannelLost(shortChannelId))
shortChannelIds.foreach(shortChannelId => ctx.system.eventStream.publish(ChannelLost(shortChannelId)))
lostNodes.foreach {
nodeId =>
log.info("pruning nodeId={} (spent)", nodeId)
db.removeNode(nodeId)
ctx.system.eventStream.publish(NodeLost(nodeId))
}
// we no longer need to track this or alternative transactions that spent the parent channel
// either this channel was really closed, or it was spliced and the announcement was not received in time
// we will re-add a spliced channel as a new channel later when we receive the announcement
watcher ! UnwatchExternalChannelSpent(lostChannel.fundingTxId, outputIndex(lostChannel.ann.shortChannelId))
val spendingTxs = d.spentChannels.filter(_._2 == shortChannelId).keySet
// stop watching the spending txs that will never confirm, we already got confirmations for this spending tx
lostChannels.foreach {
lostChannel =>
// we no longer need to track this or alternative transactions that spent the parent channel
// either this channel was really closed, or it was spliced and the announcement was not received in time
// we will re-add a spliced channel as a new channel later when we receive the announcement
watcher ! UnwatchExternalChannelSpent(lostChannel.fundingTxId, outputIndex(lostChannel.ann.shortChannelId))
}

// We may have received RBF candidates for this splice: we can find them by looking at transactions that spend one
// of the channels we're removing (note that they may spend a slightly different set of channels).
// Those transactions cannot confirm anymore (they have been double-spent by the current one), so we should stop
// watching them.
val spendingTxs = d.spentChannels.filter(_._2.intersect(shortChannelIds).nonEmpty).keySet
(spendingTxs - spendingTxId).foreach(txId => watcher ! UnwatchTxConfirmed(txId))
val spentChannels1 = d.spentChannels -- spendingTxs
d.copy(nodes = d.nodes -- lostNodes, channels = channels1, prunedChannels = prunedChannels1, graphWithBalances = graphWithBalances1, spentChannels = spentChannels1)
Expand Down Expand Up @@ -599,7 +608,14 @@ object Validation {
log.debug("this is a known pruned local channel, processing channel_update for channelId={} scid={}", lcu.channelId, ann.shortChannelId)
handleChannelUpdate(d, db, nodeParams.currentBlockHeight, Left(lcu))
case Some(ann) =>
val d1 = d.spentChannels.get(ann.fundingTxId).flatMap(parentScid => d.channels.get(parentScid)) match {
// A single transaction may splice multiple channels (batching), in which case we have multiple parent
// channels. We cannot know which parent channel this announcement corresponds to, but it doesn't matter.
// We only need to update one of the parent channels between the same set of nodes to correctly update
// our graph.
val d1 = d.spentChannels
.getOrElse(ann.fundingTxId, Set.empty)
.flatMap(d.channels.get)
.find(parent => parent.nodeId1 == ann.announcement.nodeId1 && parent.nodeId2 == ann.announcement.nodeId2) match {
case Some(parentChannel) =>
// This is a splice for which we haven't processed the (local) channel_announcement yet.
log.debug("processing channel_announcement for local splice with fundingTxId={} channelId={} scid={} (previous={})", ann.fundingTxId, lcu.channelId, ann.shortChannelId, parentChannel.shortChannelId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,19 @@ class GossipIntegrationSpec extends FixtureSpec with IntegrationPatience {
val channels = getPeerChannels(alice, bob.nodeId) ++ getPeerChannels(bob, carol.nodeId)
assert(channels.map(_.data.channelId).toSet == Set(channelId_ab, channelId_bc))

eventually {
val scid_ab = eventually {
assert(getChannelData(alice, channelId_ab).asInstanceOf[DATA_NORMAL].commitments.latest.localFundingStatus.isInstanceOf[LocalFundingStatus.ConfirmedFundingTx])
assert(getChannelData(bob, channelId_bc).asInstanceOf[DATA_NORMAL].commitments.latest.localFundingStatus.isInstanceOf[LocalFundingStatus.ConfirmedFundingTx])
val scid_ab = getChannelData(alice, channelId_ab).asInstanceOf[DATA_NORMAL].commitments.latest.shortChannelId_opt.get
// Wait for Alice to receive both initial local channel updates.
inside(getRouterData(alice)) { routerData =>
val channel_ab = routerData.channels(scid_ab)
Seq(channel_ab.update_1_opt, channel_ab.update_2_opt).flatten.foreach(u => assert(u.shortChannelId == scid_ab))
t-bast marked this conversation as resolved.
Show resolved Hide resolved
}
scid_ab
}

// We splice in to increase the capacity of the alice->bob channel.
val scid_ab = getChannelData(alice, channelId_ab).asInstanceOf[DATA_NORMAL].commitments.latest.shortChannelId_opt.get
val spliceTxId = spliceIn(alice, channelId_ab, 100_000 sat, None).asInstanceOf[RES_SPLICE].fundingTxId

// The announcement for the splice transaction and the corresponding channel updates are broadcast.
Expand All @@ -73,10 +79,12 @@ class GossipIntegrationSpec extends FixtureSpec with IntegrationPatience {
val splice_scid_ab = channelData_alice.commitments.latest.shortChannelId_opt.get
assert(splice_scid_ab != scid_ab)
assert(channelData_bob.commitments.latest.shortChannelId_opt.contains(splice_scid_ab))
val scid_bc = getChannelData(bob, channelId_bc).asInstanceOf[DATA_NORMAL].commitments.latest.shortChannelId_opt.get

// Alice creates a channel_announcement for the splice transaction and updates the graph.
val spliceAnn = inside(getRouterData(alice)) { routerData =>
assert(routerData.channels.contains(splice_scid_ab))
assert(routerData.channels.keys == Set(splice_scid_ab, scid_bc))
assert(routerData.spentChannels.isEmpty)
val channel_ab = routerData.channels(splice_scid_ab)
assert(channel_ab.capacity == 200_000.sat)
assert(channel_ab.update_1_opt.nonEmpty && channel_ab.update_2_opt.nonEmpty)
Expand All @@ -91,6 +99,8 @@ class GossipIntegrationSpec extends FixtureSpec with IntegrationPatience {

// Bob also creates a channel_announcement for the splice transaction and updates the graph.
inside(getRouterData(bob)) { routerData =>
assert(routerData.channels.keys == Set(splice_scid_ab, scid_bc))
assert(routerData.spentChannels.isEmpty)
assert(routerData.channels.get(splice_scid_ab).map(_.ann).contains(spliceAnn))
routerData.channels.get(splice_scid_ab).foreach(c => {
assert(c.capacity == 200_000.sat)
Expand All @@ -106,6 +116,8 @@ class GossipIntegrationSpec extends FixtureSpec with IntegrationPatience {

// The channel_announcement for the splice propagates to Carol.
inside(getRouterData(carol)) { routerData =>
assert(routerData.channels.keys == Set(splice_scid_ab, scid_bc))
assert(routerData.spentChannels.isEmpty)
assert(routerData.channels.get(splice_scid_ab).map(_.ann).contains(spliceAnn))
routerData.channels.get(splice_scid_ab).foreach(c => {
assert(c.capacity == 200_000.sat)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,44 @@ abstract class BaseRouterSpec extends TestKitBaseClass with FixtureAnyFunSuiteLi
}
}

def addChannel(router: ActorRef, watcher: TestProbe, scid: RealShortChannelId, priv1: PrivateKey, priv2: PrivateKey, priv_funding1: PrivateKey, priv_funding2: PrivateKey): (ChannelAnnouncement, ChannelUpdate, ChannelUpdate) = {
val ann = channelAnnouncement(scid, priv1, priv2, priv_funding1, priv_funding2)
val pub1 = priv1.publicKey
val pub2 = priv2.publicKey
val update1 = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv1, pub2, scid, CltvExpiryDelta(7), htlcMinimumMsat = 0 msat, feeBaseMsat = 10 msat, feeProportionalMillionths = 10, htlcMaximumMsat = htlcMaximum)
val update2 = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv2, pub1, scid, CltvExpiryDelta(7), htlcMinimumMsat = 0 msat, feeBaseMsat = 10 msat, feeProportionalMillionths = 10, htlcMaximumMsat = htlcMaximum)
val pub_funding1 = priv_funding1.publicKey
val pub_funding2 = priv_funding2.publicKey
assert(ChannelDesc(update1, ann) == ChannelDesc(ann.shortChannelId, pub1, pub2))
val sender1 = TestProbe()
val peerConnection = TestProbe()
peerConnection.ignoreMsg { case _: TransportHandler.ReadAck => true }
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, ann))
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, update1))
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, update2))
assert(watcher.expectMsgType[ValidateRequest].ann == ann)
watcher.send(router, ValidateResult(ann, Right((Transaction(version = 2, txIn = Nil, txOut = TxOut(publicChannelCapacity, write(pay2wsh(Scripts.multiSig2of2(pub_funding1, pub_funding2)))) :: Nil, lockTime = 0), UtxoStatus.Unspent))))
assert(watcher.expectMsgType[WatchExternalChannelSpent].shortChannelId == scid)
peerConnection.expectMsgAllOf(
GossipDecision.Accepted(ann),
GossipDecision.Accepted(update1),
GossipDecision.Accepted(update2)
)
peerConnection.expectNoMessage(100 millis)
awaitCond({
sender1.send(router, GetNodes)
val nodes = sender1.expectMsgType[Iterable[NodeAnnouncement]]
sender1.send(router, GetChannels)
val channels = sender1.expectMsgType[Iterable[ChannelAnnouncement]].toSeq
sender1.send(router, GetChannelUpdates)
val updates = sender1.expectMsgType[Iterable[ChannelUpdate]].toSeq
nodes.exists(_.nodeId == pub1) && nodes.exists(_.nodeId == pub2) &&
channels.contains(ann) &&
updates.contains(update1) && updates.contains(update2)
}, max = 10 seconds, interval = 1 second)
(ann, update1, update2)
}

}

object BaseRouterSpec {
Expand Down
Loading