diff --git a/.github/workflows/gateway-conformance.yml b/.github/workflows/gateway-conformance.yml index 25608dc..a2a2160 100644 --- a/.github/workflows/gateway-conformance.yml +++ b/.github/workflows/gateway-conformance.yml @@ -74,7 +74,7 @@ jobs: kuboNodeMultiaddr=$(ipfs --api=/ip4/127.0.0.1/tcp/5001 swarm addrs local --id | head -n 1) # run gw - ./rainbow --routing=http://127.0.0.1:8080 --peering=$kuboNodeMultiaddr & + ./rainbow --routing-v1-endpoints=http://127.0.0.1:8080 --dht-routing=false --peering=$kuboNodeMultiaddr & working-directory: rainbow # 6. Run the gateway-conformance tests diff --git a/handler_test.go b/handler_test.go index 4e6ce42..c0d61ce 100644 --- a/handler_test.go +++ b/handler_test.go @@ -22,6 +22,8 @@ import ( func mustTestNode(t *testing.T, cfg Config) *Node { cfg.DataDir = t.TempDir() cfg.BlockstoreType = "flatfs" + cfg.DHTRouting = true + cfg.RoutingV1Endpoints = []string{cidContactEndpoint} ctx := context.Background() diff --git a/main.go b/main.go index cc931fd..96f800b 100644 --- a/main.go +++ b/main.go @@ -93,13 +93,13 @@ Generate an identity seed and launch a gateway: Name: "seeds-peering", Value: false, EnvVars: []string{"RAINBOW_SEEDS_PEERING"}, - Usage: "TODO (needs --seed and --seed-index). Automatically enables --dht-shared-host, incompatible with --routing", + Usage: "Automatic peering with peers with the same seed (requires --seed and --seed-index). Automatically enables --dht-routing and --dht-shared-host", }, &cli.UintFlag{ Name: "seeds-peering-max-index", Value: 100, EnvVars: []string{"RAINBOW_SEEDS_PEERING_MAX_INDEX"}, - Usage: "TODO", + Usage: "Largest index to derive automatic peering peer IDs for", }, &cli.StringSliceFlag{ Name: "gateway-domains", @@ -179,10 +179,17 @@ Generate an identity seed and launch a gateway: EnvVars: []string{"RAINBOW_MAX_FD"}, Usage: "Maximum number of file descriptors. Defaults to 50% of the process' limit", }, - &cli.StringFlag{ - Name: "routing", - Value: "", - Usage: "RoutingV1 Endpoint (otherwise Amino DHT and cid.contact is used)", + &cli.StringSliceFlag{ + Name: "routing-v1-endpoints", + Value: cli.NewStringSlice(cidContactEndpoint), + EnvVars: []string{"RAINBOW_ROUTING_V1_ENDPOINTS"}, + Usage: "Routing V1 endpoints to use for routing (comma-separated)", + }, + &cli.BoolFlag{ + Name: "dht-routing", + Value: true, + EnvVars: []string{"RAINBOW_DHT_ROUTING"}, + Usage: "Use the Amino DHT for routing", }, &cli.BoolFlag{ Name: "dht-shared-host", @@ -308,14 +315,14 @@ share the same seed as long as the indexes are different. return errors.New("--seed and --seed-index must be explicitly defined when --seeds-peering is enabled") } - if cctx.String("routing") != "" { - return errors.New("--routing is incompatible with --seeds-peering: DHT needs to run in order to be able to find peers") + // If seeds-peering is enabled, automatically turn on DHT routing, as well + // as sharing the libp2p hosts. This allows the peer information (id and + // addresses) to be discoverable via the DHT without having to create a + // new DHT client instance just for this purpose. + err = cctx.Set("dht-routing", "true") + if err != nil { + return err } - - // If seeds-peering is enabled, automatically use the same libp2p host - // for the DHT. This allows the peer information (id and addresses) to - // be discoverable via the DHT without having to create a new DHT client - // instance just for this purpose. err = cctx.Set("dht-shared-host", "true") if err != nil { return err @@ -346,7 +353,8 @@ share the same seed as long as the indexes are different. MaxMemory: cctx.Uint64("max-memory"), MaxFD: cctx.Int("max-fd"), InMemBlockCache: cctx.Int64("inmem-block-cache"), - RoutingV1: cctx.String("routing"), + RoutingV1Endpoints: cctx.StringSlice("routing-v1-endpoints"), + DHTRouting: cctx.Bool("dht-routing"), DHTSharedHost: cctx.Bool("dht-shared-host"), AcceleratedDHT: cctx.Bool("accelerated-dht"), IpnsMaxCacheTTL: cctx.Duration("ipns-max-cache-ttl"), diff --git a/setup.go b/setup.go index b1da504..453daab 100644 --- a/setup.go +++ b/setup.go @@ -63,7 +63,7 @@ func init() { identify.ActivationThresh = 1 } -const ipniFallbackEndpoint = "https://cid.contact" +const cidContactEndpoint = "https://cid.contact" type Node struct { vs routing.ValueStore @@ -100,7 +100,8 @@ type Config struct { GatewayDomains []string SubdomainGatewayDomains []string TrustlessGatewayDomains []string - RoutingV1 string + RoutingV1Endpoints []string + DHTRouting bool DHTSharedHost bool AcceleratedDHT bool IpnsMaxCacheTTL time.Duration @@ -159,9 +160,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached ) blkst = blockstore.NewIdStore(blkst) - var pr routing.PeerRouting - var vs routing.ValueStore - var cr routing.ContentRouting + var router routing.Routing // Increase per-host connection pool since we are making lots of concurrent requests. httpClient := &http.Client{ @@ -184,17 +183,21 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached } opts = append(opts, libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) { - if cfg.RoutingV1 != "" { - routingClient, err := delegatedHTTPContentRouter(cfg.RoutingV1, routingv1client.WithStreamResultsRequired(), routingv1client.WithHTTPClient(httpClient)) + var routingV1Routers []routing.Routing + for _, endpoint := range cfg.RoutingV1Endpoints { + rv1Opts := []routingv1client.Option{routingv1client.WithHTTPClient(httpClient)} + if endpoint != cidContactEndpoint { + rv1Opts = append(rv1Opts, routingv1client.WithStreamResultsRequired()) + } + httpClient, err := delegatedHTTPContentRouter(endpoint, rv1Opts...) if err != nil { return nil, err } - pr = routingClient - vs = routingClient - cr = routingClient - } else { - // If there are no delegated routing endpoints run an accelerated Amino DHT client and send IPNI requests to cid.contact + routingV1Routers = append(routingV1Routers, httpClient) + } + var dhtRouter routing.Routing + if cfg.DHTRouting { var dhtHost host.Host if cfg.DHTSharedHost { dhtHost = h @@ -211,8 +214,6 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached } } - var dhtRouter routing.Routing - standardClient, err := dht.New(ctx, dhtHost, dht.Datastore(ds), dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...), @@ -243,13 +244,15 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached } else { dhtRouter = standardClient } + } - // we want to also use the default HTTP routers, so wrap the FullRT client - // in a parallel router that calls them in parallel - httpRouters, err := delegatedHTTPContentRouter(ipniFallbackEndpoint, routingv1client.WithHTTPClient(httpClient)) - if err != nil { - return nil, err - } + if len(routingV1Routers) == 0 && dhtRouter == nil { + return nil, errors.New("no routers available") + } + + if len(routingV1Routers) == 0 { + router = dhtRouter + } else { routers := []*routinghelpers.ParallelRouter{ { Router: dhtRouter, @@ -257,22 +260,22 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached DoNotWaitForSearchValue: true, IgnoreError: false, }, - { + } + + for _, routingV1Router := range routingV1Routers { + routers = append(routers, &routinghelpers.ParallelRouter{ Timeout: 15 * time.Second, - Router: httpRouters, + Router: routingV1Router, ExecuteAfter: 0, DoNotWaitForSearchValue: true, IgnoreError: true, - }, + }) } - router := routinghelpers.NewComposableParallel(routers) - pr = router - vs = router - cr = router + router = routinghelpers.NewComposableParallel(routers) } - return pr, nil + return router, nil })) h, err := libp2p.New(opts...) if err != nil { @@ -291,7 +294,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached } bsctx := metri.CtxScope(ctx, "ipfs_bitswap") - bn := bsnet.NewFromIpfsHost(h, cr) + bn := bsnet.NewFromIpfsHost(h, router) bswap := bsclient.New(bsctx, bn, blkst, // default is 1 minute to search for a random live-want (1 // CID). I think we want to search for random live-wants more @@ -346,7 +349,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached if cfg.IpnsMaxCacheTTL > 0 { nsOptions = append(nsOptions, namesys.WithMaxCacheTTL(cfg.IpnsMaxCacheTTL)) } - ns, err := namesys.NewNameSystem(vs, nsOptions...) + ns, err := namesys.NewNameSystem(router, nsOptions...) if err != nil { return nil, err } @@ -366,7 +369,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached bsClient: bswap, ns: ns, ps: ps, - vs: vs, + vs: router, bsrv: bsrv, resolver: r, bwc: bwc, diff --git a/setup_test.go b/setup_test.go index 90def22..951acf3 100644 --- a/setup_test.go +++ b/setup_test.go @@ -11,6 +11,8 @@ import ( ) func testSeedPeering(t *testing.T, n int) ([]crypto.PrivKey, []peer.ID, []*Node) { + t.Parallel() + cdns := newCachedDNS(dnsCacheRefreshInterval) defer cdns.Close() @@ -39,6 +41,7 @@ func testSeedPeering(t *testing.T, n int) ([]crypto.PrivKey, []peer.ID, []*Node) BlockstoreType: "flatfs", AcceleratedDHT: false, DHTSharedHost: true, + DHTRouting: true, } // Add all remaining peers to the peering config.