Skip to content

Commit

Permalink
refactor: comments and minor updates
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Oct 30, 2024
1 parent 16b05f2 commit 4753d35
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 132 deletions.
8 changes: 5 additions & 3 deletions api/clients/accountant.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ type Accountant struct {
binRecords []BinRecord
usageLock sync.Mutex
cumulativePayment *big.Int
stopRotation chan struct{}

paymentSigner core.PaymentSigner
}
Expand All @@ -52,14 +51,17 @@ func NewAccountant(reservation core.ActiveReservation, onDemand core.OnDemandPay
minNumSymbols: minNumSymbols,
binRecords: []BinRecord{{Index: 0, Usage: 0}, {Index: 1, Usage: 0}, {Index: 2, Usage: 0}},
cumulativePayment: big.NewInt(0),
stopRotation: make(chan struct{}),
paymentSigner: paymentSigner,
}
// TODO: add a routine to refresh the on-chain state occasionally?
return &a
}

// accountant calculates and records payment information
// BlobPaymentInfo calculates and records payment information. The accountant
// will attempt to use the active reservation first and check for quorum settings,
// then on-demand if the reservation is not available. The returned values are
// bin index for reservation payments and cumulative payment for on-demand payments,
// and both fields are used to create the payment header and signature
func (a *Accountant) BlobPaymentInfo(ctx context.Context, numSymbols uint64, quorumNumbers []uint8) (uint32, *big.Int, error) {
now := time.Now().Unix()
currentBinIndex := meterer.GetBinIndex(uint64(now), a.reservationWindow)
Expand Down
7 changes: 7 additions & 0 deletions api/clients/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ type EigenDAClientConfig struct {
// that can retrieve blobs but cannot disperse blobs.
SignerPrivateKeyHex string

// Payment signer private key in hex encoded format. This key connect to the wallet with payment registered on-chain
// if set to "", will result in a non-paying client and cannot disperse paid blobs.
PaymentSignerPrivateKeyHex string

// Whether to disable TLS for an insecure connection when connecting to a local EigenDA disperser instance.
DisableTLS bool

Expand Down Expand Up @@ -93,6 +97,9 @@ func (c *EigenDAClientConfig) CheckAndSetDefaults() error {
if len(c.SignerPrivateKeyHex) > 0 && len(c.SignerPrivateKeyHex) != 64 {
return fmt.Errorf("a valid length SignerPrivateKeyHex needs to have 64 bytes")
}
if len(c.PaymentSignerPrivateKeyHex) > 0 && len(c.PaymentSignerPrivateKeyHex) != 64 {
return fmt.Errorf("a valid length PaymentSignerPrivateKeyHex needs to have 64 bytes")
}

if len(c.RPC) == 0 {
return fmt.Errorf("EigenDAClientConfig.RPC not set")
Expand Down
33 changes: 31 additions & 2 deletions api/clients/disperser_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"github.com/Layr-Labs/eigenda/api"
disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser"

"github.com/Layr-Labs/eigenda/core"
Expand Down Expand Up @@ -70,10 +71,31 @@ type disperserClient struct {

var _ DisperserClient = &disperserClient{}

// DisperserClient maintains a single underlying grpc connection to the disperser server,
// through which it sends requests to disperse blobs, get blob status, and retrieve blobs.
// The connection is established lazily on the first method call. Don't forget to call Close(),
// which is safe to call even if the connection was never established.
//
// DisperserClient is safe to be used concurrently by multiple goroutines.
//
// Example usage:
//
// client := NewDisperserClient(config, signer)
// defer client.Close()
//
// // The connection will be established on the first call
// status, requestId, err := client.DisperseBlob(ctx, someData, someQuorums)
// if err != nil {
// // Handle error
// }
//
// // Subsequent calls will use the existing connection
// status2, requestId2, err := client.DisperseBlob(ctx, otherData, otherQuorums)
func NewDisperserClient(config *Config, signer core.BlobRequestSigner, accountant *Accountant) DisperserClient {
return &disperserClient{
config: config,
signer: signer,
config: config,
signer: signer,
// conn and client are initialized lazily
accountant: accountant,
}
}
Expand Down Expand Up @@ -129,6 +151,10 @@ func (c *disperserClient) DisperseBlob(ctx context.Context, data []byte, quorums

// DispersePaidBlob disperses a blob with a payment header and signature. Similar to DisperseBlob but with signed payment header.
func (c *disperserClient) DispersePaidBlob(ctx context.Context, data []byte, quorums []uint8) (*disperser.BlobStatus, []byte, error) {
if c.accountant == nil {
return nil, nil, api.NewErrorInternal("not implemented")
}

err := c.initOnceGrpcConnection()
if err != nil {
return nil, nil, fmt.Errorf("error initializing connection: %w", err)
Expand All @@ -149,6 +175,9 @@ func (c *disperserClient) DispersePaidBlob(ctx context.Context, data []byte, quo
}

header, signature, err := c.accountant.AccountBlob(ctx, uint64(encoding.GetBlobLength(uint(len(data)))), quorums)
if header == nil {
return nil, nil, errors.New("accountant returned nil pointer to header")
}
if err != nil {
return nil, nil, err
}
Expand Down
140 changes: 18 additions & 122 deletions api/clients/eigenda_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ type EigenDAClient struct {
ethClient *ethclient.Client
edasmCaller *edasm.ContractEigenDAServiceManagerCaller
Codec codecs.BlobCodec
accountant Accountant
}

var _ IEigenDAClient = &EigenDAClient{}
Expand Down Expand Up @@ -108,7 +107,15 @@ func NewEigenDAClient(log log.Logger, config EigenDAClientConfig) (*EigenDAClien

disperserConfig := NewConfig(host, port, config.ResponseTimeout, !config.DisableTLS)

paymentSigner := auth.NewPaymentSigner(hex.EncodeToString([]byte(config.SignerPrivateKeyHex)))
var paymentSigner core.PaymentSigner
if len(config.PaymentSignerPrivateKeyHex) == 64 {
paymentSigner = auth.NewPaymentSigner(config.PaymentSignerPrivateKeyHex)
} else if len(config.PaymentSignerPrivateKeyHex) == 0 {
paymentSigner = auth.NewNoopPaymentSigner()
} else {
return nil, fmt.Errorf("invalid length for signer private key")
}

// a subsequent PR contains updates to fill in payment state
accountant := NewAccountant(core.ActiveReservation{}, core.OnDemandPayment{}, 0, 0, 0, paymentSigner)

Expand Down Expand Up @@ -211,7 +218,15 @@ func (m *EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan
}
// disperse blob
// TODO: would be nice to add a trace-id key to the context, to be able to follow requests from batcher->proxy->eigenda
blobStatus, requestID, err := m.Client.DisperseBlobAuthenticated(ctx, data, customQuorumNumbers)
var blobStatus *disperser.BlobStatus
var requestID []byte
// clients with a payment signer setting can disperse paid blobs
if len(m.Config.PaymentSignerPrivateKeyHex) > 0 {
blobStatus, requestID, err = m.Client.DispersePaidBlob(ctx, data, customQuorumNumbers)
} else {
blobStatus, requestID, err = m.Client.DisperseBlobAuthenticated(ctx, data, customQuorumNumbers)

}
if err != nil {
errChan <- fmt.Errorf("error initializing DisperseBlobAuthenticated() client: %w", err)
return
Expand Down Expand Up @@ -304,125 +319,6 @@ func (m *EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan
}
}

// PaidPutBlob behaves like PutBlob but with authenticated payment.
func (m EigenDAClient) PaidPutBlob(ctx context.Context, data []byte) (*grpcdisperser.BlobInfo, error) {
resultChan, errorChan := m.PaidPutBlobAsync(ctx, data)
select { // no timeout here because we depend on the configured timeout in PutBlobAsync
case result := <-resultChan:
return result, nil
case err := <-errorChan:
return nil, err
}
}

func (m EigenDAClient) PaidPutBlobAsync(ctx context.Context, data []byte) (resultChan chan *grpcdisperser.BlobInfo, errChan chan error) {
resultChan = make(chan *grpcdisperser.BlobInfo, 1)
errChan = make(chan error, 1)
go m.paidPutBlob(ctx, data, resultChan, errChan)
return
}

func (m EigenDAClient) paidPutBlob(ctx context.Context, rawData []byte, resultChan chan *grpcdisperser.BlobInfo, errChan chan error) {
m.Log.Info("Attempting to disperse blob to EigenDA with payment")

// encode blob
if m.Codec == nil {
errChan <- fmt.Errorf("Codec cannot be nil")
return
}

data, err := m.Codec.EncodeBlob(rawData)
if err != nil {
errChan <- fmt.Errorf("error encoding blob: %w", err)
return
}

customQuorumNumbers := make([]uint8, len(m.Config.CustomQuorumIDs))
for i, e := range m.Config.CustomQuorumIDs {
customQuorumNumbers[i] = uint8(e)
}
// disperse blob
blobStatus, requestID, err := m.Client.DispersePaidBlob(ctx, data, customQuorumNumbers)
if err != nil {
errChan <- fmt.Errorf("error initializing DispersePaidBlob() client: %w", err)
return
}

// process response
if *blobStatus == disperser.Failed {
m.Log.Error("Unable to disperse blob to EigenDA, aborting", "err", err)
errChan <- fmt.Errorf("reply status is %d", blobStatus)
return
}

base64RequestID := base64.StdEncoding.EncodeToString(requestID)
m.Log.Info("Blob dispersed to EigenDA, now waiting for confirmation", "requestID", base64RequestID)

ticker := time.NewTicker(m.Config.StatusQueryRetryInterval)
defer ticker.Stop()

var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, m.Config.StatusQueryTimeout)
defer cancel()

alreadyWaitingForDispersal := false
alreadyWaitingForFinalization := false
for {
select {
case <-ctx.Done():
errChan <- fmt.Errorf("timed out waiting for EigenDA blob to confirm blob with request id=%s: %w", base64RequestID, ctx.Err())
return
case <-ticker.C:
statusRes, err := m.Client.GetBlobStatus(ctx, requestID)
if err != nil {
m.Log.Error("Unable to retrieve blob dispersal status, will retry", "requestID", base64RequestID, "err", err)
continue
}

switch statusRes.Status {
case grpcdisperser.BlobStatus_PROCESSING, grpcdisperser.BlobStatus_DISPERSING:
// to prevent log clutter, we only log at info level once
if alreadyWaitingForDispersal {
m.Log.Debug("Blob submitted, waiting for dispersal from EigenDA", "requestID", base64RequestID)
} else {
m.Log.Info("Blob submitted, waiting for dispersal from EigenDA", "requestID", base64RequestID)
alreadyWaitingForDispersal = true
}
case grpcdisperser.BlobStatus_FAILED:
m.Log.Error("EigenDA blob dispersal failed in processing", "requestID", base64RequestID, "err", err)
errChan <- fmt.Errorf("EigenDA blob dispersal failed in processing, requestID=%s: %w", base64RequestID, err)
return
case grpcdisperser.BlobStatus_INSUFFICIENT_SIGNATURES:
m.Log.Error("EigenDA blob dispersal failed in processing with insufficient signatures", "requestID", base64RequestID, "err", err)
errChan <- fmt.Errorf("EigenDA blob dispersal failed in processing with insufficient signatures, requestID=%s: %w", base64RequestID, err)
return
case grpcdisperser.BlobStatus_CONFIRMED:
if m.Config.WaitForFinalization {
// to prevent log clutter, we only log at info level once
if alreadyWaitingForFinalization {
m.Log.Debug("EigenDA blob confirmed, waiting for finalization", "requestID", base64RequestID)
} else {
m.Log.Info("EigenDA blob confirmed, waiting for finalization", "requestID", base64RequestID)
alreadyWaitingForFinalization = true
}
} else {
m.Log.Info("EigenDA blob confirmed", "requestID", base64RequestID)
resultChan <- statusRes.Info
return
}
case grpcdisperser.BlobStatus_FINALIZED:
batchHeaderHashHex := fmt.Sprintf("0x%s", hex.EncodeToString(statusRes.Info.BlobVerificationProof.BatchMetadata.BatchHeaderHash))
m.Log.Info("Successfully dispersed blob to EigenDA", "requestID", base64RequestID, "batchHeaderHash", batchHeaderHashHex)
resultChan <- statusRes.Info
return
default:
errChan <- fmt.Errorf("EigenDA blob dispersal failed in processing with reply status %d", statusRes.Status)
return
}
}
}
}

// Close simply calls Close() on the wrapped disperserClient, to close the grpc connection to the disperser server.
// It is thread safe and can be called multiple times.
func (c *EigenDAClient) Close() error {
Expand Down
2 changes: 1 addition & 1 deletion api/clients/eigenda_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ func TestPutBlobTotalTimeout(t *testing.T) {

func TestPutBlobNoopSigner(t *testing.T) {
config := clients.NewConfig("nohost", "noport", time.Second, false)
disperserClient := clients.NewDisperserClient(config, auth.NewLocalNoopSigner(), &clients.Accountant{})
disperserClient := clients.NewDisperserClient(config, auth.NewLocalNoopSigner(), nil)

test := []byte("test")
test[0] = 0x00 // make sure the first byte of the requst is always 0
Expand Down
2 changes: 1 addition & 1 deletion inabox/tests/ratelimit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func testRatelimit(t *testing.T, testConfig *deploy.Config, c ratelimitTestCase)
Hostname: "localhost",
Port: testConfig.Dispersers[0].DISPERSER_SERVER_GRPC_PORT,
Timeout: 10 * time.Second,
}, nil, &clients.Accountant{})
}, nil, nil)
assert.NotNil(t, disp)

data := make([]byte, c.blobSize)
Expand Down
2 changes: 1 addition & 1 deletion tools/traffic/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func NewTrafficGenerator(config *Config, signer core.BlobRequestSigner) (*Traffi

return &TrafficGenerator{
Logger: logger,
DisperserClient: clients.NewDisperserClient(&config.Config, signer),
DisperserClient: clients.NewDisperserClient(&config.Config, signer, nil),
Config: config,
}, nil
}
Expand Down
4 changes: 2 additions & 2 deletions tools/traffic/generator_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func NewTrafficGeneratorV2(config *config.Config) (*Generator, error) {

unconfirmedKeyChannel := make(chan *workers.UnconfirmedKey, 100)

disperserClient := clients.NewDisperserClient(config.DisperserClientConfig, signer)
disperserClient := clients.NewDisperserClient(config.DisperserClientConfig, signer, nil)
statusVerifier := workers.NewBlobStatusTracker(
&ctx,
&waitGroup,
Expand Down Expand Up @@ -134,7 +134,7 @@ func NewTrafficGeneratorV2(config *config.Config) (*Generator, error) {
waitGroup: &waitGroup,
generatorMetrics: generatorMetrics,
logger: &logger,
disperserClient: clients.NewDisperserClient(config.DisperserClientConfig, signer),
disperserClient: clients.NewDisperserClient(config.DisperserClientConfig, signer, nil),
eigenDAClient: client,
config: config,
writers: writers,
Expand Down

0 comments on commit 4753d35

Please sign in to comment.