Skip to content

Commit

Permalink
feat: relax indexedChainState to ChainState for retrieval
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Dec 20, 2024
1 parent d3674bf commit 59b7ff7
Show file tree
Hide file tree
Showing 13 changed files with 105 additions and 125 deletions.
2 changes: 1 addition & 1 deletion api/clients/mock/node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (c *MockNodeClient) GetBlobHeader(ctx context.Context, socket string, batch
func (c *MockNodeClient) GetChunks(
ctx context.Context,
opID core.OperatorID,
opInfo *core.IndexedOperatorInfo,
opInfo *core.OperatorInfo,
batchHeaderHash [32]byte,
blobIndex uint32,
quorumID core.QuorumID,
Expand Down
4 changes: 2 additions & 2 deletions api/clients/node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type RetrievedChunks struct {

type NodeClient interface {
GetBlobHeader(ctx context.Context, socket string, batchHeaderHash [32]byte, blobIndex uint32) (*core.BlobHeader, *merkletree.Proof, error)
GetChunks(ctx context.Context, opID core.OperatorID, opInfo *core.IndexedOperatorInfo, batchHeaderHash [32]byte, blobIndex uint32, quorumID core.QuorumID, chunksChan chan RetrievedChunks)
GetChunks(ctx context.Context, opID core.OperatorID, opInfo *core.OperatorInfo, batchHeaderHash [32]byte, blobIndex uint32, quorumID core.QuorumID, chunksChan chan RetrievedChunks)
}

type client struct {
Expand Down Expand Up @@ -79,7 +79,7 @@ func (c client) GetBlobHeader(
func (c client) GetChunks(
ctx context.Context,
opID core.OperatorID,
opInfo *core.IndexedOperatorInfo,
opInfo *core.OperatorInfo,
batchHeaderHash [32]byte,
blobIndex uint32,
quorumID core.QuorumID,
Expand Down
16 changes: 8 additions & 8 deletions api/clients/retrieval_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type BlobChunks struct {

type retrievalClient struct {
logger logging.Logger
indexedChainState core.IndexedChainState
chainState core.ChainState
assignmentCoordinator core.AssignmentCoordinator
nodeClient NodeClient
verifier encoding.Verifier
Expand All @@ -63,15 +63,15 @@ type retrievalClient struct {
// NewRetrievalClient creates a new retrieval client.
func NewRetrievalClient(
logger logging.Logger,
chainState core.IndexedChainState,
chainState core.ChainState,
assignmentCoordinator core.AssignmentCoordinator,
nodeClient NodeClient,
verifier encoding.Verifier,
numConnections int) (RetrievalClient, error) {

return &retrievalClient{
logger: logger.With("component", "RetrievalClient"),
indexedChainState: chainState,
chainState: chainState,
assignmentCoordinator: assignmentCoordinator,
nodeClient: nodeClient,
verifier: verifier,
Expand Down Expand Up @@ -104,11 +104,11 @@ func (r *retrievalClient) RetrieveBlobChunks(ctx context.Context,
batchRoot [32]byte,
quorumID core.QuorumID) (*BlobChunks, error) {

indexedOperatorState, err := r.indexedChainState.GetIndexedOperatorState(ctx, referenceBlockNumber, []core.QuorumID{quorumID})
operatorState, err := r.chainState.GetOperatorState(ctx, referenceBlockNumber, []core.QuorumID{quorumID})
if err != nil {
return nil, err
}
operators, ok := indexedOperatorState.Operators[quorumID]
operators, ok := operatorState.Operators[quorumID]
if !ok {
return nil, fmt.Errorf("no quorum with ID: %d", quorumID)
}
Expand All @@ -118,7 +118,7 @@ func (r *retrievalClient) RetrieveBlobChunks(ctx context.Context,
var proof *merkletree.Proof
var proofVerified bool
for opID := range operators {
opInfo := indexedOperatorState.IndexedOperators[opID]
opInfo := operators[opID]
blobHeader, proof, err = r.nodeClient.GetBlobHeader(ctx, opInfo.Socket, batchHeaderHash, blobIndex)
if err != nil {
// try another operator
Expand Down Expand Up @@ -172,7 +172,7 @@ func (r *retrievalClient) RetrieveBlobChunks(ctx context.Context,
return nil, err
}

assignments, info, err := r.assignmentCoordinator.GetAssignments(indexedOperatorState.OperatorState, blobHeader.Length, quorumHeader)
assignments, info, err := r.assignmentCoordinator.GetAssignments(operatorState, blobHeader.Length, quorumHeader)
if err != nil {
return nil, errors.New("failed to get assignments")
}
Expand All @@ -182,7 +182,7 @@ func (r *retrievalClient) RetrieveBlobChunks(ctx context.Context,
pool := workerpool.New(r.numConnections)
for opID := range operators {
opID := opID
opInfo := indexedOperatorState.IndexedOperators[opID]
opInfo := operators[opID]
pool.Submit(func() {
r.nodeClient.GetChunks(ctx, opID, opInfo, batchHeaderHash, blobIndex, quorumID, chunksChan)
})
Expand Down
7 changes: 1 addition & 6 deletions api/clients/retrieval_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,7 @@ func setup(t *testing.T) {
indexer = &indexermock.MockIndexer{}
indexer.On("Index").Return(nil).Once()

ics, err := coreindexer.NewIndexedChainState(chainState, indexer)
if err != nil {
panic("failed to create a new indexed chain state")
}

retrievalClient, err = clients.NewRetrievalClient(logger, ics, coordinator, nodeClient, v, 2)
retrievalClient, err = clients.NewRetrievalClient(logger, chainState, coordinator, nodeClient, v, 2)
if err != nil {
panic("failed to create a new retrieval client")
}
Expand Down
33 changes: 16 additions & 17 deletions api/clients/v2/retrieval_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,27 @@ type RetrievalClient interface {
}

type retrievalClient struct {
logger logging.Logger
ethClient core.Reader
indexedChainState core.IndexedChainState
verifier encoding.Verifier
numConnections int
logger logging.Logger
ethClient core.Reader
chainState core.ChainState
verifier encoding.Verifier
numConnections int
}

// NewRetrievalClient creates a new retrieval client.
func NewRetrievalClient(
logger logging.Logger,
ethClient core.Reader,
chainState core.IndexedChainState,
chainState core.ChainState,
verifier encoding.Verifier,
numConnections int,
) RetrievalClient {
return &retrievalClient{
logger: logger.With("component", "RetrievalClient"),
ethClient: ethClient,
indexedChainState: chainState,
verifier: verifier,
numConnections: numConnections,
logger: logger.With("component", "RetrievalClient"),
ethClient: ethClient,
chainState: chainState,
verifier: verifier,
numConnections: numConnections,
}
}

Expand All @@ -65,11 +65,11 @@ func (r *retrievalClient) GetBlob(ctx context.Context, blobHeader *corev2.BlobHe
return nil, err
}

indexedOperatorState, err := r.indexedChainState.GetIndexedOperatorState(ctx, uint(referenceBlockNumber), []core.QuorumID{quorumID})
operatorState, err := r.chainState.GetOperatorState(ctx, uint(referenceBlockNumber), []core.QuorumID{quorumID})
if err != nil {
return nil, err
}
operators, ok := indexedOperatorState.Operators[quorumID]
operators, ok := operatorState.Operators[quorumID]
if !ok {
return nil, fmt.Errorf("no quorum with ID: %d", quorumID)
}
Expand All @@ -89,7 +89,7 @@ func (r *retrievalClient) GetBlob(ctx context.Context, blobHeader *corev2.BlobHe
return nil, err
}

assignments, err := corev2.GetAssignments(indexedOperatorState.OperatorState, blobParam, quorumID)
assignments, err := corev2.GetAssignments(operatorState, blobParam, quorumID)
if err != nil {
return nil, errors.New("failed to get assignments")
}
Expand All @@ -98,8 +98,7 @@ func (r *retrievalClient) GetBlob(ctx context.Context, blobHeader *corev2.BlobHe
chunksChan := make(chan clients.RetrievedChunks, len(operators))
pool := workerpool.New(r.numConnections)
for opID := range operators {
opID := opID
opInfo := indexedOperatorState.IndexedOperators[opID]
opInfo := operators[opID]
pool.Submit(func() {
r.getChunksFromOperator(ctx, opID, opInfo, blobKey, quorumID, chunksChan)
})
Expand Down Expand Up @@ -147,7 +146,7 @@ func (r *retrievalClient) GetBlob(ctx context.Context, blobHeader *corev2.BlobHe
func (r *retrievalClient) getChunksFromOperator(
ctx context.Context,
opID core.OperatorID,
opInfo *core.IndexedOperatorInfo,
opInfo *core.OperatorInfo,
blobKey corev2.BlobKey,
quorumID core.QuorumID,
chunksChan chan clients.RetrievedChunks,
Expand Down
44 changes: 39 additions & 5 deletions core/eth/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,14 @@ func (cs *ChainState) GetOperatorStateByOperator(ctx context.Context, blockNumbe
return nil, err
}

return getOperatorState(operatorsByQuorum, uint32(blockNumber))
socketMap := make(map[core.OperatorID]string)
socket, err := cs.Tx.GetOperatorSocket(ctx, operator)
if err != nil {
return nil, err
}
socketMap[operator] = socket

return getOperatorState(operatorsByQuorum, uint32(blockNumber), socketMap)

}

Expand All @@ -38,7 +45,12 @@ func (cs *ChainState) GetOperatorState(ctx context.Context, blockNumber uint, qu
return nil, err
}

return getOperatorState(operatorsByQuorum, uint32(blockNumber))
socketMap, err := cs.buildSocketMap(ctx, operatorsByQuorum)
if err != nil {
return nil, err
}

return getOperatorState(operatorsByQuorum, uint32(blockNumber), socketMap)
}

func (cs *ChainState) GetCurrentBlockNumber() (uint, error) {
Expand All @@ -59,7 +71,26 @@ func (cs *ChainState) GetOperatorSocket(ctx context.Context, blockNumber uint, o
return socket, nil
}

func getOperatorState(operatorsByQuorum core.OperatorStakes, blockNumber uint32) (*core.OperatorState, error) {
// buildSocketMap returns a map from operatorID to socket address for the operators in the operatorsByQuorum
func (cs *ChainState) buildSocketMap(ctx context.Context, operatorsByQuorum core.OperatorStakes) (map[core.OperatorID]string, error) {
socketMap := make(map[core.OperatorID]string)
for _, quorum := range operatorsByQuorum {
for _, op := range quorum {
// if the socket is already in the map, skip
if _, ok := socketMap[op.OperatorID]; ok {
continue
}
socket, err := cs.Tx.GetOperatorSocket(ctx, op.OperatorID)
if err != nil {
return nil, err
}
socketMap[op.OperatorID] = socket
}
}
return socketMap, nil
}

func getOperatorState(operatorsByQuorum core.OperatorStakes, blockNumber uint32, socketMap map[core.OperatorID]string) (*core.OperatorState, error) {
operators := make(map[core.QuorumID]map[core.OperatorID]*core.OperatorInfo)
totals := make(map[core.QuorumID]*core.OperatorInfo)

Expand All @@ -69,15 +100,18 @@ func getOperatorState(operatorsByQuorum core.OperatorStakes, blockNumber uint32)

for ind, op := range quorum {
operators[quorumID][op.OperatorID] = &core.OperatorInfo{
Stake: op.Stake,
Index: core.OperatorIndex(ind),
Stake: op.Stake,
Index: core.OperatorIndex(ind),
Socket: socketMap[op.OperatorID],
}
totalStake.Add(totalStake, op.Stake)
}

totals[quorumID] = &core.OperatorInfo{
Stake: totalStake,
Index: core.OperatorIndex(len(quorum)),
// no socket for the total
Socket: "",
}
}

Expand Down
2 changes: 2 additions & 0 deletions core/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type OperatorInfo struct {
Stake StakeAmount
// Index is the index of the operator within the quorum
Index OperatorIndex
// Socket is the socket address of the operator, in the form "host:port"
Socket string
}

// OperatorState contains information about the current state of operators which is stored in the blockchain state
Expand Down
38 changes: 22 additions & 16 deletions core/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,39 @@ func TestOperatorStateHash(t *testing.T) {
Operators: map[core.QuorumID]map[core.OperatorID]*core.OperatorInfo{
0: {
[32]byte{0}: &core.OperatorInfo{
Stake: big.NewInt(12),
Index: uint(2),
Stake: big.NewInt(12),
Index: uint(2),
Socket: "192.168.1.100:8080",
},
[32]byte{1}: &core.OperatorInfo{
Stake: big.NewInt(23),
Index: uint(3),
Stake: big.NewInt(23),
Index: uint(3),
Socket: "127.0.0.1:3000",
},
},
1: {
[32]byte{1}: &core.OperatorInfo{
Stake: big.NewInt(23),
Index: uint(3),
Stake: big.NewInt(23),
Index: uint(3),
Socket: "127.0.0.1:3000",
},
[32]byte{2}: &core.OperatorInfo{
Stake: big.NewInt(34),
Index: uint(4),
Stake: big.NewInt(34),
Index: uint(4),
Socket: "192.168.1.100:8080",
},
},
},
Totals: map[core.QuorumID]*core.OperatorInfo{
0: {
Stake: big.NewInt(35),
Index: uint(2),
Stake: big.NewInt(35),
Index: uint(2),
Socket: "",
},
1: {
Stake: big.NewInt(57),
Index: uint(2),
Stake: big.NewInt(57),
Index: uint(2),
Socket: "",
},
},
BlockNumber: uint(123),
Expand All @@ -50,8 +56,8 @@ func TestOperatorStateHash(t *testing.T) {
assert.NoError(t, err)
q0 := hash1[0]
q1 := hash1[1]
assert.Equal(t, "3805338f34f77ff1fa23bbc23b1e86c4", hex.EncodeToString(q0[:]))
assert.Equal(t, "2f110a29f2bdd8a19c2d87d05736be0a", hex.EncodeToString(q1[:]))
assert.Equal(t, "6098562ea2e61a8f68743f9162b0adc0", hex.EncodeToString(q0[:]))
assert.Equal(t, "8ceea2ec543eb311e51ccfdc9e00ea4f", hex.EncodeToString(q1[:]))

s2 := core.OperatorState{
Operators: map[core.QuorumID]map[core.OperatorID]*core.OperatorInfo{
Expand Down Expand Up @@ -93,6 +99,6 @@ func TestOperatorStateHash(t *testing.T) {
assert.NoError(t, err)
q0 = hash2[0]
q1 = hash2[1]
assert.Equal(t, "1836448b57ae79decdcb77157cf31698", hex.EncodeToString(q0[:]))
assert.Equal(t, "2f110a29f2bdd8a19c2d87d05736be0a", hex.EncodeToString(q1[:]))
assert.Equal(t, "dc1bbb0b2b5d20238adfd4bd33661423", hex.EncodeToString(q0[:]))
assert.Equal(t, "8ceea2ec543eb311e51ccfdc9e00ea4f", hex.EncodeToString(q1[:]))
}
21 changes: 3 additions & 18 deletions inabox/tests/integration_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
rollupbindings "github.com/Layr-Labs/eigenda/contracts/bindings/MockRollup"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/core/eth"
"github.com/Layr-Labs/eigenda/core/thegraph"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigenda/encoding/kzg"
"github.com/Layr-Labs/eigenda/encoding/kzg/verifier"
Expand Down Expand Up @@ -191,31 +190,17 @@ func setupRetrievalClient(testConfig *deploy.Config) error {
return err
}

graphBackoff, err := time.ParseDuration(testConfig.Retriever.RETRIEVER_GRAPH_BACKOFF)
if err != nil {
return err
}
maxRetries, err := strconv.Atoi(testConfig.Retriever.RETRIEVER_GRAPH_MAX_RETRIES)
if err != nil {
return err
}
ics := thegraph.MakeIndexedChainState(thegraph.Config{
Endpoint: testConfig.Retriever.RETRIEVER_GRAPH_URL,
PullInterval: graphBackoff,
MaxRetries: maxRetries,
}, cs, logger)

retrievalClient, err = clients.NewRetrievalClient(logger, ics, agn, nodeClient, v, 10)
retrievalClient, err = clients.NewRetrievalClient(logger, cs, agn, nodeClient, v, 10)
if err != nil {
return err
}
chainReader, err := eth.NewReader(logger, ethClient, testConfig.Retriever.RETRIEVER_BLS_OPERATOR_STATE_RETRIVER, testConfig.Retriever.RETRIEVER_EIGENDA_SERVICE_MANAGER)
if err != nil {
return err
}
retrievalClientV2 = clientsv2.NewRetrievalClient(logger, chainReader, ics, v, 10)
retrievalClientV2 = clientsv2.NewRetrievalClient(logger, chainReader, cs, v, 10)

return ics.Start(context.Background())
return nil
}

var _ = AfterSuite(func() {
Expand Down
Loading

0 comments on commit 59b7ff7

Please sign in to comment.