Skip to content

Commit

Permalink
Separate apiserver validation (#279)
Browse files Browse the repository at this point in the history
  • Loading branch information
mooselumph authored Feb 26, 2024
1 parent 25dd873 commit 206721f
Showing 1 changed file with 33 additions and 18 deletions.
51 changes: 33 additions & 18 deletions disperser/apiserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,50 +178,69 @@ func (s *DispersalServer) DisperseBlob(ctx context.Context, req *pb.DisperseBlob
return reply, err
}

func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, authenticatedAddress string) (*pb.DisperseBlobReply, error) {
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) {
s.metrics.ObserveLatency("DisperseBlob", f*1000) // make milliseconds
}))
defer timer.ObserveDuration()
func (s *DispersalServer) validateBlobRequest(ctx context.Context, blob *core.Blob) error {

securityParams := blob.RequestHeader.SecurityParams
if len(securityParams) == 0 {
return nil, fmt.Errorf("invalid request: security_params must not be empty")
return fmt.Errorf("invalid request: security_params must not be empty")
}
if len(securityParams) > 256 {
return nil, fmt.Errorf("invalid request: security_params must not exceed 256")
return fmt.Errorf("invalid request: security_params must not exceed 256")
}

seenQuorums := make(map[uint8]struct{})
// The quorum ID must be in range [0, 254]. It'll actually be converted
// to uint8, so it cannot be greater than 254.
for _, param := range securityParams {
if _, ok := seenQuorums[param.QuorumID]; ok {
return nil, fmt.Errorf("invalid request: security_params must not contain duplicate quorum_id")
return fmt.Errorf("invalid request: security_params must not contain duplicate quorum_id")
}
seenQuorums[param.QuorumID] = struct{}{}

if param.QuorumID >= s.quorumCount {
err := s.updateQuorumCount(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get onchain quorum count: %w", err)
return fmt.Errorf("failed to get onchain quorum count: %w", err)
}

if param.QuorumID >= s.quorumCount {
return nil, fmt.Errorf("invalid request: the quorum_id must be in range [0, %d], but found %d", s.quorumCount-1, param.QuorumID)
return fmt.Errorf("invalid request: the quorum_id must be in range [0, %d], but found %d", s.quorumCount-1, param.QuorumID)
}
}
}

blobSize := len(blob.Data)
// The blob size in bytes must be in range [1, maxBlobSize].
if blobSize > maxBlobSize {
return nil, fmt.Errorf("blob size cannot exceed 2 MiB")
return fmt.Errorf("blob size cannot exceed 2 MiB")
}
if blobSize == 0 {
return nil, fmt.Errorf("blob size must be greater than 0")
return fmt.Errorf("blob size must be greater than 0")
}

if err := blob.RequestHeader.Validate(); err != nil {
s.logger.Warn("invalid header", "err", err)
return err
}

return nil

}

func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, authenticatedAddress string) (*pb.DisperseBlobReply, error) {
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) {
s.metrics.ObserveLatency("DisperseBlob", f*1000) // make milliseconds
}))
defer timer.ObserveDuration()

securityParams := blob.RequestHeader.SecurityParams
securityParamsStrings := make([]string, len(securityParams))
for i, sp := range securityParams {
securityParamsStrings[i] = sp.String()
}

blobSize := len(blob.Data)

origin, err := common.GetClientAddress(ctx, s.rateConfig.ClientIPHeader, 2, true)
if err != nil {
for _, param := range securityParams {
Expand All @@ -231,14 +250,10 @@ func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, aut
return nil, err
}

securityParamsStrings := make([]string, len(securityParams))
for i, sp := range securityParams {
securityParamsStrings[i] = sp.String()
}
s.logger.Debug("received a new blob request", "origin", origin, "securityParams", strings.Join(securityParamsStrings, ", "))

if err := blob.RequestHeader.Validate(); err != nil {
s.logger.Warn("invalid header", "err", err)
err = s.validateBlobRequest(ctx, blob)
if err != nil {
for _, param := range securityParams {
quorumId := string(param.QuorumID)
s.metrics.HandleFailedRequest(quorumId, blobSize, "DisperseBlob")
Expand Down

0 comments on commit 206721f

Please sign in to comment.