Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only sync with top peers #2983

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion docs/release-notes/eclair-vnext.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ There are many organisations that package Java runtimes and development kits, fo

### Miscellaneous improvements and bug fixes

<insert changes>
#### Gossip sync limits

On reconnection, eclair now only synchronizes its routing table with a small number of top peers instead of synchronizing with every peer.
t-bast marked this conversation as resolved.
Show resolved Hide resolved
If you already use `sync-whitelist`, the default behavior has been modified and you must set `router.sync.peer-limit = 0` to keep preventing any synchronization with other nodes.
You must also use `router.sync.whitelist` instead of `sync-whitelist`.

## Verifying signatures

Expand Down
3 changes: 2 additions & 1 deletion eclair-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ eclair {
# features { }
# }
]
sync-whitelist = [] // a list of public keys; if non-empty, we will only do the initial sync with those peers

channel {
channel-flags {
Expand Down Expand Up @@ -411,6 +410,8 @@ eclair {
request-node-announcements = true // if true we will ask for node announcements when we receive channel ids that we don't know
channel-range-chunk-size = 1500 // max number of short_channel_ids (+ timestamps + checksums) in reply_channel_range *do not change this unless you know what you are doing*
channel-query-chunk-size = 100 // max number of short_channel_ids in query_short_channel_ids *do not change this unless you know what you are doing*
peer-limit = 5 // number of peers to do the initial sync with. We limit the initial sync to the peers that have the largest capacity with us when starting the node.
whitelist = [] // a list of public keys to do the initial sync with, in addition to the top peers by capacity
}

message-path-finding {
Expand Down
19 changes: 10 additions & 9 deletions eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import fr.acinq.eclair.payment.relay.Relayer.{AsyncPaymentsParams, RelayFees, Re
import fr.acinq.eclair.router.Announcements.AddressException
import fr.acinq.eclair.router.Graph.{HeuristicsConstants, PaymentWeightRatios}
import fr.acinq.eclair.router.Router._
import fr.acinq.eclair.router.{Graph, PathFindingExperimentConf}
import fr.acinq.eclair.router.{Graph, PathFindingExperimentConf, Router}
import fr.acinq.eclair.tor.Socks5ProxyParams
import fr.acinq.eclair.transactions.Transactions
import fr.acinq.eclair.wire.protocol._
Expand Down Expand Up @@ -66,7 +66,6 @@ case class NodeParams(nodeKeyManager: NodeKeyManager,
torAddress_opt: Option[NodeAddress],
features: Features[Feature],
private val overrideInitFeatures: Map[PublicKey, Features[InitFeature]],
syncWhitelist: Set[PublicKey],
pluginParams: Seq[PluginParams],
channelConf: ChannelConf,
onChainFeeConf: OnChainFeeConf,
Expand Down Expand Up @@ -319,6 +318,7 @@ object NodeParams extends Logging {
"on-chain-fees.target-blocks" -> "on-chain-fees.confirmation-priority",
// v0.12.0
"channel.mindepth-blocks" -> "channel.min-depth-funding-blocks",
"sync-whitelist" -> "router.sync.whitelist",
)
deprecatedKeyPaths.foreach {
case (old, new_) => require(!config.hasPath(old), s"configuration key '$old' has been replaced by '$new_'")
Expand Down Expand Up @@ -413,8 +413,6 @@ object NodeParams extends Logging {
p -> (f.copy(unknown = f.unknown ++ pluginMessageParams.map(_.pluginFeature)): Features[InitFeature])
}.toMap

val syncWhitelist: Set[PublicKey] = config.getStringList("sync-whitelist").asScala.map(s => PublicKey(ByteVector.fromValidHex(s))).toSet

val socksProxy_opt = parseSocks5ProxyParams(config)

val publicTorAddress_opt = if (config.getBoolean("tor.publish-onion-address")) torAddress_opt else None
Expand Down Expand Up @@ -561,7 +559,6 @@ object NodeParams extends Logging {
features = coreAndPluginFeatures,
pluginParams = pluginParams,
overrideInitFeatures = overrideInitFeatures,
syncWhitelist = syncWhitelist,
channelConf = ChannelConf(
channelFlags = channelFlags,
dustLimit = dustLimitSatoshis,
Expand Down Expand Up @@ -659,10 +656,14 @@ object NodeParams extends Logging {
watchSpentWindow = watchSpentWindow,
channelExcludeDuration = FiniteDuration(config.getDuration("router.channel-exclude-duration").getSeconds, TimeUnit.SECONDS),
routerBroadcastInterval = FiniteDuration(config.getDuration("router.broadcast-interval").getSeconds, TimeUnit.SECONDS),
requestNodeAnnouncements = config.getBoolean("router.sync.request-node-announcements"),
encodingType = EncodingType.UNCOMPRESSED,
channelRangeChunkSize = config.getInt("router.sync.channel-range-chunk-size"),
channelQueryChunkSize = config.getInt("router.sync.channel-query-chunk-size"),
syncConf = Router.SyncConf(
requestNodeAnnouncements = config.getBoolean("router.sync.request-node-announcements"),
encodingType = EncodingType.UNCOMPRESSED,
channelRangeChunkSize = config.getInt("router.sync.channel-range-chunk-size"),
channelQueryChunkSize = config.getInt("router.sync.channel-query-chunk-size"),
peerLimit = config.getInt("router.sync.peer-limit"),
whitelist = config.getStringList("router.sync.whitelist").asScala.map(s => PublicKey(ByteVector.fromValidHex(s))).toSet
),
pathFindingExperimentConf = getPathFindingExperimentConf(config.getConfig("router.path-finding.experiments")),
messageRouteParams = getMessageRouteParams(config.getConfig("router.message-path-finding")),
balanceEstimateHalfLife = FiniteDuration(config.getDuration("router.balance-estimate-half-life").getSeconds, TimeUnit.SECONDS),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ private class IncomingConnectionsTracker(nodeParams: NodeParams, switchboard: Ac
Metrics.IncomingConnectionsNoChannels.withoutTags().update(incomingConnections.size)
Behaviors.receiveMessage {
case TrackIncomingConnection(remoteNodeId) =>
if (nodeParams.syncWhitelist.contains(remoteNodeId)) {
if (nodeParams.routerConf.syncConf.whitelist.contains(remoteNodeId)) {
Behaviors.same
} else {
if (incomingConnections.size >= nodeParams.peerConnectionConf.maxNoChannels) {
Expand Down
20 changes: 13 additions & 7 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import akka.actor.typed.receptionist.{Receptionist, ServiceKey}
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter.{ClassicActorContextOps, ClassicActorRefOps, ClassicActorSystemOps, TypedActorRefOps}
import akka.actor.{Actor, ActorContext, ActorLogging, ActorRef, OneForOneStrategy, Props, Stash, Status, SupervisorStrategy, typed}
import fr.acinq.bitcoin.scalacompat.ByteVector32
import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi}
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.eclair.blockchain.OnchainPubkeyCache
import fr.acinq.eclair.channel.Helpers.Closing
Expand Down Expand Up @@ -67,14 +67,19 @@ class Switchboard(nodeParams: NodeParams, peerFactory: Switchboard.PeerFactory)
(peersWithOnTheFlyFunding -- peersWithChannels.keySet).foreach {
case (remoteNodeId, pending) => createOrGetPeer(remoteNodeId, Set.empty, pending)
}
val peerCapacities = channels.map {
case channelData: ChannelDataWithoutCommitments => (channelData.remoteNodeId, 0L)
case channelData: ChannelDataWithCommitments => (channelData.remoteNodeId, channelData.commitments.capacity.toLong)
}.groupMapReduce[PublicKey, Long](_._1)(_._2)(_ + _)
val topCapacityPeers = peerCapacities.toSeq.sortBy(_._2).takeRight(nodeParams.routerConf.syncConf.peerLimit).map(_._1).toSet
log.info("restoring {} peer(s) with {} channel(s) and {} peers with pending on-the-fly funding", peersWithChannels.size, channels.size, (peersWithOnTheFlyFunding.keySet -- peersWithChannels.keySet).size)
unstashAll()
context.become(normal(peersWithChannels.keySet))
context.become(normal(peersWithChannels.keySet, topCapacityPeers))
case _ =>
stash()
}

def normal(peersWithChannels: Set[PublicKey]): Receive = {
def normal(peersWithChannels: Set[PublicKey], peersToSyncWith: Set[PublicKey]): Receive = {

case Peer.Connect(publicKey, _, _, _) if publicKey == nodeParams.nodeId =>
sender() ! Status.Failure(new RuntimeException("cannot open connection with oneself"))
Expand Down Expand Up @@ -110,16 +115,17 @@ class Switchboard(nodeParams: NodeParams, peerFactory: Switchboard.PeerFactory)
val peer = createOrGetPeer(authenticated.remoteNodeId, offlineChannels = Set.empty, pendingOnTheFlyFunding = Map.empty)
val features = nodeParams.initFeaturesFor(authenticated.remoteNodeId)
val hasChannels = peersWithChannels.contains(authenticated.remoteNodeId)
// if the peer is whitelisted, we sync with them, otherwise we only sync with peers with whom we have at least one channel
val doSync = nodeParams.syncWhitelist.contains(authenticated.remoteNodeId) || (nodeParams.syncWhitelist.isEmpty && hasChannels)
val doSync = peersToSyncWith.contains(authenticated.remoteNodeId) || nodeParams.routerConf.syncConf.whitelist.contains(authenticated.remoteNodeId)
authenticated.peerConnection ! PeerConnection.InitializeConnection(peer, nodeParams.chainHash, features, doSync, nodeParams.liquidityAdsConfig.rates_opt)
if (!hasChannels && !authenticated.outgoing) {
incomingConnectionsTracker ! TrackIncomingConnection(authenticated.remoteNodeId)
}

case ChannelIdAssigned(_, remoteNodeId, _, _) => context.become(normal(peersWithChannels + remoteNodeId))
case ChannelIdAssigned(_, remoteNodeId, _, _) =>
val peersToSyncWith1 = if (peersToSyncWith.size < nodeParams.routerConf.syncConf.peerLimit) peersToSyncWith + remoteNodeId else peersToSyncWith
context.become(normal(peersWithChannels + remoteNodeId, peersToSyncWith1))

case LastChannelClosed(_, remoteNodeId) => context.become(normal(peersWithChannels - remoteNodeId))
case LastChannelClosed(_, remoteNodeId) => context.become(normal(peersWithChannels - remoteNodeId, peersToSyncWith - remoteNodeId))

case GetPeers => sender() ! context.children.filterNot(_ == incomingConnectionsTracker.toClassic)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,21 @@ object EclairInternalsSerializer {
("ageFactor" | double) ::
("capacityFactor" | double)).as[Graph.MessageWeightRatios]).as[MessageRouteParams]

val routerConfCodec: Codec[RouterConf] = (
("watchSpentWindow" | finiteDurationCodec) ::
("channelExcludeDuration" | finiteDurationCodec) ::
("routerBroadcastInterval" | finiteDurationCodec) ::
("requestNodeAnnouncements" | bool(8)) ::
val syncConfCodec: Codec[Router.SyncConf] = (
("requestNodeAnnouncements" | bool(8)) ::
("encodingType" | discriminated[EncodingType].by(uint8)
.typecase(0, provide(EncodingType.UNCOMPRESSED))
.typecase(1, provide(EncodingType.COMPRESSED_ZLIB))) ::
("channelRangeChunkSize" | int32) ::
("channelQueryChunkSize" | int32) ::
("peerLimit" | int32) ::
("whitelist" | listOfN(uint16, publicKey).xmap[Set[PublicKey]](_.toSet, _.toList))).as[Router.SyncConf]

val routerConfCodec: Codec[RouterConf] = (
("watchSpentWindow" | finiteDurationCodec) ::
("channelExcludeDuration" | finiteDurationCodec) ::
("routerBroadcastInterval" | finiteDurationCodec) ::
("syncConf" | syncConfCodec) ::
("pathFindingExperimentConf" | pathFindingExperimentConfCodec) ::
("messageRouteParams" | messageRouteParamsCodec) ::
("balanceEstimateHalfLife" | finiteDurationCodec)).as[RouterConf]
Expand Down
20 changes: 12 additions & 8 deletions eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala
Original file line number Diff line number Diff line change
Expand Up @@ -368,19 +368,23 @@ object Router {
)
}

case class SyncConf(requestNodeAnnouncements: Boolean,
encodingType: EncodingType,
channelRangeChunkSize: Int,
channelQueryChunkSize: Int,
peerLimit: Int,
whitelist: Set[PublicKey]) {
require(channelRangeChunkSize <= Sync.MAXIMUM_CHUNK_SIZE, "channel range chunk size exceeds the size of a lightning message")
require(channelQueryChunkSize <= Sync.MAXIMUM_CHUNK_SIZE, "channel query chunk size exceeds the size of a lightning message")
}

case class RouterConf(watchSpentWindow: FiniteDuration,
channelExcludeDuration: FiniteDuration,
routerBroadcastInterval: FiniteDuration,
requestNodeAnnouncements: Boolean,
encodingType: EncodingType,
channelRangeChunkSize: Int,
channelQueryChunkSize: Int,
syncConf: SyncConf,
pathFindingExperimentConf: PathFindingExperimentConf,
messageRouteParams: MessageRouteParams,
balanceEstimateHalfLife: FiniteDuration) {
require(channelRangeChunkSize <= Sync.MAXIMUM_CHUNK_SIZE, "channel range chunk size exceeds the size of a lightning message")
require(channelQueryChunkSize <= Sync.MAXIMUM_CHUNK_SIZE, "channel query chunk size exceeds the size of a lightning message")
}
balanceEstimateHalfLife: FiniteDuration)

// @formatter:off
case class ChannelDesc private(shortChannelId: ShortChannelId, a: PublicKey, b: PublicKey){
Expand Down
8 changes: 4 additions & 4 deletions eclair-core/src/main/scala/fr/acinq/eclair/router/Sync.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ object Sync {
// keep channel ids that are in [firstBlockNum, firstBlockNum + numberOfBlocks]
val shortChannelIds: SortedSet[RealShortChannelId] = channels.keySet.filter(keep(q.firstBlock, q.numberOfBlocks, _))
log.info("replying with {} items for range=({}, {})", shortChannelIds.size, q.firstBlock, q.numberOfBlocks)
val chunks = split(shortChannelIds, q.firstBlock, q.numberOfBlocks, routerConf.channelRangeChunkSize)
val chunks = split(shortChannelIds, q.firstBlock, q.numberOfBlocks, routerConf.syncConf.channelRangeChunkSize)
Metrics.QueryChannelRange.Replies.withoutTags().record(chunks.size)
chunks.zipWithIndex.foreach { case (chunk, i) =>
val syncComplete = i == chunks.size - 1
val reply = buildReplyChannelRange(chunk, syncComplete, q.chainHash, routerConf.encodingType, q.queryFlags_opt, channels)
val reply = buildReplyChannelRange(chunk, syncComplete, q.chainHash, routerConf.syncConf.encodingType, q.queryFlags_opt, channels)
origin.peerConnection ! reply
Metrics.ReplyChannelRange.Blocks.withTag(Tags.Direction, Tags.Directions.Outgoing).record(reply.numberOfBlocks)
Metrics.ReplyChannelRange.ShortChannelIds.withTag(Tags.Direction, Tags.Directions.Outgoing).record(reply.shortChannelIds.array.size)
Expand Down Expand Up @@ -111,7 +111,7 @@ object Sync {
ids match {
case Nil => acc.reverse
case head :: tail =>
val flag = computeFlag(d.channels)(head, timestamps.headOption, checksums.headOption, routerConf.requestNodeAnnouncements)
val flag = computeFlag(d.channels)(head, timestamps.headOption, checksums.headOption, routerConf.syncConf.requestNodeAnnouncements)
// 0 means nothing to query, just don't include it
val acc1 = if (flag != 0) ShortChannelIdAndFlag(head, flag) :: acc else acc
loop(tail, timestamps.drop(1), checksums.drop(1), acc1)
Expand Down Expand Up @@ -144,7 +144,7 @@ object Sync {

// we update our sync data to this node (there may be multiple channel range responses and we can only query one set of ids at a time)
val replies = shortChannelIdAndFlags
.grouped(routerConf.channelQueryChunkSize)
.grouped(routerConf.syncConf.channelQueryChunkSize)
.map(buildQuery)
.toList

Expand Down
28 changes: 17 additions & 11 deletions eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import fr.acinq.eclair.message.OnionMessages.OnionMessageConfig
import fr.acinq.eclair.payment.relay.OnTheFlyFunding
import fr.acinq.eclair.payment.relay.Relayer.{AsyncPaymentsParams, RelayFees, RelayParams}
import fr.acinq.eclair.router.Graph.{MessageWeightRatios, PaymentWeightRatios}
import fr.acinq.eclair.router.PathFindingExperimentConf
import fr.acinq.eclair.router.{PathFindingExperimentConf, Router}
import fr.acinq.eclair.router.Router._
import fr.acinq.eclair.wire.protocol._
import org.scalatest.Tag
Expand Down Expand Up @@ -114,7 +114,6 @@ object TestConstants {
),
pluginParams = List(pluginParams),
overrideInitFeatures = Map.empty,
syncWhitelist = Set.empty,
channelConf = ChannelConf(
dustLimit = 1100 sat,
maxRemoteDustLimit = 1500 sat,
Expand Down Expand Up @@ -197,10 +196,14 @@ object TestConstants {
watchSpentWindow = 1 second,
channelExcludeDuration = 60 seconds,
routerBroadcastInterval = 1 day, // "disables" rebroadcast
requestNodeAnnouncements = true,
encodingType = EncodingType.COMPRESSED_ZLIB,
channelRangeChunkSize = 20,
channelQueryChunkSize = 5,
syncConf = Router.SyncConf(
requestNodeAnnouncements = true,
encodingType = EncodingType.COMPRESSED_ZLIB,
channelRangeChunkSize = 20,
channelQueryChunkSize = 5,
peerLimit = 10,
whitelist = Set.empty
),
pathFindingExperimentConf = PathFindingExperimentConf(Map("alice-test-experiment" -> PathFindingConf(
randomize = false,
boundaries = SearchBoundaries(
Expand Down Expand Up @@ -293,7 +296,6 @@ object TestConstants {
),
pluginParams = Nil,
overrideInitFeatures = Map.empty,
syncWhitelist = Set.empty,
channelConf = ChannelConf(
dustLimit = 1000 sat,
maxRemoteDustLimit = 1500 sat,
Expand Down Expand Up @@ -376,10 +378,14 @@ object TestConstants {
watchSpentWindow = 1 second,
channelExcludeDuration = 60 seconds,
routerBroadcastInterval = 1 day, // "disables" rebroadcast
requestNodeAnnouncements = true,
encodingType = EncodingType.UNCOMPRESSED,
channelRangeChunkSize = 20,
channelQueryChunkSize = 5,
syncConf = Router.SyncConf(
requestNodeAnnouncements = true,
encodingType = EncodingType.UNCOMPRESSED,
channelRangeChunkSize = 20,
channelQueryChunkSize = 5,
peerLimit = 20,
whitelist = Set.empty
),
pathFindingExperimentConf = PathFindingExperimentConf(Map("bob-test-experiment" -> PathFindingConf(
randomize = false,
boundaries = SearchBoundaries(
Expand Down
Loading