Skip to content

Commit

Permalink
Merge branch 'master' into quaq/blob-verification
Browse files Browse the repository at this point in the history
  • Loading branch information
0x0aa0 committed Nov 20, 2024
2 parents 8136dee + 5ce2098 commit 31017e2
Show file tree
Hide file tree
Showing 45 changed files with 1,574 additions and 276 deletions.
3 changes: 2 additions & 1 deletion api/clients/disperser_client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ func (c *disperserClientV2) DisperseBlob(
blobCommitments = *deserialized
} else {
// if prover is configured, get commitments from prover
blobCommitments, err = c.prover.GetCommitments(data)

blobCommitments, err = c.prover.GetCommitmentsForPaddedLength(data)
if err != nil {
return nil, [32]byte{}, fmt.Errorf("error getting blob commitments: %w", err)
}
Expand Down
92 changes: 53 additions & 39 deletions api/clients/relay_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,49 +45,44 @@ type RelayClient interface {
type relayClient struct {
config *RelayClientConfig

initOnce map[corev2.RelayKey]*sync.Once
conns map[corev2.RelayKey]*grpc.ClientConn
logger logging.Logger

grpcClients map[corev2.RelayKey]relaygrpc.RelayClient
// initOnce is used to ensure that the connection to each relay is initialized only once.
// It maps relay key to a sync.Once instance: `map[corev2.RelayKey]*sync.Once`
initOnce *sync.Map
// conns maps relay key to the gRPC connection: `map[corev2.RelayKey]*grpc.ClientConn`
conns sync.Map
logger logging.Logger

// grpcClients maps relay key to the gRPC client: `map[corev2.RelayKey]relaygrpc.RelayClient`
grpcClients sync.Map
}

var _ RelayClient = (*relayClient)(nil)

// NewRelayClient creates a new RelayClient that connects to the relays specified in the config.
// It keeps a connection to each relay and reuses it for subsequent requests, and the connection is lazily instantiated.
func NewRelayClient(config *RelayClientConfig, logger logging.Logger) (*relayClient, error) {
if config == nil || len(config.Sockets) > 0 {
if config == nil || len(config.Sockets) <= 0 {
return nil, fmt.Errorf("invalid config: %v", config)
}

initOnce := make(map[corev2.RelayKey]*sync.Once)
conns := make(map[corev2.RelayKey]*grpc.ClientConn)
grpcClients := make(map[corev2.RelayKey]relaygrpc.RelayClient)
initOnce := sync.Map{}
for key := range config.Sockets {
initOnce[key] = &sync.Once{}
initOnce.Store(key, &sync.Once{})
}
return &relayClient{
config: config,

initOnce: initOnce,
conns: conns,
initOnce: &initOnce,
logger: logger,

grpcClients: grpcClients,
}, nil
}

func (c *relayClient) GetBlob(ctx context.Context, relayKey corev2.RelayKey, blobKey corev2.BlobKey) ([]byte, error) {
if err := c.initOnceGrpcConnection(relayKey); err != nil {
client, err := c.getClient(relayKey)
if err != nil {
return nil, err
}

client, ok := c.grpcClients[relayKey]
if !ok {
return nil, fmt.Errorf("no grpc client for relay key: %v", relayKey)
}

res, err := client.GetBlob(ctx, &relaygrpc.GetBlobRequest{
BlobKey: blobKey[:],
})
Expand All @@ -102,15 +97,11 @@ func (c *relayClient) GetChunksByRange(ctx context.Context, relayKey corev2.Rela
if len(requests) == 0 {
return nil, fmt.Errorf("no requests")
}
if err := c.initOnceGrpcConnection(relayKey); err != nil {
client, err := c.getClient(relayKey)
if err != nil {
return nil, err
}

client, ok := c.grpcClients[relayKey]
if !ok {
return nil, fmt.Errorf("no grpc client for relay key: %v", relayKey)
}

grpcRequests := make([]*relaygrpc.ChunkRequest, len(requests))
for i, req := range requests {
grpcRequests[i] = &relaygrpc.ChunkRequest{
Expand Down Expand Up @@ -138,13 +129,10 @@ func (c *relayClient) GetChunksByIndex(ctx context.Context, relayKey corev2.Rela
if len(requests) == 0 {
return nil, fmt.Errorf("no requests")
}
if err := c.initOnceGrpcConnection(relayKey); err != nil {
return nil, err
}

client, ok := c.grpcClients[relayKey]
if !ok {
return nil, fmt.Errorf("no grpc client for relay key: %v", relayKey)
client, err := c.getClient(relayKey)
if err != nil {
return nil, err
}

grpcRequests := make([]*relaygrpc.ChunkRequest, len(requests))
Expand All @@ -169,9 +157,28 @@ func (c *relayClient) GetChunksByIndex(ctx context.Context, relayKey corev2.Rela
return res.GetData(), nil
}

func (c *relayClient) getClient(key corev2.RelayKey) (relaygrpc.RelayClient, error) {
if err := c.initOnceGrpcConnection(key); err != nil {
return nil, err
}
maybeClient, ok := c.grpcClients.Load(key)
if !ok {
return nil, fmt.Errorf("no grpc client for relay key: %v", key)
}
client, ok := maybeClient.(relaygrpc.RelayClient)
if !ok {
return nil, fmt.Errorf("invalid grpc client for relay key: %v", key)
}
return client, nil
}

func (c *relayClient) initOnceGrpcConnection(key corev2.RelayKey) error {
var initErr error
c.initOnce[key].Do(func() {
once, ok := c.initOnce.Load(key)
if !ok {
return fmt.Errorf("unknown relay key: %v", key)
}
once.(*sync.Once).Do(func() {
socket, ok := c.config.Sockets[key]
if !ok {
initErr = fmt.Errorf("unknown relay key: %v", key)
Expand All @@ -183,24 +190,31 @@ func (c *relayClient) initOnceGrpcConnection(key corev2.RelayKey) error {
initErr = err
return
}
c.conns[key] = conn
c.grpcClients[key] = relaygrpc.NewRelayClient(conn)
c.conns.Store(key, conn)
c.grpcClients.Store(key, relaygrpc.NewRelayClient(conn))
})
return initErr
}

func (c *relayClient) Close() error {
var errList *multierror.Error
for k, conn := range c.conns {
c.conns.Range(func(k, v interface{}) bool {
conn, ok := v.(*grpc.ClientConn)
if !ok {
errList = multierror.Append(errList, fmt.Errorf("invalid connection for relay key: %v", k))
return true
}

if conn != nil {
err := conn.Close()
conn = nil
c.grpcClients[k] = nil
c.conns.Delete(k)
c.grpcClients.Delete(k)
if err != nil {
c.logger.Error("failed to close connection", "err", err)
errList = multierror.Append(errList, err)
}
}
}
return true
})
return errList.ErrorOrNil()
}
2 changes: 1 addition & 1 deletion contracts/script/GenerateUnitTestHashes.s.sol
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,4 @@ contract GenerateHashes is Script {


}
}
}
2 changes: 1 addition & 1 deletion contracts/src/core/EigenDAServiceManagerStorage.sol
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,4 @@ abstract contract EigenDAServiceManagerStorage is IEigenDAServiceManager {
// storage gap for upgradeability
// slither-disable-next-line shadowing-state
uint256[47] private __GAP;
}
}
2 changes: 1 addition & 1 deletion contracts/src/interfaces/IEigenDARelayRegistry.sol
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ interface IEigenDARelayRegistry {
function getRelayId(address relay) external view returns (uint32);

function getRelayAddress(uint32 id) external view returns (address);
}
}
2 changes: 1 addition & 1 deletion core/v2/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func makeTestBlob(t *testing.T, p encoding.Prover, version corev2.BlobVersion, l

data = codec.ConvertByPaddingEmptyByte(data)

commitments, err := p.GetCommitments(data)
commitments, err := p.GetCommitmentsForPaddedLength(data)
if err != nil {
t.Fatal(err)
}
Expand Down
14 changes: 7 additions & 7 deletions core/v2/serialization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestPaymentHash(t *testing.T) {

func TestBlobKeyFromHeader(t *testing.T) {
data := codec.ConvertByPaddingEmptyByte(GETTYSBURG_ADDRESS_BYTES)
commitments, err := p.GetCommitments(data)
commitments, err := p.GetCommitmentsForPaddedLength(data)
if err != nil {
t.Fatal(err)
}
Expand All @@ -52,8 +52,8 @@ func TestBlobKeyFromHeader(t *testing.T) {
}
blobKey, err := bh.BlobKey()
assert.NoError(t, err)
// 0xb19d368345990c79744fe571fe99f427f35787b9383c55089fb5bd6a5c171bbc verified in solidity
assert.Equal(t, "b19d368345990c79744fe571fe99f427f35787b9383c55089fb5bd6a5c171bbc", blobKey.Hex())
// 0x40efb7273649f39590b27550ea06eeb81efd6ae4d719385a302fbd93173a395d verified in solidity
assert.Equal(t, "40efb7273649f39590b27550ea06eeb81efd6ae4d719385a302fbd93173a395d", blobKey.Hex())
}

func TestBatchHeaderHash(t *testing.T) {
Expand Down Expand Up @@ -87,7 +87,7 @@ func TestBatchHeaderSerialization(t *testing.T) {

func TestBlobCertHash(t *testing.T) {
data := codec.ConvertByPaddingEmptyByte(GETTYSBURG_ADDRESS_BYTES)
commitments, err := p.GetCommitments(data)
commitments, err := p.GetCommitmentsForPaddedLength(data)
if err != nil {
t.Fatal(err)
}
Expand All @@ -109,13 +109,13 @@ func TestBlobCertHash(t *testing.T) {

hash, err := blobCert.Hash()
assert.NoError(t, err)
// 0xc4512b8702f69cb837fff50a93d3d28aada535b1f151b64db45859c3f5bb096a verified in solidity
assert.Equal(t, "c4512b8702f69cb837fff50a93d3d28aada535b1f151b64db45859c3f5bb096a", hex.EncodeToString(hash[:]))
// 0x3719a91e2a294feafdd624c1c88a6f1db1a5c79ee0863b352255bc9162f02751 verified in solidity
assert.Equal(t, "3719a91e2a294feafdd624c1c88a6f1db1a5c79ee0863b352255bc9162f02751", hex.EncodeToString(hash[:]))
}

func TestBlobCertSerialization(t *testing.T) {
data := codec.ConvertByPaddingEmptyByte(GETTYSBURG_ADDRESS_BYTES)
commitments, err := p.GetCommitments(data)
commitments, err := p.GetCommitmentsForPaddedLength(data)
if err != nil {
t.Fatal(err)
}
Expand Down
6 changes: 3 additions & 3 deletions core/v2/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

func TestConvertBatchToFromProtobuf(t *testing.T) {
data := codec.ConvertByPaddingEmptyByte(GETTYSBURG_ADDRESS_BYTES)
commitments, err := p.GetCommitments(data)
commitments, err := p.GetCommitmentsForPaddedLength(data)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -68,7 +68,7 @@ func TestConvertBatchToFromProtobuf(t *testing.T) {

func TestConvertBlobHeaderToFromProtobuf(t *testing.T) {
data := codec.ConvertByPaddingEmptyByte(GETTYSBURG_ADDRESS_BYTES)
commitments, err := p.GetCommitments(data)
commitments, err := p.GetCommitmentsForPaddedLength(data)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -96,7 +96,7 @@ func TestConvertBlobHeaderToFromProtobuf(t *testing.T) {

func TestConvertBlobCertToFromProtobuf(t *testing.T) {
data := codec.ConvertByPaddingEmptyByte(GETTYSBURG_ADDRESS_BYTES)
commitments, err := p.GetCommitments(data)
commitments, err := p.GetCommitmentsForPaddedLength(data)
if err != nil {
t.Fatal(err)
}
Expand Down
8 changes: 1 addition & 7 deletions disperser/apiserver/disperse_blob_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/Layr-Labs/eigenda/api"
pb "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2"
"github.com/Layr-Labs/eigenda/common"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
dispv2 "github.com/Layr-Labs/eigenda/disperser/common/v2"
"github.com/Layr-Labs/eigenda/encoding"
Expand All @@ -19,17 +18,12 @@ func (s *DispersalServerV2) DisperseBlob(ctx context.Context, req *pb.DisperseBl
return nil, err
}

origin, err := common.GetClientAddress(ctx, s.rateConfig.ClientIPHeader, 2, true)
if err != nil {
return nil, api.NewErrorInvalidArg(err.Error())
}

data := req.GetData()
blobHeader, err := corev2.BlobHeaderFromProtobuf(req.GetBlobHeader())
if err != nil {
return nil, api.NewErrorInternal(err.Error())
}
s.logger.Debug("received a new blob dispersal request", "origin", origin, "blobSizeBytes", len(data), "quorums", req.GetBlobHeader().GetQuorumNumbers())
s.logger.Debug("received a new blob dispersal request", "blobSizeBytes", len(data), "quorums", req.GetBlobHeader().GetQuorumNumbers())

// TODO(ian-shim): handle payments and check rate limits

Expand Down
41 changes: 1 addition & 40 deletions disperser/apiserver/server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,10 @@ type DispersalServerV2 struct {
pb.UnimplementedDisperserServer

serverConfig disperser.ServerConfig
rateConfig RateConfig
blobStore *blobstore.BlobStore
blobMetadataStore *blobstore.BlobMetadataStore

chainReader core.Reader
ratelimiter common.RateLimiter
authenticator corev2.BlobRequestAuthenticator
prover encoding.Prover
logger logging.Logger
Expand Down Expand Up @@ -67,12 +65,10 @@ func NewDispersalServerV2(

return &DispersalServerV2{
serverConfig: serverConfig,
rateConfig: rateConfig,
blobStore: blobStore,
blobMetadataStore: blobMetadataStore,

chainReader: chainReader,
ratelimiter: ratelimiter,
authenticator: authenticator,
prover: prover,
logger: logger,
Expand Down Expand Up @@ -105,10 +101,6 @@ func (s *DispersalServerV2) Start(ctx context.Context) error {
return fmt.Errorf("failed to refresh onchain quorum state: %w", err)
}

if err := s.RefreshAllowlist(); err != nil {
return fmt.Errorf("failed to refresh allowlist: %w", err)
}

go func() {
ticker := time.NewTicker(s.onchainStateRefreshInterval)
defer ticker.Stop()
Expand All @@ -125,21 +117,6 @@ func (s *DispersalServerV2) Start(ctx context.Context) error {
}
}()

go func() {
t := time.NewTicker(s.rateConfig.AllowlistRefreshInterval)
defer t.Stop()
for {
select {
case <-t.C:
if err := s.RefreshAllowlist(); err != nil {
s.logger.Error("failed to refresh allowlist", "err", err)
}
case <-ctx.Done():
return
}
}
}()

s.logger.Info("GRPC Listening", "port", s.serverConfig.GrpcPort, "address", listener.Addr().String())

if err := gs.Serve(listener); err != nil {
Expand All @@ -160,7 +137,7 @@ func (s *DispersalServerV2) GetBlobCommitment(ctx context.Context, req *pb.BlobC
if uint64(blobSize) > s.maxNumSymbolsPerBlob*encoding.BYTES_PER_SYMBOL {
return nil, api.NewErrorInvalidArg(fmt.Sprintf("blob size cannot exceed %v bytes", s.maxNumSymbolsPerBlob*encoding.BYTES_PER_SYMBOL))
}
c, err := s.prover.GetCommitments(req.GetData())
c, err := s.prover.GetCommitmentsForPaddedLength(req.GetData())
if err != nil {
return nil, api.NewErrorInternal("failed to get commitments")
}
Expand All @@ -186,22 +163,6 @@ func (s *DispersalServerV2) GetBlobCommitment(ctx context.Context, req *pb.BlobC
}}, nil
}

func (s *DispersalServerV2) RefreshAllowlist() error {
s.logger.Debug("Refreshing onchain quorum state")
al, err := ReadAllowlistFromFile(s.rateConfig.AllowlistFile)
if err != nil {
return fmt.Errorf("failed to load allowlist: %w", err)
}
s.rateConfig.Allowlist = al
for account, rateInfoByQuorum := range al {
for quorumID, rateInfo := range rateInfoByQuorum {
s.logger.Info("[Allowlist]", "account", account, "name", rateInfo.Name, "quorumID", quorumID, "throughput", rateInfo.Throughput, "blobRate", rateInfo.BlobRate)
}
}

return nil
}

// refreshOnchainState refreshes the onchain quorum state.
// It should be called periodically to keep the state up to date.
// **Note** that there is no lock. If the state is being updated concurrently, it may lead to inconsistent state.
Expand Down
Loading

0 comments on commit 31017e2

Please sign in to comment.