From ec9b54fcbd398d73af6958fc6cf02e809d8c616a Mon Sep 17 00:00:00 2001 From: Henrique Dias Date: Wed, 17 Apr 2024 14:39:42 +0200 Subject: [PATCH] feat: seed-based automatic peering --- go.mod | 1 + keys.go | 29 +++- main.go | 27 +++ setup.go | 470 +++++++++++++++++++++++++++++++------------------- setup_test.go | 75 +++++++- 5 files changed, 418 insertions(+), 184 deletions(-) diff --git a/go.mod b/go.mod index 1190e97..7f04635 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/libp2p/go-libp2p-kad-dht v0.25.2 github.com/libp2p/go-libp2p-record v0.2.0 github.com/libp2p/go-libp2p-routing-helpers v0.7.3 + github.com/libp2p/go-libp2p-testing v0.12.0 github.com/mitchellh/go-server-timing v1.0.1 github.com/mr-tron/base58 v1.2.0 github.com/multiformats/go-multiaddr v0.12.3 diff --git a/keys.go b/keys.go index 31c1d6d..6cbf000 100644 --- a/keys.go +++ b/keys.go @@ -9,6 +9,7 @@ import ( "io" libp2p "github.com/libp2p/go-libp2p/core/crypto" + peer "github.com/libp2p/go-libp2p/core/peer" "github.com/mr-tron/base58" "golang.org/x/crypto/hkdf" ) @@ -25,7 +26,7 @@ func newSeed() (string, error) { return base58.Encode(bs), nil } -// derive derives libp2p keys from a b58-encoded seed. +// deriveKey derives libp2p keys from a b58-encoded seed. func deriveKey(b58secret string, info []byte) (libp2p.PrivKey, error) { secret, err := base58.Decode(b58secret) if err != nil { @@ -45,6 +46,32 @@ func deriveKey(b58secret string, info []byte) (libp2p.PrivKey, error) { return libp2p.UnmarshalEd25519PrivateKey(key) } +// derivePeerIDs derives the peer IDs of all the peers with the same seed up to +// maxIndex. Our peer ID (with index 'ourIndex') is not generated. +func derivePeerIDs(seed string, ourIndex int, maxIndex int) ([]peer.ID, error) { + peerIDs := []peer.ID{} + + for i := 0; i <= maxIndex; i++ { + if i == ourIndex { + continue + } + + peerPriv, err := deriveKey(seed, deriveKeyInfo(i)) + if err != nil { + return nil, err + } + + pid, err := peer.IDFromPrivateKey(peerPriv) + if err != nil { + return nil, err + } + + peerIDs = append(peerIDs, pid) + } + + return peerIDs, nil +} + func deriveKeyInfo(index int) []byte { return []byte(fmt.Sprintf("rainbow-%d", index)) } diff --git a/main.go b/main.go index 43318d6..0995c71 100644 --- a/main.go +++ b/main.go @@ -91,6 +91,29 @@ Generate an identity seed and launch a gateway: EnvVars: []string{"RAINBOW_SEED_INDEX"}, Usage: "Index to derivate the peerID (needs --seed)", }, + &cli.BoolFlag{ + Name: "seed-peering", + Value: false, + EnvVars: []string{"RAINBOW_SEED_PEERING"}, + Usage: "Automatic peering with peers with the same seed (requires --seed and --seed-index). Runs a separate light DHT for peer routing with the main host if --dht-routing or --dht-shared-host are disabled", + Action: func(ctx *cli.Context, b bool) error { + if !b { + return nil + } + + if !ctx.IsSet("seed") || !ctx.IsSet("seed-index") { + return errors.New("--seed and --seed-index must be explicitly defined when --seed-peering is enabled") + } + + return nil + }, + }, + &cli.IntFlag{ + Name: "seed-peering-max-index", + Value: 100, + EnvVars: []string{"RAINBOW_SEED_PEERING_MAX_INDEX"}, + Usage: "Largest index to derive automatic peering peer IDs for", + }, &cli.StringSliceFlag{ Name: "gateway-domains", Value: cli.NewStringSlice(), @@ -338,6 +361,10 @@ share the same seed as long as the indexes are different. DenylistSubs: cctx.StringSlice("denylists"), Peering: peeringAddrs, PeeringCache: cctx.Bool("peering-shared-cache"), + Seed: seed, + SeedIndex: index, + SeedPeering: cctx.Bool("seed-peering"), + SeedPeeringMaxIndex: cctx.Int("seed-peering-max-index"), GCInterval: cctx.Duration("gc-interval"), GCThreshold: cctx.Float64("gc-threshold"), } diff --git a/setup.go b/setup.go index 3ac52ea..74870bc 100644 --- a/setup.go +++ b/setup.go @@ -45,9 +45,11 @@ import ( "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/metrics" + "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/routing" "github.com/libp2p/go-libp2p/p2p/net/connmgr" + "github.com/libp2p/go-libp2p/p2p/protocol/identify" "github.com/multiformats/go-multiaddr" "go.opencensus.io/stats/view" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" @@ -69,23 +71,23 @@ const ( DHTOff DHTRouting = "off" ) +func init() { + // Lets us discover our own public address with a single observation + identify.ActivationThresh = 1 +} + type Node struct { vs routing.ValueStore host host.Host - dataDir string - datastore datastore.Batching - blockstore blockstore.Blockstore - bs *bitswap.Bitswap - bsrv blockservice.BlockService - resolver resolver.Resolver - - ns namesys.NameSystem - - bwc *metrics.BandwidthCounter - + dataDir string + datastore datastore.Batching + blockstore blockstore.Blockstore + bs *bitswap.Bitswap + bsrv blockservice.BlockService + resolver resolver.Resolver + ns namesys.NameSystem denylistSubs []*nopfs.HTTPSubscriber - blocker *nopfs.Blocker } type Config struct { @@ -115,6 +117,11 @@ type Config struct { Peering []peer.AddrInfo PeeringCache bool + Seed string + SeedIndex int + SeedPeering bool + SeedPeeringMaxIndex int + GCInterval time.Duration GCThreshold float64 } @@ -188,182 +195,27 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached ) blkst = blockstore.NewIdStore(blkst) - var router routing.Routing - - // Increase per-host connection pool since we are making lots of concurrent requests. - httpClient := &http.Client{ - Transport: otelhttp.NewTransport( - &routingv1client.ResponseBodyLimitedTransport{ - RoundTripper: &customTransport{ - // Roundtripper with increased defaults than http.Transport such that retrieving - // multiple lookups concurrently is fast. - RoundTripper: &http.Transport{ - MaxIdleConns: 1000, - MaxConnsPerHost: 100, - MaxIdleConnsPerHost: 100, - IdleConnTimeout: 90 * time.Second, - DialContext: dnsCache.dialWithCachedDNS, - ForceAttemptHTTP2: true, - }, - }, - LimitBytes: 1 << 20, - }), - } + var ( + cr routing.ContentRouting + pr routing.PeerRouting + vs routing.ValueStore + ) opts = append(opts, libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) { - var routingV1Routers []routing.Routing - for _, endpoint := range cfg.RoutingV1Endpoints { - rv1Opts := []routingv1client.Option{routingv1client.WithHTTPClient(httpClient)} - if endpoint != cidContactEndpoint { - rv1Opts = append(rv1Opts, routingv1client.WithStreamResultsRequired()) - } - httpClient, err := delegatedHTTPContentRouter(endpoint, rv1Opts...) - if err != nil { - return nil, err - } - routingV1Routers = append(routingV1Routers, httpClient) - } - - var dhtRouter routing.Routing - if cfg.DHTRouting != DHTOff { - var dhtHost host.Host - if cfg.DHTSharedHost { - dhtHost = h - } else { - dhtHost, err = libp2p.New( - libp2p.NoListenAddrs, - libp2p.BandwidthReporter(bwc), - libp2p.DefaultTransports, - libp2p.DefaultMuxers, - libp2p.UserAgent("rainbow/"+buildVersion()), - libp2p.ResourceManager(dhtRcMgr), - ) - if err != nil { - return nil, err - } - } - - standardClient, err := dht.New(ctx, dhtHost, - dht.Datastore(ds), - dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...), - dht.Mode(dht.ModeClient), - ) - if err != nil { - return nil, err - } - - if cfg.DHTRouting == DHTAccelerated { - fullRTClient, err := fullrt.NewFullRT(dhtHost, dht.DefaultPrefix, - fullrt.DHTOption( - dht.Validator(record.NamespacedValidator{ - "pk": record.PublicKeyValidator{}, - "ipns": ipns.Validator{KeyBook: h.Peerstore()}, - }), - dht.Datastore(ds), - dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...), - dht.BucketSize(20), - )) - if err != nil { - return nil, err - } - dhtRouter = &bundledDHT{ - standard: standardClient, - fullRT: fullRTClient, - } - } else { - dhtRouter = standardClient - } - } - - // Default router is no routing at all: can be especially useful during tests. - router = &routinghelpers.Null{} - - if len(routingV1Routers) == 0 && dhtRouter != nil { - router = dhtRouter - } else { - var routers []*routinghelpers.ParallelRouter - - if dhtRouter != nil { - routers = append(routers, &routinghelpers.ParallelRouter{ - Router: dhtRouter, - ExecuteAfter: 0, - DoNotWaitForSearchValue: true, - IgnoreError: false, - }) - } - - for _, routingV1Router := range routingV1Routers { - routers = append(routers, &routinghelpers.ParallelRouter{ - Timeout: 15 * time.Second, - Router: routingV1Router, - ExecuteAfter: 0, - DoNotWaitForSearchValue: true, - IgnoreError: true, - }) - } - - if len(routers) > 0 { - router = routinghelpers.NewComposableParallel(routers) - } - } - - return router, nil + cr, pr, vs, err = setupRouting(ctx, cfg, h, ds, dhtRcMgr, bwc, dnsCache) + return pr, err })) h, err := libp2p.New(opts...) if err != nil { return nil, err } - if len(cfg.Peering) > 0 { - ps := peering.NewPeeringService(h) - if err := ps.Start(); err != nil { - return nil, err - } - for _, a := range cfg.Peering { - ps.AddPeer(a) - } - } - - var ( - provideEnabled bool - peerBlockRequestFilter bsserver.PeerBlockRequestFilter - ) - if cfg.PeeringCache && len(cfg.Peering) > 0 { - peers := make(map[peer.ID]struct{}, len(cfg.Peering)) - for _, a := range cfg.Peering { - peers[a.ID] = struct{}{} - } - - provideEnabled = true - peerBlockRequestFilter = func(p peer.ID, c cid.Cid) bool { - _, ok := peers[p] - return ok - } - } else { - provideEnabled = false - peerBlockRequestFilter = func(p peer.ID, c cid.Cid) bool { - return false - } + err = setupPeering(cfg, h) + if err != nil { + return nil, err } - bsctx := metri.CtxScope(ctx, "ipfs_bitswap") - bn := bsnet.NewFromIpfsHost(h, router) - bswap := bitswap.New(bsctx, bn, blkst, - // --- Client Options - // default is 1 minute to search for a random live-want (1 - // CID). I think we want to search for random live-wants more - // often although probably it overlaps with general - // rebroadcasts. - bitswap.RebroadcastDelay(delay.Fixed(10*time.Second)), - // ProviderSearchDelay: default is 1 second. - bitswap.ProviderSearchDelay(time.Second), - bitswap.WithoutDuplicatedBlockStats(), - - // ---- Server Options - bitswap.WithPeerBlockRequestFilter(peerBlockRequestFilter), - bitswap.ProvideEnabled(provideEnabled), - ) - bn.Start(bswap) + bswap := setupBitswap(ctx, cfg, h, cr, blkst) err = os.Mkdir(filepath.Join(cfg.DataDir, "denylists"), 0755) if err != nil && !errors.Is(err, fs.ErrExist) { @@ -407,7 +259,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached if cfg.IpnsMaxCacheTTL > 0 { nsOptions = append(nsOptions, namesys.WithMaxCacheTTL(cfg.IpnsMaxCacheTTL)) } - ns, err := namesys.NewNameSystem(router, nsOptions...) + ns, err := namesys.NewNameSystem(vs, nsOptions...) if err != nil { return nil, err } @@ -426,11 +278,9 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached datastore: ds, bs: bswap, ns: ns, - vs: router, + vs: vs, bsrv: bsrv, resolver: r, - bwc: bwc, - blocker: blocker, denylistSubs: denylists, }, nil } @@ -491,6 +341,178 @@ func setupDatastore(cfg Config) (datastore.Batching, error) { } } +func setupDelegatedRouting(cfg Config, dnsCache *cachedDNS) ([]routing.Routing, error) { + // Increase per-host connection pool since we are making lots of concurrent requests. + httpClient := &http.Client{ + Transport: otelhttp.NewTransport( + &routingv1client.ResponseBodyLimitedTransport{ + RoundTripper: &customTransport{ + // Roundtripper with increased defaults than http.Transport such that retrieving + // multiple lookups concurrently is fast. + RoundTripper: &http.Transport{ + MaxIdleConns: 1000, + MaxConnsPerHost: 100, + MaxIdleConnsPerHost: 100, + IdleConnTimeout: 90 * time.Second, + DialContext: dnsCache.dialWithCachedDNS, + ForceAttemptHTTP2: true, + }, + }, + LimitBytes: 1 << 20, + }), + } + + var ( + delegatedRouters []routing.Routing + ) + + for _, endpoint := range cfg.RoutingV1Endpoints { + rv1Opts := []routingv1client.Option{routingv1client.WithHTTPClient(httpClient)} + if endpoint != cidContactEndpoint { + rv1Opts = append(rv1Opts, routingv1client.WithStreamResultsRequired()) + } + delegatedRouter, err := delegatedHTTPContentRouter(endpoint, rv1Opts...) + if err != nil { + return nil, err + } + delegatedRouters = append(delegatedRouters, delegatedRouter) + } + + return delegatedRouters, nil +} + +func setupDHTRouting(ctx context.Context, cfg Config, h host.Host, ds datastore.Batching, dhtRcMgr network.ResourceManager, bwc metrics.Reporter) (routing.Routing, error) { + if cfg.DHTRouting == DHTOff { + return nil, nil + } + + var err error + + var dhtHost host.Host + if cfg.DHTSharedHost { + dhtHost = h + } else { + dhtHost, err = libp2p.New( + libp2p.NoListenAddrs, + libp2p.BandwidthReporter(bwc), + libp2p.DefaultTransports, + libp2p.DefaultMuxers, + libp2p.ResourceManager(dhtRcMgr), + ) + if err != nil { + return nil, err + } + } + + standardClient, err := dht.New(ctx, dhtHost, + dht.Datastore(ds), + dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...), + dht.Mode(dht.ModeClient), + ) + if err != nil { + return nil, err + } + + if cfg.DHTRouting == DHTStandard { + return standardClient, nil + } + + if cfg.DHTRouting == DHTAccelerated { + fullRTClient, err := fullrt.NewFullRT(dhtHost, dht.DefaultPrefix, + fullrt.DHTOption( + dht.Validator(record.NamespacedValidator{ + "pk": record.PublicKeyValidator{}, + "ipns": ipns.Validator{KeyBook: h.Peerstore()}, + }), + dht.Datastore(ds), + dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...), + dht.BucketSize(20), + )) + if err != nil { + return nil, err + } + return &bundledDHT{ + standard: standardClient, + fullRT: fullRTClient, + }, nil + } + + return nil, fmt.Errorf("unknown DHTRouting option: %q", cfg.DHTRouting) +} + +func setupCompositeRouting(delegatedRouters []routing.Routing, dht routing.Routing) routing.Routing { + // Default router is no routing at all: can be especially useful during tests. + var router routing.Routing + router = &routinghelpers.Null{} + + if len(delegatedRouters) == 0 && dht != nil { + router = dht + } else { + var routers []*routinghelpers.ParallelRouter + + if dht != nil { + routers = append(routers, &routinghelpers.ParallelRouter{ + Router: dht, + ExecuteAfter: 0, + DoNotWaitForSearchValue: true, + IgnoreError: false, + }) + } + + for _, routingV1Router := range delegatedRouters { + routers = append(routers, &routinghelpers.ParallelRouter{ + Timeout: 15 * time.Second, + Router: routingV1Router, + ExecuteAfter: 0, + DoNotWaitForSearchValue: true, + IgnoreError: true, + }) + } + + if len(routers) > 0 { + router = routinghelpers.NewComposableParallel(routers) + } + } + + return router +} + +func setupRouting(ctx context.Context, cfg Config, h host.Host, ds datastore.Batching, dhtRcMgr network.ResourceManager, bwc metrics.Reporter, dnsCache *cachedDNS) (routing.ContentRouting, routing.PeerRouting, routing.ValueStore, error) { + delegatedRouters, err := setupDelegatedRouting(cfg, dnsCache) + if err != nil { + return nil, nil, nil, err + } + + dhtRouter, err := setupDHTRouting(ctx, cfg, h, ds, dhtRcMgr, bwc) + if err != nil { + return nil, nil, nil, err + } + + router := setupCompositeRouting(delegatedRouters, dhtRouter) + + var ( + cr routing.ContentRouting = router + pr routing.PeerRouting = router + vs routing.ValueStore = router + ) + + // If we're using seed peering, we need to run a lighter Amino DHT for the + // peering routing. We need to run a separate DHT with the main host if + // the shared host is disabled, or if we're not running any DHT at all. + if cfg.SeedPeering && (!cfg.DHTSharedHost || dhtRouter == nil) { + pr, err = dht.New(ctx, h, + dht.Datastore(ds), + dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...), + dht.Mode(dht.ModeClient), + ) + if err != nil { + return nil, nil, nil, err + } + } + + return cr, pr, vs, nil +} + type bundledDHT struct { standard *dht.IpfsDHT fullRT *fullrt.FullRT @@ -585,3 +607,87 @@ func loadOrInitPeerKey(kf string) (crypto.PrivKey, error) { } return crypto.UnmarshalPrivateKey(data) } + +func setupPeering(cfg Config, h host.Host) error { + if len(cfg.Peering) == 0 && !cfg.SeedPeering { + return nil + } + + ps := peering.NewPeeringService(h) + if err := ps.Start(); err != nil { + return err + } + for _, a := range cfg.Peering { + ps.AddPeer(a) + } + + if !cfg.SeedPeering { + return nil + } + + if cfg.SeedIndex < 0 { + return fmt.Errorf("seed index must be equal or greater than 0, it is %d", cfg.SeedIndex) + } + + if cfg.SeedPeeringMaxIndex < 0 { + return fmt.Errorf("seed peering max index must be a positive number, it is %d", cfg.SeedPeeringMaxIndex) + } + + pids, err := derivePeerIDs(cfg.Seed, cfg.SeedIndex, cfg.SeedPeeringMaxIndex) + if err != nil { + return err + } + + for _, pid := range pids { + // The peering module will automatically perform lookups to find the + // addresses of the given peers. + ps.AddPeer(peer.AddrInfo{ID: pid}) + } + + return nil +} + +func setupBitswap(ctx context.Context, cfg Config, h host.Host, cr routing.ContentRouting, bstore blockstore.Blockstore) *bitswap.Bitswap { + var ( + provideEnabled bool + peerBlockRequestFilter bsserver.PeerBlockRequestFilter + ) + if cfg.PeeringCache && len(cfg.Peering) > 0 { + peers := make(map[peer.ID]struct{}, len(cfg.Peering)) + for _, a := range cfg.Peering { + peers[a.ID] = struct{}{} + } + + provideEnabled = true + peerBlockRequestFilter = func(p peer.ID, c cid.Cid) bool { + _, ok := peers[p] + return ok + } + } else { + provideEnabled = false + peerBlockRequestFilter = func(p peer.ID, c cid.Cid) bool { + return false + } + } + + bsctx := metri.CtxScope(ctx, "ipfs_bitswap") + bn := bsnet.NewFromIpfsHost(h, cr) + bswap := bitswap.New(bsctx, bn, bstore, + // --- Client Options + // default is 1 minute to search for a random live-want (1 + // CID). I think we want to search for random live-wants more + // often although probably it overlaps with general + // rebroadcasts. + bitswap.RebroadcastDelay(delay.Fixed(10*time.Second)), + // ProviderSearchDelay: default is 1 second. + bitswap.ProviderSearchDelay(time.Second), + bitswap.WithoutDuplicatedBlockStats(), + + // ---- Server Options + bitswap.WithPeerBlockRequestFilter(peerBlockRequestFilter), + bitswap.ProvideEnabled(provideEnabled), + ) + bn.Start(bswap) + + return bswap +} diff --git a/setup_test.go b/setup_test.go index 4364ae2..ac852ee 100644 --- a/setup_test.go +++ b/setup_test.go @@ -8,6 +8,7 @@ import ( "time" blocks "github.com/ipfs/go-block-format" + ci "github.com/libp2p/go-libp2p-testing/ci" ic "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -98,7 +99,6 @@ func mustPeeredNodes(t *testing.T, configuration [][]int, peeringShareCache bool for i, node := range nodes { for _, peer := range cfgs[i].Peering { if node.host.Network().Connectedness(peer.ID) != network.Connected { - t.Log(node.host.Network().Connectedness(peer.ID)) return false } } @@ -150,3 +150,76 @@ func TestPeeringCache(t *testing.T) { // confirm bitswap providing is disabled by default (no peering) checkBitswap(2, false) } + +func testSeedPeering(t *testing.T, n int, dhtRouting DHTRouting, dhtSharedHost bool) ([]ic.PrivKey, []peer.ID, []*Node) { + cdns := newCachedDNS(dnsCacheRefreshInterval) + defer cdns.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + seed, err := newSeed() + require.NoError(t, err) + + keys := make([]ic.PrivKey, n) + pids := make([]peer.ID, n) + + for i := 0; i < n; i++ { + keys[i], pids[i] = mustTestPeerFromSeed(t, seed, i) + } + + cfgs := make([]Config, n) + nodes := make([]*Node, n) + + for i := 0; i < n; i++ { + cfgs[i] = Config{ + DataDir: t.TempDir(), + BlockstoreType: "flatfs", + DHTRouting: dhtRouting, + DHTSharedHost: dhtSharedHost, + Seed: seed, + SeedIndex: i, + SeedPeering: true, + SeedPeeringMaxIndex: n, + } + + nodes[i], err = Setup(ctx, cfgs[i], keys[i], cdns) + require.NoError(t, err) + } + + require.Eventually(t, func() bool { + for i, node := range nodes { + for j, pid := range pids { + if i == j { + continue + } + + if node.host.Network().Connectedness(pid) != network.Connected { + return false + } + } + } + + return true + }, time.Second*120, time.Millisecond*100) + + return keys, pids, nodes +} + +func TestSeedPeering(t *testing.T) { + if ci.IsRunning() { + t.Skip("don't run seed peering tests in ci") + } + + t.Run("DHT disabled", func(t *testing.T) { + testSeedPeering(t, 3, DHTOff, false) + }) + + t.Run("DHT enabled with shared host disabled", func(t *testing.T) { + testSeedPeering(t, 3, DHTStandard, false) + }) + + t.Run("DHT enabled with shared host enabled", func(t *testing.T) { + testSeedPeering(t, 3, DHTStandard, true) + }) +}