Skip to content

Commit

Permalink
rebase master
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Dec 11, 2024
1 parent 6469113 commit 2ba0051
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 43 deletions.
40 changes: 26 additions & 14 deletions api/clients/disperser_client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package clients
import (
"context"
"fmt"
"math/big"
"sync"

"github.com/Layr-Labs/eigenda/api"
Expand Down Expand Up @@ -124,15 +125,17 @@ func (c *disperserClientV2) DisperseBlob(
if c.signer == nil {
return nil, [32]byte{}, api.NewErrorInternal("uninitialized signer for authenticated dispersal")
}
if c.accountant == nil {
return nil, [32]byte{}, api.NewErrorInternal("uninitialized accountant for paid dispersal; make sure to call PopulateAccountant after creating the client")
}

symbolLength := encoding.GetBlobLengthPowerOf2(uint(len(data)))
payment, err := c.accountant.AccountBlob(ctx, uint64(symbolLength), quorums, salt)
if err != nil {
return nil, [32]byte{}, fmt.Errorf("error accounting blob: %w", err)
}
// TODO(hopeyen): uncomment this after the accountant is implemented
// if c.accountant == nil {
// return nil, [32]byte{}, api.NewErrorInternal("uninitialized accountant for paid dispersal; make sure to call PopulateAccountant after creating the client")
// }

// symbolLength := encoding.GetBlobLengthPowerOf2(uint(len(data)))
// payment, err := c.accountant.AccountBlob(ctx, uint64(symbolLength), quorums, salt)
// if err != nil {
// return nil, [32]byte{}, fmt.Errorf("error accounting blob: %w", err)
// }

if len(quorums) == 0 {
return nil, [32]byte{}, api.NewErrorInvalidArg("quorum numbers must be provided")
Expand Down Expand Up @@ -171,17 +174,26 @@ func (c *disperserClientV2) DisperseBlob(
}
}

var payment core.PaymentMetadata
accountId, err := c.signer.GetAccountID()
if err != nil {
return nil, [32]byte{}, api.NewErrorInvalidArg(fmt.Sprintf("please configure signer key if you want to use authenticated endpoint %v", err))
}
payment.AccountID = accountId
payment.BinIndex = 0
payment.CumulativePayment = big.NewInt(0)
blobHeader := &corev2.BlobHeader{
BlobVersion: blobVersion,
BlobCommitments: blobCommitments,
QuorumNumbers: quorums,
PaymentMetadata: *payment,
}
sig, err := c.signer.SignBlobRequest(blobHeader)
if err != nil {
return nil, [32]byte{}, fmt.Errorf("error signing blob request: %w", err)
PaymentMetadata: payment,
}
blobHeader.Signature = sig
// TODO(hopeyen): uncomment this and replace the payment metadata for authentication
// sig, err := c.signer.SignBlobRequest(blobHeader)
// if err != nil {
// return nil, [32]byte{}, fmt.Errorf("error signing blob request: %w", err)
// }
// blobHeader.Signature = sig
blobHeaderProto, err := blobHeader.ToProtobuf()
if err != nil {
return nil, [32]byte{}, fmt.Errorf("error converting blob header to protobuf: %w", err)
Expand Down
10 changes: 10 additions & 0 deletions core/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,9 @@ type PaymentMetadata struct {

// Hash returns the Keccak256 hash of the PaymentMetadata
func (pm *PaymentMetadata) Hash() ([32]byte, error) {
if pm == nil {
return [32]byte{}, errors.New("payment metadata is nil")
}
blobHeaderType, err := abi.NewType("tuple", "", []abi.ArgumentMarshaling{
{
Name: "accountID",
Expand Down Expand Up @@ -543,6 +546,10 @@ func (pm *PaymentMetadata) Hash() ([32]byte, error) {
}

func (pm *PaymentMetadata) MarshalDynamoDBAttributeValue() (types.AttributeValue, error) {
if pm == nil {
return nil, errors.New("payment metadata is nil")
}

return &types.AttributeValueMemberM{
Value: map[string]types.AttributeValue{
"AccountID": &types.AttributeValueMemberS{Value: pm.AccountID},
Expand Down Expand Up @@ -576,6 +583,9 @@ func (pm *PaymentMetadata) UnmarshalDynamoDBAttributeValue(av types.AttributeVal
}

func (pm *PaymentMetadata) ToProtobuf() *commonpb.PaymentHeader {
if pm == nil {
return nil
}
return &commonpb.PaymentHeader{
AccountId: pm.AccountID,
ReservationPeriod: pm.ReservationPeriod,
Expand Down
48 changes: 25 additions & 23 deletions disperser/apiserver/disperse_blob_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ package apiserver
import (
"context"
"fmt"
"math/big"
"time"

"github.com/Layr-Labs/eigenda/api"
pb "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2"
"github.com/Layr-Labs/eigenda/core"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
dispv2 "github.com/Layr-Labs/eigenda/disperser/common/v2"
"github.com/Layr-Labs/eigenda/encoding"
Expand Down Expand Up @@ -123,34 +121,38 @@ func (s *DispersalServerV2) validateDispersalRequest(ctx context.Context, req *p
if err != nil {
return api.NewErrorInvalidArg(fmt.Sprintf("invalid blob header: %s", err.Error()))
}
if err = s.authenticator.AuthenticateBlobRequest(blobHeader); err != nil {
return api.NewErrorInvalidArg(fmt.Sprintf("authentication failed: %s", err.Error()))
}
// TODO(ian-shim): enable this check for authentication
// if blobHeader.PaymentMetadata == nil {
// return api.NewErrorInvalidArg("payment metadata is required")
// }
// if err = s.authenticator.AuthenticateBlobRequest(blobHeader); err != nil {
// return api.NewErrorInvalidArg(fmt.Sprintf("authentication failed: %s", err.Error()))
// }

// TODO(ian-shim): enable this check when we have payment metadata + authentication in disperser client
// if len(blobHeader.PaymentMetadata.AccountID) == 0 || blobHeader.PaymentMetadata.ReservationPeriod == 0 || blobHeader.PaymentMetadata.CumulativePayment == nil {
// return api.NewErrorInvalidArg("invalid payment metadata")
// }

// handle payments and check rate limits
if blobHeaderProto.GetPaymentHeader() != nil {
reservationPeriod := blobHeaderProto.GetPaymentHeader().GetReservationPeriod()
cumulativePayment := new(big.Int).SetBytes(blobHeaderProto.GetPaymentHeader().GetCumulativePayment())
accountID := blobHeaderProto.GetPaymentHeader().GetAccountId()

paymentHeader := core.PaymentMetadata{
AccountID: accountID,
ReservationPeriod: reservationPeriod,
CumulativePayment: cumulativePayment,
}

err := s.meterer.MeterRequest(ctx, paymentHeader, blobLength, blobHeader.QuorumNumbers)
if err != nil {
return api.NewErrorResourceExhausted(err.Error())
}
} else {
return api.NewErrorInvalidArg("payment header is required")
}
// if blobHeaderProto.GetPaymentHeader() != nil {
// reservationPeriod := blobHeaderProto.GetPaymentHeader().GetReservationPeriod()
// cumulativePayment := new(big.Int).SetBytes(blobHeaderProto.GetPaymentHeader().GetCumulativePayment())
// accountID := blobHeaderProto.GetPaymentHeader().GetAccountId()

// paymentHeader := core.PaymentMetadata{
// AccountID: accountID,
// ReservationPeriod: reservationPeriod,
// CumulativePayment: cumulativePayment,
// }

// err := s.meterer.MeterRequest(ctx, paymentHeader, blobLength, blobHeader.QuorumNumbers)
// if err != nil {
// return api.NewErrorResourceExhausted(err.Error())
// }
// } else {
// return api.NewErrorInvalidArg("payment header is required")
// }

commitments, err := s.prover.GetCommitmentsForPaddedLength(data)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ require (
github.com/urfave/cli/v2 v2.27.4
github.com/wealdtech/go-merkletree/v2 v2.6.0
go.uber.org/automaxprocs v1.5.2
go.uber.org/goleak v1.3.0
go.uber.org/mock v0.4.0
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa
golang.org/x/sync v0.8.0
Expand Down
1 change: 1 addition & 0 deletions inabox/deploy/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ func (env *Config) generateRetrieverVars(ind int, key string, graphUrl, logPath,
RETRIEVER_NUM_WORKERS: fmt.Sprint(runtime.GOMAXPROCS(0)),
RETRIEVER_VERBOSE: "true",
RETRIEVER_CACHE_ENCODED_BLOBS: "false",
RETRIEVER_GRAPH_URL: graphUrl,

RETRIEVER_INDEXER_PULL_INTERVAL: "1s",
}
Expand Down
11 changes: 7 additions & 4 deletions inabox/deploy/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (env *Config) DeployExperiment() {
fmt.Println("Test environment has successfully deployed!")
}

func (env *Config) RegisterBlobVersionAndRelays(ethClient common.EthClient) map[uint16]string {
func (env *Config) RegisterBlobVersionAndRelays(ethClient common.EthClient) map[uint32]string {
dasmAddr := gcommon.HexToAddress(env.EigenDA.ServiceManager)
contractEigenDAServiceManager, err := eigendasrvmg.NewContractEigenDAServiceManager(dasmAddr, ethClient)
if err != nil {
Expand Down Expand Up @@ -202,18 +202,21 @@ func (env *Config) RegisterBlobVersionAndRelays(ethClient common.EthClient) map[
if err != nil {
log.Panicf("Error: %s", err)
}
relays := map[uint16]string{}
relays := map[uint32]string{}
for i, relayVars := range env.Relays {
url := fmt.Sprintf("0.0.0.0:%s", relayVars.RELAY_GRPC_PORT)
txn, err := contractRelayRegistry.AddRelayURL(opts, gcommon.Address{0}, url)
txn, err := contractRelayRegistry.AddRelayInfo(opts, relayreg.RelayInfo{
RelayAddress: gcommon.Address{0},
RelayURL: url,
})
if err != nil {
log.Panicf("Error: %s", err)
}
err = ethClient.SendTransaction(context.Background(), txn)
if err != nil {
log.Panicf("Error: %s", err)
}
relays[uint16(i)] = url
relays[uint32(i)] = url
}

return relays
Expand Down
2 changes: 1 addition & 1 deletion inabox/tests/integration_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var _ = Describe("Inabox v2 Integration", func() {
disp, err := clients.NewDisperserClientV2(&clients.DisperserClientV2Config{
Hostname: "localhost",
Port: "32005",
}, signer, nil)
}, signer, nil, nil)
Expect(err).To(BeNil())
Expect(disp).To(Not(BeNil()))

Expand Down

0 comments on commit 2ba0051

Please sign in to comment.