Skip to content

Commit

Permalink
disperser client - init once accountant (#1015)
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen authored Dec 18, 2024
1 parent e5281b4 commit aba75de
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 21 deletions.
21 changes: 13 additions & 8 deletions api/clients/v2/accountant.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,13 @@ func (a *Accountant) SetPaymentState(paymentState *disperser_rpc.GetPaymentState
a.reservationWindow = uint32(paymentState.GetPaymentGlobalParams().GetReservationWindow())

if paymentState.GetOnchainCumulativePayment() == nil {
a.onDemand.CumulativePayment = big.NewInt(0)
a.onDemand = &core.OnDemandPayment{
CumulativePayment: big.NewInt(0),
}
} else {
a.onDemand.CumulativePayment = new(big.Int).SetBytes(paymentState.GetOnchainCumulativePayment())
a.onDemand = &core.OnDemandPayment{
CumulativePayment: new(big.Int).SetBytes(paymentState.GetOnchainCumulativePayment()),
}
}

if paymentState.GetCumulativePayment() == nil {
Expand All @@ -192,20 +196,21 @@ func (a *Accountant) SetPaymentState(paymentState *disperser_rpc.GetPaymentState
QuorumSplits: []byte{},
}
} else {
a.reservation.SymbolsPerSecond = uint64(paymentState.GetReservation().GetSymbolsPerSecond())
a.reservation.StartTimestamp = uint64(paymentState.GetReservation().GetStartTimestamp())
a.reservation.EndTimestamp = uint64(paymentState.GetReservation().GetEndTimestamp())
quorumNumbers := make([]uint8, len(paymentState.GetReservation().GetQuorumNumbers()))
for i, quorum := range paymentState.GetReservation().GetQuorumNumbers() {
quorumNumbers[i] = uint8(quorum)
}
a.reservation.QuorumNumbers = quorumNumbers

quorumSplits := make([]uint8, len(paymentState.GetReservation().GetQuorumSplits()))
for i, quorum := range paymentState.GetReservation().GetQuorumSplits() {
quorumSplits[i] = uint8(quorum)
}
a.reservation.QuorumSplits = quorumSplits
a.reservation = &core.ReservedPayment{
SymbolsPerSecond: uint64(paymentState.GetReservation().GetSymbolsPerSecond()),
StartTimestamp: uint64(paymentState.GetReservation().GetStartTimestamp()),
EndTimestamp: uint64(paymentState.GetReservation().GetEndTimestamp()),
QuorumNumbers: quorumNumbers,
QuorumSplits: quorumSplits,
}
}

binRecords := make([]BinRecord, len(paymentState.GetBinRecords()))
Expand Down
48 changes: 40 additions & 8 deletions api/clients/v2/disperser_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@ type DisperserClient interface {
}

type disperserClient struct {
config *DisperserClientConfig
signer corev2.BlobRequestSigner
initOnce sync.Once
conn *grpc.ClientConn
client disperser_rpc.DisperserClient
prover encoding.Prover
accountant *Accountant
config *DisperserClientConfig
signer corev2.BlobRequestSigner
initOnceGrpc sync.Once
initOnceAccountant sync.Once
conn *grpc.ClientConn
client disperser_rpc.DisperserClient
prover encoding.Prover
accountant *Accountant
}

var _ DisperserClient = &disperserClient{}
Expand Down Expand Up @@ -86,6 +87,14 @@ func NewDisperserClient(config *DisperserClientConfig, signer corev2.BlobRequest

// PopulateAccountant populates the accountant with the payment state from the disperser.
func (c *disperserClient) PopulateAccountant(ctx context.Context) error {
if c.accountant == nil {
accountId, err := c.signer.GetAccountID()
if err != nil {
return fmt.Errorf("error getting account ID: %w", err)
}
c.accountant = NewAccountant(accountId, nil, nil, 0, 0, 0, 0)
}

paymentState, err := c.GetPaymentState(ctx)
if err != nil {
return fmt.Errorf("error getting payment state for initializing accountant: %w", err)
Expand Down Expand Up @@ -121,6 +130,10 @@ func (c *disperserClient) DisperseBlob(
if err != nil {
return nil, [32]byte{}, api.NewErrorFailover(err)
}
err = c.initOncePopulateAccountant(ctx)
if err != nil {
return nil, [32]byte{}, api.NewErrorFailover(err)
}

if c.signer == nil {
return nil, [32]byte{}, api.NewErrorInternal("uninitialized signer for authenticated dispersal")
Expand Down Expand Up @@ -273,7 +286,7 @@ func (c *disperserClient) GetBlobCommitment(ctx context.Context, data []byte) (*
// If initialization fails, it caches the error and will return it on every subsequent call.
func (c *disperserClient) initOnceGrpcConnection() error {
var initErr error
c.initOnce.Do(func() {
c.initOnceGrpc.Do(func() {
addr := fmt.Sprintf("%v:%v", c.config.Hostname, c.config.Port)
dialOptions := getGrpcDialOptions(c.config.UseSecureGrpcFlag)
conn, err := grpc.NewClient(addr, dialOptions...)
Expand All @@ -289,3 +302,22 @@ func (c *disperserClient) initOnceGrpcConnection() error {
}
return nil
}

// initOncePopulateAccountant initializes the accountant if it is not already initialized.
// If initialization fails, it caches the error and will return it on every subsequent call.
func (c *disperserClient) initOncePopulateAccountant(ctx context.Context) error {
var initErr error
c.initOnceAccountant.Do(func() {
if c.accountant == nil {
err := c.PopulateAccountant(ctx)
if err != nil {
initErr = err
return
}
}
})
if initErr != nil {
return fmt.Errorf("populating accountant: %w", initErr)
}
return nil
}
13 changes: 8 additions & 5 deletions disperser/apiserver/server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"math/big"
"net"
"sync/atomic"
"time"
Expand Down Expand Up @@ -285,9 +284,13 @@ func (s *DispersalServerV2) GetPaymentState(ctx context.Context, req *pb.GetPaym
if err != nil {
s.logger.Debug("failed to get reservation records, use placeholders", "err", err, "accountID", accountID)
}
var largestCumulativePaymentBytes []byte
largestCumulativePayment, err := s.meterer.OffchainStore.GetLargestCumulativePayment(ctx, req.AccountId)
if err != nil {
s.logger.Debug("failed to get largest cumulative payment, use zero value", "err", err, "accountID", accountID)

} else {
largestCumulativePaymentBytes = largestCumulativePayment.Bytes()
}
// on-Chain account state
var pbReservation *pb.Reservation
Expand All @@ -313,12 +316,12 @@ func (s *DispersalServerV2) GetPaymentState(ctx context.Context, req *pb.GetPaym
}
}

var onchainCumulativePayment *big.Int
var onchainCumulativePaymentBytes []byte
onDemandPayment, err := s.meterer.ChainPaymentState.GetOnDemandPaymentByAccount(ctx, accountID)
if err != nil {
s.logger.Debug("failed to get ondemand payment, use zero value", "err", err, "accountID", accountID)
} else {
onchainCumulativePayment = onDemandPayment.CumulativePayment
onchainCumulativePaymentBytes = onDemandPayment.CumulativePayment.Bytes()
}

paymentGlobalParams := pb.PaymentGlobalParams{
Expand All @@ -333,8 +336,8 @@ func (s *DispersalServerV2) GetPaymentState(ctx context.Context, req *pb.GetPaym
PaymentGlobalParams: &paymentGlobalParams,
BinRecords: binRecords[:],
Reservation: pbReservation,
CumulativePayment: largestCumulativePayment.Bytes(),
OnchainCumulativePayment: onchainCumulativePayment.Bytes(),
CumulativePayment: largestCumulativePaymentBytes,
OnchainCumulativePayment: onchainCumulativePaymentBytes,
}
return reply, nil
}
5 changes: 5 additions & 0 deletions inabox/deploy/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,11 @@ func (env *Config) generateDisperserV2Vars(ind int, logPath, dbPath, grpcPort st
DISPERSER_SERVER_BLS_OPERATOR_STATE_RETRIVER: env.EigenDA.OperatorStateRetreiver,
DISPERSER_SERVER_EIGENDA_SERVICE_MANAGER: env.EigenDA.ServiceManager,
DISPERSER_SERVER_DISPERSER_VERSION: "2",

DISPERSER_SERVER_ENABLE_PAYMENT_METERER: "true",
DISPERSER_SERVER_RESERVATIONS_TABLE_NAME: "e2e_v2_reservation",
DISPERSER_SERVER_ON_DEMAND_TABLE_NAME: "e2e_v2_ondemand",
DISPERSER_SERVER_GLOBAL_RATE_TABLE_NAME: "e2e_v2_global_reservation",
}

env.applyDefaults(&v, "DISPERSER_SERVER", "dis", ind)
Expand Down
19 changes: 19 additions & 0 deletions inabox/deploy/localstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/Layr-Labs/eigenda/common/aws"
"github.com/Layr-Labs/eigenda/common/store"
"github.com/Layr-Labs/eigenda/core/meterer"
"github.com/Layr-Labs/eigenda/disperser/common/blobstore"
blobstorev2 "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore"
"github.com/ory/dockertest/v3"
Expand Down Expand Up @@ -142,6 +143,24 @@ func DeployResources(
}
}

v2PaymentName := "e2e_v2_"
// create payment related tables
err = meterer.CreateReservationTable(cfg, v2PaymentName+"reservation")
if err != nil {
fmt.Println("err", err)
return err
}
err = meterer.CreateOnDemandTable(cfg, v2PaymentName+"ondemand")
if err != nil {
fmt.Println("err", err)
return err
}
err = meterer.CreateGlobalReservationTable(cfg, v2PaymentName+"global_reservation")
if err != nil {
fmt.Println("err", err)
return err
}

return err

}
Expand Down

0 comments on commit aba75de

Please sign in to comment.