Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

api, cmd, prod, pss, swarm: get publisher and query feed for chunk repair request #2175

Merged
merged 49 commits into from
May 12, 2020
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
8c4e6a7
Merge remote-tracking branch 'origin/master' into global-pinning-feeds
mortelli May 5, 2020
7a213dc
Merge remote-tracking branch 'origin/global-pinning' into global-pinn…
mortelli May 5, 2020
7132166
prod: iterate getPinners func
mortelli May 5, 2020
f25b463
storage, prod: add publisher param to recovery hook
mortelli May 5, 2020
64741c4
prod: update TestRecoveryHook for new hook
mortelli May 5, 2020
7b6ef91
prod: iterate handler use in recovery hook construction and invocation
mortelli May 5, 2020
ce6e347
prod: iterate handler use in recovery hook construction and invocation
mortelli May 5, 2020
ed7e691
feed: add FeedsHandler interface, DummyHandler struct, DummyHandler.L…
mortelli May 6, 2020
934ac93
feed, prod: rename FeedsHandler interface to GenericHandler, move and…
mortelli May 6, 2020
81954b3
chunk, storage, prod: refine comments
mortelli May 6, 2020
0842aff
prod, pss, trojan: extract RecoveryTopic var
mortelli May 6, 2020
2c55d03
swarm: use RecoveryTopic var
mortelli May 6, 2020
78c188b
prod: add feed error vars
mortelli May 6, 2020
467473c
prod: fix topic for feed in getPinners func
mortelli May 6, 2020
3608343
prod: iterate tests
mortelli May 6, 2020
9747df5
prod: refine error handling in getPinners
mortelli May 6, 2020
7e1b979
prod: update error var comments
mortelli May 6, 2020
52dac5b
prod: iterate comments in getPinners func
mortelli May 6, 2020
64de461
feed: small refactor to newDummyCacheEntry func
mortelli May 6, 2020
be58121
feed: fix comment in newDummyCacheEntry func
mortelli May 6, 2020
75d5d64
prod: iterate getPinners func
mortelli May 6, 2020
4c3fc93
trojan: add Target alias for [][]byte and apply it to functions and t…
mortelli May 7, 2020
b893bc3
pss: use trojan.Targets type in func params
mortelli May 7, 2020
652bc35
prod: use trojan.Targets type in func params, add json unmarshalling …
mortelli May 7, 2020
2a2b9df
prod: add ErrTargets var
mortelli May 7, 2020
9a659f9
pss: update tests to use trojan.Targets type
mortelli May 7, 2020
948a3ae
feed: add cacheEntry field to DummyHandler struct, add NewDummyHandle…
mortelli May 7, 2020
ebe8b6a
prod: add newTestRecoveryFeedHandler helper func to create mock feed …
mortelli May 7, 2020
768c063
pss, trojan: fix typos
mortelli May 7, 2020
699a28f
prod: add comment in getPinners func
mortelli May 7, 2020
16ea955
prod: minor refactor to getPinners func
mortelli May 7, 2020
d4e6e0b
pss: minor refactor to pss tests
mortelli May 7, 2020
8feeed5
feeds: fix comment in TestHandler
mortelli May 7, 2020
bdb8c02
feeds: fix comment in NewTestHandler func
mortelli May 7, 2020
0a2fca2
feed: update DummyHandler comments
mortelli May 7, 2020
faf7158
feed: small comment update
mortelli May 7, 2020
3019ebc
Merge remote-tracking branch 'origin/global-pinning' into global-pinn…
mortelli May 7, 2020
b67a103
prod, pss: update TODOs
mortelli May 7, 2020
97dc5cd
storage, prod: remove publisher param from store hook
mortelli May 8, 2020
0250a5f
swarm, prod, trojan: move recovery topic vars to prod package
mortelli May 8, 2020
6195cf7
api, storage: Publisher added to manifest, remove hardcoded value use…
santicomp2014 May 9, 2020
931b76e
pss: remove unnecessary dependency to prod package on pss test
mortelli May 11, 2020
272df48
api,api/client,api/http,cmd/swarm,cmd/swarm-smoke,prod,storage: added…
santicomp2014 May 11, 2020
7fc3df5
api: remove test debug logs
santicomp2014 May 11, 2020
a28f1ff
api,prod: removed unnecessary publisher default value
santicomp2014 May 11, 2020
e6f0099
Merge remote-tracking branch 'origin/global-pinning' into global-pinn…
mortelli May 12, 2020
470ee96
api, api/http: fixed len check for publisher and also init value
santicomp2014 May 12, 2020
5c2f9f0
api,prod: removed ctx and clear, moved todo for fallback flag to prod
santicomp2014 May 12, 2020
61c33d9
prod: added timeout to ctx in getPinners
santicomp2014 May 12, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ type Validator interface {
// with validators check.
type ValidatorStore struct {
Store
deliverCallback func(Chunk) // callback hook used to validate chunk
deliverCallback func(Chunk) // callback func to be invoked for validated chunks
validators []Validator
}

Expand Down
89 changes: 71 additions & 18 deletions prod/prod.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,44 +18,97 @@ package prod

import (
"context"
"encoding/hex"
"errors"
"fmt"

"github.com/ethereum/go-ethereum/crypto"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/pss"
"github.com/ethersphere/swarm/pss/trojan"
"github.com/ethersphere/swarm/storage/feed"
"github.com/ethersphere/swarm/storage/feed/lookup"
)

// RecoveryHook defines code to be executed upon trigger of failed to be retrieved chunks
type RecoveryHook func(ctx context.Context, chunkAddress chunk.Address) error
// ErrPublisher is returned when the publisher string cannot be decoded into bytes
var ErrPublisher = errors.New("failed to decode publisher")

// sender is the function call for sending trojan chunks
// ErrPubKey is returned when the publisher bytes cannot be decompressed as a public key
var ErrPubKey = errors.New("failed to decompress public key")

// ErrFeedLookup is returned when the recovery feed cannot be successefully looked up
var ErrFeedLookup = errors.New("failed to look up recovery feed")

// ErrFeedContent is returned when there is a failure to retrieve content from the recovery feed
var ErrFeedContent = errors.New("failed to get content for recovery feed")

// RecoveryHook defines code to be executed upon failing to retrieve pinned chunks
type RecoveryHook func(ctx context.Context, chunkAddress chunk.Address, publisher string) error

// sender is the function type for sending trojan chunks
type sender func(ctx context.Context, targets [][]byte, topic trojan.Topic, payload []byte) (*pss.Monitor, error)

// NewRecoveryHook returns a new RecoveryHook with the sender function defined
func NewRecoveryHook(send sender) RecoveryHook {
return func(ctx context.Context, chunkAddress chunk.Address) error {
targets, err := getPinners(chunkAddress)
func NewRecoveryHook(send sender, handler feed.GenericHandler) RecoveryHook {
return func(ctx context.Context, chunkAddress chunk.Address, publisher string) error {
targets, err := getPinners(publisher, handler)
if err != nil {
return err
}
payload := chunkAddress
topic := trojan.NewTopic("RECOVERY")

// TODO: monitor return should
if _, err := send(ctx, targets, topic, payload); err != nil {
if _, err := send(ctx, targets, trojan.RecoveryTopic, payload); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we need the monitor here, I would block this until either parent context is done or monitor shows synced request

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be done in #2171

return err
}
return nil
}
}

// TODO: refactor this method to implement feed of target pinners
// getPinners returns the specific target pinners for a corresponding chunk address
func getPinners(chunkAddress chunk.Address) ([][]byte, error) {
//this should get the feed and return correct target of pinners
return [][]byte{
{57, 120},
{209, 156},
{156, 38},
{89, 19},
{22, 129}}, nil
// getPinners returns the specific target pinners for a corresponding chunk
func getPinners(publisher string, handler feed.GenericHandler) ([][]byte, error) {
// get feed user from publisher
publisherBytes, err := hex.DecodeString(publisher)
if err != nil {
return nil, ErrPublisher
}
pubKey, err := crypto.DecompressPubkey(publisherBytes)
if err != nil {
return nil, ErrPubKey
}
addr := crypto.PubkeyToAddress(*pubKey)
Copy link
Contributor Author

@mortelli mortelli May 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not entirely sure these are the steps to go from an (assumed) byte string as the publisher, to the actual User address for the feed.

as clarification: this code was adapted from api/act.go:245

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good but this should defo be done only once instead of publisher, the derived ether address should be put in the context value

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, registered in #2181


// get feed topic from trojan recovery topic
topic, err := feed.NewTopic(trojan.RecoveryTopicText, nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trojan pkg should not know anything about recovery

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolved here

if err != nil {
return nil, err
}

// read feed
fd := feed.Feed{
Topic: topic,
User: addr,
}

query := feed.NewQueryLatest(&fd, lookup.NoClue)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this always lookup.NoClue? does it not store latest. state to be. used as hint?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as of now, we do not have any idea when the last update to the feed happened, so this is why NoClue is used.

but i have added a note to #2181, we can further optimize it this way i think.

thanks

// TODO: do we need this ctx.WithCancel? why?
ctx, cancel := context.WithCancel(context.Background())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think you dont need cancel here but timeout and of course extending the parent context inherited from netstore

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

defer cancel()

// TODO: not exactly sure what to do with the `feed.cacheEntry` value returned here
// TODO: in fact, not even sure if we need to call `Lookup` first before calling `GetContent`
_, err = handler.Lookup(ctx, query)
// feed can still be queried even if there are no updates
if err != nil && err.Error() != "no feed updates found" {
zelig marked this conversation as resolved.
Show resolved Hide resolved
return nil, fmt.Errorf("%s : %s", ErrFeedLookup, err)
}

// TODO: how do we handle time-outs here?
_, content, err := handler.GetContent(&fd)
if err != nil {
return nil, fmt.Errorf("%s : %s", ErrFeedContent, err)
}

// TODO: transform content into actual list of targets
return [][]byte{content}, nil
}
13 changes: 10 additions & 3 deletions prod/prod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
psstest "github.com/ethersphere/swarm/pss/testing"
"github.com/ethersphere/swarm/pss/trojan"
"github.com/ethersphere/swarm/storage"
"github.com/ethersphere/swarm/storage/feed"
)

// TestRecoveryHook tests that NewRecoveryHook has been properly invoked
Expand All @@ -44,11 +45,16 @@ func TestRecoveryHook(t *testing.T) {
hookWasCalled = true
return nil, nil
}
testHandler := &feed.DummyHandler{}

// setup recovery hook with testHook
recoverFunc := NewRecoveryHook(testHook)
recoverFunc := NewRecoveryHook(testHook, testHandler)

recoverFunc(ctx, chunk.ZeroAddr)
testChunk := "aacca8d446af47ebcab582ca2188fa73dfa871eb0a35eda798f47d4f91a575e9"
testPublisher := "0226f213613e843a413ad35b40f193910d26eb35f00154afcde9ded57479a6224a"
if err := recoverFunc(ctx, chunk.Address([]byte(testChunk)), testPublisher); err != nil {
t.Fatal(err)
}

// verify the hook has been called correctly
if hookWasCalled != true {
Expand Down Expand Up @@ -85,8 +91,9 @@ func TestSenderCall(t *testing.T) {
hookWasCalled = true
return nil, nil
}
testHandler := &feed.DummyHandler{}

recoverFunc := NewRecoveryHook(testHook)
recoverFunc := NewRecoveryHook(testHook, testHandler)
netStore.WithRecoveryCallback(recoverFunc)

c := ctest.GenerateTestRandomChunk()
Expand Down
6 changes: 2 additions & 4 deletions pss/pss_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,9 @@ func TestTrojanChunkRetrieval(t *testing.T) {
{89, 19},
{22, 129}}
payload := []byte("RECOVERY CHUNK")
topic := trojan.NewTopic("RECOVERY")

// call Send to store trojan chunk in localstore
if _, err = pss.Send(ctx, targets, topic, payload); err != nil {
if _, err = pss.Send(ctx, targets, trojan.RecoveryTopic, payload); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -119,14 +118,13 @@ func TestPssMonitor(t *testing.T) {
{89, 19},
{22, 129}}
payload := []byte("RECOVERY CHUNK")
topic := trojan.NewTopic("RECOVERY")

var monitor *Monitor

pss := NewPss(localStore, tags)

// call Send to store trojan chunk in localstore
if monitor, err = pss.Send(ctx, targets, topic, payload); err != nil {
if monitor, err = pss.Send(ctx, targets, trojan.RecoveryTopic, payload); err != nil {
t.Fatal(err)
}

Expand Down
6 changes: 6 additions & 0 deletions pss/trojan/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ const MaxPayloadSize = 4030

var hashFunc = storage.MakeHashFunc(storage.BMTHash)

// RecoveryTopicText is the string used to construct the RecoveryTopic var
const RecoveryTopicText = "RECOVERY"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need a separate string here as a const in order to build a feed.Topic var in the getPinners function (which is not of the trojan.Topic type)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should not be here. trojan pkg not supposed to know about recovery

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, moved this to prod


// RecoveryTopic is the topic used for repairing globally pinned chunks
var RecoveryTopic = NewTopic(RecoveryTopicText)

// ErrPayloadTooBig is returned when a given payload for a Message type is longer than the maximum amount allowed
var ErrPayloadTooBig = fmt.Errorf("message payload size cannot be greater than %d bytes", MaxPayloadSize)

Expand Down
7 changes: 7 additions & 0 deletions storage/feed/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ import (
"github.com/ethersphere/swarm/storage/feed/lookup"
)

// GenericHandler is an interface which specifies funcs any feeds handler should use
type GenericHandler interface {
Lookup(ctx context.Context, query *Query) (*cacheEntry, error)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the signature as originally defined in the feeds package.

it seems strange to me that the cacheEntry type is not exported, since it is returned in the Lookup function, which is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point

GetContent(feed *Feed) (storage.Address, []byte, error)
}

// Handler is the struct to be used as the API for feeds
type Handler struct {
chunkStore *storage.NetStore
HashSize int
Expand Down
45 changes: 40 additions & 5 deletions storage/feed/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package feed

import (
"bytes"
"context"
"errors"
"path/filepath"
Expand All @@ -32,15 +33,12 @@ const (
testDbDirName = "feeds"
)

// TestHandler is a stuct to be used for testing purposes
type TestHandler struct {
*Handler
}

func (t *TestHandler) Close() {
t.chunkStore.Close()
}

// NewTestHandler creates Handler object to be used for testing purposes.
// NewTestHandler creates a new TestHandler
func NewTestHandler(datadir string, params *HandlerParams) (*TestHandler, error) {
path := filepath.Join(datadir, testDbDirName)
fh := NewHandler(params)
Expand All @@ -60,6 +58,7 @@ func NewTestHandler(datadir string, params *HandlerParams) (*TestHandler, error)
return &TestHandler{fh}, nil
}

// NewTestHandlerWithStore creates a new TestHandler with a set chunk store
func NewTestHandlerWithStore(datadir string, db chunk.Store, params *HandlerParams) (*TestHandler, error) {
fh := NewHandler(params)
return newTestHandlerWithStore(fh, datadir, db, params)
Expand All @@ -75,3 +74,39 @@ func newTestHandlerWithStore(fh *Handler, datadir string, db chunk.Store, params
fh.SetStore(netStore)
return &TestHandler{fh}, nil
}

// Close closes the chunk store field for the test handler receiver
func (t *TestHandler) Close() {
t.chunkStore.Close()
}

// DummyHandler is a test handler with dummy Lookup and GetContent funcs
type DummyHandler struct {
}

// Lookup is the dummy func for DummyHandler structs
func (d *DummyHandler) Lookup(ctx context.Context, query *Query) (*cacheEntry, error) {
return newDummyCacheEntry(), nil
}

// GetContent is the dummy func for DummyHandler structs
func (d *DummyHandler) GetContent(feed *Feed) (storage.Address, []byte, error) {
cacheEntry := newDummyCacheEntry()
return cacheEntry.lastKey, cacheEntry.data, nil
}

// newDummyCacheEntry returns a dummy cacheEntry pointer to be used by dummy handlers
func newDummyCacheEntry() *cacheEntry {
topic := Topic{0x1} // dummy topic
key := chunk.Address([]byte{2}) // dummy byte
data := []byte{3} // dummy data

request := NewFirstRequest(topic)
entry := &cacheEntry{}
entry.lastKey = key
entry.Update = request.Update

entry.Reader = bytes.NewReader(data)

return entry
}
7 changes: 4 additions & 3 deletions storage/netstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ type NetStore struct {
requestGroup singleflight.Group
RemoteGet RemoteGetFunc
logger log.Logger
recoveryCallback func(ctx context.Context, chunkAddress chunk.Address) error // this is the function callback when a chunk failed to be retrieved
recoveryCallback func(ctx context.Context, chunkAddress chunk.Address, publisher string) error // this is the callback to be executed when a chunk fails to be retrieved
Copy link
Contributor Author

@mortelli mortelli May 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

publisher may not be a param here, but rather extracted from ctx

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it makes sense to extract from context within the recovery function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed this param, expecting code to be pushed to extract this data from ctx

}

// NewNetStore creates a new NetStore using the provided chunk.Store and localID of the node.
Expand Down Expand Up @@ -167,7 +167,7 @@ func (n *NetStore) Close() error {
}

// WithRecoveryCallback allows injecting a callback func on the NetStore struct
func (n *NetStore) WithRecoveryCallback(f func(ctx context.Context, chunkAddress chunk.Address) error) *NetStore {
func (n *NetStore) WithRecoveryCallback(f func(ctx context.Context, chunkAddress chunk.Address, publisher string) error) *NetStore {
Copy link
Contributor Author

@mortelli mortelli May 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolution here

n.recoveryCallback = f
return n
}
Expand Down Expand Up @@ -201,7 +201,8 @@ func (n *NetStore) Get(ctx context.Context, mode chunk.ModeGet, req *Request) (c
ch, err = n.RemoteFetch(ctx, req, fi)
if err != nil {
if n.recoveryCallback != nil {
n.recoveryCallback(ctx, ref)
// TODO: get actual publisher, dummy for now
n.recoveryCallback(ctx, ref, "0226f213613e843a413ad35b40f193910d26eb35f00154afcde9ded57479a6224a")
time.Sleep(500 * time.Millisecond) // TODO: view what the ideal timeout is
return n.RemoteFetch(ctx, req, fi)
}
Expand Down
4 changes: 2 additions & 2 deletions swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,8 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e
chAddr := m.Payload
lstore.Set(context.Background(), chunk.ModeSetReUpload, chAddr)
}
self.pss.Register(trojan.NewTopic("RECOVERY"), repairFunc)
recoverFunc := prod.NewRecoveryHook(self.pss.Send)
self.pss.Register(trojan.RecoveryTopic, repairFunc)
recoverFunc := prod.NewRecoveryHook(self.pss.Send, feedsHandler)
self.netStore.WithRecoveryCallback(recoverFunc)
}

Expand Down