diff --git a/api/api.go b/api/api.go index 85ecb6e24f..0797700be3 100644 --- a/api/api.go +++ b/api/api.go @@ -424,6 +424,9 @@ func (a *API) Get(ctx context.Context, decrypt DecryptFunc, manifestAddr storage } mimeType = entry.ContentType log.Debug("content lookup key", "key", contentAddr, "mimetype", mimeType) + if len(entry.Publisher) > 0 { + ctx = context.WithValue(ctx, "publisher", entry.Publisher) + } reader, _ = a.fileStore.Retrieve(ctx, contentAddr) } else { // no entry found @@ -629,10 +632,12 @@ func (a *API) Modify(ctx context.Context, addr storage.Address, path, contentHas apiModifyFail.Inc(1) return nil, err } + publisher, _ := ctx.Value("publisher").(string) if contentHash != "" { entry := newManifestTrieEntry(&ManifestEntry{ Path: path, ContentType: contentType, + Publisher: publisher, }, nil) entry.Hash = contentHash trie.addEntry(entry, quitC) @@ -667,12 +672,14 @@ func (a *API) AddFile(ctx context.Context, mhash, path, fname string, content [] path = path[1:] } + publisher, _ := ctx.Value("publisher").(string) entry := &ManifestEntry{ Path: filepath.Join(path, fname), ContentType: mime.TypeByExtension(filepath.Ext(fname)), Mode: 0700, Size: int64(len(content)), ModTime: time.Now(), + Publisher: publisher, } mw, err := a.NewManifestWriter(ctx, mkey, nil) @@ -697,7 +704,7 @@ func (a *API) AddFile(ctx context.Context, mhash, path, fname string, content [] return fkey, newMkey.String(), nil } -func (a *API) UploadTar(ctx context.Context, bodyReader io.ReadCloser, manifestPath, defaultPath string, mw *ManifestWriter) (storage.Address, error) { +func (a *API) UploadTar(ctx context.Context, bodyReader io.ReadCloser, manifestPath, defaultPath string, mw *ManifestWriter, publisher string) (storage.Address, error) { apiUploadTarCount.Inc(1) var contentKey storage.Address tr := tar.NewReader(bodyReader) @@ -730,6 +737,7 @@ func (a *API) UploadTar(ctx context.Context, bodyReader io.ReadCloser, manifestP Mode: hdr.Mode, Size: hdr.Size, ModTime: hdr.ModTime, + Publisher: publisher, } contentKey, err = mw.AddEntry(ctx, tr, entry) if err != nil { @@ -741,7 +749,6 @@ func (a *API) UploadTar(ctx context.Context, bodyReader io.ReadCloser, manifestP if contentType == "" { contentType = mime.TypeByExtension(filepath.Ext(hdr.Name)) } - entry := &ManifestEntry{ Hash: contentKey.Hex(), Path: "", // default entry @@ -749,6 +756,7 @@ func (a *API) UploadTar(ctx context.Context, bodyReader io.ReadCloser, manifestP Mode: hdr.Mode, Size: hdr.Size, ModTime: hdr.ModTime, + Publisher: publisher, } contentKey, err = mw.AddEntry(ctx, nil, entry) if err != nil { @@ -863,12 +871,14 @@ func (a *API) AppendFile(ctx context.Context, mhash, path, fname string, existin return nil, "", err } + publisher, _ := ctx.Value("publisher").(string) entry := &ManifestEntry{ Path: filepath.Join(path, fname), ContentType: mime.TypeByExtension(filepath.Ext(fname)), Mode: 0700, Size: totalSize, ModTime: time.Now(), + Publisher: publisher, } fkey, err := mw.AddEntry(ctx, io.Reader(combinedReader), entry) diff --git a/api/client/client.go b/api/client/client.go index 726fbaa609..653df2f67e 100644 --- a/api/client/client.go +++ b/api/client/client.go @@ -163,11 +163,11 @@ func Open(path string) (*File, error) { // (if the manifest argument is non-empty) or creates a new manifest containing // the file, returning the resulting manifest hash (the file will then be // available at bzz://) -func (c *Client) Upload(file *File, manifest string, toEncrypt, toPin, anonymous bool) (string, error) { +func (c *Client) Upload(file *File, manifest string, toEncrypt, toPin, anonymous bool, publisher string) (string, error) { if file.Size <= 0 { return "", errors.New("file size must be greater than zero") } - return c.TarUpload(manifest, &FileUploader{file}, "", toEncrypt, toPin, anonymous) + return c.TarUpload(manifest, &FileUploader{file}, "", toEncrypt, toPin, anonymous, publisher) } // Download downloads a file with the given path from the swarm manifest with @@ -197,7 +197,7 @@ func (c *Client) Download(hash, path string) (*File, error) { // directory will then be available at bzz://path/to/file), with // the file specified in defaultPath being uploaded to the root of the manifest // (i.e. bzz://) -func (c *Client) UploadDirectory(dir, defaultPath, manifest string, toEncrypt, toPin, anonymous bool) (string, error) { +func (c *Client) UploadDirectory(dir, defaultPath, manifest string, toEncrypt, toPin, anonymous bool, publisher string) (string, error) { stat, err := os.Stat(dir) if err != nil { return "", err @@ -212,7 +212,7 @@ func (c *Client) UploadDirectory(dir, defaultPath, manifest string, toEncrypt, t return "", fmt.Errorf("default path: %v", err) } } - return c.TarUpload(manifest, &DirectoryUploader{dir}, defaultPath, toEncrypt, toPin, anonymous) + return c.TarUpload(manifest, &DirectoryUploader{dir}, defaultPath, toEncrypt, toPin, anonymous, publisher) } // DownloadDirectory downloads the files contained in a swarm manifest under @@ -511,7 +511,7 @@ type UploadFn func(file *File) error // TarUpload uses the given Uploader to upload files to swarm as a tar stream, // returning the resulting manifest hash -func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, toEncrypt, toPin, anonymous bool) (string, error) { +func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, toEncrypt, toPin, anonymous bool, publisher string) (string, error) { ctx, sp := spancontext.StartSpan(context.Background(), "api.client.tarupload") defer sp.Finish() @@ -539,6 +539,7 @@ func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, t transport := http.DefaultTransport req.Header.Set("Content-Type", "application/x-tar") + req.Header.Set("publisher", publisher) if defaultPath != "" { q := req.URL.Query() q.Set("defaultpath", defaultPath) diff --git a/api/client/client_test.go b/api/client/client_test.go index e952f42e66..b3f5dca423 100644 --- a/api/client/client_test.go +++ b/api/client/client_test.go @@ -125,7 +125,7 @@ func testClientUploadDownloadFiles(toEncrypt bool, t *testing.T) string { Size: int64(len(data)), }, } - hash, err := client.Upload(file, manifest, toEncrypt, toPin, true) + hash, err := client.Upload(file, manifest, toEncrypt, toPin, true, "") if err != nil { t.Fatal(err) } @@ -221,7 +221,7 @@ func TestClientUploadDownloadDirectory(t *testing.T) { // upload the directory client := NewClient(srv.URL) defaultPath := testDirFiles[0] - hash, err := client.UploadDirectory(dir, defaultPath, "", false, false, true) + hash, err := client.UploadDirectory(dir, defaultPath, "", false, false, true, "") if err != nil { t.Fatalf("error uploading directory: %s", err) } @@ -289,7 +289,7 @@ func testClientFileList(toEncrypt bool, t *testing.T) { defer os.RemoveAll(dir) client := NewClient(srv.URL) - hash, err := client.UploadDirectory(dir, "", "", toEncrypt, false, true) + hash, err := client.UploadDirectory(dir, "", "", toEncrypt, false, true, "") if err != nil { t.Fatalf("error uploading directory: %s", err) } @@ -475,7 +475,7 @@ func TestClientBzzWithFeed(t *testing.T) { } // upload data to bzz:// and retrieve the content-addressed manifest hash, hex-encoded. - manifestAddressHex, err := swarmClient.Upload(f, "", false, false, true) + manifestAddressHex, err := swarmClient.Upload(f, "", false, false, true, "") if err != nil { t.Fatalf("Error creating manifest: %s", err) } diff --git a/api/http/server.go b/api/http/server.go index 8bd9aa2676..43b2884723 100644 --- a/api/http/server.go +++ b/api/http/server.go @@ -468,8 +468,12 @@ func (s *Server) handleTarUpload(r *http.Request, mw *api.ManifestWriter) (stora log.Debug("handle.tar.upload", "ruid", GetRUID(r.Context()), "tag", sctx.GetTag(r.Context())) defaultPath := r.URL.Query().Get("defaultpath") + var publisher string + if len(r.Header["Publisher"]) > 0 { + publisher = r.Header["Publisher"][0] + } - key, err := s.api.UploadTar(r.Context(), r.Body, GetURI(r.Context()).Path, defaultPath, mw) + key, err := s.api.UploadTar(r.Context(), r.Body, GetURI(r.Context()).Path, defaultPath, mw, publisher) if err != nil { return nil, err } diff --git a/api/manifest.go b/api/manifest.go index 33ca3038b6..10e24468e9 100644 --- a/api/manifest.go +++ b/api/manifest.go @@ -57,6 +57,7 @@ type ManifestEntry struct { Status int `json:"status,omitempty"` Access *AccessEntry `json:"access,omitempty"` Feed *feed.Feed `json:"feed,omitempty"` + Publisher string `json:"publisher,omitempty"` } // ManifestList represents the result of listing files in a manifest diff --git a/chunk/chunk.go b/chunk/chunk.go index 5fb55f0a17..cec08f8b93 100644 --- a/chunk/chunk.go +++ b/chunk/chunk.go @@ -283,7 +283,7 @@ type Validator interface { // with validators check. type ValidatorStore struct { Store - deliverCallback func(Chunk) // callback hook used to validate chunk + deliverCallback func(Chunk) // callback func to be invoked for validated chunks validators []Validator } diff --git a/cmd/swarm-smoke/util.go b/cmd/swarm-smoke/util.go index f90fba1f4d..e61490b271 100644 --- a/cmd/swarm-smoke/util.go +++ b/cmd/swarm-smoke/util.go @@ -212,7 +212,7 @@ func uploadWithTag(data []byte, endpoint string, tag string) (string, error) { Tag: tag, } - return swarm.TarUpload("", &client.FileUploader{File: f}, "", false, false, true) + return swarm.TarUpload("", &client.FileUploader{File: f}, "", false, false, true, "") } func digest(r io.Reader) ([]byte, error) { diff --git a/cmd/swarm/flags.go b/cmd/swarm/flags.go index 59e77e272f..639a7d5698 100644 --- a/cmd/swarm/flags.go +++ b/cmd/swarm/flags.go @@ -153,6 +153,10 @@ var ( Name: "stdin", Usage: "reads data to be uploaded from stdin", } + SwarmUploadPublisher = cli.StringFlag{ + Name: "publisher", + Usage: "Manually specify Publisher", + } SwarmUploadMimeType = cli.StringFlag{ Name: "mime", Usage: "Manually specify MIME type", diff --git a/cmd/swarm/upload.go b/cmd/swarm/upload.go index b7aeb4cc31..91fcf78bc4 100644 --- a/cmd/swarm/upload.go +++ b/cmd/swarm/upload.go @@ -48,7 +48,7 @@ var ( Name: "up", Usage: "uploads a file or directory to swarm using the HTTP API", ArgsUsage: "", - Flags: []cli.Flag{SwarmEncryptedFlag, SwarmPinFlag, SwarmProgressFlag, SwarmVerboseFlag}, + Flags: []cli.Flag{SwarmEncryptedFlag, SwarmPinFlag, SwarmProgressFlag, SwarmVerboseFlag, SwarmUploadPublisher}, Description: "uploads a file or directory to swarm using the HTTP API and prints the root hash", } @@ -73,6 +73,7 @@ func upload(ctx *cli.Context) { defaultPath = ctx.GlobalString(SwarmUploadDefaultPath.Name) fromStdin = ctx.GlobalBool(SwarmUpFromStdinFlag.Name) mimeType = ctx.GlobalString(SwarmUploadMimeType.Name) + publisher = ctx.String(SwarmUploadPublisher.Name) verbose = ctx.Bool(SwarmVerboseFlag.Name) client = swarm.NewClient(bzzapi) toEncrypt = ctx.Bool(SwarmEncryptedFlag.Name) @@ -160,7 +161,7 @@ func upload(ctx *cli.Context) { defaultPath = strings.TrimPrefix(absDefaultPath, absFile) } } - return client.UploadDirectory(file, defaultPath, "", toEncrypt, toPin, anon) + return client.UploadDirectory(file, defaultPath, "", toEncrypt, toPin, anon, publisher) } } else { doUpload = func() (string, error) { @@ -172,7 +173,7 @@ func upload(ctx *cli.Context) { if mimeType != "" { f.ContentType = mimeType } - return client.Upload(f, "", toEncrypt, toPin, anon) + return client.Upload(f, "", toEncrypt, toPin, anon, publisher) } } start := time.Now() diff --git a/prod/prod.go b/prod/prod.go index 6fb704b542..c8e1342c4a 100644 --- a/prod/prod.go +++ b/prod/prod.go @@ -18,43 +18,109 @@ package prod import ( "context" + "encoding/hex" + "encoding/json" + "errors" + "time" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/pss" "github.com/ethersphere/swarm/pss/trojan" + "github.com/ethersphere/swarm/storage/feed" + "github.com/ethersphere/swarm/storage/feed/lookup" ) -// RecoveryHook defines code to be executed upon trigger of failed to be retrieved chunks +// RecoveryTopicText is the string used to construct the recovery topics +const RecoveryTopicText = "RECOVERY" + +// RecoveryTopic is the topic used for repairing globally pinned chunks +var RecoveryTopic = trojan.NewTopic("RECOVERY") + +// ErrPublisher is returned when the publisher string cannot be decoded into bytes +var ErrPublisher = errors.New("failed to decode publisher") + +// ErrPubKey is returned when the publisher bytes cannot be decompressed as a public key +var ErrPubKey = errors.New("failed to decompress public key") + +// ErrFeedLookup is returned when the recovery feed cannot be successefully looked up +var ErrFeedLookup = errors.New("failed to look up recovery feed") + +// ErrFeedContent is returned when there is a failure to retrieve content from the recovery feed +var ErrFeedContent = errors.New("failed to get content for recovery feed") + +// ErrTargets is returned when there is a failure to unmarshal the feed content as a trojan.Targets variable +var ErrTargets = errors.New("failed to unmarshal targets in recovery feed content") + +// RecoveryHook defines code to be executed upon failing to retrieve pinned chunks type RecoveryHook func(ctx context.Context, chunkAddress chunk.Address) error // sender is the function call for sending trojan chunks type sender func(ctx context.Context, targets trojan.Targets, topic trojan.Topic, payload []byte) (*pss.Monitor, error) // NewRecoveryHook returns a new RecoveryHook with the sender function defined -func NewRecoveryHook(send sender) RecoveryHook { +func NewRecoveryHook(send sender, handler feed.GenericHandler) RecoveryHook { return func(ctx context.Context, chunkAddress chunk.Address) error { - targets, err := getPinners(chunkAddress) + targets, err := getPinners(ctx, handler) if err != nil { return err } payload := chunkAddress - topic := trojan.NewTopic("RECOVERY") - // TODO: monitor return should - if _, err := send(ctx, targets, topic, payload); err != nil { + // TODO: returned monitor should be made use of + if _, err := send(ctx, targets, RecoveryTopic, payload); err != nil { return err } return nil } } -// TODO: refactor this method to implement feed of target pinners -// getPinners returns the specific target pinners for a corresponding chunk address -func getPinners(chunkAddress chunk.Address) (trojan.Targets, error) { +// getPinners returns the specific target pinners for a corresponding chunk +func getPinners(ctx context.Context, handler feed.GenericHandler) (trojan.Targets, error) { + // TODO: obtain fallback Publisher from cli flag if no Publisher found in Manifest + // get feed user from publisher + publisher, _ := ctx.Value("publisher").(string) + publisherBytes, err := hex.DecodeString(publisher) + if err != nil { + return nil, ErrPublisher + } + pubKey, err := crypto.DecompressPubkey(publisherBytes) + if err != nil { + return nil, ErrPubKey + } + addr := crypto.PubkeyToAddress(*pubKey) + + // get feed topic from trojan recovery topic + topic, err := feed.NewTopic(RecoveryTopicText, nil) + if err != nil { + return nil, err + } + + // read feed + fd := feed.Feed{ + Topic: topic, + User: addr, + } + query := feed.NewQueryLatest(&fd, lookup.NoClue) + ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) + defer cancel() + + _, err = handler.Lookup(ctx, query) + // feed should still be queried even if there are no updates + if err != nil && err.Error() != "no feed updates found" { + return nil, ErrFeedLookup + } + + _, content, err := handler.GetContent(&fd) + if err != nil { + return nil, ErrFeedContent + } + + // extract targets from feed content + targets := new(trojan.Targets) + if err := json.Unmarshal(content, targets); err != nil { + return nil, ErrTargets + } - // TODO: dummy targets for now - t1 := trojan.Target([]byte{57, 120}) - t2 := trojan.Target([]byte{209, 156}) - t3 := trojan.Target([]byte{156, 38}) - return trojan.Targets([]trojan.Target{t1, t2, t3}), nil + return *targets, nil } diff --git a/prod/prod_test.go b/prod/prod_test.go index 64746adbe1..52eb07f248 100644 --- a/prod/prod_test.go +++ b/prod/prod_test.go @@ -19,6 +19,7 @@ package prod import ( "context" "crypto/rand" + "encoding/json" "testing" "time" @@ -31,11 +32,12 @@ import ( psstest "github.com/ethersphere/swarm/pss/testing" "github.com/ethersphere/swarm/pss/trojan" "github.com/ethersphere/swarm/storage" + "github.com/ethersphere/swarm/storage/feed" ) // TestRecoveryHook tests that NewRecoveryHook has been properly invoked func TestRecoveryHook(t *testing.T) { - ctx := context.TODO() + ctx := context.WithValue(context.TODO(), "publisher", "0226f213613e843a413ad35b40f193910d26eb35f00154afcde9ded57479a6224a") hookWasCalled := false // test variable to check hook func are correctly retrieved @@ -44,11 +46,15 @@ func TestRecoveryHook(t *testing.T) { hookWasCalled = true return nil, nil } + testHandler := newTestRecoveryFeedHandler(t) // setup recovery hook with testHook - recoverFunc := NewRecoveryHook(testHook) + recoverFunc := NewRecoveryHook(testHook, testHandler) - recoverFunc(ctx, chunk.ZeroAddr) + testChunk := "aacca8d446af47ebcab582ca2188fa73dfa871eb0a35eda798f47d4f91a575e9" + if err := recoverFunc(ctx, chunk.Address([]byte(testChunk))); err != nil { + t.Fatal(err) + } // verify the hook has been called correctly if hookWasCalled != true { @@ -59,7 +65,7 @@ func TestRecoveryHook(t *testing.T) { // TestSenderCall verifies that a hook is being called correctly within the netstore func TestSenderCall(t *testing.T) { - ctx := context.TODO() + ctx := context.WithValue(context.TODO(), "publisher", "0226f213613e843a413ad35b40f193910d26eb35f00154afcde9ded57479a6224a") tags := chunk.NewTags() localStore := psstest.NewMockLocalStore(t, tags) @@ -85,8 +91,9 @@ func TestSenderCall(t *testing.T) { hookWasCalled = true return nil, nil } + testHandler := newTestRecoveryFeedHandler(t) - recoverFunc := NewRecoveryHook(testHook) + recoverFunc := NewRecoveryHook(testHook, testHandler) netStore.WithRecoveryCallback(recoverFunc) c := ctest.GenerateTestRandomChunk() @@ -112,3 +119,23 @@ func TestSenderCall(t *testing.T) { } } + +// newTestRecoveryFeedHandler returns a DummyHandler with binary content which can be correctly unmarshalled +func newTestRecoveryFeedHandler(t *testing.T) *feed.DummyHandler { + h := feed.NewDummyHandler() + + // test targets + t1 := trojan.Target([]byte{57, 120}) + t2 := trojan.Target([]byte{209, 156}) + t3 := trojan.Target([]byte{156, 38}) + targets := trojan.Targets([]trojan.Target{t1, t2, t3}) + + // marshal into bytes and set as mock feed content + b, err := json.Marshal(targets) + if err != nil { + t.Fatal(err) + } + h.SetContent(b) + + return h +} diff --git a/pss/pss_test.go b/pss/pss_test.go index 32c2fba9a0..1560be3abe 100644 --- a/pss/pss_test.go +++ b/pss/pss_test.go @@ -41,7 +41,7 @@ func TestTrojanChunkRetrieval(t *testing.T) { target := trojan.Target([]byte{1}) // arbitrary test target targets := trojan.Targets([]trojan.Target{target}) payload := []byte("RECOVERY CHUNK") - topic := trojan.NewTopic("RECOVERY") + topic := trojan.NewTopic("RECOVERY TOPIC") // call Send to store trojan chunk in localstore if _, err = pss.Send(ctx, targets, topic, payload); err != nil { @@ -110,7 +110,7 @@ func TestPssMonitor(t *testing.T) { target := trojan.Target([]byte{1}) // arbitrary test target targets := trojan.Targets([]trojan.Target{target}) payload := []byte("RECOVERY CHUNK") - topic := trojan.NewTopic("RECOVERY") + topic := trojan.NewTopic("RECOVERY TOPIC") var monitor *Monitor diff --git a/pss/testing/pss.go b/pss/testing/pss.go index f1affe592c..9316087ffb 100644 --- a/pss/testing/pss.go +++ b/pss/testing/pss.go @@ -48,5 +48,3 @@ func NewMockLocalStore(t *testing.T, tags *chunk.Tags) *localstore.DB { return localStore } - -// TODO: later test could be a simulation test for 2 nodes, localstore + netstore diff --git a/storage/netstore.go b/storage/netstore.go index b2ba7c3784..c66b4b508c 100644 --- a/storage/netstore.go +++ b/storage/netstore.go @@ -99,7 +99,7 @@ type NetStore struct { requestGroup singleflight.Group RemoteGet RemoteGetFunc logger log.Logger - recoveryCallback func(ctx context.Context, chunkAddress chunk.Address) error // this is the function callback when a chunk failed to be retrieved + recoveryCallback func(ctx context.Context, chunkAddress chunk.Address) error // this is the callback to be executed when a chunk fails to be retrieved } // NewNetStore creates a new NetStore using the provided chunk.Store and localID of the node. @@ -176,6 +176,7 @@ func (n *NetStore) WithRecoveryCallback(f func(ctx context.Context, chunkAddress // If it is not found in the LocalStore then it uses RemoteGet to fetch from the network. func (n *NetStore) Get(ctx context.Context, mode chunk.ModeGet, req *Request) (ch Chunk, err error) { metrics.GetOrRegisterCounter("netstore/get", nil).Inc(1) + start := time.Now() ref := req.Addr diff --git a/swarm.go b/swarm.go index f66aee0493..20d7dcb4a1 100644 --- a/swarm.go +++ b/swarm.go @@ -291,8 +291,8 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e chAddr := m.Payload lstore.Set(context.Background(), chunk.ModeSetReUpload, chAddr) } - self.pss.Register(trojan.NewTopic("RECOVERY"), repairFunc) - recoverFunc := prod.NewRecoveryHook(self.pss.Send) + self.pss.Register(prod.RecoveryTopic, repairFunc) + recoverFunc := prod.NewRecoveryHook(self.pss.Send, feedsHandler) self.netStore.WithRecoveryCallback(recoverFunc) }