From 5e6ca7876bdf377f3bf06c5a0b3bf92e3f381555 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ljubi=C5=A1a=20Ga=C4=8Devi=C4=87?= <35105035+gacevicljubisa@users.noreply.github.com> Date: Tue, 29 Oct 2024 19:31:47 +0100 Subject: [PATCH] fix(retrieval): use only full nodes except bootnodes (#430) * fix(retrieval): use only full nodes except bootnodes * fix(retrieval): ensure upload and download nodes are distinct --- pkg/bee/client.go | 7 +++- pkg/check/retrieval/retrieval.go | 65 +++++++++++++++++------------- pkg/config/check.go | 1 - pkg/orchestration/k8s/nodegroup.go | 1 + pkg/test/case.go | 2 +- 5 files changed, 46 insertions(+), 30 deletions(-) diff --git a/pkg/bee/client.go b/pkg/bee/client.go index d63873d20..1462c27e4 100644 --- a/pkg/bee/client.go +++ b/pkg/bee/client.go @@ -33,8 +33,9 @@ type Client struct { // ClientOptions holds optional parameters for the Client. type ClientOptions struct { - APIURL *url.URL APIInsecureTLS bool + APIURL *url.URL + Name string Retry int } @@ -67,6 +68,10 @@ type Addresses struct { PSSPublicKey string } +func (c *Client) Name() string { + return c.opts.Name +} + func (c *Client) Config() ClientOptions { return c.opts } diff --git a/pkg/check/retrieval/retrieval.go b/pkg/check/retrieval/retrieval.go index c9a881953..cc2365b60 100644 --- a/pkg/check/retrieval/retrieval.go +++ b/pkg/check/retrieval/retrieval.go @@ -1,22 +1,23 @@ package retrieval import ( + "bytes" "context" "errors" "fmt" "time" + "github.com/ethersphere/beekeeper/pkg/bee" + "github.com/ethersphere/beekeeper/pkg/bee/api" "github.com/ethersphere/beekeeper/pkg/beekeeper" "github.com/ethersphere/beekeeper/pkg/logging" "github.com/ethersphere/beekeeper/pkg/orchestration" "github.com/ethersphere/beekeeper/pkg/random" - test "github.com/ethersphere/beekeeper/pkg/test" ) // Options represents check options type Options struct { ChunksPerNode int // number of chunks to upload per node - GasPrice string PostageAmount int64 PostageDepth uint64 PostageLabel string @@ -28,7 +29,6 @@ type Options struct { func NewDefaultOptions() Options { return Options{ ChunksPerNode: 1, - GasPrice: "", PostageAmount: 1, PostageLabel: "test-label", PostageDepth: 16, @@ -63,67 +63,78 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts int return fmt.Errorf("invalid options type") } - caseOpts := test.CaseOptions{ - GasPrice: o.GasPrice, - PostageAmount: o.PostageAmount, - PostageLabel: o.PostageLabel, - PostageDepth: o.PostageDepth, - Seed: o.Seed, + rnds := random.PseudoGenerators(o.Seed, o.UploadNodeCount) + + overlays, err := cluster.FlattenOverlays(ctx) + if err != nil { + return err } - checkCase, err := test.NewCheckCase(ctx, cluster, caseOpts, c.logger) + clients, err := cluster.NodesClients(ctx) if err != nil { return err } - lastBee := checkCase.LastBee() + nodes := cluster.FullNodeNames() for i := 0; i < o.UploadNodeCount; i++ { - uploader, err := checkCase.Bee(i).NewChunkUploader(ctx) + uploadNode := clients[nodes[i]] + + downloadNodeIndex := (i + 1) % len(nodes) // download from the next node + downloadNode := clients[nodes[downloadNodeIndex]] + + batchID, err := uploadNode.GetOrCreateMutableBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel) if err != nil { - return err + return fmt.Errorf("node %s: created batched id %w", uploadNode.Name(), err) } + c.logger.Infof("node %s: created batched id %s", uploadNode.Name(), batchID) for j := 0; j < o.ChunksPerNode; j++ { // time upload t0 := time.Now() - chunk, err := uploader.UploadRandomChunk() + chunk, err := bee.NewRandomChunk(rnds[i], c.logger) + if err != nil { + return fmt.Errorf("node %s: %w", uploadNode.Name(), err) + } + + addr, err := uploadNode.UploadChunk(ctx, chunk.Data(), api.UploadOptions{BatchID: batchID}) if err != nil { - return err + return fmt.Errorf("node %s: %w", uploadNode.Name(), err) } + c.logger.Infof("Uploaded chunk address: %s", addr.String()) d0 := time.Since(t0) - c.metrics.UploadedCounter.WithLabelValues(uploader.Overlay).Inc() - c.metrics.UploadTimeGauge.WithLabelValues(uploader.Overlay, chunk.AddrString()).Set(d0.Seconds()) + c.metrics.UploadedCounter.WithLabelValues(overlays[uploadNode.Name()].String()).Inc() + c.metrics.UploadTimeGauge.WithLabelValues(overlays[uploadNode.Name()].String(), chunk.Address().String()).Set(d0.Seconds()) c.metrics.UploadTimeHistogram.Observe(d0.Seconds()) // time download t1 := time.Now() - data, err := lastBee.DownloadChunk(ctx, chunk.Addr()) + downloadData, err := downloadNode.DownloadChunk(ctx, chunk.Address(), "", nil) if err != nil { - return fmt.Errorf("node %s: %w", lastBee.Name(), err) + return fmt.Errorf("node %s: %w", downloadNode.Name(), err) } d1 := time.Since(t1) - c.metrics.DownloadedCounter.WithLabelValues(uploader.Name()).Inc() - c.metrics.DownloadTimeGauge.WithLabelValues(uploader.Name(), chunk.AddrString()).Set(d1.Seconds()) + c.metrics.DownloadedCounter.WithLabelValues(uploadNode.Name()).Inc() + c.metrics.DownloadTimeGauge.WithLabelValues(uploadNode.Name(), chunk.Address().String()).Set(d1.Seconds()) c.metrics.DownloadTimeHistogram.Observe(d1.Seconds()) - if !chunk.Equals(data) { - c.metrics.NotRetrievedCounter.WithLabelValues(uploader.Name()).Inc() - c.logger.Infof("Node %s. Chunk %d not retrieved successfully. Uploaded size: %d Downloaded size: %d Node: %s Chunk: %s", lastBee.Name(), j, chunk.Size(), len(data), uploader.Name(), chunk.AddrString()) - if chunk.Contains(data) { + if !bytes.Equal(chunk.Data(), downloadData) { + c.metrics.NotRetrievedCounter.WithLabelValues(uploadNode.Name()).Inc() + c.logger.Errorf("Chunk not retrieved successfully: DownloadNode=%s, ChunkIndex=%d, UploadedSize=%d, DownloadedSize=%d, UploadNode=%s, ChunkAddress=%s", downloadNode.Name(), j, chunk.Size(), len(downloadData), uploadNode.Name(), chunk.Address().String()) + if bytes.Contains(chunk.Data(), downloadData) { c.logger.Infof("Downloaded data is subset of the uploaded data") } return errRetrieval } - c.metrics.RetrievedCounter.WithLabelValues(uploader.Name()).Inc() - c.logger.Infof("Node %s. Chunk %d retrieved successfully. Node: %s Chunk: %s", lastBee.Name(), j, uploader.Name(), chunk.AddrString()) + c.metrics.RetrievedCounter.WithLabelValues(uploadNode.Name()).Inc() + c.logger.Infof("Chunk retrieved successfully: DownloadNode=%s, ChunkIndex=%d, DownloadedSize=%d, UploadNode=%s, ChunkAddress=%s", downloadNode.Name(), j, len(downloadData), uploadNode.Name(), chunk.Address().String()) } } diff --git a/pkg/config/check.go b/pkg/config/check.go index 4298defb7..aefe9f272 100644 --- a/pkg/config/check.go +++ b/pkg/config/check.go @@ -329,7 +329,6 @@ var Checks = map[string]CheckType{ NewOptions: func(checkGlobalConfig CheckGlobalConfig, check Check) (interface{}, error) { checkOpts := new(struct { ChunksPerNode *int `yaml:"chunks-per-node"` - GasPrice *string `yaml:"gas-price"` PostageAmount *int64 `yaml:"postage-amount"` PostageDepth *uint64 `yaml:"postage-depth"` PostageLabel *string `yaml:"postage-label"` diff --git a/pkg/orchestration/k8s/nodegroup.go b/pkg/orchestration/k8s/nodegroup.go index 483e596fa..7d99638bb 100644 --- a/pkg/orchestration/k8s/nodegroup.go +++ b/pkg/orchestration/k8s/nodegroup.go @@ -66,6 +66,7 @@ func (g *NodeGroup) AddNode(ctx context.Context, name string, o orchestration.No } beeClientOpts := bee.ClientOptions{ + Name: name, APIURL: aURL, APIInsecureTLS: g.clusterOpts.APIInsecureTLS, Retry: 5, diff --git a/pkg/test/case.go b/pkg/test/case.go index 860f69614..b3eaf4458 100644 --- a/pkg/test/case.go +++ b/pkg/test/case.go @@ -54,12 +54,12 @@ func NewCheckCase(ctx context.Context, cluster orchestration.Cluster, caseOpts C } rnds := random.PseudoGenerators(caseOpts.Seed, len(flatOverlays)) - logger.Infof("Seed: %d", caseOpts.Seed) var ( nodes []BeeV2 count int ) + for name, addr := range flatOverlays { nodes = append(nodes, BeeV2{ name: name,