Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

api, cmd, prod, pss, swarm: get publisher and query feed for chunk repair request #2175

Merged
merged 49 commits into from
May 12, 2020
Merged
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
8c4e6a7
Merge remote-tracking branch 'origin/master' into global-pinning-feeds
mortelli May 5, 2020
7a213dc
Merge remote-tracking branch 'origin/global-pinning' into global-pinn…
mortelli May 5, 2020
7132166
prod: iterate getPinners func
mortelli May 5, 2020
f25b463
storage, prod: add publisher param to recovery hook
mortelli May 5, 2020
64741c4
prod: update TestRecoveryHook for new hook
mortelli May 5, 2020
7b6ef91
prod: iterate handler use in recovery hook construction and invocation
mortelli May 5, 2020
ce6e347
prod: iterate handler use in recovery hook construction and invocation
mortelli May 5, 2020
ed7e691
feed: add FeedsHandler interface, DummyHandler struct, DummyHandler.L…
mortelli May 6, 2020
934ac93
feed, prod: rename FeedsHandler interface to GenericHandler, move and…
mortelli May 6, 2020
81954b3
chunk, storage, prod: refine comments
mortelli May 6, 2020
0842aff
prod, pss, trojan: extract RecoveryTopic var
mortelli May 6, 2020
2c55d03
swarm: use RecoveryTopic var
mortelli May 6, 2020
78c188b
prod: add feed error vars
mortelli May 6, 2020
467473c
prod: fix topic for feed in getPinners func
mortelli May 6, 2020
3608343
prod: iterate tests
mortelli May 6, 2020
9747df5
prod: refine error handling in getPinners
mortelli May 6, 2020
7e1b979
prod: update error var comments
mortelli May 6, 2020
52dac5b
prod: iterate comments in getPinners func
mortelli May 6, 2020
64de461
feed: small refactor to newDummyCacheEntry func
mortelli May 6, 2020
be58121
feed: fix comment in newDummyCacheEntry func
mortelli May 6, 2020
75d5d64
prod: iterate getPinners func
mortelli May 6, 2020
4c3fc93
trojan: add Target alias for [][]byte and apply it to functions and t…
mortelli May 7, 2020
b893bc3
pss: use trojan.Targets type in func params
mortelli May 7, 2020
652bc35
prod: use trojan.Targets type in func params, add json unmarshalling …
mortelli May 7, 2020
2a2b9df
prod: add ErrTargets var
mortelli May 7, 2020
9a659f9
pss: update tests to use trojan.Targets type
mortelli May 7, 2020
948a3ae
feed: add cacheEntry field to DummyHandler struct, add NewDummyHandle…
mortelli May 7, 2020
ebe8b6a
prod: add newTestRecoveryFeedHandler helper func to create mock feed …
mortelli May 7, 2020
768c063
pss, trojan: fix typos
mortelli May 7, 2020
699a28f
prod: add comment in getPinners func
mortelli May 7, 2020
16ea955
prod: minor refactor to getPinners func
mortelli May 7, 2020
d4e6e0b
pss: minor refactor to pss tests
mortelli May 7, 2020
8feeed5
feeds: fix comment in TestHandler
mortelli May 7, 2020
bdb8c02
feeds: fix comment in NewTestHandler func
mortelli May 7, 2020
0a2fca2
feed: update DummyHandler comments
mortelli May 7, 2020
faf7158
feed: small comment update
mortelli May 7, 2020
3019ebc
Merge remote-tracking branch 'origin/global-pinning' into global-pinn…
mortelli May 7, 2020
b67a103
prod, pss: update TODOs
mortelli May 7, 2020
97dc5cd
storage, prod: remove publisher param from store hook
mortelli May 8, 2020
0250a5f
swarm, prod, trojan: move recovery topic vars to prod package
mortelli May 8, 2020
6195cf7
api, storage: Publisher added to manifest, remove hardcoded value use…
santicomp2014 May 9, 2020
931b76e
pss: remove unnecessary dependency to prod package on pss test
mortelli May 11, 2020
272df48
api,api/client,api/http,cmd/swarm,cmd/swarm-smoke,prod,storage: added…
santicomp2014 May 11, 2020
7fc3df5
api: remove test debug logs
santicomp2014 May 11, 2020
a28f1ff
api,prod: removed unnecessary publisher default value
santicomp2014 May 11, 2020
e6f0099
Merge remote-tracking branch 'origin/global-pinning' into global-pinn…
mortelli May 12, 2020
470ee96
api, api/http: fixed len check for publisher and also init value
santicomp2014 May 12, 2020
5c2f9f0
api,prod: removed ctx and clear, moved todo for fallback flag to prod
santicomp2014 May 12, 2020
61c33d9
prod: added timeout to ctx in getPinners
santicomp2014 May 12, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,10 @@ func (a *API) Get(ctx context.Context, decrypt DecryptFunc, manifestAddr storage
}
mimeType = entry.ContentType
log.Debug("content lookup key", "key", contentAddr, "mimetype", mimeType)
// TODO: obtain Publisher from cli flag if no Publisher found in Manifest
mortelli marked this conversation as resolved.
Show resolved Hide resolved
if len(entry.Publisher) != 0 {
mortelli marked this conversation as resolved.
Show resolved Hide resolved
ctx = context.WithValue(ctx, "publisher", entry.Publisher)
}
reader, _ = a.fileStore.Retrieve(ctx, contentAddr)
} else {
// no entry found
Expand Down Expand Up @@ -629,10 +633,12 @@ func (a *API) Modify(ctx context.Context, addr storage.Address, path, contentHas
apiModifyFail.Inc(1)
return nil, err
}
publisher, _ := ctx.Value("publisher").(string)
if contentHash != "" {
entry := newManifestTrieEntry(&ManifestEntry{
Path: path,
ContentType: contentType,
Publisher: publisher,
}, nil)
entry.Hash = contentHash
trie.addEntry(entry, quitC)
Expand Down Expand Up @@ -667,12 +673,14 @@ func (a *API) AddFile(ctx context.Context, mhash, path, fname string, content []
path = path[1:]
}

publisher, _ := ctx.Value("publisher").(string)
entry := &ManifestEntry{
Path: filepath.Join(path, fname),
ContentType: mime.TypeByExtension(filepath.Ext(fname)),
Mode: 0700,
Size: int64(len(content)),
ModTime: time.Now(),
Publisher: publisher,
}

mw, err := a.NewManifestWriter(ctx, mkey, nil)
Expand All @@ -697,7 +705,7 @@ func (a *API) AddFile(ctx context.Context, mhash, path, fname string, content []
return fkey, newMkey.String(), nil
}

func (a *API) UploadTar(ctx context.Context, bodyReader io.ReadCloser, manifestPath, defaultPath string, mw *ManifestWriter) (storage.Address, error) {
func (a *API) UploadTar(ctx context.Context, bodyReader io.ReadCloser, manifestPath, defaultPath string, mw *ManifestWriter, publisher string) (storage.Address, error) {
apiUploadTarCount.Inc(1)
var contentKey storage.Address
tr := tar.NewReader(bodyReader)
Expand Down Expand Up @@ -730,6 +738,7 @@ func (a *API) UploadTar(ctx context.Context, bodyReader io.ReadCloser, manifestP
Mode: hdr.Mode,
Size: hdr.Size,
ModTime: hdr.ModTime,
Publisher: publisher,
}
contentKey, err = mw.AddEntry(ctx, tr, entry)
if err != nil {
Expand All @@ -741,14 +750,14 @@ func (a *API) UploadTar(ctx context.Context, bodyReader io.ReadCloser, manifestP
if contentType == "" {
contentType = mime.TypeByExtension(filepath.Ext(hdr.Name))
}

entry := &ManifestEntry{
Hash: contentKey.Hex(),
Path: "", // default entry
ContentType: contentType,
Mode: hdr.Mode,
Size: hdr.Size,
ModTime: hdr.ModTime,
Publisher: publisher,
}
contentKey, err = mw.AddEntry(ctx, nil, entry)
if err != nil {
Expand Down Expand Up @@ -863,12 +872,14 @@ func (a *API) AppendFile(ctx context.Context, mhash, path, fname string, existin
return nil, "", err
}

publisher, _ := ctx.Value("publisher").(string)
entry := &ManifestEntry{
Path: filepath.Join(path, fname),
ContentType: mime.TypeByExtension(filepath.Ext(fname)),
Mode: 0700,
Size: totalSize,
ModTime: time.Now(),
Publisher: publisher,
}

fkey, err := mw.AddEntry(ctx, io.Reader(combinedReader), entry)
Expand Down
11 changes: 6 additions & 5 deletions api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,11 @@ func Open(path string) (*File, error) {
// (if the manifest argument is non-empty) or creates a new manifest containing
// the file, returning the resulting manifest hash (the file will then be
// available at bzz:/<hash>/<path>)
func (c *Client) Upload(file *File, manifest string, toEncrypt, toPin, anonymous bool) (string, error) {
func (c *Client) Upload(file *File, manifest string, toEncrypt, toPin, anonymous bool, publisher string) (string, error) {
if file.Size <= 0 {
return "", errors.New("file size must be greater than zero")
}
return c.TarUpload(manifest, &FileUploader{file}, "", toEncrypt, toPin, anonymous)
return c.TarUpload(manifest, &FileUploader{file}, "", toEncrypt, toPin, anonymous, publisher)
}

// Download downloads a file with the given path from the swarm manifest with
Expand Down Expand Up @@ -197,7 +197,7 @@ func (c *Client) Download(hash, path string) (*File, error) {
// directory will then be available at bzz:/<hash>/path/to/file), with
// the file specified in defaultPath being uploaded to the root of the manifest
// (i.e. bzz:/<hash>/)
func (c *Client) UploadDirectory(dir, defaultPath, manifest string, toEncrypt, toPin, anonymous bool) (string, error) {
func (c *Client) UploadDirectory(dir, defaultPath, manifest string, toEncrypt, toPin, anonymous bool, publisher string) (string, error) {
stat, err := os.Stat(dir)
if err != nil {
return "", err
Expand All @@ -212,7 +212,7 @@ func (c *Client) UploadDirectory(dir, defaultPath, manifest string, toEncrypt, t
return "", fmt.Errorf("default path: %v", err)
}
}
return c.TarUpload(manifest, &DirectoryUploader{dir}, defaultPath, toEncrypt, toPin, anonymous)
return c.TarUpload(manifest, &DirectoryUploader{dir}, defaultPath, toEncrypt, toPin, anonymous, publisher)
}

// DownloadDirectory downloads the files contained in a swarm manifest under
Expand Down Expand Up @@ -511,7 +511,7 @@ type UploadFn func(file *File) error

// TarUpload uses the given Uploader to upload files to swarm as a tar stream,
// returning the resulting manifest hash
func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, toEncrypt, toPin, anonymous bool) (string, error) {
func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, toEncrypt, toPin, anonymous bool, publisher string) (string, error) {
ctx, sp := spancontext.StartSpan(context.Background(), "api.client.tarupload")
defer sp.Finish()

Expand Down Expand Up @@ -539,6 +539,7 @@ func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, t
transport := http.DefaultTransport

req.Header.Set("Content-Type", "application/x-tar")
req.Header.Set("publisher", publisher)
if defaultPath != "" {
q := req.URL.Query()
q.Set("defaultpath", defaultPath)
Expand Down
8 changes: 4 additions & 4 deletions api/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func testClientUploadDownloadFiles(toEncrypt bool, t *testing.T) string {
Size: int64(len(data)),
},
}
hash, err := client.Upload(file, manifest, toEncrypt, toPin, true)
hash, err := client.Upload(file, manifest, toEncrypt, toPin, true, "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -221,7 +221,7 @@ func TestClientUploadDownloadDirectory(t *testing.T) {
// upload the directory
client := NewClient(srv.URL)
defaultPath := testDirFiles[0]
hash, err := client.UploadDirectory(dir, defaultPath, "", false, false, true)
hash, err := client.UploadDirectory(dir, defaultPath, "", false, false, true, "")
if err != nil {
t.Fatalf("error uploading directory: %s", err)
}
Expand Down Expand Up @@ -289,7 +289,7 @@ func testClientFileList(toEncrypt bool, t *testing.T) {
defer os.RemoveAll(dir)

client := NewClient(srv.URL)
hash, err := client.UploadDirectory(dir, "", "", toEncrypt, false, true)
hash, err := client.UploadDirectory(dir, "", "", toEncrypt, false, true, "")
if err != nil {
t.Fatalf("error uploading directory: %s", err)
}
Expand Down Expand Up @@ -475,7 +475,7 @@ func TestClientBzzWithFeed(t *testing.T) {
}

// upload data to bzz:// and retrieve the content-addressed manifest hash, hex-encoded.
manifestAddressHex, err := swarmClient.Upload(f, "", false, false, true)
manifestAddressHex, err := swarmClient.Upload(f, "", false, false, true, "")
if err != nil {
t.Fatalf("Error creating manifest: %s", err)
}
Expand Down
6 changes: 5 additions & 1 deletion api/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,8 +468,12 @@ func (s *Server) handleTarUpload(r *http.Request, mw *api.ManifestWriter) (stora
log.Debug("handle.tar.upload", "ruid", GetRUID(r.Context()), "tag", sctx.GetTag(r.Context()))

defaultPath := r.URL.Query().Get("defaultpath")
publisher := ""
if len(r.Header["Publisher"]) != 0 {
publisher = r.Header["Publisher"][0]
}
mortelli marked this conversation as resolved.
Show resolved Hide resolved

key, err := s.api.UploadTar(r.Context(), r.Body, GetURI(r.Context()).Path, defaultPath, mw)
key, err := s.api.UploadTar(r.Context(), r.Body, GetURI(r.Context()).Path, defaultPath, mw, publisher)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions api/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type ManifestEntry struct {
Status int `json:"status,omitempty"`
Access *AccessEntry `json:"access,omitempty"`
Feed *feed.Feed `json:"feed,omitempty"`
Publisher string `json:"publisher,omitempty"`
}

// ManifestList represents the result of listing files in a manifest
Expand Down
2 changes: 1 addition & 1 deletion chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ type Validator interface {
// with validators check.
type ValidatorStore struct {
Store
deliverCallback func(Chunk) // callback hook used to validate chunk
deliverCallback func(Chunk) // callback func to be invoked for validated chunks
validators []Validator
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/swarm-smoke/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func uploadWithTag(data []byte, endpoint string, tag string) (string, error) {
Tag: tag,
}

return swarm.TarUpload("", &client.FileUploader{File: f}, "", false, false, true)
return swarm.TarUpload("", &client.FileUploader{File: f}, "", false, false, true, "")
}

func digest(r io.Reader) ([]byte, error) {
Expand Down
4 changes: 4 additions & 0 deletions cmd/swarm/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ var (
Name: "stdin",
Usage: "reads data to be uploaded from stdin",
}
SwarmUploadPublisher = cli.StringFlag{
Name: "publisher",
Usage: "Manually specify Publisher",
}
mortelli marked this conversation as resolved.
Show resolved Hide resolved
SwarmUploadMimeType = cli.StringFlag{
Name: "mime",
Usage: "Manually specify MIME type",
Expand Down
7 changes: 4 additions & 3 deletions cmd/swarm/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ var (
Name: "up",
Usage: "uploads a file or directory to swarm using the HTTP API",
ArgsUsage: "<file>",
Flags: []cli.Flag{SwarmEncryptedFlag, SwarmPinFlag, SwarmProgressFlag, SwarmVerboseFlag},
Flags: []cli.Flag{SwarmEncryptedFlag, SwarmPinFlag, SwarmProgressFlag, SwarmVerboseFlag, SwarmUploadPublisher},
Description: "uploads a file or directory to swarm using the HTTP API and prints the root hash",
}

Expand All @@ -73,6 +73,7 @@ func upload(ctx *cli.Context) {
defaultPath = ctx.GlobalString(SwarmUploadDefaultPath.Name)
fromStdin = ctx.GlobalBool(SwarmUpFromStdinFlag.Name)
mimeType = ctx.GlobalString(SwarmUploadMimeType.Name)
publisher = ctx.String(SwarmUploadPublisher.Name)
verbose = ctx.Bool(SwarmVerboseFlag.Name)
client = swarm.NewClient(bzzapi)
toEncrypt = ctx.Bool(SwarmEncryptedFlag.Name)
Expand Down Expand Up @@ -160,7 +161,7 @@ func upload(ctx *cli.Context) {
defaultPath = strings.TrimPrefix(absDefaultPath, absFile)
}
}
return client.UploadDirectory(file, defaultPath, "", toEncrypt, toPin, anon)
return client.UploadDirectory(file, defaultPath, "", toEncrypt, toPin, anon, publisher)
}
} else {
doUpload = func() (string, error) {
Expand All @@ -172,7 +173,7 @@ func upload(ctx *cli.Context) {
if mimeType != "" {
f.ContentType = mimeType
}
return client.Upload(f, "", toEncrypt, toPin, anon)
return client.Upload(f, "", toEncrypt, toPin, anon, publisher)
}
}
start := time.Now()
Expand Down
100 changes: 82 additions & 18 deletions prod/prod.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,44 +18,108 @@ package prod

import (
"context"
"encoding/hex"
"encoding/json"
"errors"

"github.com/ethereum/go-ethereum/crypto"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/pss"
"github.com/ethersphere/swarm/pss/trojan"
"github.com/ethersphere/swarm/storage/feed"
"github.com/ethersphere/swarm/storage/feed/lookup"
)

// RecoveryHook defines code to be executed upon trigger of failed to be retrieved chunks
// RecoveryTopicText is the string used to construct the recovery topics
const RecoveryTopicText = "RECOVERY"

// RecoveryTopic is the topic used for repairing globally pinned chunks
var RecoveryTopic = trojan.NewTopic("RECOVERY")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here use RecoveryTopicText

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoops, this was left behind when i split the code into 2 more PRs.

fixed in the main branch, thanks


// ErrPublisher is returned when the publisher string cannot be decoded into bytes
var ErrPublisher = errors.New("failed to decode publisher")

// ErrPubKey is returned when the publisher bytes cannot be decompressed as a public key
var ErrPubKey = errors.New("failed to decompress public key")

// ErrFeedLookup is returned when the recovery feed cannot be successefully looked up
var ErrFeedLookup = errors.New("failed to look up recovery feed")

// ErrFeedContent is returned when there is a failure to retrieve content from the recovery feed
var ErrFeedContent = errors.New("failed to get content for recovery feed")

// ErrTargets is returned when there is a failure to unmarshal the feed content as a trojan.Targets variable
var ErrTargets = errors.New("failed to unmarshal targets in recovery feed content")

// RecoveryHook defines code to be executed upon failing to retrieve pinned chunks
type RecoveryHook func(ctx context.Context, chunkAddress chunk.Address) error

// sender is the function call for sending trojan chunks
type sender func(ctx context.Context, targets [][]byte, topic trojan.Topic, payload []byte) (*pss.Monitor, error)
// sender is the function type for sending trojan chunks
type sender func(ctx context.Context, targets trojan.Targets, topic trojan.Topic, payload []byte) (*pss.Monitor, error)

// NewRecoveryHook returns a new RecoveryHook with the sender function defined
func NewRecoveryHook(send sender) RecoveryHook {
func NewRecoveryHook(send sender, handler feed.GenericHandler) RecoveryHook {
return func(ctx context.Context, chunkAddress chunk.Address) error {
targets, err := getPinners(chunkAddress)
targets, err := getPinners(ctx, handler)
zelig marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
payload := chunkAddress
topic := trojan.NewTopic("RECOVERY")

// TODO: monitor return should
if _, err := send(ctx, targets, topic, payload); err != nil {
// TODO: returned monitor should be made use of
zelig marked this conversation as resolved.
Show resolved Hide resolved
if _, err := send(ctx, targets, RecoveryTopic, payload); err != nil {
return err
}
return nil
}
}

// TODO: refactor this method to implement feed of target pinners
// getPinners returns the specific target pinners for a corresponding chunk address
func getPinners(chunkAddress chunk.Address) ([][]byte, error) {
//this should get the feed and return correct target of pinners
return [][]byte{
{57, 120},
{209, 156},
{156, 38},
{89, 19},
{22, 129}}, nil
// getPinners returns the specific target pinners for a corresponding chunk
func getPinners(ctx context.Context, handler feed.GenericHandler) (trojan.Targets, error) {
// get feed user from publisher
publisher, _ := ctx.Value("publisher").(string)
zelig marked this conversation as resolved.
Show resolved Hide resolved
publisherBytes, err := hex.DecodeString(publisher)
if err != nil {
return nil, ErrPublisher
}
pubKey, err := crypto.DecompressPubkey(publisherBytes)
if err != nil {
return nil, ErrPubKey
}
addr := crypto.PubkeyToAddress(*pubKey)

// get feed topic from trojan recovery topic
topic, err := feed.NewTopic(RecoveryTopicText, nil)
if err != nil {
return nil, err
}

// read feed
fd := feed.Feed{
Topic: topic,
User: addr,
}
query := feed.NewQueryLatest(&fd, lookup.NoClue)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this always lookup.NoClue? does it not store latest. state to be. used as hint?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as of now, we do not have any idea when the last update to the feed happened, so this is why NoClue is used.

but i have added a note to #2181, we can further optimize it this way i think.

thanks

ctx, cancel := context.WithCancel(context.Background())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think you dont need cancel here but timeout and of course extending the parent context inherited from netstore

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

defer cancel()

_, err = handler.Lookup(ctx, query)
// feed should still be queried even if there are no updates
if err != nil && err.Error() != "no feed updates found" {
zelig marked this conversation as resolved.
Show resolved Hide resolved
return nil, ErrFeedLookup
}

// TODO: how do we handle time-outs here?
_, content, err := handler.GetContent(&fd)
if err != nil {
return nil, ErrFeedContent
}

// extract targets from feed content
targets := new(trojan.Targets)
if err := json.Unmarshal(content, targets); err != nil {
return nil, ErrTargets
}

return *targets, nil
}
Loading