Skip to content

Commit

Permalink
Provide signer and nonsigner operator IDs
Browse files Browse the repository at this point in the history
  • Loading branch information
jianoaix committed Feb 12, 2025
1 parent c5851d6 commit 7e674f9
Show file tree
Hide file tree
Showing 7 changed files with 350 additions and 17 deletions.
30 changes: 28 additions & 2 deletions disperser/dataapi/docs/v2/V2_docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,32 @@ const docTemplateV2 = `{
}
}
},
"v2.AttestationInfo": {
"type": "object",
"properties": {
"attestation": {
"$ref": "#/definitions/github_com_Layr-Labs_eigenda_core_v2.Attestation"
},
"nonsigning_operator_ids": {
"type": "object",
"additionalProperties": {
"type": "array",
"items": {
"type": "string"
}
}
},
"signing_operator_ids": {
"type": "object",
"additionalProperties": {
"type": "array",
"items": {
"type": "string"
}
}
}
}
},
"v2.BatchFeedResponse": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -1039,8 +1065,8 @@ const docTemplateV2 = `{
"v2.BlobAttestationInfoResponse": {
"type": "object",
"properties": {
"attestation": {
"$ref": "#/definitions/github_com_Layr-Labs_eigenda_core_v2.Attestation"
"attestation_info": {
"$ref": "#/definitions/v2.AttestationInfo"
},
"batch_header_hash": {
"type": "string"
Expand Down
30 changes: 28 additions & 2 deletions disperser/dataapi/docs/v2/V2_swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -976,6 +976,32 @@
}
}
},
"v2.AttestationInfo": {
"type": "object",
"properties": {
"attestation": {
"$ref": "#/definitions/github_com_Layr-Labs_eigenda_core_v2.Attestation"
},
"nonsigning_operator_ids": {
"type": "object",
"additionalProperties": {
"type": "array",
"items": {
"type": "string"
}
}
},
"signing_operator_ids": {
"type": "object",
"additionalProperties": {
"type": "array",
"items": {
"type": "string"
}
}
}
}
},
"v2.BatchFeedResponse": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -1036,8 +1062,8 @@
"v2.BlobAttestationInfoResponse": {
"type": "object",
"properties": {
"attestation": {
"$ref": "#/definitions/github_com_Layr-Labs_eigenda_core_v2.Attestation"
"attestation_info": {
"$ref": "#/definitions/v2.AttestationInfo"
},
"batch_header_hash": {
"type": "string"
Expand Down
21 changes: 19 additions & 2 deletions disperser/dataapi/docs/v2/V2_swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,23 @@ definitions:
type: number
type: object
type: object
v2.AttestationInfo:
properties:
attestation:
$ref: '#/definitions/github_com_Layr-Labs_eigenda_core_v2.Attestation'
nonsigning_operator_ids:
additionalProperties:
items:
type: string
type: array
type: object
signing_operator_ids:
additionalProperties:
items:
type: string
type: array
type: object
type: object
v2.BatchFeedResponse:
properties:
batches:
Expand Down Expand Up @@ -266,8 +283,8 @@ definitions:
type: object
v2.BlobAttestationInfoResponse:
properties:
attestation:
$ref: '#/definitions/github_com_Layr-Labs_eigenda_core_v2.Attestation'
attestation_info:
$ref: '#/definitions/v2.AttestationInfo'
batch_header_hash:
type: string
blob_inclusion_info:
Expand Down
53 changes: 52 additions & 1 deletion disperser/dataapi/v2/blobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strconv"
"time"

"github.com/Layr-Labs/eigenda/core"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore"
"github.com/gin-gonic/gin"
Expand Down Expand Up @@ -225,6 +226,8 @@ func (s *ServerV2) FetchBlobCertificate(c *gin.Context) {
// @Router /blobs/{blob_key}/attestation-info [get]
func (s *ServerV2) FetchBlobAttestationInfo(c *gin.Context) {
handlerStart := time.Now()

ctx := c.Request.Context()
blobKey, err := corev2.HexToBlobKey(c.Param("blob_key"))
if err != nil {
s.metrics.IncrementInvalidArgRequestNum("FetchBlobAttestationInfo")
Expand All @@ -246,11 +249,59 @@ func (s *ServerV2) FetchBlobAttestationInfo(c *gin.Context) {
return
}

// Get quorums that this blob was dispersed to
metadata, err := s.blobMetadataStore.GetBlobMetadata(ctx, blobKey)
if err != nil {
s.metrics.IncrementFailedRequestNum("FetchBlobAttestationInfo")
errorResponse(c, fmt.Errorf("failed to fetch blob metadat: %w", err))
return
}
blobQuorums := make(map[uint8]struct{}, 0)
for _, q := range metadata.BlobHeader.QuorumNumbers {
blobQuorums[q] = struct{}{}
}

// Get all nonsigners (some may be not in blob's quorums)
nonsigners := make(map[core.OperatorID]struct{}, 0)
for i := 0; i < len(attestationInfo.Attestation.NonSignerPubKeys); i++ {
opId := attestationInfo.Attestation.NonSignerPubKeys[i].GetOperatorID()
nonsigners[opId] = struct{}{}
}

// Get all operators at the reference block number
rbn := attestationInfo.Attestation.ReferenceBlockNumber
operatorsByQuorum, err := s.chainReader.GetOperatorStakesForQuorums(ctx, attestationInfo.Attestation.QuorumNumbers, uint32(rbn))
if err != nil {
s.metrics.IncrementFailedRequestNum("FetchBlobAttestationInfo")
errorResponse(c, fmt.Errorf("failed to fetch operator at reference block number: %w", err))
return
}

// Compute the signers and nonsigners for the blob, for each blob's quorums
signerIds := make(map[uint8][]string, 0)
nonsignerIds := make(map[uint8][]string, 0)
for q, innerMap := range operatorsByQuorum {
if _, exist := blobQuorums[q]; !exist {
continue
}
for _, op := range innerMap {
if _, exist := nonsigners[op.OperatorID]; exist {
nonsignerIds[q] = append(nonsignerIds[q], op.OperatorID.Hex())
} else {
signerIds[q] = append(signerIds[q], op.OperatorID.Hex())
}
}
}

response := &BlobAttestationInfoResponse{
BlobKey: blobKey.Hex(),
BatchHeaderHash: hex.EncodeToString(batchHeaderHash[:]),
InclusionInfo: attestationInfo.InclusionInfo,
Attestation: attestationInfo.Attestation,
AttestationInfo: &AttestationInfo{
Attestation: attestationInfo.Attestation,
SigningOperatorIds: signerIds,
NonsigningOperatorIds: nonsignerIds,
},
}

s.metrics.IncrementSuccessfulRequestNum("FetchBlobAttestationInfo")
Expand Down
127 changes: 127 additions & 0 deletions disperser/dataapi/v2/feed_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package v2

import (
"sort"
"sync"
"time"
)

const (
bucketMinutes = 1
)

// FeedBucket represents a cached bucket of feed items
type FeedBucket[T any] struct {
StartTime time.Time
Items []T
}

// FeedCache implements a time-based caching system for feed items
type FeedCache[T any] struct {
cache map[int64]FeedBucket[T]
mutex sync.RWMutex
bucketSize time.Duration
maxBuckets int
getTimestamp func(T) time.Time
}

// NewFeedCache creates a new cache with specified bucket size and maximum number of buckets
func NewFeedCache[T any](
bucketSizeMinutes,
maxBuckets int,
getTimestamp func(T) time.Time,
) *FeedCache[T] {
return &FeedCache[T]{
cache: make(map[int64]FeedBucket[T]),
bucketSize: time.Duration(bucketSizeMinutes) * time.Minute,
maxBuckets: maxBuckets,
getTimestamp: getTimestamp,
}
}

// getBucketKey returns the cache key for a given timestamp
func (c *FeedCache[T]) getBucketKey(timestamp time.Time) int64 {
return timestamp.Truncate(c.bucketSize).Unix()
}

// GetItemsInRange retrieves items within the specified time range [startTime, endTime).
// The interval is inclusive of startTime and exclusive of endTime.
// fetchFromDB is a function that retrieves items from the database for a specific time bucket.
// The items returned from fetchFromDB are guaranteed to be sorted by timestamp in ascending order.
func (c *FeedCache[T]) GetItemsInRange(
startTime, endTime time.Time,
fetchFromDB func(start, end time.Time) ([]T, error),
) ([]T, error) {
var results []T

// Calculate all bucket keys needed
var bucketKeys []int64
current := startTime.Truncate(c.bucketSize)
for current.Before(endTime) {
bucketKeys = append(bucketKeys, c.getBucketKey(current))
current = current.Add(c.bucketSize)
}

c.mutex.Lock()
defer c.mutex.Unlock()

// Process each bucket
for _, bucketKey := range bucketKeys {
bucket, exists := c.cache[bucketKey]
if !exists {
// Convert key back to time
bucketStart := time.Unix(bucketKey, 0)
bucketEnd := bucketStart.Add(c.bucketSize)

// Fetch items from database
items, err := fetchFromDB(bucketStart, bucketEnd)
if err != nil {
return nil, err
}

// Cache the bucket
bucket = FeedBucket[T]{
StartTime: bucketStart,
Items: items,
}
c.cache[bucketKey] = bucket

// Cleanup old buckets if needed
c.cleanupOldBuckets()
}

// Filter items within the requested range
for _, item := range bucket.Items {
ts := c.getTimestamp(item)
if (ts.Equal(startTime) || ts.After(startTime)) &&
ts.Before(endTime) {
results = append(results, item)
}
}
}

// Results are already sorted by timestamp since each bucket is pre-sorted from fetchFromDB
// and we process buckets in chronological order
return results, nil
}

// cleanupOldBuckets removes the oldest buckets when cache size exceeds maxBuckets
func (c *FeedCache[T]) cleanupOldBuckets() {
if len(c.cache) <= c.maxBuckets {
return
}

// Convert keys to times and sort
var times []int64
for key := range c.cache {
times = append(times, key)
}
sort.Slice(times, func(i, j int) bool {
return times[i] < times[j]
})

// Remove oldest buckets
for i := 0; i < len(times)-c.maxBuckets; i++ {
delete(c.cache, times[i])
}
}
7 changes: 6 additions & 1 deletion disperser/dataapi/v2/server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,16 @@ type (
Certificate *corev2.BlobCertificate `json:"blob_certificate"`
}

AttestationInfo struct {
Attestation *corev2.Attestation `json:"attestation"`
NonsigningOperatorIds map[uint8][]string `json:"nonsigning_operator_ids"`
SigningOperatorIds map[uint8][]string `json:"signing_operator_ids"`
}
BlobAttestationInfoResponse struct {
BlobKey string `json:"blob_key"`
BatchHeaderHash string `json:"batch_header_hash"`
InclusionInfo *corev2.BlobInclusionInfo `json:"blob_inclusion_info"`
Attestation *corev2.Attestation `json:"attestation"`
AttestationInfo *AttestationInfo `json:"attestation_info"`
}

BlobInfo struct {
Expand Down
Loading

0 comments on commit 7e674f9

Please sign in to comment.