Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

routing/http: Fix contentrouter.go client interface. #8

Merged
merged 2 commits into from
Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions routing/http/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/benbjohnson/clock"
"github.com/ipfs/go-cid"
ipns "github.com/ipfs/go-ipns"
"github.com/ipfs/go-libipfs/routing/http/contentrouter"
"github.com/ipfs/go-libipfs/routing/http/internal/drjson"
"github.com/ipfs/go-libipfs/routing/http/server"
"github.com/ipfs/go-libipfs/routing/http/types"
Expand Down Expand Up @@ -39,6 +40,8 @@ type client struct {
afterSignCallback func(req *types.WriteBitswapProviderRecord)
}

var _ contentrouter.Client = &client{}

type httpClient interface {
Do(req *http.Request) (*http.Response, error)
}
Expand Down
55 changes: 35 additions & 20 deletions routing/http/contentrouter/contentrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,30 @@ package contentrouter

import (
"context"
"reflect"
"time"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-libipfs/routing/http/internal"
"github.com/ipfs/go-libipfs/routing/http/types"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multihash"
)

var logger = logging.Logger("service/contentrouting")

const ttl = 24 * time.Hour

type client interface {
Provide(context.Context, []cid.Cid, time.Duration) (time.Duration, error)
FindProviders(context.Context, cid.Cid) ([]peer.AddrInfo, error)
Ready(context.Context) (bool, error)
type Client interface {
ProvideBitswap(ctx context.Context, keys []cid.Cid, ttl time.Duration) (time.Duration, error)
FindProviders(ctx context.Context, key cid.Cid) ([]types.ProviderResponse, error)
}

type contentRouter struct {
client client
client Client
maxProvideConcurrency int
maxProvideBatchSize int
}
Expand All @@ -44,7 +46,7 @@ func WithMaxProvideBatchSize(max int) option {
}
}

func NewContentRoutingClient(c client, opts ...option) *contentRouter {
func NewContentRoutingClient(c Client, opts ...option) *contentRouter {
cr := &contentRouter{
client: c,
maxProvideConcurrency: 5,
Expand All @@ -64,7 +66,7 @@ func (c *contentRouter) Provide(ctx context.Context, key cid.Cid, announce bool)
return nil
}

_, err := c.client.Provide(ctx, []cid.Cid{key}, ttl)
_, err := c.client.ProvideBitswap(ctx, []cid.Cid{key}, ttl)
return err
}

Expand All @@ -78,7 +80,7 @@ func (c *contentRouter) ProvideMany(ctx context.Context, mhKeys []multihash.Mult
}

if len(keys) <= c.maxProvideBatchSize {
_, err := c.client.Provide(ctx, keys, ttl)
_, err := c.client.ProvideBitswap(ctx, keys, ttl)
return err
}

Expand All @@ -88,23 +90,15 @@ func (c *contentRouter) ProvideMany(ctx context.Context, mhKeys []multihash.Mult
c.maxProvideConcurrency,
keys,
func(ctx context.Context, batch []cid.Cid) error {
_, err := c.client.Provide(ctx, batch, ttl)
_, err := c.client.ProvideBitswap(ctx, batch, ttl)
return err
},
)
}

// Ready is part of the existing `ProvideMany` interface, but can be used more generally to determine if the routing client
// has a working connection.
// Ready is part of the existing `ProvideMany` interface.
func (c *contentRouter) Ready() bool {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
ready, err := c.client.Ready(ctx)
if err != nil {
logger.Warnw("error checking if delegated content router is ready", "Error", err)
return false
}
return ready
return true
}

func (c *contentRouter) FindProvidersAsync(ctx context.Context, key cid.Cid, numResults int) <-chan peer.AddrInfo {
Expand All @@ -118,7 +112,28 @@ func (c *contentRouter) FindProvidersAsync(ctx context.Context, key cid.Cid, num

ch := make(chan peer.AddrInfo, len(results))
for _, r := range results {
ch <- r
if r.GetProtocol() == types.BitswapProviderID {
result, ok := r.(*types.ReadBitswapProviderRecord)
if !ok {
logger.Errorw(
"problem casting find providers result",
"ProtocolID", types.BitswapProviderID,
"Type", reflect.TypeOf(r).String(),
)
continue
}

var addrs []multiaddr.Multiaddr
for _, a := range result.Addrs {
addrs = append(addrs, a.Multiaddr)
}

ch <- peer.AddrInfo{
ID: *result.ID,
Addrs: addrs,
}
}

}
close(ch)
return ch
Expand Down
37 changes: 27 additions & 10 deletions routing/http/contentrouter/contentrouter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-libipfs/routing/http/types"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/assert"
Expand All @@ -16,13 +17,13 @@ import (

type mockClient struct{ mock.Mock }

func (m *mockClient) Provide(ctx context.Context, keys []cid.Cid, ttl time.Duration) (time.Duration, error) {
func (m *mockClient) ProvideBitswap(ctx context.Context, keys []cid.Cid, ttl time.Duration) (time.Duration, error) {
args := m.Called(ctx, keys, ttl)
return args.Get(0).(time.Duration), args.Error(1)
}
func (m *mockClient) FindProviders(ctx context.Context, key cid.Cid) ([]peer.AddrInfo, error) {
func (m *mockClient) FindProviders(ctx context.Context, key cid.Cid) ([]types.ProviderResponse, error) {
args := m.Called(ctx, key)
return args.Get(0).([]peer.AddrInfo), args.Error(1)
return args.Get(0).([]types.ProviderResponse), args.Error(1)
}
func (m *mockClient) Ready(ctx context.Context) (bool, error) {
args := m.Called(ctx)
Expand Down Expand Up @@ -66,14 +67,14 @@ func TestProvide(t *testing.T) {
crc := NewContentRoutingClient(client)

if !c.expNotProvided {
client.On("Provide", ctx, []cid.Cid{key}, ttl).Return(time.Minute, nil)
client.On("ProvideBitswap", ctx, []cid.Cid{key}, ttl).Return(time.Minute, nil)
}

err := crc.Provide(ctx, key, c.announce)
assert.NoError(t, err)

if c.expNotProvided {
client.AssertNumberOfCalls(t, "Provide", 0)
client.AssertNumberOfCalls(t, "ProvideBitswap", 0)
}

})
Expand All @@ -90,7 +91,7 @@ func TestProvideMany(t *testing.T) {
client := &mockClient{}
crc := NewContentRoutingClient(client)

client.On("Provide", ctx, cids, ttl).Return(time.Minute, nil)
client.On("ProvideBitswap", ctx, cids, ttl).Return(time.Minute, nil)

err := crc.ProvideMany(ctx, mhs)
require.NoError(t, err)
Expand All @@ -102,9 +103,20 @@ func TestFindProvidersAsync(t *testing.T) {
client := &mockClient{}
crc := NewContentRoutingClient(client)

ais := []peer.AddrInfo{
{ID: peer.ID("peer1")},
{ID: peer.ID("peer2")},
p1 := peer.ID("peer1")
p2 := peer.ID("peer2")
ais := []types.ProviderResponse{
&types.ReadBitswapProviderRecord{
Protocol: types.BitswapProviderID,
ID: &p1,
},
&types.ReadBitswapProviderRecord{
Protocol: types.BitswapProviderID,
ID: &p2,
},
&types.UnknownProviderRecord{
Protocol: "UNKNOWN",
},
}

client.On("FindProviders", ctx, key).Return(ais, nil)
Expand All @@ -116,5 +128,10 @@ func TestFindProvidersAsync(t *testing.T) {
actualAIs = append(actualAIs, ai)
}

require.Equal(t, ais, actualAIs)
expected := []peer.AddrInfo{
{ID: p1},
{ID: p2},
}

require.Equal(t, expected, actualAIs)
}