Skip to content

Commit

Permalink
add minibatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Jul 23, 2024
1 parent 3285f41 commit 508e17b
Show file tree
Hide file tree
Showing 6 changed files with 860 additions and 40 deletions.
123 changes: 106 additions & 17 deletions disperser/batcher/inmem/minibatch_store.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package inmem

import (
"context"
"fmt"
"sync"

"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/disperser/batcher"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/google/uuid"
Expand All @@ -14,10 +17,11 @@ type minibatchStore struct {
// MinibatchRecords maps batch IDs to a map from minibatch indices to minibatch records
MinibatchRecords map[uuid.UUID]map[uint]*batcher.MinibatchRecord
// DispersalRequests maps batch IDs to a map from minibatch indices to dispersal requests
DispersalRequests map[uuid.UUID]map[uint]*batcher.DispersalRequest
DispersalRequests map[uuid.UUID]map[uint][]*batcher.DispersalRequest
// DispersalResponses maps batch IDs to a map from minibatch indices to dispersal responses
DispersalResponses map[uuid.UUID]map[uint]*batcher.DispersalResponse
DispersalResponses map[uuid.UUID]map[uint][]*batcher.DispersalResponse

mu sync.RWMutex
logger logging.Logger
}

Expand All @@ -27,28 +31,37 @@ func NewMinibatchStore(logger logging.Logger) batcher.MinibatchStore {
return &minibatchStore{
BatchRecords: make(map[uuid.UUID]*batcher.BatchRecord),
MinibatchRecords: make(map[uuid.UUID]map[uint]*batcher.MinibatchRecord),
DispersalRequests: make(map[uuid.UUID]map[uint]*batcher.DispersalRequest),
DispersalResponses: make(map[uuid.UUID]map[uint]*batcher.DispersalResponse),
DispersalRequests: make(map[uuid.UUID]map[uint][]*batcher.DispersalRequest),
DispersalResponses: make(map[uuid.UUID]map[uint][]*batcher.DispersalResponse),

logger: logger,
}
}

func (m *minibatchStore) PutBatch(batch *batcher.BatchRecord) error {
func (m *minibatchStore) PutBatch(ctx context.Context, batch *batcher.BatchRecord) error {
m.mu.Lock()
defer m.mu.Unlock()

m.BatchRecords[batch.ID] = batch

return nil
}

func (m *minibatchStore) GetBatch(batchID uuid.UUID) (*batcher.BatchRecord, error) {
func (m *minibatchStore) GetBatch(ctx context.Context, batchID uuid.UUID) (*batcher.BatchRecord, error) {
m.mu.RLock()
defer m.mu.RUnlock()

b, ok := m.BatchRecords[batchID]
if !ok {
return nil, fmt.Errorf("batch not found")
}
return b, nil
}

func (m *minibatchStore) PutMiniBatch(minibatch *batcher.MinibatchRecord) error {
func (m *minibatchStore) PutMiniBatch(ctx context.Context, minibatch *batcher.MinibatchRecord) error {
m.mu.Lock()
defer m.mu.Unlock()

if _, ok := m.MinibatchRecords[minibatch.BatchID]; !ok {
m.MinibatchRecords[minibatch.BatchID] = make(map[uint]*batcher.MinibatchRecord)
}
Expand All @@ -57,47 +70,123 @@ func (m *minibatchStore) PutMiniBatch(minibatch *batcher.MinibatchRecord) error
return nil
}

func (m *minibatchStore) GetMiniBatch(batchID uuid.UUID, minibatchIndex uint) (*batcher.MinibatchRecord, error) {
func (m *minibatchStore) GetMiniBatch(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) (*batcher.MinibatchRecord, error) {
m.mu.RLock()
defer m.mu.RUnlock()

if _, ok := m.MinibatchRecords[batchID]; !ok {
return nil, nil
}
return m.MinibatchRecords[batchID][minibatchIndex], nil
}

func (m *minibatchStore) PutDispersalRequest(request *batcher.DispersalRequest) error {
func (m *minibatchStore) PutDispersalRequest(ctx context.Context, request *batcher.DispersalRequest) error {
m.mu.Lock()
defer m.mu.Unlock()

if _, ok := m.DispersalRequests[request.BatchID]; !ok {
m.DispersalRequests[request.BatchID] = make(map[uint]*batcher.DispersalRequest)
m.DispersalRequests[request.BatchID] = make(map[uint][]*batcher.DispersalRequest)
}

if _, ok := m.DispersalRequests[request.BatchID][request.MinibatchIndex]; !ok {
m.DispersalRequests[request.BatchID][request.MinibatchIndex] = make([]*batcher.DispersalRequest, 0)
}

for _, r := range m.DispersalRequests[request.BatchID][request.MinibatchIndex] {
if r.OperatorID == request.OperatorID {
// replace existing record
*r = *request
return nil
}
}
m.DispersalRequests[request.BatchID][request.MinibatchIndex] = request

m.DispersalRequests[request.BatchID][request.MinibatchIndex] = append(m.DispersalRequests[request.BatchID][request.MinibatchIndex], request)

return nil
}

func (m *minibatchStore) GetDispersalRequest(batchID uuid.UUID, minibatchIndex uint) (*batcher.DispersalRequest, error) {
func (m *minibatchStore) GetDispersalRequest(ctx context.Context, batchID uuid.UUID, minibatchIndex uint, opID core.OperatorID) (*batcher.DispersalRequest, error) {
m.mu.RLock()
defer m.mu.RUnlock()

requests, err := m.GetDispersalRequests(ctx, batchID, minibatchIndex)
if err != nil {
return nil, err
}
for _, r := range requests {
if r.OperatorID == opID {
return r, nil
}
}
return nil, nil
}

func (m *minibatchStore) GetDispersalRequests(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*batcher.DispersalRequest, error) {
m.mu.RLock()
defer m.mu.RUnlock()

if _, ok := m.DispersalRequests[batchID]; !ok {
return nil, nil
}

return m.DispersalRequests[batchID][minibatchIndex], nil
}

func (m *minibatchStore) PutDispersalResponse(response *batcher.DispersalResponse) error {
func (m *minibatchStore) PutDispersalResponse(ctx context.Context, response *batcher.DispersalResponse) error {
m.mu.Lock()
defer m.mu.Unlock()

if _, ok := m.DispersalResponses[response.BatchID]; !ok {
m.DispersalResponses[response.BatchID] = make(map[uint]*batcher.DispersalResponse)
m.DispersalResponses[response.BatchID] = make(map[uint][]*batcher.DispersalResponse)
}

if _, ok := m.DispersalResponses[response.BatchID][response.MinibatchIndex]; !ok {
m.DispersalResponses[response.BatchID][response.MinibatchIndex] = make([]*batcher.DispersalResponse, 0)
}
m.DispersalResponses[response.BatchID][response.MinibatchIndex] = response

for _, r := range m.DispersalResponses[response.BatchID][response.MinibatchIndex] {
if r.OperatorID == response.OperatorID {
// replace existing record
*r = *response
return nil
}
}

m.DispersalResponses[response.BatchID][response.MinibatchIndex] = append(m.DispersalResponses[response.BatchID][response.MinibatchIndex], response)

return nil
}

func (m *minibatchStore) GetDispersalResponse(batchID uuid.UUID, minibatchIndex uint) (*batcher.DispersalResponse, error) {
func (m *minibatchStore) GetDispersalResponse(ctx context.Context, batchID uuid.UUID, minibatchIndex uint, opID core.OperatorID) (*batcher.DispersalResponse, error) {
m.mu.RLock()
defer m.mu.RUnlock()

responses, err := m.GetDispersalResponses(ctx, batchID, minibatchIndex)
if err != nil {
return nil, err
}
for _, r := range responses {
if r.OperatorID == opID {
return r, nil
}
}
return nil, nil
}

func (m *minibatchStore) GetDispersalResponses(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*batcher.DispersalResponse, error) {
m.mu.RLock()
defer m.mu.RUnlock()

if _, ok := m.DispersalResponses[batchID]; !ok {
return nil, nil
}

return m.DispersalResponses[batchID][minibatchIndex], nil
}

func (m *minibatchStore) GetPendingBatch() (*batcher.BatchRecord, error) {
func (m *minibatchStore) GetPendingBatch(ctx context.Context) (*batcher.BatchRecord, error) {
m.mu.RLock()
defer m.mu.RUnlock()

return nil, nil
}
80 changes: 66 additions & 14 deletions disperser/batcher/inmem/minibatch_store_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package inmem_test

import (
"context"
"testing"
"time"

Expand Down Expand Up @@ -29,9 +30,10 @@ func TestPutBatch(t *testing.T) {
AggregatePubKey: nil,
AggregateSignature: nil,
}
err = s.PutBatch(batch)
ctx := context.Background()
err = s.PutBatch(ctx, batch)
assert.NoError(t, err)
b, err := s.GetBatch(batch.ID)
b, err := s.GetBatch(ctx, batch.ID)
assert.NoError(t, err)
assert.Equal(t, batch, b)
}
Expand All @@ -47,9 +49,10 @@ func TestPutMiniBatch(t *testing.T) {
BatchSize: 1,
ReferenceBlockNumber: 1,
}
err = s.PutMiniBatch(minibatch)
ctx := context.Background()
err = s.PutMiniBatch(ctx, minibatch)
assert.NoError(t, err)
m, err := s.GetMiniBatch(minibatch.BatchID, minibatch.MinibatchIndex)
m, err := s.GetMiniBatch(ctx, minibatch.BatchID, minibatch.MinibatchIndex)
assert.NoError(t, err)
assert.Equal(t, minibatch, m)
}
Expand All @@ -58,29 +61,54 @@ func TestPutDispersalRequest(t *testing.T) {
s := newMinibatchStore()
id, err := uuid.NewV7()
assert.NoError(t, err)
request := &batcher.DispersalRequest{
minibatchIndex := uint(0)
ctx := context.Background()
req1 := &batcher.DispersalRequest{
BatchID: id,
MinibatchIndex: 0,
MinibatchIndex: minibatchIndex,
OperatorID: core.OperatorID([32]byte{1}),
OperatorAddress: gcommon.HexToAddress("0x0"),
NumBlobs: 1,
RequestedAt: time.Now().UTC(),
}
err = s.PutDispersalRequest(request)
err = s.PutDispersalRequest(ctx, req1)
assert.NoError(t, err)
r, err := s.GetDispersalRequest(request.BatchID, request.MinibatchIndex)
req2 := &batcher.DispersalRequest{
BatchID: id,
MinibatchIndex: minibatchIndex,
OperatorID: core.OperatorID([32]byte{2}),
OperatorAddress: gcommon.HexToAddress("0x0"),
NumBlobs: 1,
RequestedAt: time.Now().UTC(),
}
err = s.PutDispersalRequest(ctx, req2)
assert.NoError(t, err)

r, err := s.GetDispersalRequests(ctx, id, minibatchIndex)
assert.NoError(t, err)
assert.Equal(t, request, r)
assert.Len(t, r, 2)
assert.Equal(t, req1, r[0])
assert.Equal(t, req2, r[1])

req, err := s.GetDispersalRequest(ctx, id, minibatchIndex, req1.OperatorID)
assert.NoError(t, err)
assert.Equal(t, req1, req)

req, err = s.GetDispersalRequest(ctx, id, minibatchIndex, req2.OperatorID)
assert.NoError(t, err)
assert.Equal(t, req2, req)
}

func TestPutDispersalResponse(t *testing.T) {
s := newMinibatchStore()
id, err := uuid.NewV7()
assert.NoError(t, err)
response := &batcher.DispersalResponse{
ctx := context.Background()
minibatchIndex := uint(0)
resp1 := &batcher.DispersalResponse{
DispersalRequest: batcher.DispersalRequest{
BatchID: id,
MinibatchIndex: 0,
MinibatchIndex: minibatchIndex,
OperatorID: core.OperatorID([32]byte{1}),
OperatorAddress: gcommon.HexToAddress("0x0"),
NumBlobs: 1,
Expand All @@ -90,9 +118,33 @@ func TestPutDispersalResponse(t *testing.T) {
RespondedAt: time.Now().UTC(),
Error: nil,
}
err = s.PutDispersalResponse(response)
resp2 := &batcher.DispersalResponse{
DispersalRequest: batcher.DispersalRequest{
BatchID: id,
MinibatchIndex: minibatchIndex,
OperatorID: core.OperatorID([32]byte{2}),
OperatorAddress: gcommon.HexToAddress("0x0"),
NumBlobs: 1,
RequestedAt: time.Now().UTC(),
},
Signatures: nil,
RespondedAt: time.Now().UTC(),
Error: nil,
}
err = s.PutDispersalResponse(ctx, resp1)
assert.NoError(t, err)
r, err := s.GetDispersalResponse(response.BatchID, response.MinibatchIndex)
err = s.PutDispersalResponse(ctx, resp2)
assert.NoError(t, err)

r, err := s.GetDispersalResponses(ctx, id, minibatchIndex)
assert.NoError(t, err)
assert.Len(t, r, 2)

resp, err := s.GetDispersalResponse(ctx, id, minibatchIndex, resp1.OperatorID)
assert.NoError(t, err)
assert.Equal(t, resp1, resp)

resp, err = s.GetDispersalResponse(ctx, id, minibatchIndex, resp2.OperatorID)
assert.NoError(t, err)
assert.Equal(t, response, r)
assert.Equal(t, resp2, resp)
}
22 changes: 13 additions & 9 deletions disperser/batcher/minibatch_store.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package batcher

import (
"context"
"time"

"github.com/Layr-Labs/eigenda/core"
Expand Down Expand Up @@ -30,6 +31,7 @@ type DispersalRequest struct {
MinibatchIndex uint
core.OperatorID
OperatorAddress gcommon.Address
Socket string
NumBlobs uint
RequestedAt time.Time
}
Expand All @@ -42,13 +44,15 @@ type DispersalResponse struct {
}

type MinibatchStore interface {
PutBatch(batch *BatchRecord) error
GetBatch(batchID uuid.UUID) (*BatchRecord, error)
PutMiniBatch(minibatch *MinibatchRecord) error
GetMiniBatch(batchID uuid.UUID, minibatchIndex uint) (*MinibatchRecord, error)
PutDispersalRequest(request *DispersalRequest) error
GetDispersalRequest(batchID uuid.UUID, minibatchIndex uint) (*DispersalRequest, error)
PutDispersalResponse(response *DispersalResponse) error
GetDispersalResponse(batchID uuid.UUID, minibatchIndex uint) (*DispersalResponse, error)
GetPendingBatch() (*BatchRecord, error)
PutBatch(ctx context.Context, batch *BatchRecord) error
GetBatch(ctx context.Context, batchID uuid.UUID) (*BatchRecord, error)
PutMiniBatch(ctx context.Context, minibatch *MinibatchRecord) error
GetMiniBatch(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) (*MinibatchRecord, error)
PutDispersalRequest(ctx context.Context, request *DispersalRequest) error
GetDispersalRequest(ctx context.Context, batchID uuid.UUID, minibatchIndex uint, opID core.OperatorID) (*DispersalRequest, error)
GetDispersalRequests(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*DispersalRequest, error)
PutDispersalResponse(ctx context.Context, response *DispersalResponse) error
GetDispersalResponse(ctx context.Context, batchID uuid.UUID, minibatchIndex uint, opID core.OperatorID) (*DispersalResponse, error)
GetDispersalResponses(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*DispersalResponse, error)
GetPendingBatch(ctx context.Context) (*BatchRecord, error)
}
Loading

0 comments on commit 508e17b

Please sign in to comment.