Skip to content

Commit

Permalink
exchange: create a providing.Exchange which provides on NotifyNewBlocks.
Browse files Browse the repository at this point in the history
  • Loading branch information
hsanjuan committed Nov 20, 2024
1 parent 6d74a8f commit b55366d
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 39 deletions.
12 changes: 6 additions & 6 deletions bitswap/benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ func runDistribution(b *testing.B, instances []testinstance.Instance, blocks []b

func allToAll(b *testing.B, provs []testinstance.Instance, blocks []blocks.Block) {
for _, p := range provs {
if err := p.Blockstore().PutMany(context.Background(), blocks); err != nil {
if err := p.Blockstore.PutMany(context.Background(), blocks); err != nil {
b.Fatal(err)
}
}
Expand All @@ -453,10 +453,10 @@ func overlap1(b *testing.B, provs []testinstance.Instance, blks []blocks.Block)
bill := provs[0]
jeff := provs[1]

if err := bill.Blockstore().PutMany(context.Background(), blks[:75]); err != nil {
if err := bill.Blockstore.PutMany(context.Background(), blks[:75]); err != nil {
b.Fatal(err)
}
if err := jeff.Blockstore().PutMany(context.Background(), blks[25:]); err != nil {
if err := jeff.Blockstore.PutMany(context.Background(), blks[25:]); err != nil {
b.Fatal(err)
}
}
Expand All @@ -474,12 +474,12 @@ func overlap2(b *testing.B, provs []testinstance.Instance, blks []blocks.Block)
even := i%2 == 0
third := i%3 == 0
if third || even {
if err := bill.Blockstore().Put(context.Background(), blk); err != nil {
if err := bill.Blockstore.Put(context.Background(), blk); err != nil {
b.Fatal(err)
}
}
if third || !even {
if err := jeff.Blockstore().Put(context.Background(), blk); err != nil {
if err := jeff.Blockstore.Put(context.Background(), blk); err != nil {
b.Fatal(err)
}
}
Expand All @@ -491,7 +491,7 @@ func overlap2(b *testing.B, provs []testinstance.Instance, blks []blocks.Block)
// but we're mostly just testing performance of the sync algorithm
func onePeerPerBlock(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) {
for _, blk := range blks {
err := provs[rand.Intn(len(provs))].Blockstore().Put(context.Background(), blk)
err := provs[rand.Intn(len(provs))].Blockstore.Put(context.Background(), blk)
if err != nil {
b.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions bitswap/bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func isCI() bool {

func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk blocks.Block) {
t.Helper()
err := inst.Blockstore().Put(ctx, blk)
err := inst.Blockstore.Put(ctx, blk)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -176,7 +176,7 @@ func TestUnwantedBlockNotAdded(t *testing.T) {

doesNotWantBlock.Exchange.ReceiveMessage(ctx, hasBlock.Identity.ID(), bsMessage)

blockInStore, err := doesNotWantBlock.Blockstore().Has(ctx, block.Cid())
blockInStore, err := doesNotWantBlock.Blockstore.Has(ctx, block.Cid())
if err != nil || blockInStore {
t.Fatal("Unwanted block added to block store")
}
Expand Down
8 changes: 4 additions & 4 deletions bitswap/client/bitswap_with_sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func getVirtualNetwork() tn.Network {

func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk blocks.Block) {
t.Helper()
err := inst.Blockstore().Put(ctx, blk)
err := inst.Blockstore.Put(ctx, blk)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -61,7 +61,7 @@ func TestBasicSessions(t *testing.T) {
b := inst[1]

// Add a block to Peer B
if err := b.Blockstore().Put(ctx, block); err != nil {
if err := b.Blockstore.Put(ctx, block); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -125,7 +125,7 @@ func TestSessionBetweenPeers(t *testing.T) {

// Add 101 blocks to Peer A
blks := random.BlocksOfSize(101, blockSize)
if err := inst[0].Blockstore().PutMany(ctx, blks); err != nil {
if err := inst[0].Blockstore.PutMany(ctx, blks); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -186,7 +186,7 @@ func TestSessionSplitFetch(t *testing.T) {
// Add 10 distinct blocks to each of 10 peers
blks := random.BlocksOfSize(100, blockSize)
for i := 0; i < 10; i++ {
if err := inst[i].Blockstore().PutMany(ctx, blks[i*10:(i+1)*10]); err != nil {
if err := inst[i].Blockstore.PutMany(ctx, blks[i*10:(i+1)*10]); err != nil {
t.Fatal(err)
}
}
Expand Down
15 changes: 7 additions & 8 deletions bitswap/testinstance/testinstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
tn "github.com/ipfs/boxo/bitswap/testnet"
blockstore "github.com/ipfs/boxo/blockstore"
mockrouting "github.com/ipfs/boxo/routing/mock"
"github.com/ipfs/go-datastore"
ds "github.com/ipfs/go-datastore"
delayed "github.com/ipfs/go-datastore/delayed"
ds_sync "github.com/ipfs/go-datastore/sync"
Expand Down Expand Up @@ -89,18 +90,14 @@ func ConnectInstances(instances []Instance) {
// Instance is a test instance of bitswap + dependencies for integration testing
type Instance struct {
Identity tnet.Identity
Datastore datastore.Batching
Exchange *bitswap.Bitswap
blockstore blockstore.Blockstore
Blockstore blockstore.Blockstore
Adapter bsnet.BitSwapNetwork
Routing routing.Routing
blockstoreDelay delay.D
}

// Blockstore returns the block store for this test instance
func (i *Instance) Blockstore() blockstore.Blockstore {
return i.blockstore
}

// SetBlockstoreLatency customizes the artificial delay on receiving blocks
// from a blockstore test instance.
func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration {
Expand All @@ -118,20 +115,22 @@ func NewInstance(ctx context.Context, net tn.Network, router routing.Routing, p
adapter := net.Adapter(p, netOptions...)
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))

ds := ds_sync.MutexWrap(dstore)
bstore, err := blockstore.CachedBlockstore(ctx,
blockstore.NewBlockstore(ds_sync.MutexWrap(dstore)),
blockstore.NewBlockstore(ds),
blockstore.DefaultCacheOpts())
if err != nil {
panic(err.Error()) // FIXME perhaps change signature and return error.
}

bs := bitswap.New(ctx, adapter, router, bstore, bsOptions...)
return Instance{
Datastore: ds,
Adapter: adapter,
Identity: p,
Exchange: bs,
Routing: router,
blockstore: bstore,
Blockstore: bstore,
blockstoreDelay: bsdelay,
}
}
2 changes: 1 addition & 1 deletion blockservice/test/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func Mocks(n int, opts ...blockservice.Option) []blockservice.BlockService {

var servs []blockservice.BlockService
for _, i := range instances {
servs = append(servs, blockservice.New(i.Blockstore(),
servs = append(servs, blockservice.New(i.Blockstore,
i.Exchange, append(opts, blockservice.WithProvider(i.Routing))...))
}
return servs
Expand Down
36 changes: 36 additions & 0 deletions exchange/providing/providing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Package providing implements an exchange wrapper which
// does content providing for new blocks.
package providing

import (
"context"

"github.com/ipfs/boxo/exchange"
"github.com/ipfs/boxo/provider"
blocks "github.com/ipfs/go-block-format"
)

// Exchange is an exchange wrapper that calls ProvideMany for blocks received
// over NotifyNewBlocks.
type Exchange struct {
exchange.Interface
provider provider.Provider
}

// New creates a new providing Exchange with the given exchange and provider.
func New(base exchange.Interface, provider provider.Provider) *Exchange {
return &Exchange{
Interface: base,
provider: provider,
}
}

// NotifyNewBlocks calls provider.ProvideMany.
func (ex *Exchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error {
for _, b := range blocks {
if err := ex.provider.Provide(ctx, b.Cid(), true); err != nil {
return err
}
}
return nil
}
62 changes: 62 additions & 0 deletions exchange/providing/providing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package providing

import (
"context"
"testing"

testinstance "github.com/ipfs/boxo/bitswap/testinstance"
tn "github.com/ipfs/boxo/bitswap/testnet"
"github.com/ipfs/boxo/blockservice"
"github.com/ipfs/boxo/provider"
mockrouting "github.com/ipfs/boxo/routing/mock"
delay "github.com/ipfs/go-ipfs-delay"
"github.com/ipfs/go-test/random"
)

func TestExchange(t *testing.T) {
ctx := context.Background()
net := tn.VirtualNetwork(delay.Fixed(0))
routing := mockrouting.NewServer()
sg := testinstance.NewTestInstanceGenerator(net, routing, nil, nil)
i := sg.Next()
provFinder := routing.Client(i.Identity)
prov, err := provider.New(i.Datastore,
provider.Online(provFinder),
)
if err != nil {
t.Fatal(err)
}
provExchange := New(i.Exchange, prov)
// write-through so that we notify when re-adding block
bs := blockservice.New(i.Blockstore, provExchange,
blockservice.WriteThrough())
block := random.BlocksOfSize(1, 10)[0]
// put it on the blockstore of the first instance
err = i.Blockstore.Put(ctx, block)
if err != nil {
t.Fatal()
}

providersChan := provFinder.FindProvidersAsync(ctx, block.Cid(), 1)
_, ok := <-providersChan
if ok {
t.Fatal("there should be no providers yet for block")
}

// Now add it via BlockService. It should trigger NotifyNewBlocks
// on the exchange and thus they should get announced.
err = bs.AddBlock(ctx, block)
if err != nil {
t.Fatal()
}
// Trigger reproviding, otherwise it's not really provided.
err = prov.Reprovide(ctx)
if err != nil {
t.Fatal(err)
}
providersChan = provFinder.FindProvidersAsync(ctx, block.Cid(), 1)
_, ok = <-providersChan
if !ok {
t.Fatal("there should be one provider for the block")
}
}
8 changes: 4 additions & 4 deletions fetcher/helpers/block_visitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ func TestFetchGraphToBlocks(t *testing.T) {
defer hasBlock.Exchange.Close()

blocks := []blocks.Block{block1, block2, block3, block4}
err := hasBlock.Blockstore().PutMany(bg, blocks)
err := hasBlock.Blockstore.PutMany(bg, blocks)
require.NoError(t, err)
err = hasBlock.Exchange.NotifyNewBlocks(bg, blocks...)
require.NoError(t, err)

wantsBlock := peers[1]
defer wantsBlock.Exchange.Close()

wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange)
wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange)
fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter)
session := fetcherConfig.NewSession(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestFetchGraphToUniqueBlocks(t *testing.T) {
hasBlock := peers[0]
defer hasBlock.Exchange.Close()

err := hasBlock.Blockstore().PutMany(bg, []blocks.Block{block1, block2, block3})
err := hasBlock.Blockstore.PutMany(bg, []blocks.Block{block1, block2, block3})
require.NoError(t, err)

err = hasBlock.Exchange.NotifyNewBlocks(bg, block1, block2, block3)
Expand All @@ -113,7 +113,7 @@ func TestFetchGraphToUniqueBlocks(t *testing.T) {
wantsBlock := peers[1]
defer wantsBlock.Exchange.Close()

wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange)
wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange)
fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter)
session := fetcherConfig.NewSession(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
Expand Down
20 changes: 10 additions & 10 deletions fetcher/impl/blockservice/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestFetchIPLDPrimeNode(t *testing.T) {
hasBlock := peers[0]
defer hasBlock.Exchange.Close()

err := hasBlock.Blockstore().Put(bg, block)
err := hasBlock.Blockstore.Put(bg, block)
require.NoError(t, err)

err = hasBlock.Exchange.NotifyNewBlocks(bg, block)
Expand All @@ -56,7 +56,7 @@ func TestFetchIPLDPrimeNode(t *testing.T) {
wantsBlock := peers[1]
defer wantsBlock.Exchange.Close()

wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange)
wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange)
fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter)
session := fetcherConfig.NewSession(context.Background())

Expand Down Expand Up @@ -98,15 +98,15 @@ func TestFetchIPLDGraph(t *testing.T) {
defer hasBlock.Exchange.Close()

blocks := []blocks.Block{block1, block2, block3, block4}
err := hasBlock.Blockstore().PutMany(bg, blocks)
err := hasBlock.Blockstore.PutMany(bg, blocks)
require.NoError(t, err)
err = hasBlock.Exchange.NotifyNewBlocks(bg, blocks...)
require.NoError(t, err)

wantsBlock := peers[1]
defer wantsBlock.Exchange.Close()

wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange)
wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange)
fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter)
session := fetcherConfig.NewSession(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
Expand Down Expand Up @@ -155,15 +155,15 @@ func TestFetchIPLDPath(t *testing.T) {
defer hasBlock.Exchange.Close()

blocks := []blocks.Block{block1, block2, block3, block4, block5}
err := hasBlock.Blockstore().PutMany(bg, blocks)
err := hasBlock.Blockstore.PutMany(bg, blocks)
require.NoError(t, err)
err = hasBlock.Exchange.NotifyNewBlocks(bg, blocks...)
require.NoError(t, err)

wantsBlock := peers[1]
defer wantsBlock.Exchange.Close()

wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange)
wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange)
fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter)
session := fetcherConfig.NewSession(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
Expand Down Expand Up @@ -219,15 +219,15 @@ func TestHelpers(t *testing.T) {
defer hasBlock.Exchange.Close()

blocks := []blocks.Block{block1, block2, block3, block4}
err := hasBlock.Blockstore().PutMany(bg, blocks)
err := hasBlock.Blockstore.PutMany(bg, blocks)
require.NoError(t, err)
err = hasBlock.Exchange.NotifyNewBlocks(bg, blocks...)
require.NoError(t, err)

wantsBlock := peers[1]
defer wantsBlock.Exchange.Close()

wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange)
wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange)

t.Run("Block retrieves node", func(t *testing.T) {
fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter)
Expand Down Expand Up @@ -334,15 +334,15 @@ func TestNodeReification(t *testing.T) {
defer hasBlock.Exchange.Close()

blocks := []blocks.Block{block2, block3, block4}
err := hasBlock.Blockstore().PutMany(bg, blocks)
err := hasBlock.Blockstore.PutMany(bg, blocks)
require.NoError(t, err)
err = hasBlock.Exchange.NotifyNewBlocks(bg, blocks...)
require.NoError(t, err)

wantsBlock := peers[1]
defer wantsBlock.Exchange.Close()

wantsGetter := blockservice.New(wantsBlock.Blockstore(), wantsBlock.Exchange)
wantsGetter := blockservice.New(wantsBlock.Blockstore, wantsBlock.Exchange)
fetcherConfig := bsfetcher.NewFetcherConfig(wantsGetter)
nodeReifier := func(lnkCtx ipld.LinkContext, nd ipld.Node, ls *ipld.LinkSystem) (ipld.Node, error) {
return &selfLoader{Node: nd, ctx: lnkCtx.Ctx, ls: ls}, nil
Expand Down
Loading

0 comments on commit b55366d

Please sign in to comment.