Skip to content

Commit

Permalink
Separate out client v2 package (#986)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Dec 16, 2024
1 parent 80231c3 commit b7fe161
Show file tree
Hide file tree
Showing 19 changed files with 96 additions and 74 deletions.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@ import (
"google.golang.org/grpc"
)

type DisperserClientV2Config struct {
type DisperserClientConfig struct {
Hostname string
Port string
UseSecureGrpcFlag bool
}

type DisperserClientV2 interface {
type DisperserClient interface {
Close() error
DisperseBlob(ctx context.Context, data []byte, blobVersion corev2.BlobVersion, quorums []core.QuorumID, salt uint32) (*dispv2.BlobStatus, corev2.BlobKey, error)
GetBlobStatus(ctx context.Context, blobKey corev2.BlobKey) (*disperser_rpc.BlobStatusReply, error)
GetBlobCommitment(ctx context.Context, data []byte) (*disperser_rpc.BlobCommitmentReply, error)
}

type disperserClientV2 struct {
config *DisperserClientV2Config
type disperserClient struct {
config *DisperserClientConfig
signer corev2.BlobRequestSigner
initOnce sync.Once
conn *grpc.ClientConn
Expand All @@ -39,18 +39,18 @@ type disperserClientV2 struct {
accountant *Accountant
}

var _ DisperserClientV2 = &disperserClientV2{}
var _ DisperserClient = &disperserClient{}

// DisperserClientV2 maintains a single underlying grpc connection to the disperser server,
// DisperserClient maintains a single underlying grpc connection to the disperser server,
// through which it sends requests to disperse blobs and get blob status.
// The connection is established lazily on the first method call. Don't forget to call Close(),
// which is safe to call even if the connection was never established.
//
// DisperserClientV2 is safe to be used concurrently by multiple goroutines.
// DisperserClient is safe to be used concurrently by multiple goroutines.
//
// Example usage:
//
// client := NewDisperserClientV2(config, signer)
// client := NewDisperserClient(config, signer)
// defer client.Close()
//
// // The connection will be established on the first call
Expand All @@ -61,7 +61,7 @@ var _ DisperserClientV2 = &disperserClientV2{}
//
// // Subsequent calls will use the existing connection
// status2, blobKey2, err := client.DisperseBlob(ctx, data, blobHeader)
func NewDisperserClientV2(config *DisperserClientV2Config, signer corev2.BlobRequestSigner, prover encoding.Prover, accountant *Accountant) (*disperserClientV2, error) {
func NewDisperserClient(config *DisperserClientConfig, signer corev2.BlobRequestSigner, prover encoding.Prover, accountant *Accountant) (*disperserClient, error) {
if config == nil {
return nil, api.NewErrorInvalidArg("config must be provided")
}
Expand All @@ -75,7 +75,7 @@ func NewDisperserClientV2(config *DisperserClientV2Config, signer corev2.BlobReq
return nil, api.NewErrorInvalidArg("signer must be provided")
}

return &disperserClientV2{
return &disperserClient{
config: config,
signer: signer,
prover: prover,
Expand All @@ -85,7 +85,7 @@ func NewDisperserClientV2(config *DisperserClientV2Config, signer corev2.BlobReq
}

// PopulateAccountant populates the accountant with the payment state from the disperser.
func (c *disperserClientV2) PopulateAccountant(ctx context.Context) error {
func (c *disperserClient) PopulateAccountant(ctx context.Context) error {
paymentState, err := c.GetPaymentState(ctx)
if err != nil {
return fmt.Errorf("error getting payment state for initializing accountant: %w", err)
Expand All @@ -100,7 +100,7 @@ func (c *disperserClientV2) PopulateAccountant(ctx context.Context) error {

// Close closes the grpc connection to the disperser server.
// It is thread safe and can be called multiple times.
func (c *disperserClientV2) Close() error {
func (c *disperserClient) Close() error {
if c.conn != nil {
err := c.conn.Close()
c.conn = nil
Expand All @@ -110,7 +110,7 @@ func (c *disperserClientV2) Close() error {
return nil
}

func (c *disperserClientV2) DisperseBlob(
func (c *disperserClient) DisperseBlob(
ctx context.Context,
data []byte,
blobVersion corev2.BlobVersion,
Expand Down Expand Up @@ -217,7 +217,7 @@ func (c *disperserClientV2) DisperseBlob(
}

// GetBlobStatus returns the status of a blob with the given blob key.
func (c *disperserClientV2) GetBlobStatus(ctx context.Context, blobKey corev2.BlobKey) (*disperser_rpc.BlobStatusReply, error) {
func (c *disperserClient) GetBlobStatus(ctx context.Context, blobKey corev2.BlobKey) (*disperser_rpc.BlobStatusReply, error) {
err := c.initOnceGrpcConnection()
if err != nil {
return nil, api.NewErrorInternal(err.Error())
Expand All @@ -230,7 +230,7 @@ func (c *disperserClientV2) GetBlobStatus(ctx context.Context, blobKey corev2.Bl
}

// GetPaymentState returns the payment state of the disperser client
func (c *disperserClientV2) GetPaymentState(ctx context.Context) (*disperser_rpc.GetPaymentStateReply, error) {
func (c *disperserClient) GetPaymentState(ctx context.Context) (*disperser_rpc.GetPaymentStateReply, error) {
err := c.initOnceGrpcConnection()
if err != nil {
return nil, api.NewErrorInternal(err.Error())
Expand All @@ -257,7 +257,7 @@ func (c *disperserClientV2) GetPaymentState(ctx context.Context) (*disperser_rpc
// While the blob commitment can be calculated by anyone, it requires SRS points to
// be loaded. For service that does not have access to SRS points, this method can be
// used to calculate the blob commitment in blob header, which is required for dispersal.
func (c *disperserClientV2) GetBlobCommitment(ctx context.Context, data []byte) (*disperser_rpc.BlobCommitmentReply, error) {
func (c *disperserClient) GetBlobCommitment(ctx context.Context, data []byte) (*disperser_rpc.BlobCommitmentReply, error) {
err := c.initOnceGrpcConnection()
if err != nil {
return nil, api.NewErrorInternal(err.Error())
Expand All @@ -271,7 +271,7 @@ func (c *disperserClientV2) GetBlobCommitment(ctx context.Context, data []byte)

// initOnceGrpcConnection initializes the grpc connection and client if they are not already initialized.
// If initialization fails, it caches the error and will return it on every subsequent call.
func (c *disperserClientV2) initOnceGrpcConnection() error {
func (c *disperserClient) initOnceGrpcConnection() error {
var initErr error
c.initOnce.Do(func() {
addr := fmt.Sprintf("%v:%v", c.config.Hostname, c.config.Port)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,23 @@ package mock
import (
"context"

"github.com/Layr-Labs/eigenda/api/clients"
"github.com/Layr-Labs/eigenda/api/clients/v2"
"github.com/Layr-Labs/eigenda/core"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/stretchr/testify/mock"
)

type MockNodeClientV2 struct {
type MockNodeClient struct {
mock.Mock
}

var _ clients.NodeClientV2 = (*MockNodeClientV2)(nil)
var _ clients.NodeClient = (*MockNodeClient)(nil)

func NewNodeClientV2() *MockNodeClientV2 {
return &MockNodeClientV2{}
func NewNodeClient() *MockNodeClient {
return &MockNodeClient{}
}

func (c *MockNodeClientV2) StoreChunks(ctx context.Context, batch *corev2.Batch) (*core.Signature, error) {
func (c *MockNodeClient) StoreChunks(ctx context.Context, batch *corev2.Batch) (*core.Signature, error) {
args := c.Called()
var signature *core.Signature
if args.Get(0) != nil {
Expand All @@ -28,7 +28,7 @@ func (c *MockNodeClientV2) StoreChunks(ctx context.Context, batch *corev2.Batch)
return signature, args.Error(1)
}

func (c *MockNodeClientV2) Close() error {
func (c *MockNodeClient) Close() error {
args := c.Called()
return args.Error(0)
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package mock
import (
"context"

"github.com/Layr-Labs/eigenda/api/clients"
"github.com/Layr-Labs/eigenda/api/clients/v2"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/stretchr/testify/mock"
)
Expand Down
20 changes: 10 additions & 10 deletions api/clients/node_client_v2.go → api/clients/v2/node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,37 +12,37 @@ import (
"google.golang.org/grpc"
)

type NodeClientV2Config struct {
type NodeClientConfig struct {
Hostname string
Port string
UseSecureGrpcFlag bool
}

type NodeClientV2 interface {
type NodeClient interface {
StoreChunks(ctx context.Context, certs *corev2.Batch) (*core.Signature, error)
Close() error
}

type nodeClientV2 struct {
config *NodeClientV2Config
type nodeClient struct {
config *NodeClientConfig
initOnce sync.Once
conn *grpc.ClientConn

dispersalClient nodegrpc.DispersalClient
}

var _ NodeClientV2 = (*nodeClientV2)(nil)
var _ NodeClient = (*nodeClient)(nil)

func NewNodeClientV2(config *NodeClientV2Config) (*nodeClientV2, error) {
func NewNodeClient(config *NodeClientConfig) (*nodeClient, error) {
if config == nil || config.Hostname == "" || config.Port == "" {
return nil, fmt.Errorf("invalid config: %v", config)
}
return &nodeClientV2{
return &nodeClient{
config: config,
}, nil
}

func (c *nodeClientV2) StoreChunks(ctx context.Context, batch *corev2.Batch) (*core.Signature, error) {
func (c *nodeClient) StoreChunks(ctx context.Context, batch *corev2.Batch) (*core.Signature, error) {
if len(batch.BlobCertificates) == 0 {
return nil, fmt.Errorf("no blob certificates in the batch")
}
Expand Down Expand Up @@ -89,7 +89,7 @@ func (c *nodeClientV2) StoreChunks(ctx context.Context, batch *corev2.Batch) (*c

// Close closes the grpc connection to the disperser server.
// It is thread safe and can be called multiple times.
func (c *nodeClientV2) Close() error {
func (c *nodeClient) Close() error {
if c.conn != nil {
err := c.conn.Close()
c.conn = nil
Expand All @@ -99,7 +99,7 @@ func (c *nodeClientV2) Close() error {
return nil
}

func (c *nodeClientV2) initOnceGrpcConnection() error {
func (c *nodeClient) initOnceGrpcConnection() error {
var initErr error
c.initOnce.Do(func() {
addr := fmt.Sprintf("%v:%v", c.config.Hostname, c.config.Port)
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"

"github.com/Layr-Labs/eigenda/api/clients"
grpcnode "github.com/Layr-Labs/eigenda/api/grpc/node/v2"
"github.com/Layr-Labs/eigenda/core"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
Expand All @@ -16,30 +17,30 @@ import (
"github.com/gammazero/workerpool"
)

// RetrievalClientV2 is an object that can retrieve blobs from the DA nodes.
// RetrievalClient is an object that can retrieve blobs from the DA nodes.
// To retrieve a blob from the relay, use RelayClient instead.
type RetrievalClientV2 interface {
type RetrievalClient interface {
// GetBlob downloads chunks of a blob from operator network and reconstructs the blob.
GetBlob(ctx context.Context, blobHeader corev2.BlobHeader, referenceBlockNumber uint64, quorumID core.QuorumID) ([]byte, error)
}

type retrievalClientV2 struct {
type retrievalClient struct {
logger logging.Logger
ethClient core.Reader
indexedChainState core.IndexedChainState
verifier encoding.Verifier
numConnections int
}

// NewRetrievalClientV2 creates a new retrieval client.
func NewRetrievalClientV2(
// NewRetrievalClient creates a new retrieval client.
func NewRetrievalClient(
logger logging.Logger,
ethClient core.Reader,
chainState core.IndexedChainState,
verifier encoding.Verifier,
numConnections int,
) RetrievalClientV2 {
return &retrievalClientV2{
) RetrievalClient {
return &retrievalClient{
logger: logger.With("component", "RetrievalClient"),
ethClient: ethClient,
indexedChainState: chainState,
Expand All @@ -48,7 +49,7 @@ func NewRetrievalClientV2(
}
}

func (r *retrievalClientV2) GetBlob(ctx context.Context, blobHeader corev2.BlobHeader, referenceBlockNumber uint64, quorumID core.QuorumID) ([]byte, error) {
func (r *retrievalClient) GetBlob(ctx context.Context, blobHeader corev2.BlobHeader, referenceBlockNumber uint64, quorumID core.QuorumID) ([]byte, error) {
blobKey, err := blobHeader.BlobKey()
if err != nil {
return nil, err
Expand Down Expand Up @@ -90,7 +91,7 @@ func (r *retrievalClientV2) GetBlob(ctx context.Context, blobHeader corev2.BlobH
}

// Fetch chunks from all operators
chunksChan := make(chan RetrievedChunks, len(operators))
chunksChan := make(chan clients.RetrievedChunks, len(operators))
pool := workerpool.New(r.numConnections)
for opID := range operators {
opID := opID
Expand Down Expand Up @@ -139,13 +140,13 @@ func (r *retrievalClientV2) GetBlob(ctx context.Context, blobHeader corev2.BlobH
)
}

func (r *retrievalClientV2) getChunksFromOperator(
func (r *retrievalClient) getChunksFromOperator(
ctx context.Context,
opID core.OperatorID,
opInfo *core.IndexedOperatorInfo,
blobKey corev2.BlobKey,
quorumID core.QuorumID,
chunksChan chan RetrievedChunks,
chunksChan chan clients.RetrievedChunks,
) {
conn, err := grpc.NewClient(
core.OperatorSocket(opInfo.Socket).GetRetrievalSocket(),
Expand All @@ -158,7 +159,7 @@ func (r *retrievalClientV2) getChunksFromOperator(
}
}()
if err != nil {
chunksChan <- RetrievedChunks{
chunksChan <- clients.RetrievedChunks{
OperatorID: opID,
Err: err,
Chunks: nil,
Expand All @@ -174,7 +175,7 @@ func (r *retrievalClientV2) getChunksFromOperator(

reply, err := n.GetChunks(ctx, request)
if err != nil {
chunksChan <- RetrievedChunks{
chunksChan <- clients.RetrievedChunks{
OperatorID: opID,
Err: err,
Chunks: nil,
Expand All @@ -187,7 +188,7 @@ func (r *retrievalClientV2) getChunksFromOperator(
var chunk *encoding.Frame
chunk, err = new(encoding.Frame).DeserializeGnark(data)
if err != nil {
chunksChan <- RetrievedChunks{
chunksChan <- clients.RetrievedChunks{
OperatorID: opID,
Err: err,
Chunks: nil,
Expand All @@ -197,7 +198,7 @@ func (r *retrievalClientV2) getChunksFromOperator(

chunks[i] = chunk
}
chunksChan <- RetrievedChunks{
chunksChan <- clients.RetrievedChunks{
OperatorID: opID,
Err: nil,
Chunks: chunks,
Expand Down
19 changes: 19 additions & 0 deletions api/clients/v2/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package clients

import (
"crypto/tls"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
)

func getGrpcDialOptions(useSecureGrpcFlag bool) []grpc.DialOption {
options := []grpc.DialOption{}
if useSecureGrpcFlag {
options = append(options, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})))
} else {
options = append(options, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
return options
}
Loading

0 comments on commit b7fe161

Please sign in to comment.