diff --git a/api/client/cert.go b/api/client/cert.go new file mode 100644 index 0000000000..172e9cd897 --- /dev/null +++ b/api/client/cert.go @@ -0,0 +1,8 @@ +package client + +type Cert struct { + BatchHeaderHash []byte + BlobIndex uint32 + ReferenceBlockNumber uint32 + QuorumIDs []uint32 +} diff --git a/api/client/client.go b/api/client/client.go new file mode 100644 index 0000000000..d9f63ba236 --- /dev/null +++ b/api/client/client.go @@ -0,0 +1,244 @@ +package client + +import ( + "bytes" + "context" + "crypto/tls" + "encoding/base64" + "encoding/binary" + "encoding/hex" + "fmt" + "time" + + "github.com/Layr-Labs/eigenda/api/grpc/disperser" + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/core/auth" + "github.com/Layr-Labs/eigenda/encoding" + "github.com/Layr-Labs/eigenda/encoding/utils/codec" + "github.com/ethereum/go-ethereum/log" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" +) + +type BlobEncodingVersion byte + +var NoIFFT BlobEncodingVersion = 0x00 + +type IEigenDAClient interface { + GetBlob(ctx context.Context, BatchHeaderHash []byte, BlobIndex uint32) ([]byte, error) + PutBlob(ctx context.Context, txData []byte) (*Cert, error) +} + +type EigenDAClient struct { + Config + + Log log.Logger + + client disperser.DisperserClient + + signer *auth.LocalBlobRequestSigner +} + +var _ IEigenDAClient = EigenDAClient{} + +func NewEigenDAClient(ctx context.Context, log log.Logger, config Config) (*EigenDAClient, error) { + err := config.Check() + if err != nil { + return nil, err + } + client, err := NewDisperserClient(ctx, config.RPC) + if err != nil { + return nil, fmt.Errorf("failed to instatiated EigenDA GRPC client: %w", err) + } + return &EigenDAClient{ + Log: log, + Config: config, + client: client, + signer: auth.NewLocalBlobRequestSigner(config.SignerPrivateKeyHex), + }, nil +} + +func NewDisperserClient(ctx context.Context, rpc string) (disperser.DisperserClient, error) { + config := &tls.Config{} + credential := credentials.NewTLS(config) + dialOptions := []grpc.DialOption{grpc.WithTransportCredentials(credential)} + conn, err := grpc.Dial(rpc, dialOptions...) + if err != nil { + return nil, err + } + client := disperser.NewDisperserClient(conn) + return client, nil +} + +func (m EigenDAClient) GetBlob(ctx context.Context, BatchHeaderHash []byte, BlobIndex uint32) ([]byte, error) { + reply, err := m.client.RetrieveBlob(ctx, &disperser.RetrieveBlobRequest{ + BatchHeaderHash: BatchHeaderHash, + BlobIndex: BlobIndex, + }) + if err != nil { + return nil, err + } + + // decode modulo bn254 + decodedData := codec.RemoveEmptyByteFromPaddedBytes(reply.Data) + + // Return exact data with buffer removed + reader := bytes.NewReader(decodedData) + + // read version byte, we will not use it for now since there is only one version + _, err = reader.ReadByte() + if err != nil { + return nil, fmt.Errorf("EigenDA client failed to read version byte") + } + + // read length uvarint + length, err := binary.ReadUvarint(reader) + if err != nil { + return nil, fmt.Errorf("EigenDA client failed to decode length uvarint prefix") + } + data := make([]byte, length) + n, err := reader.Read(data) + if err != nil { + return nil, fmt.Errorf("EigenDA client failed to copy unpadded data into final buffer") + } + if uint64(n) != length { + return nil, fmt.Errorf("EigenDA client failed, data length does not match length prefix") + } + + return data, nil +} + +func (m EigenDAClient) PutBlob(ctx context.Context, data []byte) (*Cert, error) { + m.Log.Info("Attempting to disperse blob to EigenDA") + + // encode current blob encoding version byte + data = append([]byte{byte(NoIFFT)}, data...) + + // encode data length + data = append(ConvertIntToVarUInt(len(data)), data...) + + // encode modulo bn254 + data = codec.ConvertByPaddingEmptyByte(data) + + // do auth handshake + disperseBlobAuthClient, err := m.client.DisperseBlobAuthenticated(ctx) + if err != nil { + return nil, fmt.Errorf("error initializing DisperseBlobAuthenticated() client: %w", err) + } + + err = disperseBlobAuthClient.Send(&disperser.AuthenticatedRequest{ + Payload: &disperser.AuthenticatedRequest_DisperseRequest{ + DisperseRequest: &disperser.DisperseBlobRequest{ + Data: data, + CustomQuorumNumbers: m.Config.CustomQuorumIDs, + AccountId: m.signer.GetAccountID(), + }, + }, + }) + if err != nil { + return nil, fmt.Errorf("failed sending initial disperse blob authenticated request: %w", err) + } + + reply, err := disperseBlobAuthClient.Recv() + if err != nil { + return nil, fmt.Errorf("failed receiving challenge parameter for disperse blob authenticated request: %w", err) + } + + authHeaderReply, ok := reply.Payload.(*disperser.AuthenticatedReply_BlobAuthHeader) + if !ok { + return nil, fmt.Errorf("expected blob auth header message in response to initial disperse blob authenticated request: %w", err) + } + + authHeader := core.BlobAuthHeader{ + BlobCommitments: encoding.BlobCommitments{}, + AccountID: "", + Nonce: authHeaderReply.BlobAuthHeader.ChallengeParameter, + } + + authData, err := m.signer.SignBlobRequest(authHeader) + if err != nil { + return nil, fmt.Errorf("error signing challenge parameter while performing disperse blob authenticated request: %w", err) + } + + // Process challenge and send back challenge_reply + err = disperseBlobAuthClient.Send(&disperser.AuthenticatedRequest{Payload: &disperser.AuthenticatedRequest_AuthenticationData{ + AuthenticationData: &disperser.AuthenticationData{ + AuthenticationData: authData, + }, + }}) + if err != nil { + return nil, fmt.Errorf("error writing signed challenge paramter in disperse blob authenticated request: %w", err) + } + + reply, err = disperseBlobAuthClient.Recv() + if err != nil { + return nil, fmt.Errorf("error receiving signed challenge paramter in disperse blob authenticated request: %w", err) + } + + disperseResWrapper, ok := reply.Payload.(*disperser.AuthenticatedReply_DisperseReply) + if !ok { + return nil, fmt.Errorf("expected disperser reply message in response to signed challenge parameter submission in disperse blob authenticated request: %w", err) + } + + disperseRes := disperseResWrapper.DisperseReply + + // process response + if disperseRes.Result == disperser.BlobStatus_UNKNOWN || + disperseRes.Result == disperser.BlobStatus_FAILED { + m.Log.Error("Unable to disperse blob to EigenDA, aborting", "err", err) + return nil, fmt.Errorf("reply status is %d", disperseRes.Result) + } + + base64RequestID := base64.StdEncoding.EncodeToString(disperseRes.RequestId) + + m.Log.Info("Blob disepersed to EigenDA, now waiting for confirmation", "requestID", base64RequestID) + + timeoutTime := time.Now().Add(m.StatusQueryTimeout) + ticker := time.NewTicker(m.StatusQueryRetryInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-ticker.C: + if time.Now().After(timeoutTime) { + return nil, fmt.Errorf("timed out waiting for EigenDA blob to confirm blob with request id: %s", base64RequestID) + } + statusRes, err := m.client.GetBlobStatus(ctx, &disperser.BlobStatusRequest{ + RequestId: disperseRes.RequestId, + }) + if err != nil { + m.Log.Warn("Unable to retrieve blob dispersal status, will retry", "requestID", base64RequestID, "err", err) + continue + } + + switch statusRes.Status { + case disperser.BlobStatus_PROCESSING: + m.Log.Info("Waiting for confirmation from EigenDA", "requestID", base64RequestID) + case disperser.BlobStatus_FAILED: + m.Log.Error("EigenDA blob dispersal failed in processing", "requestID", base64RequestID, "err", err) + case disperser.BlobStatus_INSUFFICIENT_SIGNATURES: + m.Log.Error("EigenDA blob dispersal failed in processing with insufficient signatures", "requestID", base64RequestID, "err", err) + case disperser.BlobStatus_CONFIRMED: + m.Log.Info("EigenDA blob confirmed, waiting for finalization", "requestID", base64RequestID) + case disperser.BlobStatus_FINALIZED: + batchHeaderHashHex := fmt.Sprintf("0x%s", hex.EncodeToString(statusRes.Info.BlobVerificationProof.BatchMetadata.BatchHeaderHash)) + m.Log.Info("Successfully dispersed blob to EigenDA", "requestID", base64RequestID, "batchHeaderHash", batchHeaderHashHex) + blobInfo := statusRes.Info + quorumIDs := make([]uint32, len(blobInfo.BlobHeader.BlobQuorumParams)) + for i := range quorumIDs { + quorumIDs[i] = blobInfo.BlobHeader.BlobQuorumParams[i].QuorumNumber + } + cert := &Cert{ + BatchHeaderHash: blobInfo.BlobVerificationProof.BatchMetadata.BatchHeaderHash, + BlobIndex: blobInfo.BlobVerificationProof.BlobIndex, + ReferenceBlockNumber: blobInfo.BlobVerificationProof.BatchMetadata.BatchHeader.ReferenceBlockNumber, + QuorumIDs: quorumIDs, + } + return cert, nil + default: + return nil, fmt.Errorf("EigenDA blob dispersal failed in processing with reply status %d", statusRes.Status) + } + } + } +} diff --git a/api/client/client_testnet_test.go b/api/client/client_testnet_test.go new file mode 100644 index 0000000000..a3dd77a020 --- /dev/null +++ b/api/client/client_testnet_test.go @@ -0,0 +1,40 @@ +package client_test + +import ( + "context" + "flag" + "os" + "testing" + "time" + + "github.com/Layr-Labs/eigenda/api/client" + "github.com/ethereum/go-ethereum/log" + "github.com/stretchr/testify/assert" +) + +var runTestnetIntegrationTests bool + +func init() { + flag.BoolVar(&runTestnetIntegrationTests, "testnet-integration", false, "Run testnet-based integration tests") +} + +func TestClient(t *testing.T) { + if !runTestnetIntegrationTests { + t.Skip("Skipping testnet integration test") + } + logger := log.NewLogger(log.NewTerminalHandler(os.Stderr, true)) + client, err := client.NewEigenDAClient(context.Background(), logger, client.Config{ + RPC: "disperser-holesky.eigenda.xyz:443", + StatusQueryTimeout: 25 * time.Minute, + StatusQueryRetryInterval: 5 * time.Second, + CustomQuorumIDs: []uint32{}, + SignerPrivateKeyHex: "2d23e142a9e86a9175b9dfa213f20ea01f6c1731e09fa6edf895f70fe279cbb1", + }) + data := "hello world!" + assert.NoError(t, err) + cert, err := client.PutBlob(context.Background(), []byte(data)) + assert.NoError(t, err) + blob, err := client.GetBlob(context.Background(), cert.BatchHeaderHash, cert.BlobIndex) + assert.NoError(t, err) + assert.Equal(t, data, string(blob)) +} diff --git a/api/client/config.go b/api/client/config.go new file mode 100644 index 0000000000..8057f0e762 --- /dev/null +++ b/api/client/config.go @@ -0,0 +1,40 @@ +package client + +import ( + "fmt" + "time" +) + +type Config struct { + // RPC is the HTTP provider URL for the Data Availability node. + RPC string + + // The total amount of time that the batcher will spend waiting for EigenDA to confirm a blob + StatusQueryTimeout time.Duration + + // The amount of time to wait between status queries of a newly dispersed blob + StatusQueryRetryInterval time.Duration + + // The quorum IDs to write blobs to using this client. Should not include quorums 0 or 1. + CustomQuorumIDs []uint32 + + // Signer private key in hex encoded format. This key should not be associated with an Ethereum address holding any funds. + SignerPrivateKeyHex string +} + +var DefaultQuorums = map[uint32]bool{0: true, 1: true} + +func (c *Config) Check() error { + for _, e := range c.CustomQuorumIDs { + if DefaultQuorums[e] { + return fmt.Errorf("EigenDA client config failed validation because CustomQuorumIDs includes a default quorum ID %d. Because it is included by default this quorum ID can be removed from the client configuration", e) + } + } + if c.StatusQueryRetryInterval == 0 { + c.StatusQueryRetryInterval = 5 * time.Second + } + if c.StatusQueryTimeout == 0 { + c.StatusQueryTimeout = 25 * time.Minute + } + return nil +} diff --git a/api/client/utils.go b/api/client/utils.go new file mode 100644 index 0000000000..af27fdafc3 --- /dev/null +++ b/api/client/utils.go @@ -0,0 +1,9 @@ +package client + +import "encoding/binary" + +func ConvertIntToVarUInt(v int) []byte { + buf := make([]byte, binary.MaxVarintLen64) + n := binary.PutUvarint(buf, uint64(v)) + return buf[:n] +} diff --git a/api/client/utils_test.go b/api/client/utils_test.go new file mode 100644 index 0000000000..e169c0b61a --- /dev/null +++ b/api/client/utils_test.go @@ -0,0 +1 @@ +package client_test diff --git a/api/go.mod b/api/go.mod deleted file mode 100644 index dd91aee454..0000000000 --- a/api/go.mod +++ /dev/null @@ -1,18 +0,0 @@ -module github.com/Layr-Labs/eigenda/api - -go 1.21 - -toolchain go1.21.1 - -require ( - google.golang.org/grpc v1.58.3 - google.golang.org/protobuf v1.31.0 -) - -require ( - github.com/golang/protobuf v1.5.3 // indirect - golang.org/x/net v0.12.0 // indirect - golang.org/x/sys v0.10.0 // indirect - golang.org/x/text v0.11.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect -) diff --git a/api/go.sum b/api/go.sum deleted file mode 100644 index 10acb4d7f8..0000000000 --- a/api/go.sum +++ /dev/null @@ -1,21 +0,0 @@ -github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= -github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= -github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -golang.org/x/net v0.12.0 h1:cfawfvKITfUsFCeJIHJrbSxpeu/E81khclypR0GVT50= -golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= -golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= -golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4= -golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 h1:bVf09lpb+OJbByTj913DRJioFFAjf/ZGxEz7MajTp2U= -google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98/go.mod h1:TUfxEVdsvPg18p6AslUXFoLdpED4oBnGwyqk3dV1XzM= -google.golang.org/grpc v1.58.3 h1:BjnpXut1btbtgN/6sp+brB2Kbm2LjNXnidYujAVbSoQ= -google.golang.org/grpc v1.58.3/go.mod h1:tgX3ZQDlNJGU96V6yHh1T/JeoBQ2TXdr43YbYSsCJk0= -google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= -google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= diff --git a/core/auth/auth_test.go b/core/auth/auth_test.go index 947979f50c..80f318f2cf 100644 --- a/core/auth/auth_test.go +++ b/core/auth/auth_test.go @@ -17,7 +17,7 @@ func TestAuthentication(t *testing.T) { // Make the signer privateKeyHex := "0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef" - signer := auth.NewSigner(privateKeyHex) + signer := auth.NewLocalBlobRequestSigner(privateKeyHex) testHeader := core.BlobAuthHeader{ BlobCommitments: encoding.BlobCommitments{}, @@ -44,7 +44,7 @@ func TestAuthenticationFail(t *testing.T) { // Make the signer privateKeyHex := "0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef" - signer := auth.NewSigner(privateKeyHex) + signer := auth.NewLocalBlobRequestSigner(privateKeyHex) testHeader := core.BlobAuthHeader{ BlobCommitments: encoding.BlobCommitments{}, @@ -54,7 +54,7 @@ func TestAuthenticationFail(t *testing.T) { } privateKeyHex = "0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcded" - signer = auth.NewSigner(privateKeyHex) + signer = auth.NewLocalBlobRequestSigner(privateKeyHex) // Sign the header signature, err := signer.SignBlobRequest(testHeader) diff --git a/core/auth/signer.go b/core/auth/signer.go index 69baec49cf..86632f3798 100644 --- a/core/auth/signer.go +++ b/core/auth/signer.go @@ -12,11 +12,13 @@ import ( "github.com/ethereum/go-ethereum/crypto" ) -type signer struct { +type LocalBlobRequestSigner struct { PrivateKey *ecdsa.PrivateKey } -func NewSigner(privateKeyHex string) core.BlobRequestSigner { +var _ core.BlobRequestSigner = &LocalBlobRequestSigner{} + +func NewLocalBlobRequestSigner(privateKeyHex string) *LocalBlobRequestSigner { privateKeyBytes := common.FromHex(privateKeyHex) privateKey, err := crypto.ToECDSA(privateKeyBytes) @@ -24,12 +26,12 @@ func NewSigner(privateKeyHex string) core.BlobRequestSigner { log.Fatalf("Failed to parse private key: %v", err) } - return &signer{ + return &LocalBlobRequestSigner{ PrivateKey: privateKey, } } -func (s *signer) SignBlobRequest(header core.BlobAuthHeader) ([]byte, error) { +func (s *LocalBlobRequestSigner) SignBlobRequest(header core.BlobAuthHeader) ([]byte, error) { // Message you want to sign buf := make([]byte, 4) @@ -45,7 +47,7 @@ func (s *signer) SignBlobRequest(header core.BlobAuthHeader) ([]byte, error) { return sig, nil } -func (s *signer) GetAccountID() string { +func (s *LocalBlobRequestSigner) GetAccountID() string { publicKeyBytes := crypto.FromECDSAPub(&s.PrivateKey.PublicKey) return hexutil.Encode(publicKeyBytes) diff --git a/disperser/apiserver/ratelimit_test.go b/disperser/apiserver/ratelimit_test.go index 8a7941d663..69b3f99093 100644 --- a/disperser/apiserver/ratelimit_test.go +++ b/disperser/apiserver/ratelimit_test.go @@ -105,7 +105,7 @@ func TestAuthRatelimit(t *testing.T) { // Use an unauthenticated signer privateKeyHex := "0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdeb" - signer := auth.NewSigner(privateKeyHex) + signer := auth.NewLocalBlobRequestSigner(privateKeyHex) errorChan := make(chan error, 10) @@ -130,7 +130,7 @@ func TestAuthRatelimit(t *testing.T) { // Use an authenticated signer privateKeyHex = "0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcded" - signer = auth.NewSigner(privateKeyHex) + signer = auth.NewLocalBlobRequestSigner(privateKeyHex) // This should succeed because the account throughput limit is 100 KiB/s for quorum 0 simulateClient(t, signer, "4.4.4.4", data50KiB, []uint32{0}, 0, errorChan, false) diff --git a/disperser/apiserver/server_test.go b/disperser/apiserver/server_test.go index 921a02967e..7e93505820 100644 --- a/disperser/apiserver/server_test.go +++ b/disperser/apiserver/server_test.go @@ -82,7 +82,7 @@ func TestDisperseBlobAuth(t *testing.T) { // Use an unauthenticated signer privateKeyHex := "0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdeb" - signer := auth.NewSigner(privateKeyHex) + signer := auth.NewLocalBlobRequestSigner(privateKeyHex) errorChan := make(chan error, 10) @@ -104,7 +104,7 @@ func TestDisperseBlobAuthTimeout(t *testing.T) { // Use an unauthenticated signer privateKeyHex := "0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdeb" - signer := auth.NewSigner(privateKeyHex) + signer := auth.NewLocalBlobRequestSigner(privateKeyHex) errorChan := make(chan error, 10) diff --git a/go.mod b/go.mod index ffb6525e27..cf557f9c81 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.21 toolchain go1.21.1 require ( - github.com/Layr-Labs/eigenda/api v0.0.0 github.com/Layr-Labs/eigensdk-go v0.1.6-0.20240414172936-84d5bc10f72f github.com/aws/aws-sdk-go-v2 v1.26.0 github.com/aws/aws-sdk-go-v2/credentials v1.17.9 diff --git a/inabox/tests/integration_test.go b/inabox/tests/integration_test.go index 7f6064a255..f6cd3e27cf 100644 --- a/inabox/tests/integration_test.go +++ b/inabox/tests/integration_test.go @@ -34,7 +34,7 @@ var _ = Describe("Inabox Integration", func() { Expect(err).To(BeNil()) privateKeyHex := "0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcded" - signer := auth.NewSigner(privateKeyHex) + signer := auth.NewLocalBlobRequestSigner(privateKeyHex) disp := clients.NewDisperserClient(&clients.Config{ Hostname: "localhost",