From aba75dea788f0db0a5053b6851101163da8858d4 Mon Sep 17 00:00:00 2001 From: hopeyen <60078528+hopeyen@users.noreply.github.com> Date: Thu, 19 Dec 2024 02:17:50 +0700 Subject: [PATCH] disperser client - init once accountant (#1015) --- api/clients/v2/accountant.go | 21 ++++++++----- api/clients/v2/disperser_client.go | 48 +++++++++++++++++++++++++----- disperser/apiserver/server_v2.go | 13 ++++---- inabox/deploy/config.go | 5 ++++ inabox/deploy/localstack.go | 19 ++++++++++++ 5 files changed, 85 insertions(+), 21 deletions(-) diff --git a/api/clients/v2/accountant.go b/api/clients/v2/accountant.go index f5e1c920fa..4d1f80500d 100644 --- a/api/clients/v2/accountant.go +++ b/api/clients/v2/accountant.go @@ -172,9 +172,13 @@ func (a *Accountant) SetPaymentState(paymentState *disperser_rpc.GetPaymentState a.reservationWindow = uint32(paymentState.GetPaymentGlobalParams().GetReservationWindow()) if paymentState.GetOnchainCumulativePayment() == nil { - a.onDemand.CumulativePayment = big.NewInt(0) + a.onDemand = &core.OnDemandPayment{ + CumulativePayment: big.NewInt(0), + } } else { - a.onDemand.CumulativePayment = new(big.Int).SetBytes(paymentState.GetOnchainCumulativePayment()) + a.onDemand = &core.OnDemandPayment{ + CumulativePayment: new(big.Int).SetBytes(paymentState.GetOnchainCumulativePayment()), + } } if paymentState.GetCumulativePayment() == nil { @@ -192,20 +196,21 @@ func (a *Accountant) SetPaymentState(paymentState *disperser_rpc.GetPaymentState QuorumSplits: []byte{}, } } else { - a.reservation.SymbolsPerSecond = uint64(paymentState.GetReservation().GetSymbolsPerSecond()) - a.reservation.StartTimestamp = uint64(paymentState.GetReservation().GetStartTimestamp()) - a.reservation.EndTimestamp = uint64(paymentState.GetReservation().GetEndTimestamp()) quorumNumbers := make([]uint8, len(paymentState.GetReservation().GetQuorumNumbers())) for i, quorum := range paymentState.GetReservation().GetQuorumNumbers() { quorumNumbers[i] = uint8(quorum) } - a.reservation.QuorumNumbers = quorumNumbers - quorumSplits := make([]uint8, len(paymentState.GetReservation().GetQuorumSplits())) for i, quorum := range paymentState.GetReservation().GetQuorumSplits() { quorumSplits[i] = uint8(quorum) } - a.reservation.QuorumSplits = quorumSplits + a.reservation = &core.ReservedPayment{ + SymbolsPerSecond: uint64(paymentState.GetReservation().GetSymbolsPerSecond()), + StartTimestamp: uint64(paymentState.GetReservation().GetStartTimestamp()), + EndTimestamp: uint64(paymentState.GetReservation().GetEndTimestamp()), + QuorumNumbers: quorumNumbers, + QuorumSplits: quorumSplits, + } } binRecords := make([]BinRecord, len(paymentState.GetBinRecords())) diff --git a/api/clients/v2/disperser_client.go b/api/clients/v2/disperser_client.go index ff945227fb..22ab443d7d 100644 --- a/api/clients/v2/disperser_client.go +++ b/api/clients/v2/disperser_client.go @@ -30,13 +30,14 @@ type DisperserClient interface { } type disperserClient struct { - config *DisperserClientConfig - signer corev2.BlobRequestSigner - initOnce sync.Once - conn *grpc.ClientConn - client disperser_rpc.DisperserClient - prover encoding.Prover - accountant *Accountant + config *DisperserClientConfig + signer corev2.BlobRequestSigner + initOnceGrpc sync.Once + initOnceAccountant sync.Once + conn *grpc.ClientConn + client disperser_rpc.DisperserClient + prover encoding.Prover + accountant *Accountant } var _ DisperserClient = &disperserClient{} @@ -86,6 +87,14 @@ func NewDisperserClient(config *DisperserClientConfig, signer corev2.BlobRequest // PopulateAccountant populates the accountant with the payment state from the disperser. func (c *disperserClient) PopulateAccountant(ctx context.Context) error { + if c.accountant == nil { + accountId, err := c.signer.GetAccountID() + if err != nil { + return fmt.Errorf("error getting account ID: %w", err) + } + c.accountant = NewAccountant(accountId, nil, nil, 0, 0, 0, 0) + } + paymentState, err := c.GetPaymentState(ctx) if err != nil { return fmt.Errorf("error getting payment state for initializing accountant: %w", err) @@ -121,6 +130,10 @@ func (c *disperserClient) DisperseBlob( if err != nil { return nil, [32]byte{}, api.NewErrorFailover(err) } + err = c.initOncePopulateAccountant(ctx) + if err != nil { + return nil, [32]byte{}, api.NewErrorFailover(err) + } if c.signer == nil { return nil, [32]byte{}, api.NewErrorInternal("uninitialized signer for authenticated dispersal") @@ -273,7 +286,7 @@ func (c *disperserClient) GetBlobCommitment(ctx context.Context, data []byte) (* // If initialization fails, it caches the error and will return it on every subsequent call. func (c *disperserClient) initOnceGrpcConnection() error { var initErr error - c.initOnce.Do(func() { + c.initOnceGrpc.Do(func() { addr := fmt.Sprintf("%v:%v", c.config.Hostname, c.config.Port) dialOptions := getGrpcDialOptions(c.config.UseSecureGrpcFlag) conn, err := grpc.NewClient(addr, dialOptions...) @@ -289,3 +302,22 @@ func (c *disperserClient) initOnceGrpcConnection() error { } return nil } + +// initOncePopulateAccountant initializes the accountant if it is not already initialized. +// If initialization fails, it caches the error and will return it on every subsequent call. +func (c *disperserClient) initOncePopulateAccountant(ctx context.Context) error { + var initErr error + c.initOnceAccountant.Do(func() { + if c.accountant == nil { + err := c.PopulateAccountant(ctx) + if err != nil { + initErr = err + return + } + } + }) + if initErr != nil { + return fmt.Errorf("populating accountant: %w", initErr) + } + return nil +} diff --git a/disperser/apiserver/server_v2.go b/disperser/apiserver/server_v2.go index 9802d8c195..32cbb6155f 100644 --- a/disperser/apiserver/server_v2.go +++ b/disperser/apiserver/server_v2.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "math/big" "net" "sync/atomic" "time" @@ -285,9 +284,13 @@ func (s *DispersalServerV2) GetPaymentState(ctx context.Context, req *pb.GetPaym if err != nil { s.logger.Debug("failed to get reservation records, use placeholders", "err", err, "accountID", accountID) } + var largestCumulativePaymentBytes []byte largestCumulativePayment, err := s.meterer.OffchainStore.GetLargestCumulativePayment(ctx, req.AccountId) if err != nil { s.logger.Debug("failed to get largest cumulative payment, use zero value", "err", err, "accountID", accountID) + + } else { + largestCumulativePaymentBytes = largestCumulativePayment.Bytes() } // on-Chain account state var pbReservation *pb.Reservation @@ -313,12 +316,12 @@ func (s *DispersalServerV2) GetPaymentState(ctx context.Context, req *pb.GetPaym } } - var onchainCumulativePayment *big.Int + var onchainCumulativePaymentBytes []byte onDemandPayment, err := s.meterer.ChainPaymentState.GetOnDemandPaymentByAccount(ctx, accountID) if err != nil { s.logger.Debug("failed to get ondemand payment, use zero value", "err", err, "accountID", accountID) } else { - onchainCumulativePayment = onDemandPayment.CumulativePayment + onchainCumulativePaymentBytes = onDemandPayment.CumulativePayment.Bytes() } paymentGlobalParams := pb.PaymentGlobalParams{ @@ -333,8 +336,8 @@ func (s *DispersalServerV2) GetPaymentState(ctx context.Context, req *pb.GetPaym PaymentGlobalParams: &paymentGlobalParams, BinRecords: binRecords[:], Reservation: pbReservation, - CumulativePayment: largestCumulativePayment.Bytes(), - OnchainCumulativePayment: onchainCumulativePayment.Bytes(), + CumulativePayment: largestCumulativePaymentBytes, + OnchainCumulativePayment: onchainCumulativePaymentBytes, } return reply, nil } diff --git a/inabox/deploy/config.go b/inabox/deploy/config.go index 53e61a863d..d4f7fe88e5 100644 --- a/inabox/deploy/config.go +++ b/inabox/deploy/config.go @@ -225,6 +225,11 @@ func (env *Config) generateDisperserV2Vars(ind int, logPath, dbPath, grpcPort st DISPERSER_SERVER_BLS_OPERATOR_STATE_RETRIVER: env.EigenDA.OperatorStateRetreiver, DISPERSER_SERVER_EIGENDA_SERVICE_MANAGER: env.EigenDA.ServiceManager, DISPERSER_SERVER_DISPERSER_VERSION: "2", + + DISPERSER_SERVER_ENABLE_PAYMENT_METERER: "true", + DISPERSER_SERVER_RESERVATIONS_TABLE_NAME: "e2e_v2_reservation", + DISPERSER_SERVER_ON_DEMAND_TABLE_NAME: "e2e_v2_ondemand", + DISPERSER_SERVER_GLOBAL_RATE_TABLE_NAME: "e2e_v2_global_reservation", } env.applyDefaults(&v, "DISPERSER_SERVER", "dis", ind) diff --git a/inabox/deploy/localstack.go b/inabox/deploy/localstack.go index b72e69d376..32cd320177 100644 --- a/inabox/deploy/localstack.go +++ b/inabox/deploy/localstack.go @@ -12,6 +12,7 @@ import ( "github.com/Layr-Labs/eigenda/common/aws" "github.com/Layr-Labs/eigenda/common/store" + "github.com/Layr-Labs/eigenda/core/meterer" "github.com/Layr-Labs/eigenda/disperser/common/blobstore" blobstorev2 "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" "github.com/ory/dockertest/v3" @@ -142,6 +143,24 @@ func DeployResources( } } + v2PaymentName := "e2e_v2_" + // create payment related tables + err = meterer.CreateReservationTable(cfg, v2PaymentName+"reservation") + if err != nil { + fmt.Println("err", err) + return err + } + err = meterer.CreateOnDemandTable(cfg, v2PaymentName+"ondemand") + if err != nil { + fmt.Println("err", err) + return err + } + err = meterer.CreateGlobalReservationTable(cfg, v2PaymentName+"global_reservation") + if err != nil { + fmt.Println("err", err) + return err + } + return err }