Skip to content

Commit

Permalink
Merge branch 'master' into client-v2-get
Browse files Browse the repository at this point in the history
  • Loading branch information
litt3 committed Dec 10, 2024
2 parents 4625e96 + 3552a36 commit f07e820
Show file tree
Hide file tree
Showing 62 changed files with 2,131 additions and 337 deletions.
12 changes: 12 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ COPY contracts /app/contracts
COPY indexer /app/indexer
COPY encoding /app/encoding
COPY relay /app/relay
COPY inabox /app/inabox

# Churner build stage
FROM common-builder AS churner-builder
Expand Down Expand Up @@ -98,6 +99,13 @@ RUN --mount=type=cache,target=/go/pkg/mod \
--mount=type=cache,target=/root/.cache/go-build \
go build -o ./bin/controller ./cmd/controller

# Relay build stage
FROM common-builder AS relay-builder
WORKDIR /app/relay
RUN --mount=type=cache,target=/go/pkg/mod \
--mount=type=cache,target=/root/.cache/go-build \
go build -o ./bin/relay ./cmd

# Final stages for each component
FROM alpine:3.18 AS churner
COPY --from=churner-builder /app/operators/bin/churner /usr/local/bin
Expand Down Expand Up @@ -134,3 +142,7 @@ ENTRYPOINT ["nodeplugin"]
FROM alpine:3.18 AS controller
COPY --from=controller-builder /app/disperser/bin/controller /usr/local/bin
ENTRYPOINT ["controller"]

FROM alpine:3.18 AS relay
COPY --from=relay-builder /app/relay/bin/relay /usr/local/bin
ENTRYPOINT ["relay"]
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,5 @@ docker-release-build:
BUILD_TAG=${SEMVER} SEMVER=${SEMVER} GITDATE=${GITDATE} GIT_SHA=${GITSHA} GIT_SHORT_SHA=${GITCOMMIT} \
docker buildx bake node-group-release ${PUSH_FLAG}


semver:
echo "${SEMVER}"
80 changes: 63 additions & 17 deletions api/clients/accountant.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,14 @@ import (
"sync"
"time"

commonpb "github.com/Layr-Labs/eigenda/api/grpc/common"
disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/core/meterer"
)

var requiredQuorums = []uint8{0, 1}

type Accountant interface {
AccountBlob(ctx context.Context, numSymbols uint64, quorums []uint8) (*commonpb.PaymentHeader, error)
}

var _ Accountant = &accountant{}

type accountant struct {
type Accountant struct {
// on-chain states
accountID string
reservation *core.ActiveReservation
Expand All @@ -45,15 +39,15 @@ type BinRecord struct {
Usage uint64
}

func NewAccountant(accountID string, reservation *core.ActiveReservation, onDemand *core.OnDemandPayment, reservationWindow uint32, pricePerSymbol uint32, minNumSymbols uint32, numBins uint32) *accountant {
func NewAccountant(accountID string, reservation *core.ActiveReservation, onDemand *core.OnDemandPayment, reservationWindow uint32, pricePerSymbol uint32, minNumSymbols uint32, numBins uint32) *Accountant {
//TODO: client storage; currently every instance starts fresh but on-chain or a small store makes more sense
// Also client is currently responsible for supplying network params, we need to add RPC in order to be automatic
// There's a subsequent PR that handles populating the accountant with on-chain state from the disperser
binRecords := make([]BinRecord, numBins)
for i := range binRecords {
binRecords[i] = BinRecord{Index: uint32(i), Usage: 0}
}
a := accountant{
a := Accountant{
accountID: accountID,
reservation: reservation,
onDemand: onDemand,
Expand All @@ -73,7 +67,7 @@ func NewAccountant(accountID string, reservation *core.ActiveReservation, onDema
// then on-demand if the reservation is not available. The returned values are
// bin index for reservation payments and cumulative payment for on-demand payments,
// and both fields are used to create the payment header and signature
func (a *accountant) BlobPaymentInfo(ctx context.Context, numSymbols uint64, quorumNumbers []uint8) (uint32, *big.Int, error) {
func (a *Accountant) BlobPaymentInfo(ctx context.Context, numSymbols uint64, quorumNumbers []uint8) (uint32, *big.Int, error) {
now := time.Now().Unix()
currentBinIndex := meterer.GetBinIndex(uint64(now), a.reservationWindow)

Expand Down Expand Up @@ -116,7 +110,7 @@ func (a *accountant) BlobPaymentInfo(ctx context.Context, numSymbols uint64, quo
}

// AccountBlob accountant provides and records payment information
func (a *accountant) AccountBlob(ctx context.Context, numSymbols uint64, quorums []uint8) (*commonpb.PaymentHeader, error) {
func (a *Accountant) AccountBlob(ctx context.Context, numSymbols uint64, quorums []uint8) (*core.PaymentMetadata, error) {
binIndex, cumulativePayment, err := a.BlobPaymentInfo(ctx, numSymbols, quorums)
if err != nil {
return nil, err
Expand All @@ -127,28 +121,27 @@ func (a *accountant) AccountBlob(ctx context.Context, numSymbols uint64, quorums
BinIndex: binIndex,
CumulativePayment: cumulativePayment,
}
protoPaymentHeader := pm.ConvertToProtoPaymentHeader()

return protoPaymentHeader, nil
return pm, nil
}

// TODO: PaymentCharged and SymbolsCharged copied from meterer, should be refactored
// PaymentCharged returns the chargeable price for a given data length
func (a *accountant) PaymentCharged(numSymbols uint) uint64 {
func (a *Accountant) PaymentCharged(numSymbols uint) uint64 {
return uint64(a.SymbolsCharged(numSymbols)) * uint64(a.pricePerSymbol)
}

// SymbolsCharged returns the number of symbols charged for a given data length
// being at least MinNumSymbols or the nearest rounded-up multiple of MinNumSymbols.
func (a *accountant) SymbolsCharged(numSymbols uint) uint32 {
func (a *Accountant) SymbolsCharged(numSymbols uint) uint32 {
if numSymbols <= uint(a.minNumSymbols) {
return a.minNumSymbols
}
// Round up to the nearest multiple of MinNumSymbols
return uint32(core.RoundUpDivide(uint(numSymbols), uint(a.minNumSymbols))) * a.minNumSymbols
}

func (a *accountant) GetRelativeBinRecord(index uint32) *BinRecord {
func (a *Accountant) GetRelativeBinRecord(index uint32) *BinRecord {
relativeIndex := index % a.numBins
if a.binRecords[relativeIndex].Index != uint32(index) {
a.binRecords[relativeIndex] = BinRecord{
Expand All @@ -160,6 +153,59 @@ func (a *accountant) GetRelativeBinRecord(index uint32) *BinRecord {
return &a.binRecords[relativeIndex]
}

func (a *Accountant) SetPaymentState(paymentState *disperser_rpc.GetPaymentStateReply) error {
if paymentState == nil {
return fmt.Errorf("payment state cannot be nil")
} else if paymentState.GetPaymentGlobalParams() == nil {
return fmt.Errorf("payment global params cannot be nil")
} else if paymentState.GetOnchainCumulativePayment() == nil {
return fmt.Errorf("onchain cumulative payment cannot be nil")
} else if paymentState.GetCumulativePayment() == nil {
return fmt.Errorf("cumulative payment cannot be nil")
} else if paymentState.GetReservation() == nil {
return fmt.Errorf("reservation cannot be nil")
} else if paymentState.GetReservation().GetQuorumNumbers() == nil {
return fmt.Errorf("reservation quorum numbers cannot be nil")
} else if paymentState.GetReservation().GetQuorumSplit() == nil {
return fmt.Errorf("reservation quorum split cannot be nil")
} else if paymentState.GetBinRecords() == nil {
return fmt.Errorf("bin records cannot be nil")
}

a.minNumSymbols = uint32(paymentState.PaymentGlobalParams.MinNumSymbols)
a.onDemand.CumulativePayment = new(big.Int).SetBytes(paymentState.OnchainCumulativePayment)
a.cumulativePayment = new(big.Int).SetBytes(paymentState.CumulativePayment)
a.pricePerSymbol = uint32(paymentState.PaymentGlobalParams.PricePerSymbol)

a.reservation.SymbolsPerSec = uint64(paymentState.PaymentGlobalParams.GlobalSymbolsPerSecond)
a.reservation.StartTimestamp = uint64(paymentState.Reservation.StartTimestamp)
a.reservation.EndTimestamp = uint64(paymentState.Reservation.EndTimestamp)
a.reservationWindow = uint32(paymentState.PaymentGlobalParams.ReservationWindow)

quorumNumbers := make([]uint8, len(paymentState.Reservation.QuorumNumbers))
for i, quorum := range paymentState.Reservation.QuorumNumbers {
quorumNumbers[i] = uint8(quorum)
}
a.reservation.QuorumNumbers = quorumNumbers

quorumSplit := make([]uint8, len(paymentState.Reservation.QuorumSplit))
for i, quorum := range paymentState.Reservation.QuorumSplit {
quorumSplit[i] = uint8(quorum)
}
a.reservation.QuorumSplit = quorumSplit

binRecords := make([]BinRecord, len(paymentState.BinRecords))
for i, record := range paymentState.BinRecords {
binRecords[i] = BinRecord{
Index: record.Index,
Usage: record.Usage,
}
}
a.binRecords = binRecords

return nil
}

// QuorumCheck eagerly returns error if the check finds a quorum number not an element of the allowed quorum numbers
func QuorumCheck(quorumNumbers []uint8, allowedNumbers []uint8) error {
if len(quorumNumbers) == 0 {
Expand Down
33 changes: 11 additions & 22 deletions api/clients/accountant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,30 +71,27 @@ func TestAccountBlob_Reservation(t *testing.T) {
quorums := []uint8{0, 1}

header, err := accountant.AccountBlob(ctx, symbolLength, quorums)
metadata := core.ConvertPaymentHeader(header)

assert.NoError(t, err)
assert.Equal(t, meterer.GetBinIndex(uint64(time.Now().Unix()), reservationWindow), header.BinIndex)
assert.Equal(t, big.NewInt(0), metadata.CumulativePayment)
assert.Equal(t, big.NewInt(0), header.CumulativePayment)
assert.Equal(t, isRotation([]uint64{500, 0, 0}, mapRecordUsage(accountant.binRecords)), true)

symbolLength = uint64(700)

header, err = accountant.AccountBlob(ctx, symbolLength, quorums)
metadata = core.ConvertPaymentHeader(header)

assert.NoError(t, err)
assert.NotEqual(t, 0, header.BinIndex)
assert.Equal(t, big.NewInt(0), metadata.CumulativePayment)
assert.Equal(t, big.NewInt(0), header.CumulativePayment)
assert.Equal(t, isRotation([]uint64{1200, 0, 200}, mapRecordUsage(accountant.binRecords)), true)

// Second call should use on-demand payment
header, err = accountant.AccountBlob(ctx, 300, quorums)
metadata = core.ConvertPaymentHeader(header)

assert.NoError(t, err)
assert.Equal(t, uint32(0), header.BinIndex)
assert.Equal(t, big.NewInt(300), metadata.CumulativePayment)
assert.Equal(t, big.NewInt(300), header.CumulativePayment)
}

func TestAccountBlob_OnDemand(t *testing.T) {
Expand Down Expand Up @@ -124,10 +121,9 @@ func TestAccountBlob_OnDemand(t *testing.T) {
header, err := accountant.AccountBlob(ctx, numSymbols, quorums)
assert.NoError(t, err)

metadata := core.ConvertPaymentHeader(header)
expectedPayment := big.NewInt(int64(numSymbols * uint64(pricePerSymbol)))
assert.Equal(t, uint32(0), header.BinIndex)
assert.Equal(t, expectedPayment, metadata.CumulativePayment)
assert.Equal(t, expectedPayment, header.CumulativePayment)
assert.Equal(t, isRotation([]uint64{0, 0, 0}, mapRecordUsage(accountant.binRecords)), true)
assert.Equal(t, expectedPayment, accountant.cumulativePayment)
}
Expand Down Expand Up @@ -180,24 +176,21 @@ func TestAccountBlobCallSeries(t *testing.T) {

// First call: Use reservation
header, err := accountant.AccountBlob(ctx, 800, quorums)
metadata := core.ConvertPaymentHeader(header)
assert.NoError(t, err)
assert.Equal(t, meterer.GetBinIndex(uint64(now), reservationWindow), header.BinIndex)
assert.Equal(t, big.NewInt(0), metadata.CumulativePayment)
assert.Equal(t, big.NewInt(0), header.CumulativePayment)

// Second call: Use remaining reservation + overflow
header, err = accountant.AccountBlob(ctx, 300, quorums)
metadata = core.ConvertPaymentHeader(header)
assert.NoError(t, err)
assert.Equal(t, meterer.GetBinIndex(uint64(now), reservationWindow), header.BinIndex)
assert.Equal(t, big.NewInt(0), metadata.CumulativePayment)
assert.Equal(t, big.NewInt(0), header.CumulativePayment)

// Third call: Use on-demand
header, err = accountant.AccountBlob(ctx, 500, quorums)
metadata = core.ConvertPaymentHeader(header)
assert.NoError(t, err)
assert.Equal(t, uint32(0), header.BinIndex)
assert.Equal(t, big.NewInt(500), metadata.CumulativePayment)
assert.Equal(t, big.NewInt(500), header.CumulativePayment)

// Fourth call: Insufficient on-demand
_, err = accountant.AccountBlob(ctx, 600, quorums)
Expand Down Expand Up @@ -321,23 +314,20 @@ func TestAccountBlob_ReservationWithOneOverflow(t *testing.T) {
header, err := accountant.AccountBlob(ctx, 800, quorums)
assert.NoError(t, err)
assert.Equal(t, meterer.GetBinIndex(uint64(now), reservationWindow), header.BinIndex)
metadata := core.ConvertPaymentHeader(header)
assert.Equal(t, big.NewInt(0), metadata.CumulativePayment)
assert.Equal(t, big.NewInt(0), header.CumulativePayment)
assert.Equal(t, isRotation([]uint64{800, 0, 0}, mapRecordUsage(accountant.binRecords)), true)

// Second call: Allow one overflow
header, err = accountant.AccountBlob(ctx, 500, quorums)
assert.NoError(t, err)
metadata = core.ConvertPaymentHeader(header)
assert.Equal(t, big.NewInt(0), metadata.CumulativePayment)
assert.Equal(t, big.NewInt(0), header.CumulativePayment)
assert.Equal(t, isRotation([]uint64{1300, 0, 300}, mapRecordUsage(accountant.binRecords)), true)

// Third call: Should use on-demand payment
header, err = accountant.AccountBlob(ctx, 200, quorums)
assert.NoError(t, err)
assert.Equal(t, uint32(0), header.BinIndex)
metadata = core.ConvertPaymentHeader(header)
assert.Equal(t, big.NewInt(200), metadata.CumulativePayment)
assert.Equal(t, big.NewInt(200), header.CumulativePayment)
assert.Equal(t, isRotation([]uint64{1300, 0, 300}, mapRecordUsage(accountant.binRecords)), true)
}

Expand Down Expand Up @@ -373,8 +363,7 @@ func TestAccountBlob_ReservationOverflowReset(t *testing.T) {
header, err := accountant.AccountBlob(ctx, 500, quorums)
assert.NoError(t, err)
assert.Equal(t, isRotation([]uint64{1000, 0, 0}, mapRecordUsage(accountant.binRecords)), true)
metadata := core.ConvertPaymentHeader(header)
assert.Equal(t, big.NewInt(500), metadata.CumulativePayment)
assert.Equal(t, big.NewInt(500), header.CumulativePayment)

// Wait for next reservation duration
time.Sleep(time.Duration(reservationWindow) * time.Second)
Expand Down
Loading

0 comments on commit f07e820

Please sign in to comment.