diff --git a/api/clients/config.go b/api/clients/config.go index ae132b50c7..0f23ccb1cb 100644 --- a/api/clients/config.go +++ b/api/clients/config.go @@ -2,6 +2,7 @@ package clients import ( "fmt" + "log" "time" "github.com/Layr-Labs/eigenda/api/clients/codecs" @@ -23,6 +24,19 @@ type EigenDAClientConfig struct { // The amount of time to wait between status queries of a newly dispersed blob StatusQueryRetryInterval time.Duration + // The Ethereum RPC URL to use for querying the Ethereum blockchain. + // This is used to make sure that the blob has been confirmed on-chain. + EthRpcUrl string + + // The address of the EigenDAServiceManager contract, used to make sure that the blob has been confirmed on-chain. + SvcManagerAddr string + + // The number of Ethereum blocks to wait after the blob's batch has been included onchain, before returning from PutBlob calls. + // In most cases only makes sense if < 64 blocks (2 epochs). Otherwise, consider using WaitForFinalization instead. + // + // When WaitForFinalization is true, this field is ignored. + WaitForConfirmationDepth uint64 + // If true, will wait for the blob to finalize, if false, will wait only for the blob to confirm. WaitForFinalization bool @@ -51,6 +65,22 @@ type EigenDAClientConfig struct { } func (c *EigenDAClientConfig) CheckAndSetDefaults() error { + if c.WaitForFinalization { + if c.WaitForConfirmationDepth != 0 { + log.Println("Warning: WaitForFinalization is set to true, WaitForConfirmationDepth will be ignored") + } + } else { + if c.WaitForConfirmationDepth > 64 { + log.Printf("Warning: WaitForConfirmationDepth is set to %v > 64 (2 epochs == finality). Consider setting WaitForFinalization to true instead.\n", c.WaitForConfirmationDepth) + } + } + if c.SvcManagerAddr == "" { + return fmt.Errorf("EigenDAClientConfig.SvcManagerAddr not set. Needed to verify blob confirmed on-chain.") + } + if c.EthRpcUrl == "" { + return fmt.Errorf("EigenDAClientConfig.EthRpcUrl not set. Needed to verify blob confirmed on-chain.") + } + if c.StatusQueryRetryInterval == 0 { c.StatusQueryRetryInterval = 5 * time.Second } diff --git a/api/clients/config_test.go b/api/clients/config_test.go new file mode 100644 index 0000000000..1176d7452c --- /dev/null +++ b/api/clients/config_test.go @@ -0,0 +1,140 @@ +package clients + +import ( + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "testing" + "time" +) + +// Claude generated tests... don't blame the copy paster. +func TestEigenDAClientConfig_CheckAndSetDefaults(t *testing.T) { + // Helper function to create a valid base config + newValidConfig := func() *EigenDAClientConfig { + return &EigenDAClientConfig{ + RPC: "http://localhost:8080", + EthRpcUrl: "http://localhost:8545", + SvcManagerAddr: "0x1234567890123456789012345678901234567890", + } + } + + t.Run("Valid minimal configuration", func(t *testing.T) { + config := newValidConfig() + err := config.CheckAndSetDefaults() + require.NoError(t, err) + + // Check default values are set + assert.Equal(t, 5*time.Second, config.StatusQueryRetryInterval) + assert.Equal(t, 25*time.Minute, config.StatusQueryTimeout) + assert.Equal(t, 30*time.Second, config.ResponseTimeout) + }) + + t.Run("Missing required fields", func(t *testing.T) { + testCases := []struct { + name string + modifyConf func(*EigenDAClientConfig) + expectedErr string + }{ + { + name: "Missing RPC", + modifyConf: func(c *EigenDAClientConfig) { + c.RPC = "" + }, + expectedErr: "EigenDAClientConfig.RPC not set", + }, + { + name: "Missing EthRpcUrl", + modifyConf: func(c *EigenDAClientConfig) { + c.EthRpcUrl = "" + }, + expectedErr: "EigenDAClientConfig.EthRpcUrl not set. Needed to verify blob confirmed on-chain.", + }, + { + name: "Missing SvcManagerAddr", + modifyConf: func(c *EigenDAClientConfig) { + c.SvcManagerAddr = "" + }, + expectedErr: "EigenDAClientConfig.SvcManagerAddr not set. Needed to verify blob confirmed on-chain.", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + config := newValidConfig() + tc.modifyConf(config) + err := config.CheckAndSetDefaults() + assert.EqualError(t, err, tc.expectedErr) + }) + } + }) + + t.Run("SignerPrivateKeyHex validation", func(t *testing.T) { + testCases := []struct { + name string + keyHex string + shouldError bool + }{ + { + name: "Empty key (valid for read-only)", + keyHex: "", + shouldError: false, + }, + { + name: "Valid length key (64 bytes)", + keyHex: "1234567890123456789012345678901234567890123456789012345678901234", + shouldError: false, + }, + { + name: "Invalid length key", + keyHex: "123456", + shouldError: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + config := newValidConfig() + config.SignerPrivateKeyHex = tc.keyHex + err := config.CheckAndSetDefaults() + if tc.shouldError { + assert.Error(t, err) + assert.Contains(t, err.Error(), "SignerPrivateKeyHex") + } else { + assert.NoError(t, err) + } + }) + } + }) + + t.Run("Custom timeouts", func(t *testing.T) { + config := newValidConfig() + customRetryInterval := 10 * time.Second + customQueryTimeout := 30 * time.Minute + customResponseTimeout := 45 * time.Second + + config.StatusQueryRetryInterval = customRetryInterval + config.StatusQueryTimeout = customQueryTimeout + config.ResponseTimeout = customResponseTimeout + + err := config.CheckAndSetDefaults() + require.NoError(t, err) + + assert.Equal(t, customRetryInterval, config.StatusQueryRetryInterval) + assert.Equal(t, customQueryTimeout, config.StatusQueryTimeout) + assert.Equal(t, customResponseTimeout, config.ResponseTimeout) + }) + + t.Run("Optional fields", func(t *testing.T) { + config := newValidConfig() + config.CustomQuorumIDs = []uint{2, 3, 4} + config.DisableTLS = true + config.DisablePointVerificationMode = true + + err := config.CheckAndSetDefaults() + require.NoError(t, err) + + assert.Equal(t, []uint{2, 3, 4}, config.CustomQuorumIDs) + assert.True(t, config.DisableTLS) + assert.True(t, config.DisablePointVerificationMode) + }) +} diff --git a/api/clients/eigenda_client.go b/api/clients/eigenda_client.go index 173af1a7cf..20b45d1da3 100644 --- a/api/clients/eigenda_client.go +++ b/api/clients/eigenda_client.go @@ -1,19 +1,26 @@ package clients import ( + "bytes" "context" "encoding/base64" "encoding/hex" "fmt" + "math/big" "net" "time" + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/log" + "github.com/Layr-Labs/eigenda/api/clients/codecs" grpcdisperser "github.com/Layr-Labs/eigenda/api/grpc/disperser" + edasm "github.com/Layr-Labs/eigenda/contracts/bindings/EigenDAServiceManager" "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/core/auth" "github.com/Layr-Labs/eigenda/disperser" - "github.com/ethereum/go-ethereum/log" ) // IEigenDAClient is a wrapper around the DisperserClient interface which @@ -29,10 +36,12 @@ type IEigenDAClient interface { type EigenDAClient struct { // TODO: all of these should be private, to prevent users from using them directly, // which breaks encapsulation and makes it hard for us to do refactors or changes - Config EigenDAClientConfig - Log log.Logger - Client DisperserClient - Codec codecs.BlobCodec + Config EigenDAClientConfig + Log log.Logger + Client DisperserClient + ethClient *ethclient.Client + edasmCaller *edasm.ContractEigenDAServiceManagerCaller + Codec codecs.BlobCodec } var _ IEigenDAClient = &EigenDAClient{} @@ -70,6 +79,17 @@ func NewEigenDAClient(log log.Logger, config EigenDAClientConfig) (*EigenDAClien return nil, err } + var ethClient *ethclient.Client + var edasmCaller *edasm.ContractEigenDAServiceManagerCaller + ethClient, err = ethclient.Dial(config.EthRpcUrl) + if err != nil { + return nil, fmt.Errorf("failed to dial ETH RPC node: %w", err) + } + edasmCaller, err = edasm.NewContractEigenDAServiceManagerCaller(common.HexToAddress(config.SvcManagerAddr), ethClient) + if err != nil { + return nil, fmt.Errorf("failed to create EigenDAServiceManagerCaller: %w", err) + } + host, port, err := net.SplitHostPort(config.RPC) if err != nil { return nil, fmt.Errorf("failed to parse EigenDA RPC: %w", err) @@ -101,15 +121,15 @@ func NewEigenDAClient(log log.Logger, config EigenDAClientConfig) (*EigenDAClien } return &EigenDAClient{ - Log: log, - Config: config, - Client: disperserClient, - Codec: codec, + Log: log, + Config: config, + Client: disperserClient, + ethClient: ethClient, + edasmCaller: edasmCaller, + Codec: codec, }, nil } -// Deprecated: do not rely on this function. Do not use m.Codec directly either. -// These will eventually be removed and not exposed. func (m *EigenDAClient) GetCodec() codecs.BlobCodec { return m.Codec } @@ -143,6 +163,10 @@ func (m *EigenDAClient) GetBlob(ctx context.Context, batchHeaderHash []byte, blo // PutBlob encodes and writes a blob to EigenDA, waiting for a desired blob status // to be reached (guarded by WaitForFinalization config param) before returning. // This function is resilient to transient failures and timeouts. +// +// Upon return the blob is guaranteed to be: +// - finalized onchain (if Config.WaitForFinalization is true), or +// - confirmed at a certain depth (if Config.WaitForFinalization is false, in which case Config.WaitForConfirmationDepth specifies the depth). func (m *EigenDAClient) PutBlob(ctx context.Context, data []byte) (*grpcdisperser.BlobInfo, error) { resultChan, errorChan := m.PutBlobAsync(ctx, data) select { // no timeout here because we depend on the configured timeout in PutBlobAsync @@ -204,7 +228,7 @@ func (m *EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan defer cancel() alreadyWaitingForDispersal := false - alreadyWaitingForFinalization := false + alreadyWaitingForConfirmationOrFinality := false for { select { case <-ctx.Done(): @@ -235,16 +259,30 @@ func (m *EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan case grpcdisperser.BlobStatus_CONFIRMED: if m.Config.WaitForFinalization { // to prevent log clutter, we only log at info level once - if alreadyWaitingForFinalization { - m.Log.Debug("EigenDA blob confirmed, waiting for finalization", "requestID", base64RequestID) + if alreadyWaitingForConfirmationOrFinality { + m.Log.Debug("EigenDA blob included onchain, waiting for finalization", "requestID", base64RequestID) } else { - m.Log.Info("EigenDA blob confirmed, waiting for finalization", "requestID", base64RequestID) - alreadyWaitingForFinalization = true + m.Log.Info("EigenDA blob included onchain, waiting for finalization", "requestID", base64RequestID) + alreadyWaitingForConfirmationOrFinality = true } } else { - m.Log.Info("EigenDA blob confirmed", "requestID", base64RequestID) - resultChan <- statusRes.Info - return + batchId := statusRes.Info.BlobVerificationProof.GetBatchId() + batchConfirmed, err := m.batchIdConfirmedAtDepth(ctx, batchId, m.Config.WaitForConfirmationDepth) + if err != nil { + m.Log.Warn("Error checking if batch ID is confirmed at depth. Will retry...", "requestID", base64RequestID, "err", err) + } + if batchConfirmed { + m.Log.Info("EigenDA blob confirmed", "requestID", base64RequestID, "confirmationDepth", m.Config.WaitForConfirmationDepth) + resultChan <- statusRes.Info + return + } + // to prevent log clutter, we only log at info level once + if alreadyWaitingForConfirmationOrFinality { + m.Log.Debug("EigenDA blob included onchain, waiting for confirmation", "requestID", base64RequestID, "confirmationDepth", m.Config.WaitForConfirmationDepth) + } else { + m.Log.Info("EigenDA blob included onchain, waiting for confirmation", "requestID", base64RequestID, "confirmationDepth", m.Config.WaitForConfirmationDepth) + alreadyWaitingForConfirmationOrFinality = true + } } case grpcdisperser.BlobStatus_FINALIZED: batchHeaderHashHex := fmt.Sprintf("0x%s", hex.EncodeToString(statusRes.Info.BlobVerificationProof.BatchMetadata.BatchHeaderHash)) @@ -265,3 +303,36 @@ func (m *EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan func (c *EigenDAClient) Close() error { return c.Client.Close() } + +// getConfDeepBlockNumber returns the block number that is `depth` blocks behind the current block number. +func (m EigenDAClient) getConfDeepBlockNumber(ctx context.Context, depth uint64) (*big.Int, error) { + curBlockNumber, err := m.ethClient.BlockNumber(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get latest block number: %w", err) + } + // If curBlock < depth, this will return the genesis block number (0), + // which would cause to accept as confirmed a block that isn't depth deep. + // TODO: there's prob a better way to deal with this, like returning a special error + if curBlockNumber < depth { + return big.NewInt(0), nil + } + return new(big.Int).SetUint64(curBlockNumber - depth), nil +} + +// batchIdConfirmedAtDepth checks if a batch ID has been confirmed at a certain depth. +// It returns true if the batch ID has been confirmed at the given depth, and false otherwise, +// or returns an error if any of the network calls fail. +func (m EigenDAClient) batchIdConfirmedAtDepth(ctx context.Context, batchId uint32, depth uint64) (bool, error) { + confDeepBlockNumber, err := m.getConfDeepBlockNumber(ctx, depth) + if err != nil { + return false, fmt.Errorf("failed to get confirmation deep block number: %w", err) + } + onchainBatchMetadataHash, err := m.edasmCaller.BatchIdToBatchMetadataHash(&bind.CallOpts{BlockNumber: confDeepBlockNumber}, batchId) + if err != nil { + return false, fmt.Errorf("failed to get batch metadata hash: %w", err) + } + if bytes.Equal(onchainBatchMetadataHash[:], make([]byte, 32)) { + return false, nil + } + return true, nil +} diff --git a/api/clients/eigenda_client_e2e_test.go b/api/clients/eigenda_client_e2e_test.go index f5ab2d04b5..7585b21807 100644 --- a/api/clients/eigenda_client_e2e_test.go +++ b/api/clients/eigenda_client_e2e_test.go @@ -1,13 +1,14 @@ -package clients_test +package clients import ( "context" "flag" + "math/big" "os" "testing" "time" - "github.com/Layr-Labs/eigenda/api/clients" + "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/log" "github.com/stretchr/testify/assert" ) @@ -18,29 +19,78 @@ func init() { flag.BoolVar(&runTestnetIntegrationTests, "testnet-integration", false, "Run testnet-based integration tests") } +// TestClientUsingTestnet tests the eigenda client against holesky testnet disperser. +// We don't test waiting for finality because that adds 12 minutes to the test, and is not necessary +// because we already test for this in the unit tests using a mock disperser which is much faster. func TestClientUsingTestnet(t *testing.T) { if !runTestnetIntegrationTests { t.Skip("Skipping testnet integration test") } - logger := log.NewLogger(log.NewTerminalHandler(os.Stderr, true)) - client, err := clients.NewEigenDAClient(logger, clients.EigenDAClientConfig{ - RPC: "disperser-holesky.eigenda.xyz:443", - StatusQueryTimeout: 25 * time.Minute, - StatusQueryRetryInterval: 5 * time.Second, - CustomQuorumIDs: []uint{}, - SignerPrivateKeyHex: "2d23e142a9e86a9175b9dfa213f20ea01f6c1731e09fa6edf895f70fe279cbb1", - // Waiting for finality adds 12 minutes to the test, and is not necessary - // because we already test for this correct behavior in the unit tests using a mock disperser - // which is much faster. - WaitForFinalization: false, + + t.Run("PutBlobWaitForConfirmationDepth0AndGetBlob", func(t *testing.T) { + t.Parallel() + logger := log.NewLogger(log.NewTerminalHandler(os.Stdout, true)) + client, err := NewEigenDAClient(logger, EigenDAClientConfig{ + RPC: "disperser-holesky.eigenda.xyz:443", + // Should need way less than 20 minutes, but we set it to 20 minutes to be safe + // In worst case we had 10 min batching interval + some time for the tx to land onchain, + // plus wait for 3 blocks of confirmation. + StatusQueryTimeout: 20 * time.Minute, + StatusQueryRetryInterval: 5 * time.Second, + CustomQuorumIDs: []uint{}, + SignerPrivateKeyHex: "2d23e142a9e86a9175b9dfa213f20ea01f6c1731e09fa6edf895f70fe279cbb1", + WaitForFinalization: false, + WaitForConfirmationDepth: 0, + SvcManagerAddr: "0xD4A7E1Bd8015057293f0D0A557088c286942e84b", + EthRpcUrl: "https://1rpc.io/holesky", + }) + data := "hello world!" + assert.NoError(t, err) + blobInfo, err := client.PutBlob(context.Background(), []byte(data)) + assert.NoError(t, err) + batchHeaderHash := blobInfo.BlobVerificationProof.BatchMetadata.BatchHeaderHash + blobIndex := blobInfo.BlobVerificationProof.BlobIndex + blob, err := client.GetBlob(context.Background(), batchHeaderHash, blobIndex) + assert.NoError(t, err) + assert.Equal(t, data, string(blob)) + }) + + t.Run("PutBlobWaitForConfirmationDepth3AndGetBlob", func(t *testing.T) { + t.Parallel() + confDepth := uint64(3) + logger := log.NewLogger(log.NewTerminalHandler(os.Stdout, true)) + client, err := NewEigenDAClient(logger, EigenDAClientConfig{ + RPC: "disperser-holesky.eigenda.xyz:443", + // Should need way less than 20 minutes, but we set it to 20 minutes to be safe + // In worst case we had 10 min batching interval + some time for the tx to land onchain, + // plus wait for 3 blocks of confirmation. + StatusQueryTimeout: 20 * time.Minute, + StatusQueryRetryInterval: 5 * time.Second, + CustomQuorumIDs: []uint{}, + SignerPrivateKeyHex: "2d23e142a9e86a9175b9dfa213f20ea01f6c1731e09fa6edf895f70fe279cbb1", + WaitForFinalization: false, + WaitForConfirmationDepth: confDepth, + SvcManagerAddr: "0xD4A7E1Bd8015057293f0D0A557088c286942e84b", + EthRpcUrl: "https://1rpc.io/holesky", + }) + data := "hello world!" + assert.NoError(t, err) + blobInfo, err := client.PutBlob(context.Background(), []byte(data)) + assert.NoError(t, err) + batchHeaderHash := blobInfo.BlobVerificationProof.BatchMetadata.BatchHeaderHash + blobIndex := blobInfo.BlobVerificationProof.BlobIndex + blob, err := client.GetBlob(context.Background(), batchHeaderHash, blobIndex) + assert.NoError(t, err) + assert.Equal(t, data, string(blob)) + + // assert confirmation depth by making sure the batch metadata hash was registered onchain + // at least confDepth blocks ago + blockNumCur, err := client.ethClient.BlockNumber(context.Background()) + assert.NoError(t, err) + blockNumAtDepth := new(big.Int).SetUint64(blockNumCur - confDepth) + batchId := blobInfo.BlobVerificationProof.GetBatchId() + onchainBatchMetadataHash, err := client.edasmCaller.BatchIdToBatchMetadataHash(&bind.CallOpts{BlockNumber: blockNumAtDepth}, batchId) + assert.NoError(t, err) + assert.NotEqual(t, onchainBatchMetadataHash, make([]byte, 32)) }) - data := "hello world!" - assert.NoError(t, err) - blobInfo, err := client.PutBlob(context.Background(), []byte(data)) - assert.NoError(t, err) - batchHeaderHash := blobInfo.BlobVerificationProof.BatchMetadata.BatchHeaderHash - blobIndex := blobInfo.BlobVerificationProof.BlobIndex - blob, err := client.GetBlob(context.Background(), batchHeaderHash, blobIndex) - assert.NoError(t, err) - assert.Equal(t, data, string(blob)) }