Skip to content

Commit

Permalink
feat: add ipni checks (#66)
Browse files Browse the repository at this point in the history
* feat: add config input for IPNI indexer
* feat: query ipni indexer on ipfs-check test
* Query DHT and IPNI concurrently
* Respect total maximum results across DHT and IPNI. Report where provider found.
* Peer check determines if cid is found in IPNI or DHT
* Backend returns HTTP 400 Bad Request error if user-supplied multiaddr is not valid.

---------

Co-authored-by: gammazero <[email protected]>
  • Loading branch information
SgtPooki and gammazero authored Sep 19, 2024
1 parent 3b8d711 commit 6e3c677
Show file tree
Hide file tree
Showing 5 changed files with 251 additions and 136 deletions.
167 changes: 117 additions & 50 deletions daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (

vole "github.com/ipfs-shipyard/vole/lib"
"github.com/ipfs/boxo/ipns"
"github.com/ipfs/boxo/routing/http/client"
"github.com/ipfs/boxo/routing/http/contentrouter"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
Expand Down Expand Up @@ -38,9 +40,14 @@ type daemon struct {
promRegistry *prometheus.Registry
}

// number of providers at which to stop looking for providers in the DHT
// When doing a check only with a CID
var MaxProvidersCount = 10
const (
// number of providers at which to stop looking for providers in the DHT
// When doing a check only with a CID
maxProvidersCount = 10

ipniSource = "IPNI"
dhtSource = "Amino DHT"
)

func newDaemon(ctx context.Context, acceleratedDHT bool) (*daemon, error) {
rm, err := NewResourceManager()
Expand Down Expand Up @@ -135,27 +142,64 @@ type providerOutput struct {
Addrs []string
ConnectionMaddrs []string
DataAvailableOverBitswap BitswapCheckOutput
Source string
}

// runCidCheck looks up the DHT for providers of a given CID and then checks their connectivity and Bitswap availability
func (d *daemon) runCidCheck(ctx context.Context, cidStr string) (cidCheckOutput, error) {
cid, err := cid.Decode(cidStr)
// runCidCheck finds providers of a given CID, using the DHT and IPNI
// concurrently. A check of connectivity and Bitswap availability is performed
// for each provider found.
func (d *daemon) runCidCheck(ctx context.Context, cidKey cid.Cid, ipniURL string) (cidCheckOutput, error) {
crClient, err := client.New(ipniURL, client.WithStreamResultsRequired())
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to create content router client: %w", err)
}
routerClient := contentrouter.NewContentRoutingClient(crClient)

out := make([]providerOutput, 0, MaxProvidersCount)
queryCtx, cancelQuery := context.WithCancel(ctx)
defer cancelQuery()

queryCtx, cancel := context.WithCancel(ctx)
defer cancel()
provsCh := d.dht.FindProvidersAsync(queryCtx, cid, MaxProvidersCount)
// Find providers with DHT and IPNI concurrently.
provsCh := d.dht.FindProvidersAsync(queryCtx, cidKey, maxProvidersCount)
ipniProvsCh := routerClient.FindProvidersAsync(queryCtx, cidKey, maxProvidersCount)

out := make([]providerOutput, 0, maxProvidersCount)
var wg sync.WaitGroup
var mu sync.Mutex
var providersCount int
var done bool

for !done {
var provider peer.AddrInfo
var open bool
var source string

select {
case provider, open = <-provsCh:
if !open {
provsCh = nil
if ipniProvsCh == nil {
done = true
}
continue
}
source = dhtSource
case provider, open = <-ipniProvsCh:
if !open {
ipniProvsCh = nil
if provsCh == nil {
done = true
}
continue
}
source = ipniSource
}
providersCount++
if providersCount == maxProvidersCount {
done = true
}

for provider := range provsCh {
wg.Add(1)
go func(provider peer.AddrInfo) {
go func(provider peer.AddrInfo, src string) {
defer wg.Done()

outputAddrs := []string{}
Expand Down Expand Up @@ -183,6 +227,7 @@ func (d *daemon) runCidCheck(ctx context.Context, cidStr string) (cidCheckOutput
ID: provider.ID.String(),
Addrs: outputAddrs,
DataAvailableOverBitswap: BitswapCheckOutput{},
Source: src,
}

testHost, err := d.createTestHost()
Expand All @@ -196,7 +241,7 @@ func (d *daemon) runCidCheck(ctx context.Context, cidStr string) (cidCheckOutput
dialCtx, dialCancel := context.WithTimeout(ctx, time.Second*15)
defer dialCancel()

testHost.Connect(dialCtx, provider)
_ = testHost.Connect(dialCtx, provider)
// Call NewStream to force NAT hole punching. see https://github.com/libp2p/go-libp2p/issues/2714
_, connErr := testHost.NewStream(dialCtx, provider.ID, "/ipfs/bitswap/1.2.0", "/ipfs/bitswap/1.1.0", "/ipfs/bitswap/1.0.0", "/ipfs/bitswap")

Expand All @@ -205,7 +250,7 @@ func (d *daemon) runCidCheck(ctx context.Context, cidStr string) (cidCheckOutput
} else {
// since we pass a libp2p host that's already connected to the peer the actual connection maddr we pass in doesn't matter
p2pAddr, _ := multiaddr.NewMultiaddr("/p2p/" + provider.ID.String())
provOutput.DataAvailableOverBitswap = checkBitswapCID(ctx, testHost, cid, p2pAddr)
provOutput.DataAvailableOverBitswap = checkBitswapCID(ctx, testHost, cidKey, p2pAddr)

for _, c := range testHost.Network().ConnsToPeer(provider.ID) {
provOutput.ConnectionMaddrs = append(provOutput.ConnectionMaddrs, c.RemoteMultiaddr().String())
Expand All @@ -215,8 +260,9 @@ func (d *daemon) runCidCheck(ctx context.Context, cidStr string) (cidCheckOutput
mu.Lock()
out = append(out, provOutput)
mu.Unlock()
}(provider)
}(provider, source)
}
cancelQuery()

// Wait for all goroutines to finish
wg.Wait()
Expand All @@ -225,44 +271,41 @@ func (d *daemon) runCidCheck(ctx context.Context, cidStr string) (cidCheckOutput
}

type peerCheckOutput struct {
ConnectionError string
PeerFoundInDHT map[string]int
ProviderRecordFromPeerInDHT bool
ConnectionMaddrs []string
DataAvailableOverBitswap BitswapCheckOutput
ConnectionError string
PeerFoundInDHT map[string]int
ProviderRecordFromPeerInDHT bool
ProviderRecordFromPeerInIPNI bool
ConnectionMaddrs []string
DataAvailableOverBitswap BitswapCheckOutput
}

// runPeerCheck checks the connectivity and Bitswap availability of a CID from a given peer (either with just peer ID or specific multiaddr)
func (d *daemon) runPeerCheck(ctx context.Context, maStr, cidStr string) (*peerCheckOutput, error) {
ma, err := multiaddr.NewMultiaddr(maStr)
if err != nil {
return nil, err
}

ai, err := peer.AddrInfoFromP2pAddr(ma)
if err != nil {
return nil, err
}
func (d *daemon) runPeerCheck(ctx context.Context, ma multiaddr.Multiaddr, ai *peer.AddrInfo, c cid.Cid, ipniURL string) (*peerCheckOutput, error) {
addrMap, peerAddrDHTErr := peerAddrsInDHT(ctx, d.dht, d.dhtMessenger, ai.ID)

// User has only passed a PeerID without any maddrs
onlyPeerID := len(ai.Addrs) == 0
var inDHT, inIPNI bool
var wg sync.WaitGroup
wg.Add(2)
go func() {
inDHT = providerRecordFromPeerInDHT(ctx, d.dht, c, ai.ID)
wg.Done()
}()
go func() {
inIPNI = providerRecordFromPeerInIPNI(ctx, ipniURL, c, ai.ID)
wg.Done()
}()
wg.Wait()

c, err := cid.Decode(cidStr)
if err != nil {
return nil, err
out := &peerCheckOutput{
ProviderRecordFromPeerInDHT: inDHT,
ProviderRecordFromPeerInIPNI: inIPNI,
PeerFoundInDHT: addrMap,
}

out := &peerCheckOutput{}

connectionFailed := false

out.ProviderRecordFromPeerInDHT = ProviderRecordFromPeerInDHT(ctx, d.dht, c, ai.ID)

addrMap, peerAddrDHTErr := peerAddrsInDHT(ctx, d.dht, d.dhtMessenger, ai.ID)
out.PeerFoundInDHT = addrMap
var connectionFailed bool

// If peerID given,but no addresses check the DHT
if onlyPeerID {
if len(ai.Addrs) == 0 {
if peerAddrDHTErr != nil {
// PeerID is not resolvable via the DHT
connectionFailed = true
Expand All @@ -288,7 +331,7 @@ func (d *daemon) runPeerCheck(ctx context.Context, maStr, cidStr string) (*peerC
// Test Is the target connectable
dialCtx, dialCancel := context.WithTimeout(ctx, time.Second*120)

testHost.Connect(dialCtx, *ai)
_ = testHost.Connect(dialCtx, *ai)
// Call NewStream to force NAT hole punching. see https://github.com/libp2p/go-libp2p/issues/2714
_, connErr := testHost.NewStream(dialCtx, ai.ID, "/ipfs/bitswap/1.2.0", "/ipfs/bitswap/1.1.0", "/ipfs/bitswap/1.0.0", "/ipfs/bitswap")
dialCancel()
Expand Down Expand Up @@ -343,9 +386,6 @@ func peerAddrsInDHT(ctx context.Context, d kademlia, messenger *dhtpb.ProtocolMe
return nil, err
}

wg := sync.WaitGroup{}
wg.Add(len(closestPeers))

resCh := make(chan *peer.AddrInfo, len(closestPeers))

numSuccessfulResponses := execOnMany(ctx, 0.3, time.Second*3, func(ctx context.Context, peerToQuery peer.ID) error {
Expand Down Expand Up @@ -380,7 +420,7 @@ func peerAddrsInDHT(ctx context.Context, d kademlia, messenger *dhtpb.ProtocolMe
return addrMap, nil
}

func ProviderRecordFromPeerInDHT(ctx context.Context, d kademlia, c cid.Cid, p peer.ID) bool {
func providerRecordFromPeerInDHT(ctx context.Context, d kademlia, c cid.Cid, p peer.ID) bool {
queryCtx, cancel := context.WithCancel(ctx)
defer cancel()
provsCh := d.FindProvidersAsync(queryCtx, c, 0)
Expand All @@ -399,6 +439,33 @@ func ProviderRecordFromPeerInDHT(ctx context.Context, d kademlia, c cid.Cid, p p
}
}

func providerRecordFromPeerInIPNI(ctx context.Context, ipniURL string, c cid.Cid, p peer.ID) bool {
crClient, err := client.New(ipniURL, client.WithStreamResultsRequired())
if err != nil {
log.Printf("failed to creat content router client: %s\n", err)
return false
}
routerClient := contentrouter.NewContentRoutingClient(crClient)

queryCtx, cancel := context.WithCancel(ctx)
defer cancel()

provsCh := routerClient.FindProvidersAsync(queryCtx, c, 0)
for {
select {
case prov, ok := <-provsCh:
if !ok {
return false
}
if prov.ID == p {
return true
}
case <-ctx.Done():
return false
}
}
}

// Taken from the FullRT DHT client implementation
//
// execOnMany executes the given function on each of the peers, although it may only wait for a certain chunk of peers
Expand Down
37 changes: 20 additions & 17 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ require (
github.com/ipfs/go-block-format v0.2.0
github.com/ipfs/go-cid v0.4.1
github.com/ipfs/go-datastore v0.6.0
github.com/libp2p/go-libp2p v0.36.2
github.com/libp2p/go-libp2p v0.36.3
github.com/libp2p/go-libp2p-kad-dht v0.26.1
github.com/libp2p/go-libp2p-mplex v0.9.0
github.com/libp2p/go-libp2p-record v0.2.0
github.com/libp2p/go-libp2p-routing-helpers v0.7.4
github.com/libp2p/go-msgio v0.3.0
github.com/multiformats/go-multiaddr v0.13.0
github.com/multiformats/go-multihash v0.2.3
github.com/prometheus/client_golang v1.19.1
github.com/prometheus/client_golang v1.20.0
github.com/stretchr/testify v1.9.0
github.com/urfave/cli/v2 v2.27.3
)
Expand Down Expand Up @@ -55,6 +55,7 @@ require (
github.com/google/gopacket v1.1.19 // indirect
github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f // indirect
github.com/gorilla/websocket v1.5.3 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
Expand Down Expand Up @@ -95,7 +96,7 @@ require (
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-runewidth v0.0.16 // indirect
github.com/miekg/dns v1.1.61 // indirect
github.com/miekg/dns v1.1.62 // indirect
github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect
github.com/mikioh/tcpopt v0.0.0-20190314235656-172688c1accc // indirect
github.com/minio/sha256-simd v1.0.1 // indirect
Expand All @@ -110,20 +111,20 @@ require (
github.com/multiformats/go-multistream v0.5.0 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/onsi/ginkgo/v2 v2.19.1 // indirect
github.com/onsi/ginkgo/v2 v2.20.0 // indirect
github.com/opencontainers/runtime-spec v1.2.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pion/datachannel v1.5.8 // indirect
github.com/pion/dtls/v2 v2.2.12 // indirect
github.com/pion/ice/v2 v2.3.34 // indirect
github.com/pion/interceptor v0.1.29 // indirect
github.com/pion/interceptor v0.1.30 // indirect
github.com/pion/logging v0.2.2 // indirect
github.com/pion/mdns v0.0.12 // indirect
github.com/pion/randutil v0.1.0 // indirect
github.com/pion/rtcp v1.2.14 // indirect
github.com/pion/rtp v1.8.8 // indirect
github.com/pion/sctp v1.8.20 // indirect
github.com/pion/rtp v1.8.9 // indirect
github.com/pion/sctp v1.8.33 // indirect
github.com/pion/sdp/v3 v3.0.9 // indirect
github.com/pion/srtp/v2 v2.0.20 // indirect
github.com/pion/stun v0.6.1 // indirect
Expand All @@ -137,18 +138,20 @@ require (
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/quic-go/qpack v0.4.0 // indirect
github.com/quic-go/quic-go v0.45.2 // indirect
github.com/quic-go/quic-go v0.46.0 // indirect
github.com/quic-go/webtransport-go v0.8.0 // indirect
github.com/raulk/go-watchdog v1.3.0 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/samber/lo v1.39.0 // indirect
github.com/sanity-io/litter v1.5.5 // indirect
github.com/sergi/go-diff v1.3.1 // indirect
github.com/smartystreets/assertions v1.13.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.55.0 // indirect
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect
github.com/wlynxg/anet v0.0.3 // indirect
github.com/wlynxg/anet v0.0.4 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
github.com/xeipuuv/gojsonschema v1.2.0 // indirect
Expand All @@ -160,19 +163,19 @@ require (
go.opentelemetry.io/otel v1.28.0 // indirect
go.opentelemetry.io/otel/metric v1.28.0 // indirect
go.opentelemetry.io/otel/trace v1.28.0 // indirect
go.uber.org/dig v1.17.1 // indirect
go.uber.org/fx v1.22.1 // indirect
go.uber.org/dig v1.18.0 // indirect
go.uber.org/fx v1.22.2 // indirect
go.uber.org/mock v0.4.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/crypto v0.25.0 // indirect
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/crypto v0.26.0 // indirect
golang.org/x/exp v0.0.0-20240808152545-0cdaa3abc0fa // indirect
golang.org/x/mod v0.20.0 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.23.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/tools v0.23.0 // indirect
golang.org/x/sys v0.24.0 // indirect
golang.org/x/text v0.17.0 // indirect
golang.org/x/tools v0.24.0 // indirect
gonum.org/v1/gonum v0.15.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
Loading

0 comments on commit 6e3c677

Please sign in to comment.