Skip to content

Commit

Permalink
feat!: eigenda client confirmation depth (#821)
Browse files Browse the repository at this point in the history
  • Loading branch information
samlaf authored Oct 28, 2024
1 parent 83cf627 commit 5fe3e91
Show file tree
Hide file tree
Showing 4 changed files with 332 additions and 41 deletions.
30 changes: 30 additions & 0 deletions api/clients/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package clients

import (
"fmt"
"log"
"time"

"github.com/Layr-Labs/eigenda/api/clients/codecs"
Expand All @@ -23,6 +24,19 @@ type EigenDAClientConfig struct {
// The amount of time to wait between status queries of a newly dispersed blob
StatusQueryRetryInterval time.Duration

// The Ethereum RPC URL to use for querying the Ethereum blockchain.
// This is used to make sure that the blob has been confirmed on-chain.
EthRpcUrl string

// The address of the EigenDAServiceManager contract, used to make sure that the blob has been confirmed on-chain.
SvcManagerAddr string

// The number of Ethereum blocks to wait after the blob's batch has been included onchain, before returning from PutBlob calls.
// In most cases only makes sense if < 64 blocks (2 epochs). Otherwise, consider using WaitForFinalization instead.
//
// When WaitForFinalization is true, this field is ignored.
WaitForConfirmationDepth uint64

// If true, will wait for the blob to finalize, if false, will wait only for the blob to confirm.
WaitForFinalization bool

Expand Down Expand Up @@ -51,6 +65,22 @@ type EigenDAClientConfig struct {
}

func (c *EigenDAClientConfig) CheckAndSetDefaults() error {
if c.WaitForFinalization {
if c.WaitForConfirmationDepth != 0 {
log.Println("Warning: WaitForFinalization is set to true, WaitForConfirmationDepth will be ignored")
}
} else {
if c.WaitForConfirmationDepth > 64 {
log.Printf("Warning: WaitForConfirmationDepth is set to %v > 64 (2 epochs == finality). Consider setting WaitForFinalization to true instead.\n", c.WaitForConfirmationDepth)
}
}
if c.SvcManagerAddr == "" {
return fmt.Errorf("EigenDAClientConfig.SvcManagerAddr not set. Needed to verify blob confirmed on-chain.")
}
if c.EthRpcUrl == "" {
return fmt.Errorf("EigenDAClientConfig.EthRpcUrl not set. Needed to verify blob confirmed on-chain.")
}

if c.StatusQueryRetryInterval == 0 {
c.StatusQueryRetryInterval = 5 * time.Second
}
Expand Down
140 changes: 140 additions & 0 deletions api/clients/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package clients

import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"testing"
"time"
)

// Claude generated tests... don't blame the copy paster.
func TestEigenDAClientConfig_CheckAndSetDefaults(t *testing.T) {
// Helper function to create a valid base config
newValidConfig := func() *EigenDAClientConfig {
return &EigenDAClientConfig{
RPC: "http://localhost:8080",
EthRpcUrl: "http://localhost:8545",
SvcManagerAddr: "0x1234567890123456789012345678901234567890",
}
}

t.Run("Valid minimal configuration", func(t *testing.T) {
config := newValidConfig()
err := config.CheckAndSetDefaults()
require.NoError(t, err)

// Check default values are set
assert.Equal(t, 5*time.Second, config.StatusQueryRetryInterval)
assert.Equal(t, 25*time.Minute, config.StatusQueryTimeout)
assert.Equal(t, 30*time.Second, config.ResponseTimeout)
})

t.Run("Missing required fields", func(t *testing.T) {
testCases := []struct {
name string
modifyConf func(*EigenDAClientConfig)
expectedErr string
}{
{
name: "Missing RPC",
modifyConf: func(c *EigenDAClientConfig) {
c.RPC = ""
},
expectedErr: "EigenDAClientConfig.RPC not set",
},
{
name: "Missing EthRpcUrl",
modifyConf: func(c *EigenDAClientConfig) {
c.EthRpcUrl = ""
},
expectedErr: "EigenDAClientConfig.EthRpcUrl not set. Needed to verify blob confirmed on-chain.",
},
{
name: "Missing SvcManagerAddr",
modifyConf: func(c *EigenDAClientConfig) {
c.SvcManagerAddr = ""
},
expectedErr: "EigenDAClientConfig.SvcManagerAddr not set. Needed to verify blob confirmed on-chain.",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
config := newValidConfig()
tc.modifyConf(config)
err := config.CheckAndSetDefaults()
assert.EqualError(t, err, tc.expectedErr)
})
}
})

t.Run("SignerPrivateKeyHex validation", func(t *testing.T) {
testCases := []struct {
name string
keyHex string
shouldError bool
}{
{
name: "Empty key (valid for read-only)",
keyHex: "",
shouldError: false,
},
{
name: "Valid length key (64 bytes)",
keyHex: "1234567890123456789012345678901234567890123456789012345678901234",
shouldError: false,
},
{
name: "Invalid length key",
keyHex: "123456",
shouldError: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
config := newValidConfig()
config.SignerPrivateKeyHex = tc.keyHex
err := config.CheckAndSetDefaults()
if tc.shouldError {
assert.Error(t, err)
assert.Contains(t, err.Error(), "SignerPrivateKeyHex")
} else {
assert.NoError(t, err)
}
})
}
})

t.Run("Custom timeouts", func(t *testing.T) {
config := newValidConfig()
customRetryInterval := 10 * time.Second
customQueryTimeout := 30 * time.Minute
customResponseTimeout := 45 * time.Second

config.StatusQueryRetryInterval = customRetryInterval
config.StatusQueryTimeout = customQueryTimeout
config.ResponseTimeout = customResponseTimeout

err := config.CheckAndSetDefaults()
require.NoError(t, err)

assert.Equal(t, customRetryInterval, config.StatusQueryRetryInterval)
assert.Equal(t, customQueryTimeout, config.StatusQueryTimeout)
assert.Equal(t, customResponseTimeout, config.ResponseTimeout)
})

t.Run("Optional fields", func(t *testing.T) {
config := newValidConfig()
config.CustomQuorumIDs = []uint{2, 3, 4}
config.DisableTLS = true
config.DisablePointVerificationMode = true

err := config.CheckAndSetDefaults()
require.NoError(t, err)

assert.Equal(t, []uint{2, 3, 4}, config.CustomQuorumIDs)
assert.True(t, config.DisableTLS)
assert.True(t, config.DisablePointVerificationMode)
})
}
109 changes: 90 additions & 19 deletions api/clients/eigenda_client.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
package clients

import (
"bytes"
"context"
"encoding/base64"
"encoding/hex"
"fmt"
"math/big"
"net"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"

"github.com/Layr-Labs/eigenda/api/clients/codecs"
grpcdisperser "github.com/Layr-Labs/eigenda/api/grpc/disperser"
edasm "github.com/Layr-Labs/eigenda/contracts/bindings/EigenDAServiceManager"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/core/auth"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/ethereum/go-ethereum/log"
)

// IEigenDAClient is a wrapper around the DisperserClient interface which
Expand All @@ -29,10 +36,12 @@ type IEigenDAClient interface {
type EigenDAClient struct {
// TODO: all of these should be private, to prevent users from using them directly,
// which breaks encapsulation and makes it hard for us to do refactors or changes
Config EigenDAClientConfig
Log log.Logger
Client DisperserClient
Codec codecs.BlobCodec
Config EigenDAClientConfig
Log log.Logger
Client DisperserClient
ethClient *ethclient.Client
edasmCaller *edasm.ContractEigenDAServiceManagerCaller
Codec codecs.BlobCodec
}

var _ IEigenDAClient = &EigenDAClient{}
Expand Down Expand Up @@ -70,6 +79,17 @@ func NewEigenDAClient(log log.Logger, config EigenDAClientConfig) (*EigenDAClien
return nil, err
}

var ethClient *ethclient.Client
var edasmCaller *edasm.ContractEigenDAServiceManagerCaller
ethClient, err = ethclient.Dial(config.EthRpcUrl)
if err != nil {
return nil, fmt.Errorf("failed to dial ETH RPC node: %w", err)
}
edasmCaller, err = edasm.NewContractEigenDAServiceManagerCaller(common.HexToAddress(config.SvcManagerAddr), ethClient)
if err != nil {
return nil, fmt.Errorf("failed to create EigenDAServiceManagerCaller: %w", err)
}

host, port, err := net.SplitHostPort(config.RPC)
if err != nil {
return nil, fmt.Errorf("failed to parse EigenDA RPC: %w", err)
Expand Down Expand Up @@ -101,15 +121,15 @@ func NewEigenDAClient(log log.Logger, config EigenDAClientConfig) (*EigenDAClien
}

return &EigenDAClient{
Log: log,
Config: config,
Client: disperserClient,
Codec: codec,
Log: log,
Config: config,
Client: disperserClient,
ethClient: ethClient,
edasmCaller: edasmCaller,
Codec: codec,
}, nil
}

// Deprecated: do not rely on this function. Do not use m.Codec directly either.
// These will eventually be removed and not exposed.
func (m *EigenDAClient) GetCodec() codecs.BlobCodec {
return m.Codec
}
Expand Down Expand Up @@ -143,6 +163,10 @@ func (m *EigenDAClient) GetBlob(ctx context.Context, batchHeaderHash []byte, blo
// PutBlob encodes and writes a blob to EigenDA, waiting for a desired blob status
// to be reached (guarded by WaitForFinalization config param) before returning.
// This function is resilient to transient failures and timeouts.
//
// Upon return the blob is guaranteed to be:
// - finalized onchain (if Config.WaitForFinalization is true), or
// - confirmed at a certain depth (if Config.WaitForFinalization is false, in which case Config.WaitForConfirmationDepth specifies the depth).
func (m *EigenDAClient) PutBlob(ctx context.Context, data []byte) (*grpcdisperser.BlobInfo, error) {
resultChan, errorChan := m.PutBlobAsync(ctx, data)
select { // no timeout here because we depend on the configured timeout in PutBlobAsync
Expand Down Expand Up @@ -204,7 +228,7 @@ func (m *EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan
defer cancel()

alreadyWaitingForDispersal := false
alreadyWaitingForFinalization := false
alreadyWaitingForConfirmationOrFinality := false
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -235,16 +259,30 @@ func (m *EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan
case grpcdisperser.BlobStatus_CONFIRMED:
if m.Config.WaitForFinalization {
// to prevent log clutter, we only log at info level once
if alreadyWaitingForFinalization {
m.Log.Debug("EigenDA blob confirmed, waiting for finalization", "requestID", base64RequestID)
if alreadyWaitingForConfirmationOrFinality {
m.Log.Debug("EigenDA blob included onchain, waiting for finalization", "requestID", base64RequestID)
} else {
m.Log.Info("EigenDA blob confirmed, waiting for finalization", "requestID", base64RequestID)
alreadyWaitingForFinalization = true
m.Log.Info("EigenDA blob included onchain, waiting for finalization", "requestID", base64RequestID)
alreadyWaitingForConfirmationOrFinality = true
}
} else {
m.Log.Info("EigenDA blob confirmed", "requestID", base64RequestID)
resultChan <- statusRes.Info
return
batchId := statusRes.Info.BlobVerificationProof.GetBatchId()
batchConfirmed, err := m.batchIdConfirmedAtDepth(ctx, batchId, m.Config.WaitForConfirmationDepth)
if err != nil {
m.Log.Warn("Error checking if batch ID is confirmed at depth. Will retry...", "requestID", base64RequestID, "err", err)
}
if batchConfirmed {
m.Log.Info("EigenDA blob confirmed", "requestID", base64RequestID, "confirmationDepth", m.Config.WaitForConfirmationDepth)
resultChan <- statusRes.Info
return
}
// to prevent log clutter, we only log at info level once
if alreadyWaitingForConfirmationOrFinality {
m.Log.Debug("EigenDA blob included onchain, waiting for confirmation", "requestID", base64RequestID, "confirmationDepth", m.Config.WaitForConfirmationDepth)
} else {
m.Log.Info("EigenDA blob included onchain, waiting for confirmation", "requestID", base64RequestID, "confirmationDepth", m.Config.WaitForConfirmationDepth)
alreadyWaitingForConfirmationOrFinality = true
}
}
case grpcdisperser.BlobStatus_FINALIZED:
batchHeaderHashHex := fmt.Sprintf("0x%s", hex.EncodeToString(statusRes.Info.BlobVerificationProof.BatchMetadata.BatchHeaderHash))
Expand All @@ -265,3 +303,36 @@ func (m *EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan
func (c *EigenDAClient) Close() error {
return c.Client.Close()
}

// getConfDeepBlockNumber returns the block number that is `depth` blocks behind the current block number.
func (m EigenDAClient) getConfDeepBlockNumber(ctx context.Context, depth uint64) (*big.Int, error) {
curBlockNumber, err := m.ethClient.BlockNumber(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get latest block number: %w", err)
}
// If curBlock < depth, this will return the genesis block number (0),
// which would cause to accept as confirmed a block that isn't depth deep.
// TODO: there's prob a better way to deal with this, like returning a special error
if curBlockNumber < depth {
return big.NewInt(0), nil
}
return new(big.Int).SetUint64(curBlockNumber - depth), nil
}

// batchIdConfirmedAtDepth checks if a batch ID has been confirmed at a certain depth.
// It returns true if the batch ID has been confirmed at the given depth, and false otherwise,
// or returns an error if any of the network calls fail.
func (m EigenDAClient) batchIdConfirmedAtDepth(ctx context.Context, batchId uint32, depth uint64) (bool, error) {
confDeepBlockNumber, err := m.getConfDeepBlockNumber(ctx, depth)
if err != nil {
return false, fmt.Errorf("failed to get confirmation deep block number: %w", err)
}
onchainBatchMetadataHash, err := m.edasmCaller.BatchIdToBatchMetadataHash(&bind.CallOpts{BlockNumber: confDeepBlockNumber}, batchId)
if err != nil {
return false, fmt.Errorf("failed to get batch metadata hash: %w", err)
}
if bytes.Equal(onchainBatchMetadataHash[:], make([]byte, 32)) {
return false, nil
}
return true, nil
}
Loading

0 comments on commit 5fe3e91

Please sign in to comment.