Skip to content

Commit

Permalink
Respond to PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: litt3 <[email protected]>
  • Loading branch information
litt3 committed Dec 10, 2024
1 parent f07e820 commit 885c131
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 72 deletions.
124 changes: 67 additions & 57 deletions api/clients/eigenda_client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,66 +6,58 @@ import (
"github.com/Layr-Labs/eigenda/api/clients/codecs"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/hashicorp/go-multierror"
"github.com/cockroachdb/errors/join"
"math/rand"
)

// EigenDAClientV2 provides the ability to get blobs from the relay subsystem, and to send new blobs to the disperser.
type EigenDAClientV2 interface {
// GetBlob iteratively attempts to retrieve a given blob with key blobKey from the relays listed in the blobCertificate.
GetBlob(ctx context.Context, blobKey corev2.BlobKey, blobCertificate corev2.BlobCertificate) ([]byte, error)
GetCodec() codecs.BlobCodec
Close() error
type EigenDAClientV2 struct {
log logging.Logger
// doesn't need to be cryptographically secure, as it's only used to distribute load across relays
random *rand.Rand
config *EigenDAClientConfig
codec codecs.BlobCodec
relayClient RelayClient
}

type eigenDAClientV2 struct {
clientConfig EigenDAClientConfig
log logging.Logger
relayClient RelayClient
codec codecs.BlobCodec
}

var _ EigenDAClientV2 = &eigenDAClientV2{}

// BuildEigenDAClientV2 builds an EigenDAClientV2 from config structs.
func BuildEigenDAClientV2(
log logging.Logger,
clientConfig EigenDAClientConfig,
relayClientConfig RelayClientConfig) (EigenDAClientV2, error) {
config *EigenDAClientConfig,
relayClientConfig *RelayClientConfig) (*EigenDAClientV2, error) {

err := clientConfig.CheckAndSetDefaults()
err := config.CheckAndSetDefaults()
if err != nil {
return nil, err
return nil, fmt.Errorf("check and set client config defaults: %w", err)
}

relayClient, err := NewRelayClient(&relayClientConfig, log)

lowLevelCodec, err := codecs.BlobEncodingVersionToCodec(clientConfig.PutBlobEncodingVersion)
relayClient, err := NewRelayClient(relayClientConfig, log)
if err != nil {
return nil, fmt.Errorf("create low level codec: %w", err)
return nil, fmt.Errorf("new relay client: %w", err)
}
var codec codecs.BlobCodec
if clientConfig.DisablePointVerificationMode {
codec = codecs.NewNoIFFTCodec(lowLevelCodec)
} else {
codec = codecs.NewIFFTCodec(lowLevelCodec)

codec, err := createCodec(config)
if err != nil {
return nil, err
}

return NewEigenDAClientV2(log, clientConfig, relayClient, codec)
return NewEigenDAClientV2(log, rand.New(rand.NewSource(rand.Int63())), config, relayClient, codec)
}

// NewEigenDAClientV2 assembles an EigenDAClientV2 from subcomponents that have already been constructed and initialized.
func NewEigenDAClientV2(
log logging.Logger,
clientConfig EigenDAClientConfig,
random *rand.Rand,
config *EigenDAClientConfig,
relayClient RelayClient,
codec codecs.BlobCodec) (EigenDAClientV2, error) {

return &eigenDAClientV2{
clientConfig: clientConfig,
log: log,
relayClient: relayClient,
codec: codec,
codec codecs.BlobCodec) (*EigenDAClientV2, error) {

return &EigenDAClientV2{
log: log,
random: random,
config: config,
codec: codec,
relayClient: relayClient,
}, nil
}

Expand All @@ -74,15 +66,23 @@ func NewEigenDAClientV2(
// The relays are attempted in random order.
//
// The returned blob is decoded.
func (c *eigenDAClientV2) GetBlob(ctx context.Context, blobKey corev2.BlobKey, blobCertificate corev2.BlobCertificate) ([]byte, error) {
// create a randomized array of indices, so that it isn't always the first relay in the list which gets hit
random := rand.New(rand.NewSource(rand.Int63()))
func (c *EigenDAClientV2) GetBlob(
ctx context.Context,
blobKey corev2.BlobKey,
blobCertificate corev2.BlobCertificate) ([]byte, error) {

relayKeyCount := len(blobCertificate.RelayKeys)

if relayKeyCount == 0 {
return nil, fmt.Errorf("relay key count is zero")
}

var indices []int
// create a randomized array of indices, so that it isn't always the first relay in the list which gets hit
for i := 0; i < relayKeyCount; i++ {
indices = append(indices, i)
}
random.Shuffle(len(indices), func(i int, j int) {
c.random.Shuffle(len(indices), func(i int, j int) {
indices[i], indices[j] = indices[j], indices[i]
})

Expand All @@ -96,22 +96,20 @@ func (c *eigenDAClientV2) GetBlob(ctx context.Context, blobKey corev2.BlobKey, b

// if GetBlob returned an error, try calling a different relay
if err != nil {
// TODO: should this log type be downgraded to debug to avoid log spam? I'm not sure how frequent retrieval
// from a relay will fail in practice (?)
c.log.Info("blob couldn't be retrieved from relay", "blobKey", blobKey, "relayKey", relayKey)
c.log.Warn("blob couldn't be retrieved from relay", "blobKey", blobKey, "relayKey", relayKey, "error", err)
continue
}

// An honest relay should never send an empty blob
if len(data) == 0 {
c.log.Warn("blob received from relay had length 0", "blobKey", blobKey, "relayKey", relayKey)
c.log.Warn("blob received from relay had length 0", "blobKey", blobKey, "relayKey", relayKey, "error", err)
continue
}

// An honest relay should never send a blob which cannot be decoded
decodedData, err := c.codec.DecodeBlob(data)
if err != nil {
c.log.Warn("error decoding blob", "blobKey", blobKey, "relayKey", relayKey)
c.log.Warn("error decoding blob", "blobKey", blobKey, "relayKey", relayKey, "error", err)
continue
}

Expand All @@ -121,22 +119,34 @@ func (c *eigenDAClientV2) GetBlob(ctx context.Context, blobKey corev2.BlobKey, b
return nil, fmt.Errorf("unable to retrieve blob from any relay")
}

func (c *eigenDAClientV2) GetCodec() codecs.BlobCodec {
// GetCodec returns the codec the client uses for encoding and decoding blobs
func (c *EigenDAClientV2) GetCodec() codecs.BlobCodec {
return c.codec
}

func (c *eigenDAClientV2) Close() error {
var errList *multierror.Error

// TODO: this is using a multierror, since there will be more subcomponents requiring closing after adding PUT functionality
// Close is responsible for calling close on all internal clients. This method will do its best to close all internal
// clients, even if some closes fail.
//
// Any and all errors returned from closing internal clients will be joined and returned.
//
// This method should only be called once.
func (c *EigenDAClientV2) Close() error {
relayClientErr := c.relayClient.Close()
if relayClientErr != nil {
errList = multierror.Append(errList, relayClientErr)
}

if errList != nil {
return errList.ErrorOrNil()
// TODO: this is using join, since there will be more subcomponents requiring closing after adding PUT functionality
return join.Join(relayClientErr)
}

// createCodec creates the codec based on client config values
func createCodec(config *EigenDAClientConfig) (codecs.BlobCodec, error) {
lowLevelCodec, err := codecs.BlobEncodingVersionToCodec(config.PutBlobEncodingVersion)
if err != nil {
return nil, fmt.Errorf("create low level codec: %w", err)
}

return nil
if config.DisablePointVerificationMode {
return codecs.NewNoIFFTCodec(lowLevelCodec), nil
} else {
return codecs.NewIFFTCodec(lowLevelCodec), nil
}
}
24 changes: 14 additions & 10 deletions api/clients/eigenda_client_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ import (
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"golang.org/x/exp/rand"
"math/rand"
"testing"
"time"
)

type ClientV2Tester struct {
ClientV2 clients.EigenDAClientV2
ClientV2 *clients.EigenDAClientV2
MockRelayClient *clientsmock.MockRelayClient
MockCodec *codecsmock.BlobCodec
}
Expand All @@ -32,13 +32,17 @@ func (c *ClientV2Tester) assertExpectations(t *testing.T) {
func buildClientV2Tester(t *testing.T) ClientV2Tester {
tu.InitializeRandom()
logger := logging.NewNoopLogger()
clientConfig := clients.EigenDAClientConfig{}
clientConfig := &clients.EigenDAClientConfig{}

mockRelayClient := clientsmock.MockRelayClient{}
mockCodec := codecsmock.BlobCodec{}

// TODO (litt3): use TestRandom once the PR merges https://github.com/Layr-Labs/eigenda/pull/976
random := rand.New(rand.NewSource(rand.Int63()))

client, err := clients.NewEigenDAClientV2(
logger,
random,
clientConfig,
&mockRelayClient,
&mockCodec)
Expand All @@ -61,7 +65,7 @@ func TestGetBlobSuccess(t *testing.T) {
blobBytes := tu.RandomBytes(100)

relayKeys := make([]v2.RelayKey, 1)
relayKeys[0] = tu.RandomUint16()
relayKeys[0] = rand.Uint32()
blobCert := v2.BlobCertificate{
RelayKeys: relayKeys,
}
Expand All @@ -88,7 +92,7 @@ func TestRandomRelayRetries(t *testing.T) {
relayCount := 100
relayKeys := make([]v2.RelayKey, relayCount)
for i := 0; i < relayCount; i++ {
relayKeys[i] = tu.RandomUint16()
relayKeys[i] = rand.Uint32()
}
blobCert := v2.BlobCertificate{
RelayKeys: relayKeys,
Expand Down Expand Up @@ -135,7 +139,7 @@ func TestNoRelayResponse(t *testing.T) {
relayCount := 10
relayKeys := make([]v2.RelayKey, relayCount)
for i := 0; i < relayCount; i++ {
relayKeys[i] = tu.RandomUint16()
relayKeys[i] = rand.Uint32()
}
blobCert := v2.BlobCertificate{
RelayKeys: relayKeys,
Expand Down Expand Up @@ -177,7 +181,7 @@ func TestGetBlobReturns0Len(t *testing.T) {
relayCount := 10
relayKeys := make([]v2.RelayKey, relayCount)
for i := 0; i < relayCount; i++ {
relayKeys[i] = tu.RandomUint16()
relayKeys[i] = rand.Uint32()
}
blobCert := v2.BlobCertificate{
RelayKeys: relayKeys,
Expand Down Expand Up @@ -207,7 +211,7 @@ func TestFailedDecoding(t *testing.T) {
relayCount := 10
relayKeys := make([]v2.RelayKey, relayCount)
for i := 0; i < relayCount; i++ {
relayKeys[i] = tu.RandomUint16()
relayKeys[i] = rand.Uint32()
}
blobCert := v2.BlobCertificate{
RelayKeys: relayKeys,
Expand Down Expand Up @@ -261,7 +265,7 @@ func TestGetCodec(t *testing.T) {

// TestBuilder tests that the method that builds the client from config doesn't throw any obvious errors
func TestBuilder(t *testing.T) {
clientConfig := clients.EigenDAClientConfig{
clientConfig := &clients.EigenDAClientConfig{
StatusQueryTimeout: 10 * time.Minute,
StatusQueryRetryInterval: 50 * time.Millisecond,
ResponseTimeout: 10 * time.Second,
Expand All @@ -277,7 +281,7 @@ func TestBuilder(t *testing.T) {
SvcManagerAddr: "0x1234567890123456789012345678901234567890",
}

relayClientConfig := clients.RelayClientConfig{
relayClientConfig := &clients.RelayClientConfig{
Sockets: make(map[v2.RelayKey]string),
UseSecureGrpcFlag: true,
}
Expand Down
5 changes: 0 additions & 5 deletions common/testutils/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,3 @@ func RandomString(length int) string {
}
return string(b)
}

// RandomUint16 generates a random uint16
func RandomUint16() uint16 {
return uint16(rand.Uint32())
}

0 comments on commit 885c131

Please sign in to comment.