Skip to content

Commit

Permalink
Implement relay call timeout
Browse files Browse the repository at this point in the history
Signed-off-by: litt3 <[email protected]>
  • Loading branch information
litt3 committed Dec 12, 2024
1 parent dd3c262 commit 88df865
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 3 deletions.
4 changes: 4 additions & 0 deletions api/clients/v2/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v2

import (
"github.com/Layr-Labs/eigenda/api/clients/codecs"
"time"
)

// VerificationMode is an enum that represents the different ways that a blob may be encoded/decoded between
Expand All @@ -26,4 +27,7 @@ type EigenDAClientConfig struct {
// If PointVerificationMode is NoIFFT, the blob must be supplied in its entirety, to perform a verification
// that any part of the data matches the KZG commitment.
PointVerificationMode VerificationMode

// The timeout duration for relay calls
RelayTimeout time.Duration
}
15 changes: 13 additions & 2 deletions api/clients/v2/eigenda_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ func (c *EigenDAClient) GetBlob(
for _, val := range indices {
relayKey := blobCertificate.RelayKeys[val]

// TODO: does this need a timeout?
data, err := c.relayClient.GetBlob(ctx, relayKey, blobKey)
data, err := c.getBlobWithTimeout(ctx, relayKey, blobKey)

// if GetBlob returned an error, try calling a different relay
if err != nil {
Expand All @@ -113,6 +112,18 @@ func (c *EigenDAClient) GetBlob(
return nil, fmt.Errorf("unable to retrieve blob from any relay. relay count: %d", relayKeyCount)
}

// getBlobWithTimeout attempts to get a blob from a given relay, and times out based on config.RelayTimeout
func (c *EigenDAClient) getBlobWithTimeout(
ctx context.Context,
relayKey core.RelayKey,
blobKey core.BlobKey) ([]byte, error) {

timeoutCtx, cancel := context.WithTimeout(ctx, c.config.RelayTimeout)
defer cancel()

return c.relayClient.GetBlob(timeoutCtx, relayKey, blobKey)
}

// GetCodec returns the codec the client uses for encoding and decoding blobs
func (c *EigenDAClient) GetCodec() codecs.BlobCodec {
return c.codec
Expand Down
58 changes: 57 additions & 1 deletion api/clients/v2/eigenda_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v2

import (
"context"
"errors"
"fmt"
"github.com/Layr-Labs/eigenda/api/clients"
"github.com/Layr-Labs/eigenda/api/clients/codecs"
Expand All @@ -13,6 +14,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"testing"
"time"
)

type ClientTester struct {
Expand All @@ -30,7 +32,9 @@ func (c *ClientTester) assertExpectations(t *testing.T) {
// buildClientTester sets up a client with mocks necessary for testing
func buildClientTester(t *testing.T) ClientTester {
logger := logging.NewNoopLogger()
clientConfig := &EigenDAClientConfig{}
clientConfig := &EigenDAClientConfig{
RelayTimeout: 50 * time.Millisecond,
}

mockRelayClient := clientsmock.MockRelayClient{}
mockCodec := codecsmock.BlobCodec{}
Expand Down Expand Up @@ -79,6 +83,57 @@ func TestGetBlobSuccess(t *testing.T) {
tester.assertExpectations(t)
}

// TestRelayCallTimeout verifies that calls to the relay timeout after the expected duration
func TestRelayCallTimeout(t *testing.T) {
tester := buildClientTester(t)

blobKey := core.BlobKey(tester.Random.RandomBytes(32))

relayKeys := make([]core.RelayKey, 1)
relayKeys[0] = tester.Random.Uint32()
blobCert := core.BlobCertificate{
RelayKeys: relayKeys,
}

// the timeout should occur before the panic has a chance to be triggered
tester.MockRelayClient.On("GetBlob", mock.Anything, relayKeys[0], blobKey).Return(
nil, errors.New("timeout")).Once().Run(
func(args mock.Arguments) {
ctx := args.Get(0).(context.Context)
select {
case <-ctx.Done():
// this is the expected case
return
case <-time.After(time.Second):
panic("call should have timed out first")
}
})

// the panic should be triggered, since it happens faster than the configured timout
tester.MockRelayClient.On("GetBlob", mock.Anything, relayKeys[0], blobKey).Return(
nil, errors.New("timeout")).Once().Run(
func(args mock.Arguments) {
ctx := args.Get(0).(context.Context)
select {
case <-ctx.Done():
return
case <-time.After(time.Millisecond):
// this is the expected case
panic("call should not have timed out")
}
})

assert.NotPanics(t, func() {
_, _ = tester.Client.GetBlob(context.Background(), blobKey, blobCert)
})

assert.Panics(t, func() {
_, _ = tester.Client.GetBlob(context.Background(), blobKey, blobCert)
})

tester.assertExpectations(t)
}

// TestRandomRelayRetries verifies correct behavior when some relays from the certificate do not respond with the blob,
// requiring the client to retry with other relays.
func TestRandomRelayRetries(t *testing.T) {
Expand Down Expand Up @@ -266,6 +321,7 @@ func TestBuilder(t *testing.T) {
clientConfig := &EigenDAClientConfig{
BlobEncodingVersion: codecs.DefaultBlobEncoding,
PointVerificationMode: IFFT,
RelayTimeout: 500 * time.Millisecond,
}

sockets := make(map[core.RelayKey]string)
Expand Down

0 comments on commit 88df865

Please sign in to comment.