Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup disperser apiserver #278

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
276 changes: 276 additions & 0 deletions disperser/apiserver/dispersal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
package apiserver

import (
"context"
"errors"
"fmt"
"math/rand"
"slices"
"strings"
"time"

commonpb "github.com/Layr-Labs/eigenda/api/grpc/common"
pb "github.com/Layr-Labs/eigenda/api/grpc/disperser"
"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/prometheus/client_golang/prometheus"
)

func (s *DispersalServer) DisperseBlobAuthenticated(stream pb.Disperser_DisperseBlobAuthenticatedServer) error {

// Process disperse_request
in, err := stream.Recv()
if err != nil {
return fmt.Errorf("error receiving next message: %v", err)
}

request, ok := in.Payload.(*pb.AuthenticatedRequest_DisperseRequest)
if !ok {
return errors.New("expected DisperseBlobRequest")
}

blob := getBlobFromRequest(request.DisperseRequest)

// Get the ethereum address associated with the public key. This is just for convenience so we can put addresses instead of public keys in the allowlist.
// Decode public key
publicKeyBytes, err := hexutil.Decode(blob.RequestHeader.AccountID)
if err != nil {
return fmt.Errorf("failed to decode public key (%v): %v", blob.RequestHeader.AccountID, err)
}

pubKey, err := crypto.UnmarshalPubkey(publicKeyBytes)
if err != nil {
return fmt.Errorf("failed to decode public key (%v): %v", blob.RequestHeader.AccountID, err)
}

authenticatedAddress := crypto.PubkeyToAddress(*pubKey).String()

// Send back challenge to client
challenge := rand.Uint32()
err = stream.Send(&pb.AuthenticatedReply{Payload: &pb.AuthenticatedReply_BlobAuthHeader{
BlobAuthHeader: &pb.BlobAuthHeader{
ChallengeParameter: challenge,
},
}})
if err != nil {
return err
}

// Recieve challenge_reply
in, err = stream.Recv()
if err != nil {
return fmt.Errorf("error receiving next message: %v", err)
}

challengeReply, ok := in.Payload.(*pb.AuthenticatedRequest_AuthenticationData)
if !ok {
return errors.New("expected AuthenticationData")
}

blob.RequestHeader.Nonce = challenge
blob.RequestHeader.AuthenticationData = challengeReply.AuthenticationData.AuthenticationData

err = s.authenticator.AuthenticateBlobRequest(blob.RequestHeader.BlobAuthHeader)
if err != nil {
return fmt.Errorf("failed to authenticate blob request: %v", err)
}

// Disperse the blob
reply, err := s.disperseBlob(stream.Context(), blob, authenticatedAddress)
if err != nil {
s.logger.Info("failed to disperse blob", "err", err)
return err
}

// Send back disperse_reply
err = stream.Send(&pb.AuthenticatedReply{Payload: &pb.AuthenticatedReply_DisperseReply{
DisperseReply: reply,
}})
if err != nil {
s.logger.Error("failed to stream back DisperseReply", "err", err)
return err
}

return nil

}

func (s *DispersalServer) DisperseBlob(ctx context.Context, req *pb.DisperseBlobRequest) (*pb.DisperseBlobReply, error) {

blob := getBlobFromRequest(req)

reply, err := s.disperseBlob(ctx, blob, "")
if err != nil {
s.logger.Info("failed to disperse blob", "err", err)
}
return reply, err
}

func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, authenticatedAddress string) (*pb.DisperseBlobReply, error) {
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) {
s.metrics.ObserveLatency("DisperseBlob", f*1000) // make milliseconds
}))
defer timer.ObserveDuration()

securityParams := blob.RequestHeader.SecurityParams
securityParamsStrings := make([]string, len(securityParams))
for i, sp := range securityParams {
securityParamsStrings[i] = sp.String()
}

blobSize := len(blob.Data)

origin, err := common.GetClientAddress(ctx, s.rateConfig.ClientIPHeader, 2, true)
if err != nil {
for _, param := range securityParams {
quorumId := string(param.QuorumID)
s.metrics.HandleFailedRequest(quorumId, blobSize, "DisperseBlob")
}
return nil, err
}

s.logger.Debug("received a new blob request", "origin", origin, "securityParams", strings.Join(securityParamsStrings, ", "))

err = s.validateBlobRequest(ctx, blob)
if err != nil {
for _, param := range securityParams {
quorumId := string(param.QuorumID)
s.metrics.HandleFailedRequest(quorumId, blobSize, "DisperseBlob")
}
return nil, err
}

if s.ratelimiter != nil {
err := s.checkRateLimitsAndAddRates(ctx, blob, origin, authenticatedAddress)
if err != nil {
for _, param := range securityParams {
quorumId := string(param.QuorumID)
if errors.Is(err, errSystemBlobRateLimit) || errors.Is(err, errSystemThroughputRateLimit) {
s.metrics.HandleSystemRateLimitedRequest(quorumId, blobSize, "DisperseBlob")
} else if errors.Is(err, errAccountBlobRateLimit) || errors.Is(err, errAccountThroughputRateLimit) {
s.metrics.HandleAccountRateLimitedRequest(quorumId, blobSize, "DisperseBlob")
} else {
s.metrics.HandleFailedRequest(quorumId, blobSize, "DisperseBlob")
}
}
return nil, err
}
}

requestedAt := uint64(time.Now().UnixNano())
metadataKey, err := s.blobStore.StoreBlob(ctx, blob, requestedAt)
if err != nil {
for _, param := range securityParams {
quorumId := string(param.QuorumID)
s.metrics.HandleBlobStoreFailedRequest(quorumId, blobSize, "DisperseBlob")
}
s.logger.Error("failed to store blob", "err", err)
return nil, fmt.Errorf("failed to store blob, please try again later")
}

for _, param := range securityParams {
quorumId := string(param.QuorumID)
s.metrics.HandleSuccessfulRequest(quorumId, blobSize, "DisperseBlob")
}

s.logger.Info("successfully received a new blob: ", "key", metadataKey.String())
return &pb.DisperseBlobReply{
Result: pb.BlobStatus_PROCESSING,
RequestId: []byte(metadataKey.String()),
}, nil
}

func (s *DispersalServer) GetBlobStatus(ctx context.Context, req *pb.BlobStatusRequest) (*pb.BlobStatusReply, error) {
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) {
s.metrics.ObserveLatency("GetBlobStatus", f*1000) // make milliseconds
}))
defer timer.ObserveDuration()

requestID := req.GetRequestId()
if len(requestID) == 0 {
return nil, fmt.Errorf("invalid request: request_id must not be empty")
}

s.logger.Info("received a new blob status request", "requestID", string(requestID))
metadataKey, err := disperser.ParseBlobKey(string(requestID))
if err != nil {
return nil, err
}

s.logger.Debug("metadataKey", "metadataKey", metadataKey.String())
metadata, err := s.blobStore.GetBlobMetadata(ctx, metadataKey)
if err != nil {
return nil, err
}

isConfirmed, err := metadata.IsConfirmed()
if err != nil {
return nil, err
}

s.logger.Debug("isConfirmed", "metadata", metadata, "isConfirmed", isConfirmed)
if isConfirmed {
confirmationInfo := metadata.ConfirmationInfo
dataLength := uint32(confirmationInfo.BlobCommitment.Length)
quorumInfos := confirmationInfo.BlobQuorumInfos
slices.SortStableFunc[[]*core.BlobQuorumInfo](quorumInfos, func(a, b *core.BlobQuorumInfo) int {
return int(a.QuorumID) - int(b.QuorumID)
})
blobQuorumParams := make([]*pb.BlobQuorumParam, len(quorumInfos))
quorumNumbers := make([]byte, len(quorumInfos))
quorumPercentSigned := make([]byte, len(quorumInfos))
quorumIndexes := make([]byte, len(quorumInfos))
for i, quorumInfo := range quorumInfos {
blobQuorumParams[i] = &pb.BlobQuorumParam{
QuorumNumber: uint32(quorumInfo.QuorumID),
AdversaryThresholdPercentage: uint32(quorumInfo.AdversaryThreshold),
QuorumThresholdPercentage: uint32(quorumInfo.QuorumThreshold),
ChunkLength: uint32(quorumInfo.ChunkLength),
}
quorumNumbers[i] = quorumInfo.QuorumID
quorumPercentSigned[i] = confirmationInfo.QuorumResults[quorumInfo.QuorumID].PercentSigned
quorumIndexes[i] = byte(i)
}

return &pb.BlobStatusReply{
Status: getResponseStatus(metadata.BlobStatus),
Info: &pb.BlobInfo{
BlobHeader: &pb.BlobHeader{
Commitment: &commonpb.G1Commitment{
X: confirmationInfo.BlobCommitment.Commitment.X.Marshal(),
Y: confirmationInfo.BlobCommitment.Commitment.Y.Marshal(),
},
DataLength: dataLength,
BlobQuorumParams: blobQuorumParams,
},
BlobVerificationProof: &pb.BlobVerificationProof{
BatchId: confirmationInfo.BatchID,
BlobIndex: confirmationInfo.BlobIndex,
BatchMetadata: &pb.BatchMetadata{
BatchHeader: &pb.BatchHeader{
BatchRoot: confirmationInfo.BatchRoot,
QuorumNumbers: quorumNumbers,
QuorumSignedPercentages: quorumPercentSigned,
ReferenceBlockNumber: confirmationInfo.ReferenceBlockNumber,
},
SignatoryRecordHash: confirmationInfo.SignatoryRecordHash[:],
Fee: confirmationInfo.Fee,
ConfirmationBlockNumber: confirmationInfo.ConfirmationBlockNumber,
BatchHeaderHash: confirmationInfo.BatchHeaderHash[:],
},
InclusionProof: confirmationInfo.BlobInclusionProof,
// ref: api/proto/disperser/disperser.proto:BlobVerificationProof.quorum_indexes
QuorumIndexes: quorumIndexes,
},
},
}, nil
}

return &pb.BlobStatusReply{
Status: getResponseStatus(metadata.BlobStatus),
Info: &pb.BlobInfo{},
}, nil
}
11 changes: 5 additions & 6 deletions disperser/apiserver/ratelimit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"strings"
"testing"

"github.com/Layr-Labs/eigenda/api/grpc/disperser"
pb "github.com/Layr-Labs/eigenda/api/grpc/disperser"
"github.com/Layr-Labs/eigenda/api/grpc/mock"
"github.com/Layr-Labs/eigenda/core"
Expand Down Expand Up @@ -219,7 +218,7 @@ func simulateClient(t *testing.T, signer core.BlobRequestSigner, origin string,
reply, err := stream.RecvToClient()
assert.NoError(t, err)

authHeaderReply, ok := reply.Payload.(*disperser.AuthenticatedReply_BlobAuthHeader)
authHeaderReply, ok := reply.Payload.(*pb.AuthenticatedReply_BlobAuthHeader)
assert.True(t, ok)

authHeader := core.BlobAuthHeader{
Expand All @@ -232,8 +231,8 @@ func simulateClient(t *testing.T, signer core.BlobRequestSigner, origin string,
assert.NoError(t, err)

// Process challenge and send back challenge_reply
err = stream.SendFromClient(&disperser.AuthenticatedRequest{Payload: &disperser.AuthenticatedRequest_AuthenticationData{
AuthenticationData: &disperser.AuthenticationData{
err = stream.SendFromClient(&pb.AuthenticatedRequest{Payload: &pb.AuthenticatedRequest_AuthenticationData{
AuthenticationData: &pb.AuthenticationData{
AuthenticationData: authData,
},
}})
Expand All @@ -244,10 +243,10 @@ func simulateClient(t *testing.T, signer core.BlobRequestSigner, origin string,
reply, err = stream.RecvToClient()
assert.NoError(t, err)

disperseReply, ok := reply.Payload.(*disperser.AuthenticatedReply_DisperseReply)
disperseReply, ok := reply.Payload.(*pb.AuthenticatedReply_DisperseReply)
assert.True(t, ok)

assert.Equal(t, disperseReply.DisperseReply.Result, disperser.BlobStatus_PROCESSING)
assert.Equal(t, disperseReply.DisperseReply.Result, pb.BlobStatus_PROCESSING)

}

Expand Down
46 changes: 46 additions & 0 deletions disperser/apiserver/retrieval.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package apiserver

import (
"context"

pb "github.com/Layr-Labs/eigenda/api/grpc/disperser"
"github.com/prometheus/client_golang/prometheus"
)

func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlobRequest) (*pb.RetrieveBlobReply, error) {
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) {
s.metrics.ObserveLatency("RetrieveBlob", f*1000) // make milliseconds
}))
defer timer.ObserveDuration()

s.logger.Info("received a new blob retrieval request", "batchHeaderHash", req.BatchHeaderHash, "blobIndex", req.BlobIndex)

batchHeaderHash := req.GetBatchHeaderHash()
// Convert to [32]byte
var batchHeaderHash32 [32]byte
copy(batchHeaderHash32[:], batchHeaderHash)

blobIndex := req.GetBlobIndex()

blobMetadata, err := s.blobStore.GetMetadataInBatch(ctx, batchHeaderHash32, blobIndex)
if err != nil {
s.logger.Error("Failed to retrieve blob metadata", "err", err)
s.metrics.IncrementFailedBlobRequestNum("", "RetrieveBlob")

return nil, err
}

data, err := s.blobStore.GetBlobContent(ctx, blobMetadata.BlobHash)
if err != nil {
s.logger.Error("Failed to retrieve blob", "err", err)
s.metrics.HandleFailedRequest("", len(data), "RetrieveBlob")

return nil, err
}

s.metrics.HandleSuccessfulRequest("", len(data), "RetrieveBlob")

return &pb.RetrieveBlobReply{
Data: data,
}, nil
}
Loading
Loading