Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: relax indexedChainState to ChainState for retrieval #943

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/clients/mock/node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (c *MockNodeClient) GetBlobHeader(ctx context.Context, socket string, batch
func (c *MockNodeClient) GetChunks(
ctx context.Context,
opID core.OperatorID,
opInfo *core.IndexedOperatorInfo,
opInfo *core.OperatorInfo,
batchHeaderHash [32]byte,
blobIndex uint32,
quorumID core.QuorumID,
Expand Down
4 changes: 2 additions & 2 deletions api/clients/node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type RetrievedChunks struct {

type NodeClient interface {
GetBlobHeader(ctx context.Context, socket string, batchHeaderHash [32]byte, blobIndex uint32) (*core.BlobHeader, *merkletree.Proof, error)
GetChunks(ctx context.Context, opID core.OperatorID, opInfo *core.IndexedOperatorInfo, batchHeaderHash [32]byte, blobIndex uint32, quorumID core.QuorumID, chunksChan chan RetrievedChunks)
GetChunks(ctx context.Context, opID core.OperatorID, opInfo *core.OperatorInfo, batchHeaderHash [32]byte, blobIndex uint32, quorumID core.QuorumID, chunksChan chan RetrievedChunks)
}

type client struct {
Expand Down Expand Up @@ -79,7 +79,7 @@ func (c client) GetBlobHeader(
func (c client) GetChunks(
ctx context.Context,
opID core.OperatorID,
opInfo *core.IndexedOperatorInfo,
opInfo *core.OperatorInfo,
batchHeaderHash [32]byte,
blobIndex uint32,
quorumID core.QuorumID,
Expand Down
16 changes: 8 additions & 8 deletions api/clients/retrieval_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type BlobChunks struct {

type retrievalClient struct {
logger logging.Logger
indexedChainState core.IndexedChainState
chainState core.ChainState
assignmentCoordinator core.AssignmentCoordinator
nodeClient NodeClient
verifier encoding.Verifier
Expand All @@ -63,15 +63,15 @@ type retrievalClient struct {
// NewRetrievalClient creates a new retrieval client.
func NewRetrievalClient(
logger logging.Logger,
chainState core.IndexedChainState,
chainState core.ChainState,
assignmentCoordinator core.AssignmentCoordinator,
nodeClient NodeClient,
verifier encoding.Verifier,
numConnections int) (RetrievalClient, error) {

return &retrievalClient{
logger: logger.With("component", "RetrievalClient"),
indexedChainState: chainState,
chainState: chainState,
assignmentCoordinator: assignmentCoordinator,
nodeClient: nodeClient,
verifier: verifier,
Expand Down Expand Up @@ -104,11 +104,11 @@ func (r *retrievalClient) RetrieveBlobChunks(ctx context.Context,
batchRoot [32]byte,
quorumID core.QuorumID) (*BlobChunks, error) {

indexedOperatorState, err := r.indexedChainState.GetIndexedOperatorState(ctx, referenceBlockNumber, []core.QuorumID{quorumID})
operatorState, err := r.chainState.GetOperatorState(ctx, referenceBlockNumber, []core.QuorumID{quorumID})
if err != nil {
return nil, err
}
operators, ok := indexedOperatorState.Operators[quorumID]
operators, ok := operatorState.Operators[quorumID]
if !ok {
return nil, fmt.Errorf("no quorum with ID: %d", quorumID)
}
Expand All @@ -118,7 +118,7 @@ func (r *retrievalClient) RetrieveBlobChunks(ctx context.Context,
var proof *merkletree.Proof
var proofVerified bool
for opID := range operators {
opInfo := indexedOperatorState.IndexedOperators[opID]
opInfo := operators[opID]
blobHeader, proof, err = r.nodeClient.GetBlobHeader(ctx, opInfo.Socket, batchHeaderHash, blobIndex)
if err != nil {
// try another operator
Expand Down Expand Up @@ -172,7 +172,7 @@ func (r *retrievalClient) RetrieveBlobChunks(ctx context.Context,
return nil, err
}

assignments, info, err := r.assignmentCoordinator.GetAssignments(indexedOperatorState.OperatorState, blobHeader.Length, quorumHeader)
assignments, info, err := r.assignmentCoordinator.GetAssignments(operatorState, blobHeader.Length, quorumHeader)
if err != nil {
return nil, errors.New("failed to get assignments")
}
Expand All @@ -182,7 +182,7 @@ func (r *retrievalClient) RetrieveBlobChunks(ctx context.Context,
pool := workerpool.New(r.numConnections)
for opID := range operators {
opID := opID
opInfo := indexedOperatorState.IndexedOperators[opID]
opInfo := operators[opID]
pool.Submit(func() {
r.nodeClient.GetChunks(ctx, opID, opInfo, batchHeaderHash, blobIndex, quorumID, chunksChan)
})
Expand Down
7 changes: 1 addition & 6 deletions api/clients/retrieval_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,7 @@ func setup(t *testing.T) {
indexer = &indexermock.MockIndexer{}
indexer.On("Index").Return(nil).Once()

ics, err := coreindexer.NewIndexedChainState(chainState, indexer)
if err != nil {
panic("failed to create a new indexed chain state")
}

retrievalClient, err = clients.NewRetrievalClient(logger, ics, coordinator, nodeClient, v, 2)
retrievalClient, err = clients.NewRetrievalClient(logger, chainState, coordinator, nodeClient, v, 2)
if err != nil {
panic("failed to create a new retrieval client")
}
Expand Down
33 changes: 16 additions & 17 deletions api/clients/v2/retrieval_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,27 @@ type RetrievalClient interface {
}

type retrievalClient struct {
logger logging.Logger
ethClient core.Reader
indexedChainState core.IndexedChainState
verifier encoding.Verifier
numConnections int
logger logging.Logger
ethClient core.Reader
chainState core.ChainState
verifier encoding.Verifier
numConnections int
}

// NewRetrievalClient creates a new retrieval client.
func NewRetrievalClient(
logger logging.Logger,
ethClient core.Reader,
chainState core.IndexedChainState,
chainState core.ChainState,
verifier encoding.Verifier,
numConnections int,
) RetrievalClient {
return &retrievalClient{
logger: logger.With("component", "RetrievalClient"),
ethClient: ethClient,
indexedChainState: chainState,
verifier: verifier,
numConnections: numConnections,
logger: logger.With("component", "RetrievalClient"),
ethClient: ethClient,
chainState: chainState,
verifier: verifier,
numConnections: numConnections,
}
}

Expand All @@ -65,11 +65,11 @@ func (r *retrievalClient) GetBlob(ctx context.Context, blobHeader *corev2.BlobHe
return nil, err
}

indexedOperatorState, err := r.indexedChainState.GetIndexedOperatorState(ctx, uint(referenceBlockNumber), []core.QuorumID{quorumID})
operatorState, err := r.chainState.GetOperatorState(ctx, uint(referenceBlockNumber), []core.QuorumID{quorumID})
if err != nil {
return nil, err
}
operators, ok := indexedOperatorState.Operators[quorumID]
operators, ok := operatorState.Operators[quorumID]
if !ok {
return nil, fmt.Errorf("no quorum with ID: %d", quorumID)
}
Expand All @@ -89,7 +89,7 @@ func (r *retrievalClient) GetBlob(ctx context.Context, blobHeader *corev2.BlobHe
return nil, err
}

assignments, err := corev2.GetAssignments(indexedOperatorState.OperatorState, blobParam, quorumID)
assignments, err := corev2.GetAssignments(operatorState, blobParam, quorumID)
if err != nil {
return nil, errors.New("failed to get assignments")
}
Expand All @@ -98,8 +98,7 @@ func (r *retrievalClient) GetBlob(ctx context.Context, blobHeader *corev2.BlobHe
chunksChan := make(chan clients.RetrievedChunks, len(operators))
pool := workerpool.New(r.numConnections)
for opID := range operators {
opID := opID
opInfo := indexedOperatorState.IndexedOperators[opID]
opInfo := operators[opID]
pool.Submit(func() {
r.getChunksFromOperator(ctx, opID, opInfo, blobKey, quorumID, chunksChan)
})
Expand Down Expand Up @@ -147,7 +146,7 @@ func (r *retrievalClient) GetBlob(ctx context.Context, blobHeader *corev2.BlobHe
func (r *retrievalClient) getChunksFromOperator(
ctx context.Context,
opID core.OperatorID,
opInfo *core.IndexedOperatorInfo,
opInfo *core.OperatorInfo,
blobKey corev2.BlobKey,
quorumID core.QuorumID,
chunksChan chan clients.RetrievedChunks,
Expand Down
44 changes: 39 additions & 5 deletions core/eth/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,14 @@ func (cs *ChainState) GetOperatorStateByOperator(ctx context.Context, blockNumbe
return nil, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move these methods to eth reader and remove this struct?
Not sure why we need a separate struct for this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're caching the operator sockets, maybe we should keep this struct separate

}

return getOperatorState(operatorsByQuorum, uint32(blockNumber))
socketMap := make(map[core.OperatorID]string)
socket, err := cs.Tx.GetOperatorSocket(ctx, operator)
if err != nil {
return nil, err
}
socketMap[operator] = socket

return getOperatorState(operatorsByQuorum, uint32(blockNumber), socketMap)

}

Expand All @@ -38,7 +45,12 @@ func (cs *ChainState) GetOperatorState(ctx context.Context, blockNumber uint, qu
return nil, err
}

return getOperatorState(operatorsByQuorum, uint32(blockNumber))
socketMap, err := cs.buildSocketMap(ctx, operatorsByQuorum)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this mean whenever we call this method, it will query the chain # operators times?
this seems very inefficient, should we cache this?

Copy link
Contributor Author

@hopeyen hopeyen Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I expect this too. I can add socket map to the ChainState cache and only query if the operator is missing from the socket map. Although, that means we might need to add a refresh or a failover somewhere so that we don't use the old socket addresses after operators make an updater. Lmk what you think

if err != nil {
return nil, err
}

return getOperatorState(operatorsByQuorum, uint32(blockNumber), socketMap)
}

func (cs *ChainState) GetCurrentBlockNumber() (uint, error) {
Expand All @@ -59,7 +71,26 @@ func (cs *ChainState) GetOperatorSocket(ctx context.Context, blockNumber uint, o
return socket, nil
}

func getOperatorState(operatorsByQuorum core.OperatorStakes, blockNumber uint32) (*core.OperatorState, error) {
// buildSocketMap returns a map from operatorID to socket address for the operators in the operatorsByQuorum
func (cs *ChainState) buildSocketMap(ctx context.Context, operatorsByQuorum core.OperatorStakes) (map[core.OperatorID]string, error) {
socketMap := make(map[core.OperatorID]string)
for _, quorum := range operatorsByQuorum {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't the key the quorum and the value map[OperatorIndex]OperatorStake here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes exactly. This loop is to get all the operator ID and their socket from all quorums

Copy link
Contributor

@ian-shim ian-shim Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one way to address this would be to preload cache initially by querying every single operator in the current block. Then for the following requests, query the current set of operators, query the logs by filtering on SocketUpdate event, find any socket updates for existing operators, and refresh sockets for all operators.
Then the subsequent requests will only cost 2 eth calls

for _, op := range quorum {
// if the socket is already in the map, skip
if _, ok := socketMap[op.OperatorID]; ok {
continue
}
socket, err := cs.Tx.GetOperatorSocket(ctx, op.OperatorID)
if err != nil {
return nil, err
}
socketMap[op.OperatorID] = socket
}
}
return socketMap, nil
}

func getOperatorState(operatorsByQuorum core.OperatorStakes, blockNumber uint32, socketMap map[core.OperatorID]string) (*core.OperatorState, error) {
operators := make(map[core.QuorumID]map[core.OperatorID]*core.OperatorInfo)
totals := make(map[core.QuorumID]*core.OperatorInfo)

Expand All @@ -69,15 +100,18 @@ func getOperatorState(operatorsByQuorum core.OperatorStakes, blockNumber uint32)

for ind, op := range quorum {
operators[quorumID][op.OperatorID] = &core.OperatorInfo{
Stake: op.Stake,
Index: core.OperatorIndex(ind),
Stake: op.Stake,
Index: core.OperatorIndex(ind),
Socket: socketMap[op.OperatorID],
}
totalStake.Add(totalStake, op.Stake)
}

totals[quorumID] = &core.OperatorInfo{
Stake: totalStake,
Index: core.OperatorIndex(len(quorum)),
// no socket for the total
Socket: "",
}
}

Expand Down
2 changes: 2 additions & 0 deletions core/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type OperatorInfo struct {
Stake StakeAmount
// Index is the index of the operator within the quorum
Index OperatorIndex
// Socket is the socket address of the operator, in the form "host:port"
Socket string
}

// OperatorState contains information about the current state of operators which is stored in the blockchain state
Expand Down
38 changes: 22 additions & 16 deletions core/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,39 @@ func TestOperatorStateHash(t *testing.T) {
Operators: map[core.QuorumID]map[core.OperatorID]*core.OperatorInfo{
0: {
[32]byte{0}: &core.OperatorInfo{
Stake: big.NewInt(12),
Index: uint(2),
Stake: big.NewInt(12),
Index: uint(2),
Socket: "192.168.1.100:8080",
},
[32]byte{1}: &core.OperatorInfo{
Stake: big.NewInt(23),
Index: uint(3),
Stake: big.NewInt(23),
Index: uint(3),
Socket: "127.0.0.1:3000",
},
},
1: {
[32]byte{1}: &core.OperatorInfo{
Stake: big.NewInt(23),
Index: uint(3),
Stake: big.NewInt(23),
Index: uint(3),
Socket: "127.0.0.1:3000",
},
[32]byte{2}: &core.OperatorInfo{
Stake: big.NewInt(34),
Index: uint(4),
Stake: big.NewInt(34),
Index: uint(4),
Socket: "192.168.1.100:8080",
},
},
},
Totals: map[core.QuorumID]*core.OperatorInfo{
0: {
Stake: big.NewInt(35),
Index: uint(2),
Stake: big.NewInt(35),
Index: uint(2),
Socket: "",
},
1: {
Stake: big.NewInt(57),
Index: uint(2),
Stake: big.NewInt(57),
Index: uint(2),
Socket: "",
},
},
BlockNumber: uint(123),
Expand All @@ -50,8 +56,8 @@ func TestOperatorStateHash(t *testing.T) {
assert.NoError(t, err)
q0 := hash1[0]
q1 := hash1[1]
assert.Equal(t, "3805338f34f77ff1fa23bbc23b1e86c4", hex.EncodeToString(q0[:]))
assert.Equal(t, "2f110a29f2bdd8a19c2d87d05736be0a", hex.EncodeToString(q1[:]))
assert.Equal(t, "6098562ea2e61a8f68743f9162b0adc0", hex.EncodeToString(q0[:]))
assert.Equal(t, "8ceea2ec543eb311e51ccfdc9e00ea4f", hex.EncodeToString(q1[:]))

s2 := core.OperatorState{
Operators: map[core.QuorumID]map[core.OperatorID]*core.OperatorInfo{
Expand Down Expand Up @@ -93,6 +99,6 @@ func TestOperatorStateHash(t *testing.T) {
assert.NoError(t, err)
q0 = hash2[0]
q1 = hash2[1]
assert.Equal(t, "1836448b57ae79decdcb77157cf31698", hex.EncodeToString(q0[:]))
assert.Equal(t, "2f110a29f2bdd8a19c2d87d05736be0a", hex.EncodeToString(q1[:]))
assert.Equal(t, "dc1bbb0b2b5d20238adfd4bd33661423", hex.EncodeToString(q0[:]))
assert.Equal(t, "8ceea2ec543eb311e51ccfdc9e00ea4f", hex.EncodeToString(q1[:]))
}
21 changes: 3 additions & 18 deletions inabox/tests/integration_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
rollupbindings "github.com/Layr-Labs/eigenda/contracts/bindings/MockRollup"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/core/eth"
"github.com/Layr-Labs/eigenda/core/thegraph"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigenda/encoding/kzg"
"github.com/Layr-Labs/eigenda/encoding/kzg/verifier"
Expand Down Expand Up @@ -191,31 +190,17 @@ func setupRetrievalClient(testConfig *deploy.Config) error {
return err
}

graphBackoff, err := time.ParseDuration(testConfig.Retriever.RETRIEVER_GRAPH_BACKOFF)
if err != nil {
return err
}
maxRetries, err := strconv.Atoi(testConfig.Retriever.RETRIEVER_GRAPH_MAX_RETRIES)
if err != nil {
return err
}
ics := thegraph.MakeIndexedChainState(thegraph.Config{
Endpoint: testConfig.Retriever.RETRIEVER_GRAPH_URL,
PullInterval: graphBackoff,
MaxRetries: maxRetries,
}, cs, logger)

retrievalClient, err = clients.NewRetrievalClient(logger, ics, agn, nodeClient, v, 10)
retrievalClient, err = clients.NewRetrievalClient(logger, cs, agn, nodeClient, v, 10)
if err != nil {
return err
}
chainReader, err := eth.NewReader(logger, ethClient, testConfig.Retriever.RETRIEVER_BLS_OPERATOR_STATE_RETRIVER, testConfig.Retriever.RETRIEVER_EIGENDA_SERVICE_MANAGER)
if err != nil {
return err
}
retrievalClientV2 = clientsv2.NewRetrievalClient(logger, chainReader, ics, v, 10)
retrievalClientV2 = clientsv2.NewRetrievalClient(logger, chainReader, cs, v, 10)

return ics.Start(context.Background())
return nil
}

var _ = AfterSuite(func() {
Expand Down
Loading
Loading