Skip to content

Commit

Permalink
routing/http: Fix contentrouter.go client interface. (#8)
Browse files Browse the repository at this point in the history
* routing/http: Fix contentrouter.go client interface.

After some changes, we made it incompatible with the actual HTTP client.

Signed-off-by: Antonio Navarro Perez <[email protected]>

* Change log to error and fix structure

Signed-off-by: Antonio Navarro Perez <[email protected]>
Co-authored-by: Gus Eggert <[email protected]>
  • Loading branch information
ajnavarro and guseggert authored Dec 6, 2022
1 parent 10595c9 commit 4ad41a9
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 30 deletions.
3 changes: 3 additions & 0 deletions routing/http/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/benbjohnson/clock"
"github.com/ipfs/go-cid"
ipns "github.com/ipfs/go-ipns"
"github.com/ipfs/go-libipfs/routing/http/contentrouter"
"github.com/ipfs/go-libipfs/routing/http/internal/drjson"
"github.com/ipfs/go-libipfs/routing/http/server"
"github.com/ipfs/go-libipfs/routing/http/types"
Expand Down Expand Up @@ -39,6 +40,8 @@ type client struct {
afterSignCallback func(req *types.WriteBitswapProviderRecord)
}

var _ contentrouter.Client = &client{}

type httpClient interface {
Do(req *http.Request) (*http.Response, error)
}
Expand Down
55 changes: 35 additions & 20 deletions routing/http/contentrouter/contentrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,30 @@ package contentrouter

import (
"context"
"reflect"
"time"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-libipfs/routing/http/internal"
"github.com/ipfs/go-libipfs/routing/http/types"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multihash"
)

var logger = logging.Logger("service/contentrouting")

const ttl = 24 * time.Hour

type client interface {
Provide(context.Context, []cid.Cid, time.Duration) (time.Duration, error)
FindProviders(context.Context, cid.Cid) ([]peer.AddrInfo, error)
Ready(context.Context) (bool, error)
type Client interface {
ProvideBitswap(ctx context.Context, keys []cid.Cid, ttl time.Duration) (time.Duration, error)
FindProviders(ctx context.Context, key cid.Cid) ([]types.ProviderResponse, error)
}

type contentRouter struct {
client client
client Client
maxProvideConcurrency int
maxProvideBatchSize int
}
Expand All @@ -44,7 +46,7 @@ func WithMaxProvideBatchSize(max int) option {
}
}

func NewContentRoutingClient(c client, opts ...option) *contentRouter {
func NewContentRoutingClient(c Client, opts ...option) *contentRouter {
cr := &contentRouter{
client: c,
maxProvideConcurrency: 5,
Expand All @@ -64,7 +66,7 @@ func (c *contentRouter) Provide(ctx context.Context, key cid.Cid, announce bool)
return nil
}

_, err := c.client.Provide(ctx, []cid.Cid{key}, ttl)
_, err := c.client.ProvideBitswap(ctx, []cid.Cid{key}, ttl)
return err
}

Expand All @@ -78,7 +80,7 @@ func (c *contentRouter) ProvideMany(ctx context.Context, mhKeys []multihash.Mult
}

if len(keys) <= c.maxProvideBatchSize {
_, err := c.client.Provide(ctx, keys, ttl)
_, err := c.client.ProvideBitswap(ctx, keys, ttl)
return err
}

Expand All @@ -88,23 +90,15 @@ func (c *contentRouter) ProvideMany(ctx context.Context, mhKeys []multihash.Mult
c.maxProvideConcurrency,
keys,
func(ctx context.Context, batch []cid.Cid) error {
_, err := c.client.Provide(ctx, batch, ttl)
_, err := c.client.ProvideBitswap(ctx, batch, ttl)
return err
},
)
}

// Ready is part of the existing `ProvideMany` interface, but can be used more generally to determine if the routing client
// has a working connection.
// Ready is part of the existing `ProvideMany` interface.
func (c *contentRouter) Ready() bool {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
ready, err := c.client.Ready(ctx)
if err != nil {
logger.Warnw("error checking if delegated content router is ready", "Error", err)
return false
}
return ready
return true
}

func (c *contentRouter) FindProvidersAsync(ctx context.Context, key cid.Cid, numResults int) <-chan peer.AddrInfo {
Expand All @@ -118,7 +112,28 @@ func (c *contentRouter) FindProvidersAsync(ctx context.Context, key cid.Cid, num

ch := make(chan peer.AddrInfo, len(results))
for _, r := range results {
ch <- r
if r.GetProtocol() == types.BitswapProviderID {
result, ok := r.(*types.ReadBitswapProviderRecord)
if !ok {
logger.Errorw(
"problem casting find providers result",
"ProtocolID", types.BitswapProviderID,
"Type", reflect.TypeOf(r).String(),
)
continue
}

var addrs []multiaddr.Multiaddr
for _, a := range result.Addrs {
addrs = append(addrs, a.Multiaddr)
}

ch <- peer.AddrInfo{
ID: *result.ID,
Addrs: addrs,
}
}

}
close(ch)
return ch
Expand Down
37 changes: 27 additions & 10 deletions routing/http/contentrouter/contentrouter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-libipfs/routing/http/types"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/assert"
Expand All @@ -16,13 +17,13 @@ import (

type mockClient struct{ mock.Mock }

func (m *mockClient) Provide(ctx context.Context, keys []cid.Cid, ttl time.Duration) (time.Duration, error) {
func (m *mockClient) ProvideBitswap(ctx context.Context, keys []cid.Cid, ttl time.Duration) (time.Duration, error) {
args := m.Called(ctx, keys, ttl)
return args.Get(0).(time.Duration), args.Error(1)
}
func (m *mockClient) FindProviders(ctx context.Context, key cid.Cid) ([]peer.AddrInfo, error) {
func (m *mockClient) FindProviders(ctx context.Context, key cid.Cid) ([]types.ProviderResponse, error) {
args := m.Called(ctx, key)
return args.Get(0).([]peer.AddrInfo), args.Error(1)
return args.Get(0).([]types.ProviderResponse), args.Error(1)
}
func (m *mockClient) Ready(ctx context.Context) (bool, error) {
args := m.Called(ctx)
Expand Down Expand Up @@ -66,14 +67,14 @@ func TestProvide(t *testing.T) {
crc := NewContentRoutingClient(client)

if !c.expNotProvided {
client.On("Provide", ctx, []cid.Cid{key}, ttl).Return(time.Minute, nil)
client.On("ProvideBitswap", ctx, []cid.Cid{key}, ttl).Return(time.Minute, nil)
}

err := crc.Provide(ctx, key, c.announce)
assert.NoError(t, err)

if c.expNotProvided {
client.AssertNumberOfCalls(t, "Provide", 0)
client.AssertNumberOfCalls(t, "ProvideBitswap", 0)
}

})
Expand All @@ -90,7 +91,7 @@ func TestProvideMany(t *testing.T) {
client := &mockClient{}
crc := NewContentRoutingClient(client)

client.On("Provide", ctx, cids, ttl).Return(time.Minute, nil)
client.On("ProvideBitswap", ctx, cids, ttl).Return(time.Minute, nil)

err := crc.ProvideMany(ctx, mhs)
require.NoError(t, err)
Expand All @@ -102,9 +103,20 @@ func TestFindProvidersAsync(t *testing.T) {
client := &mockClient{}
crc := NewContentRoutingClient(client)

ais := []peer.AddrInfo{
{ID: peer.ID("peer1")},
{ID: peer.ID("peer2")},
p1 := peer.ID("peer1")
p2 := peer.ID("peer2")
ais := []types.ProviderResponse{
&types.ReadBitswapProviderRecord{
Protocol: types.BitswapProviderID,
ID: &p1,
},
&types.ReadBitswapProviderRecord{
Protocol: types.BitswapProviderID,
ID: &p2,
},
&types.UnknownProviderRecord{
Protocol: "UNKNOWN",
},
}

client.On("FindProviders", ctx, key).Return(ais, nil)
Expand All @@ -116,5 +128,10 @@ func TestFindProvidersAsync(t *testing.T) {
actualAIs = append(actualAIs, ai)
}

require.Equal(t, ais, actualAIs)
expected := []peer.AddrInfo{
{ID: p1},
{ID: p2},
}

require.Equal(t, expected, actualAIs)
}

0 comments on commit 4ad41a9

Please sign in to comment.