Skip to content

Commit

Permalink
fix(retrieval): use only full nodes except bootnodes (#430)
Browse files Browse the repository at this point in the history
* fix(retrieval): use only full nodes except bootnodes

* fix(retrieval): ensure upload and download nodes are distinct
  • Loading branch information
gacevicljubisa authored Oct 29, 2024
1 parent 81963ec commit 5e6ca78
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 30 deletions.
7 changes: 6 additions & 1 deletion pkg/bee/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ type Client struct {

// ClientOptions holds optional parameters for the Client.
type ClientOptions struct {
APIURL *url.URL
APIInsecureTLS bool
APIURL *url.URL
Name string
Retry int
}

Expand Down Expand Up @@ -67,6 +68,10 @@ type Addresses struct {
PSSPublicKey string
}

func (c *Client) Name() string {
return c.opts.Name
}

func (c *Client) Config() ClientOptions {
return c.opts
}
Expand Down
65 changes: 38 additions & 27 deletions pkg/check/retrieval/retrieval.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
package retrieval

import (
"bytes"
"context"
"errors"
"fmt"
"time"

"github.com/ethersphere/beekeeper/pkg/bee"
"github.com/ethersphere/beekeeper/pkg/bee/api"
"github.com/ethersphere/beekeeper/pkg/beekeeper"
"github.com/ethersphere/beekeeper/pkg/logging"
"github.com/ethersphere/beekeeper/pkg/orchestration"
"github.com/ethersphere/beekeeper/pkg/random"
test "github.com/ethersphere/beekeeper/pkg/test"
)

// Options represents check options
type Options struct {
ChunksPerNode int // number of chunks to upload per node
GasPrice string
PostageAmount int64
PostageDepth uint64
PostageLabel string
Expand All @@ -28,7 +29,6 @@ type Options struct {
func NewDefaultOptions() Options {
return Options{
ChunksPerNode: 1,
GasPrice: "",
PostageAmount: 1,
PostageLabel: "test-label",
PostageDepth: 16,
Expand Down Expand Up @@ -63,67 +63,78 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts int
return fmt.Errorf("invalid options type")
}

caseOpts := test.CaseOptions{
GasPrice: o.GasPrice,
PostageAmount: o.PostageAmount,
PostageLabel: o.PostageLabel,
PostageDepth: o.PostageDepth,
Seed: o.Seed,
rnds := random.PseudoGenerators(o.Seed, o.UploadNodeCount)

overlays, err := cluster.FlattenOverlays(ctx)
if err != nil {
return err
}

checkCase, err := test.NewCheckCase(ctx, cluster, caseOpts, c.logger)
clients, err := cluster.NodesClients(ctx)
if err != nil {
return err
}

lastBee := checkCase.LastBee()
nodes := cluster.FullNodeNames()

for i := 0; i < o.UploadNodeCount; i++ {
uploader, err := checkCase.Bee(i).NewChunkUploader(ctx)
uploadNode := clients[nodes[i]]

downloadNodeIndex := (i + 1) % len(nodes) // download from the next node
downloadNode := clients[nodes[downloadNodeIndex]]

batchID, err := uploadNode.GetOrCreateMutableBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel)
if err != nil {
return err
return fmt.Errorf("node %s: created batched id %w", uploadNode.Name(), err)
}
c.logger.Infof("node %s: created batched id %s", uploadNode.Name(), batchID)

for j := 0; j < o.ChunksPerNode; j++ {
// time upload
t0 := time.Now()

chunk, err := uploader.UploadRandomChunk()
chunk, err := bee.NewRandomChunk(rnds[i], c.logger)
if err != nil {
return fmt.Errorf("node %s: %w", uploadNode.Name(), err)
}

addr, err := uploadNode.UploadChunk(ctx, chunk.Data(), api.UploadOptions{BatchID: batchID})
if err != nil {
return err
return fmt.Errorf("node %s: %w", uploadNode.Name(), err)
}
c.logger.Infof("Uploaded chunk address: %s", addr.String())

d0 := time.Since(t0)

c.metrics.UploadedCounter.WithLabelValues(uploader.Overlay).Inc()
c.metrics.UploadTimeGauge.WithLabelValues(uploader.Overlay, chunk.AddrString()).Set(d0.Seconds())
c.metrics.UploadedCounter.WithLabelValues(overlays[uploadNode.Name()].String()).Inc()
c.metrics.UploadTimeGauge.WithLabelValues(overlays[uploadNode.Name()].String(), chunk.Address().String()).Set(d0.Seconds())
c.metrics.UploadTimeHistogram.Observe(d0.Seconds())

// time download
t1 := time.Now()

data, err := lastBee.DownloadChunk(ctx, chunk.Addr())
downloadData, err := downloadNode.DownloadChunk(ctx, chunk.Address(), "", nil)
if err != nil {
return fmt.Errorf("node %s: %w", lastBee.Name(), err)
return fmt.Errorf("node %s: %w", downloadNode.Name(), err)
}

d1 := time.Since(t1)

c.metrics.DownloadedCounter.WithLabelValues(uploader.Name()).Inc()
c.metrics.DownloadTimeGauge.WithLabelValues(uploader.Name(), chunk.AddrString()).Set(d1.Seconds())
c.metrics.DownloadedCounter.WithLabelValues(uploadNode.Name()).Inc()
c.metrics.DownloadTimeGauge.WithLabelValues(uploadNode.Name(), chunk.Address().String()).Set(d1.Seconds())
c.metrics.DownloadTimeHistogram.Observe(d1.Seconds())

if !chunk.Equals(data) {
c.metrics.NotRetrievedCounter.WithLabelValues(uploader.Name()).Inc()
c.logger.Infof("Node %s. Chunk %d not retrieved successfully. Uploaded size: %d Downloaded size: %d Node: %s Chunk: %s", lastBee.Name(), j, chunk.Size(), len(data), uploader.Name(), chunk.AddrString())
if chunk.Contains(data) {
if !bytes.Equal(chunk.Data(), downloadData) {
c.metrics.NotRetrievedCounter.WithLabelValues(uploadNode.Name()).Inc()
c.logger.Errorf("Chunk not retrieved successfully: DownloadNode=%s, ChunkIndex=%d, UploadedSize=%d, DownloadedSize=%d, UploadNode=%s, ChunkAddress=%s", downloadNode.Name(), j, chunk.Size(), len(downloadData), uploadNode.Name(), chunk.Address().String())
if bytes.Contains(chunk.Data(), downloadData) {
c.logger.Infof("Downloaded data is subset of the uploaded data")
}
return errRetrieval
}

c.metrics.RetrievedCounter.WithLabelValues(uploader.Name()).Inc()
c.logger.Infof("Node %s. Chunk %d retrieved successfully. Node: %s Chunk: %s", lastBee.Name(), j, uploader.Name(), chunk.AddrString())
c.metrics.RetrievedCounter.WithLabelValues(uploadNode.Name()).Inc()
c.logger.Infof("Chunk retrieved successfully: DownloadNode=%s, ChunkIndex=%d, DownloadedSize=%d, UploadNode=%s, ChunkAddress=%s", downloadNode.Name(), j, len(downloadData), uploadNode.Name(), chunk.Address().String())
}
}

Expand Down
1 change: 0 additions & 1 deletion pkg/config/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,6 @@ var Checks = map[string]CheckType{
NewOptions: func(checkGlobalConfig CheckGlobalConfig, check Check) (interface{}, error) {
checkOpts := new(struct {
ChunksPerNode *int `yaml:"chunks-per-node"`
GasPrice *string `yaml:"gas-price"`
PostageAmount *int64 `yaml:"postage-amount"`
PostageDepth *uint64 `yaml:"postage-depth"`
PostageLabel *string `yaml:"postage-label"`
Expand Down
1 change: 1 addition & 0 deletions pkg/orchestration/k8s/nodegroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (g *NodeGroup) AddNode(ctx context.Context, name string, o orchestration.No
}

beeClientOpts := bee.ClientOptions{
Name: name,
APIURL: aURL,
APIInsecureTLS: g.clusterOpts.APIInsecureTLS,
Retry: 5,
Expand Down
2 changes: 1 addition & 1 deletion pkg/test/case.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ func NewCheckCase(ctx context.Context, cluster orchestration.Cluster, caseOpts C
}

rnds := random.PseudoGenerators(caseOpts.Seed, len(flatOverlays))
logger.Infof("Seed: %d", caseOpts.Seed)

var (
nodes []BeeV2
count int
)

for name, addr := range flatOverlays {
nodes = append(nodes, BeeV2{
name: name,
Expand Down

0 comments on commit 5e6ca78

Please sign in to comment.