Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Commit

Permalink
api, cmd, prod, pss, swarm: get publisher and query feed for chunk re…
Browse files Browse the repository at this point in the history
…pair request (#2175)

* prod: iterate getPinners func

* storage, prod: add publisher param to recovery hook

* prod: update TestRecoveryHook for new hook

* prod: iterate handler use in recovery hook construction and invocation

* prod: iterate handler use in recovery hook construction and invocation

* feed: add FeedsHandler interface, DummyHandler struct, DummyHandler.Lookup and DummyHandler.GetContent funcs
prod: change NewRecoveryHook and getPinners funcs handler params to FeedsHandler type

* feed, prod: rename FeedsHandler interface to GenericHandler, move and comment feed package funcs

* chunk, storage, prod: refine comments

* prod, pss, trojan: extract RecoveryTopic var

* swarm: use RecoveryTopic var

* prod: add feed error vars

* prod: fix topic for feed in getPinners func

* prod: iterate tests

* prod: refine error handling in getPinners

* prod: update error var comments

* prod: iterate comments in getPinners func

* feed: small refactor to newDummyCacheEntry func

* feed: fix comment in newDummyCacheEntry func

* prod: iterate getPinners func

* trojan: add Target alias for [][]byte and apply it to functions and tests

* pss: use trojan.Targets type in func params

* prod: use trojan.Targets type in func params, add json unmarshalling for fetched content feed in getPinners

* prod: add ErrTargets var

* pss: update tests to use trojan.Targets type

* feed: add cacheEntry field to DummyHandler struct, add NewDummyHandler and SetContent funcs to DummyHandler, remove newDummyCacheEntry func

* prod: add newTestRecoveryFeedHandler helper func to create mock feed handlers which retrieve content that can be unmarshalled as a trojan.Targets type

* pss, trojan: fix typos

* prod: add comment in getPinners func

* prod: minor refactor to getPinners func

* pss: minor refactor to pss tests

* feeds: fix comment in TestHandler

* feeds: fix comment in NewTestHandler func

* feed: update DummyHandler comments

* feed: small comment update

* prod, pss: update TODOs

* storage, prod: remove publisher param from store hook

* swarm, prod, trojan: move recovery topic vars to prod package

* api, storage: Publisher added to manifest, remove hardcoded value used for testing

* pss: remove unnecessary dependency to prod package on pss test

* api,api/client,api/http,cmd/swarm,cmd/swarm-smoke,prod,storage: added publish flag for swarm up command, added header publisher for uploadTar, replace prod with context publisher, updated tests to reflect these changes

* api: remove test debug logs

* api,prod: removed unnecessary publisher default value

* api, api/http: fixed len check for publisher and also init value

* api,prod: removed ctx and clear, moved todo for fallback flag to prod

* prod: added timeout to ctx in getPinners

Co-authored-by: santicomp2014 <[email protected]>
  • Loading branch information
mortelli and santicomp2014 authored May 12, 2020
1 parent 9228d01 commit d91b839
Show file tree
Hide file tree
Showing 15 changed files with 156 additions and 43 deletions.
14 changes: 12 additions & 2 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,9 @@ func (a *API) Get(ctx context.Context, decrypt DecryptFunc, manifestAddr storage
}
mimeType = entry.ContentType
log.Debug("content lookup key", "key", contentAddr, "mimetype", mimeType)
if len(entry.Publisher) > 0 {
ctx = context.WithValue(ctx, "publisher", entry.Publisher)
}
reader, _ = a.fileStore.Retrieve(ctx, contentAddr)
} else {
// no entry found
Expand Down Expand Up @@ -629,10 +632,12 @@ func (a *API) Modify(ctx context.Context, addr storage.Address, path, contentHas
apiModifyFail.Inc(1)
return nil, err
}
publisher, _ := ctx.Value("publisher").(string)
if contentHash != "" {
entry := newManifestTrieEntry(&ManifestEntry{
Path: path,
ContentType: contentType,
Publisher: publisher,
}, nil)
entry.Hash = contentHash
trie.addEntry(entry, quitC)
Expand Down Expand Up @@ -667,12 +672,14 @@ func (a *API) AddFile(ctx context.Context, mhash, path, fname string, content []
path = path[1:]
}

publisher, _ := ctx.Value("publisher").(string)
entry := &ManifestEntry{
Path: filepath.Join(path, fname),
ContentType: mime.TypeByExtension(filepath.Ext(fname)),
Mode: 0700,
Size: int64(len(content)),
ModTime: time.Now(),
Publisher: publisher,
}

mw, err := a.NewManifestWriter(ctx, mkey, nil)
Expand All @@ -697,7 +704,7 @@ func (a *API) AddFile(ctx context.Context, mhash, path, fname string, content []
return fkey, newMkey.String(), nil
}

func (a *API) UploadTar(ctx context.Context, bodyReader io.ReadCloser, manifestPath, defaultPath string, mw *ManifestWriter) (storage.Address, error) {
func (a *API) UploadTar(ctx context.Context, bodyReader io.ReadCloser, manifestPath, defaultPath string, mw *ManifestWriter, publisher string) (storage.Address, error) {
apiUploadTarCount.Inc(1)
var contentKey storage.Address
tr := tar.NewReader(bodyReader)
Expand Down Expand Up @@ -730,6 +737,7 @@ func (a *API) UploadTar(ctx context.Context, bodyReader io.ReadCloser, manifestP
Mode: hdr.Mode,
Size: hdr.Size,
ModTime: hdr.ModTime,
Publisher: publisher,
}
contentKey, err = mw.AddEntry(ctx, tr, entry)
if err != nil {
Expand All @@ -741,14 +749,14 @@ func (a *API) UploadTar(ctx context.Context, bodyReader io.ReadCloser, manifestP
if contentType == "" {
contentType = mime.TypeByExtension(filepath.Ext(hdr.Name))
}

entry := &ManifestEntry{
Hash: contentKey.Hex(),
Path: "", // default entry
ContentType: contentType,
Mode: hdr.Mode,
Size: hdr.Size,
ModTime: hdr.ModTime,
Publisher: publisher,
}
contentKey, err = mw.AddEntry(ctx, nil, entry)
if err != nil {
Expand Down Expand Up @@ -863,12 +871,14 @@ func (a *API) AppendFile(ctx context.Context, mhash, path, fname string, existin
return nil, "", err
}

publisher, _ := ctx.Value("publisher").(string)
entry := &ManifestEntry{
Path: filepath.Join(path, fname),
ContentType: mime.TypeByExtension(filepath.Ext(fname)),
Mode: 0700,
Size: totalSize,
ModTime: time.Now(),
Publisher: publisher,
}

fkey, err := mw.AddEntry(ctx, io.Reader(combinedReader), entry)
Expand Down
11 changes: 6 additions & 5 deletions api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,11 @@ func Open(path string) (*File, error) {
// (if the manifest argument is non-empty) or creates a new manifest containing
// the file, returning the resulting manifest hash (the file will then be
// available at bzz:/<hash>/<path>)
func (c *Client) Upload(file *File, manifest string, toEncrypt, toPin, anonymous bool) (string, error) {
func (c *Client) Upload(file *File, manifest string, toEncrypt, toPin, anonymous bool, publisher string) (string, error) {
if file.Size <= 0 {
return "", errors.New("file size must be greater than zero")
}
return c.TarUpload(manifest, &FileUploader{file}, "", toEncrypt, toPin, anonymous)
return c.TarUpload(manifest, &FileUploader{file}, "", toEncrypt, toPin, anonymous, publisher)
}

// Download downloads a file with the given path from the swarm manifest with
Expand Down Expand Up @@ -197,7 +197,7 @@ func (c *Client) Download(hash, path string) (*File, error) {
// directory will then be available at bzz:/<hash>/path/to/file), with
// the file specified in defaultPath being uploaded to the root of the manifest
// (i.e. bzz:/<hash>/)
func (c *Client) UploadDirectory(dir, defaultPath, manifest string, toEncrypt, toPin, anonymous bool) (string, error) {
func (c *Client) UploadDirectory(dir, defaultPath, manifest string, toEncrypt, toPin, anonymous bool, publisher string) (string, error) {
stat, err := os.Stat(dir)
if err != nil {
return "", err
Expand All @@ -212,7 +212,7 @@ func (c *Client) UploadDirectory(dir, defaultPath, manifest string, toEncrypt, t
return "", fmt.Errorf("default path: %v", err)
}
}
return c.TarUpload(manifest, &DirectoryUploader{dir}, defaultPath, toEncrypt, toPin, anonymous)
return c.TarUpload(manifest, &DirectoryUploader{dir}, defaultPath, toEncrypt, toPin, anonymous, publisher)
}

// DownloadDirectory downloads the files contained in a swarm manifest under
Expand Down Expand Up @@ -511,7 +511,7 @@ type UploadFn func(file *File) error

// TarUpload uses the given Uploader to upload files to swarm as a tar stream,
// returning the resulting manifest hash
func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, toEncrypt, toPin, anonymous bool) (string, error) {
func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, toEncrypt, toPin, anonymous bool, publisher string) (string, error) {
ctx, sp := spancontext.StartSpan(context.Background(), "api.client.tarupload")
defer sp.Finish()

Expand Down Expand Up @@ -539,6 +539,7 @@ func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, t
transport := http.DefaultTransport

req.Header.Set("Content-Type", "application/x-tar")
req.Header.Set("publisher", publisher)
if defaultPath != "" {
q := req.URL.Query()
q.Set("defaultpath", defaultPath)
Expand Down
8 changes: 4 additions & 4 deletions api/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func testClientUploadDownloadFiles(toEncrypt bool, t *testing.T) string {
Size: int64(len(data)),
},
}
hash, err := client.Upload(file, manifest, toEncrypt, toPin, true)
hash, err := client.Upload(file, manifest, toEncrypt, toPin, true, "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -221,7 +221,7 @@ func TestClientUploadDownloadDirectory(t *testing.T) {
// upload the directory
client := NewClient(srv.URL)
defaultPath := testDirFiles[0]
hash, err := client.UploadDirectory(dir, defaultPath, "", false, false, true)
hash, err := client.UploadDirectory(dir, defaultPath, "", false, false, true, "")
if err != nil {
t.Fatalf("error uploading directory: %s", err)
}
Expand Down Expand Up @@ -289,7 +289,7 @@ func testClientFileList(toEncrypt bool, t *testing.T) {
defer os.RemoveAll(dir)

client := NewClient(srv.URL)
hash, err := client.UploadDirectory(dir, "", "", toEncrypt, false, true)
hash, err := client.UploadDirectory(dir, "", "", toEncrypt, false, true, "")
if err != nil {
t.Fatalf("error uploading directory: %s", err)
}
Expand Down Expand Up @@ -475,7 +475,7 @@ func TestClientBzzWithFeed(t *testing.T) {
}

// upload data to bzz:// and retrieve the content-addressed manifest hash, hex-encoded.
manifestAddressHex, err := swarmClient.Upload(f, "", false, false, true)
manifestAddressHex, err := swarmClient.Upload(f, "", false, false, true, "")
if err != nil {
t.Fatalf("Error creating manifest: %s", err)
}
Expand Down
6 changes: 5 additions & 1 deletion api/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,8 +468,12 @@ func (s *Server) handleTarUpload(r *http.Request, mw *api.ManifestWriter) (stora
log.Debug("handle.tar.upload", "ruid", GetRUID(r.Context()), "tag", sctx.GetTag(r.Context()))

defaultPath := r.URL.Query().Get("defaultpath")
var publisher string
if len(r.Header["Publisher"]) > 0 {
publisher = r.Header["Publisher"][0]
}

key, err := s.api.UploadTar(r.Context(), r.Body, GetURI(r.Context()).Path, defaultPath, mw)
key, err := s.api.UploadTar(r.Context(), r.Body, GetURI(r.Context()).Path, defaultPath, mw, publisher)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions api/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type ManifestEntry struct {
Status int `json:"status,omitempty"`
Access *AccessEntry `json:"access,omitempty"`
Feed *feed.Feed `json:"feed,omitempty"`
Publisher string `json:"publisher,omitempty"`
}

// ManifestList represents the result of listing files in a manifest
Expand Down
2 changes: 1 addition & 1 deletion chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ type Validator interface {
// with validators check.
type ValidatorStore struct {
Store
deliverCallback func(Chunk) // callback hook used to validate chunk
deliverCallback func(Chunk) // callback func to be invoked for validated chunks
validators []Validator
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/swarm-smoke/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func uploadWithTag(data []byte, endpoint string, tag string) (string, error) {
Tag: tag,
}

return swarm.TarUpload("", &client.FileUploader{File: f}, "", false, false, true)
return swarm.TarUpload("", &client.FileUploader{File: f}, "", false, false, true, "")
}

func digest(r io.Reader) ([]byte, error) {
Expand Down
4 changes: 4 additions & 0 deletions cmd/swarm/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ var (
Name: "stdin",
Usage: "reads data to be uploaded from stdin",
}
SwarmUploadPublisher = cli.StringFlag{
Name: "publisher",
Usage: "Manually specify Publisher",
}
SwarmUploadMimeType = cli.StringFlag{
Name: "mime",
Usage: "Manually specify MIME type",
Expand Down
7 changes: 4 additions & 3 deletions cmd/swarm/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ var (
Name: "up",
Usage: "uploads a file or directory to swarm using the HTTP API",
ArgsUsage: "<file>",
Flags: []cli.Flag{SwarmEncryptedFlag, SwarmPinFlag, SwarmProgressFlag, SwarmVerboseFlag},
Flags: []cli.Flag{SwarmEncryptedFlag, SwarmPinFlag, SwarmProgressFlag, SwarmVerboseFlag, SwarmUploadPublisher},
Description: "uploads a file or directory to swarm using the HTTP API and prints the root hash",
}

Expand All @@ -73,6 +73,7 @@ func upload(ctx *cli.Context) {
defaultPath = ctx.GlobalString(SwarmUploadDefaultPath.Name)
fromStdin = ctx.GlobalBool(SwarmUpFromStdinFlag.Name)
mimeType = ctx.GlobalString(SwarmUploadMimeType.Name)
publisher = ctx.String(SwarmUploadPublisher.Name)
verbose = ctx.Bool(SwarmVerboseFlag.Name)
client = swarm.NewClient(bzzapi)
toEncrypt = ctx.Bool(SwarmEncryptedFlag.Name)
Expand Down Expand Up @@ -160,7 +161,7 @@ func upload(ctx *cli.Context) {
defaultPath = strings.TrimPrefix(absDefaultPath, absFile)
}
}
return client.UploadDirectory(file, defaultPath, "", toEncrypt, toPin, anon)
return client.UploadDirectory(file, defaultPath, "", toEncrypt, toPin, anon, publisher)
}
} else {
doUpload = func() (string, error) {
Expand All @@ -172,7 +173,7 @@ func upload(ctx *cli.Context) {
if mimeType != "" {
f.ContentType = mimeType
}
return client.Upload(f, "", toEncrypt, toPin, anon)
return client.Upload(f, "", toEncrypt, toPin, anon, publisher)
}
}
start := time.Now()
Expand Down
94 changes: 80 additions & 14 deletions prod/prod.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,43 +18,109 @@ package prod

import (
"context"
"encoding/hex"
"encoding/json"
"errors"
"time"

"github.com/ethereum/go-ethereum/crypto"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/pss"
"github.com/ethersphere/swarm/pss/trojan"
"github.com/ethersphere/swarm/storage/feed"
"github.com/ethersphere/swarm/storage/feed/lookup"
)

// RecoveryHook defines code to be executed upon trigger of failed to be retrieved chunks
// RecoveryTopicText is the string used to construct the recovery topics
const RecoveryTopicText = "RECOVERY"

// RecoveryTopic is the topic used for repairing globally pinned chunks
var RecoveryTopic = trojan.NewTopic("RECOVERY")

// ErrPublisher is returned when the publisher string cannot be decoded into bytes
var ErrPublisher = errors.New("failed to decode publisher")

// ErrPubKey is returned when the publisher bytes cannot be decompressed as a public key
var ErrPubKey = errors.New("failed to decompress public key")

// ErrFeedLookup is returned when the recovery feed cannot be successefully looked up
var ErrFeedLookup = errors.New("failed to look up recovery feed")

// ErrFeedContent is returned when there is a failure to retrieve content from the recovery feed
var ErrFeedContent = errors.New("failed to get content for recovery feed")

// ErrTargets is returned when there is a failure to unmarshal the feed content as a trojan.Targets variable
var ErrTargets = errors.New("failed to unmarshal targets in recovery feed content")

// RecoveryHook defines code to be executed upon failing to retrieve pinned chunks
type RecoveryHook func(ctx context.Context, chunkAddress chunk.Address) error

// sender is the function call for sending trojan chunks
type sender func(ctx context.Context, targets trojan.Targets, topic trojan.Topic, payload []byte) (*pss.Monitor, error)

// NewRecoveryHook returns a new RecoveryHook with the sender function defined
func NewRecoveryHook(send sender) RecoveryHook {
func NewRecoveryHook(send sender, handler feed.GenericHandler) RecoveryHook {
return func(ctx context.Context, chunkAddress chunk.Address) error {
targets, err := getPinners(chunkAddress)
targets, err := getPinners(ctx, handler)
if err != nil {
return err
}
payload := chunkAddress
topic := trojan.NewTopic("RECOVERY")

// TODO: monitor return should
if _, err := send(ctx, targets, topic, payload); err != nil {
// TODO: returned monitor should be made use of
if _, err := send(ctx, targets, RecoveryTopic, payload); err != nil {
return err
}
return nil
}
}

// TODO: refactor this method to implement feed of target pinners
// getPinners returns the specific target pinners for a corresponding chunk address
func getPinners(chunkAddress chunk.Address) (trojan.Targets, error) {
// getPinners returns the specific target pinners for a corresponding chunk
func getPinners(ctx context.Context, handler feed.GenericHandler) (trojan.Targets, error) {
// TODO: obtain fallback Publisher from cli flag if no Publisher found in Manifest
// get feed user from publisher
publisher, _ := ctx.Value("publisher").(string)
publisherBytes, err := hex.DecodeString(publisher)
if err != nil {
return nil, ErrPublisher
}
pubKey, err := crypto.DecompressPubkey(publisherBytes)
if err != nil {
return nil, ErrPubKey
}
addr := crypto.PubkeyToAddress(*pubKey)

// get feed topic from trojan recovery topic
topic, err := feed.NewTopic(RecoveryTopicText, nil)
if err != nil {
return nil, err
}

// read feed
fd := feed.Feed{
Topic: topic,
User: addr,
}
query := feed.NewQueryLatest(&fd, lookup.NoClue)
ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel()

_, err = handler.Lookup(ctx, query)
// feed should still be queried even if there are no updates
if err != nil && err.Error() != "no feed updates found" {
return nil, ErrFeedLookup
}

_, content, err := handler.GetContent(&fd)
if err != nil {
return nil, ErrFeedContent
}

// extract targets from feed content
targets := new(trojan.Targets)
if err := json.Unmarshal(content, targets); err != nil {
return nil, ErrTargets
}

// TODO: dummy targets for now
t1 := trojan.Target([]byte{57, 120})
t2 := trojan.Target([]byte{209, 156})
t3 := trojan.Target([]byte{156, 38})
return trojan.Targets([]trojan.Target{t1, t2, t3}), nil
return *targets, nil
}
Loading

0 comments on commit d91b839

Please sign in to comment.