From 5e174b5ff7d99fca606c2bd738672215a04ad3e7 Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Wed, 15 Jan 2025 16:08:18 -0600 Subject: [PATCH 01/11] refactor: update protos to be correctly namespaced --- go.mod | 2 +- go.sum | 2 + pkg/rpcServer/handlers.go | 2 +- pkg/rpcServer/healthHandlers.go | 12 ++-- pkg/rpcServer/proofsHandlers.go | 16 +++--- pkg/rpcServer/protocolHandlers.go | 31 ++++++++++ pkg/rpcServer/rewardsHandlers.go | 44 +++++++------- pkg/rpcServer/server.go | 24 +++++--- pkg/service/protocolDataService/protocol.go | 63 +++++++++++++++++++++ pkg/service/types/pagination.go | 6 ++ 10 files changed, 156 insertions(+), 46 deletions(-) create mode 100644 pkg/rpcServer/protocolHandlers.go create mode 100644 pkg/service/protocolDataService/protocol.go create mode 100644 pkg/service/types/pagination.go diff --git a/go.mod b/go.mod index d5a648ff..ea1940a9 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/DataDog/datadog-go/v5 v5.5.0 github.com/Layr-Labs/eigenlayer-contracts v0.4.1-holesky-pepe.0.20240813143901-00fc4b95e9c1 github.com/Layr-Labs/eigenlayer-rewards-proofs v0.2.13 - github.com/Layr-Labs/protocol-apis v1.1.1-0.20250114181701-acb87ef4eeb5 + github.com/Layr-Labs/protocol-apis v1.1.1-0.20250115220323-135176acb92b github.com/ethereum/go-ethereum v1.14.9 github.com/gocarina/gocsv v0.0.0-20240520201108-78e41c74b4b1 github.com/google/uuid v1.6.0 diff --git a/go.sum b/go.sum index f072e3ec..46bbb474 100644 --- a/go.sum +++ b/go.sum @@ -51,6 +51,8 @@ github.com/Layr-Labs/protocol-apis v1.1.1-0.20250114180833-6f2487a7e08c h1:kcTwH github.com/Layr-Labs/protocol-apis v1.1.1-0.20250114180833-6f2487a7e08c/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk= github.com/Layr-Labs/protocol-apis v1.1.1-0.20250114181701-acb87ef4eeb5 h1:0PLxb8fpwdpWpfk24yhdZzETFCxVMN2yJjRDyBBf6wM= github.com/Layr-Labs/protocol-apis v1.1.1-0.20250114181701-acb87ef4eeb5/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk= +github.com/Layr-Labs/protocol-apis v1.1.1-0.20250115220323-135176acb92b h1:eJmPAq3s+AwOrQUjSFXILCzUstDZobwYEraOZ2NFC1M= +github.com/Layr-Labs/protocol-apis v1.1.1-0.20250115220323-135176acb92b/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk= github.com/Microsoft/go-winio v0.5.0/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= diff --git a/pkg/rpcServer/handlers.go b/pkg/rpcServer/handlers.go index e11c7e0b..d475aa56 100644 --- a/pkg/rpcServer/handlers.go +++ b/pkg/rpcServer/handlers.go @@ -3,7 +3,7 @@ package rpcServer import ( "context" - sidecarV1 "github.com/Layr-Labs/protocol-apis/gen/protos/eigenlayer/sidecar/v1" + sidecarV1 "github.com/Layr-Labs/protocol-apis/gen/protos/eigenlayer/sidecar/v1/sidecar" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) diff --git a/pkg/rpcServer/healthHandlers.go b/pkg/rpcServer/healthHandlers.go index d256fcb5..6f56f003 100644 --- a/pkg/rpcServer/healthHandlers.go +++ b/pkg/rpcServer/healthHandlers.go @@ -2,17 +2,17 @@ package rpcServer import ( "context" - sidecarV1 "github.com/Layr-Labs/protocol-apis/gen/protos/eigenlayer/sidecar/v1" + healthV1 "github.com/Layr-Labs/protocol-apis/gen/protos/eigenlayer/sidecar/v1/health" ) -func (rpc *RpcServer) HealthCheck(ctx context.Context, req *sidecarV1.HealthCheckRequest) (*sidecarV1.HealthCheckResponse, error) { - return &sidecarV1.HealthCheckResponse{ - Status: sidecarV1.HealthCheckResponse_SERVING, +func (rpc *RpcServer) HealthCheck(ctx context.Context, req *healthV1.HealthCheckRequest) (*healthV1.HealthCheckResponse, error) { + return &healthV1.HealthCheckResponse{ + Status: healthV1.HealthCheckResponse_SERVING, }, nil } -func (rpc *RpcServer) ReadyCheck(ctx context.Context, req *sidecarV1.ReadyRequest) (*sidecarV1.ReadyResponse, error) { - return &sidecarV1.ReadyResponse{ +func (rpc *RpcServer) ReadyCheck(ctx context.Context, req *healthV1.ReadyRequest) (*healthV1.ReadyResponse, error) { + return &healthV1.ReadyResponse{ Ready: true, }, nil } diff --git a/pkg/rpcServer/proofsHandlers.go b/pkg/rpcServer/proofsHandlers.go index be4ca7d3..6bb4ce5b 100644 --- a/pkg/rpcServer/proofsHandlers.go +++ b/pkg/rpcServer/proofsHandlers.go @@ -3,27 +3,27 @@ package rpcServer import ( "context" "github.com/Layr-Labs/eigenlayer-rewards-proofs/pkg/claimgen" - sidecarV1 "github.com/Layr-Labs/protocol-apis/gen/protos/eigenlayer/sidecar/v1" + rewardsV1 "github.com/Layr-Labs/protocol-apis/gen/protos/eigenlayer/sidecar/v1/rewards" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) -func convertClaimProofToRPCResponse(solidityProof *claimgen.IRewardsCoordinatorRewardsMerkleClaimStrings) *sidecarV1.Proof { - tokenLeaves := make([]*sidecarV1.TokenLeaf, 0) +func convertClaimProofToRPCResponse(solidityProof *claimgen.IRewardsCoordinatorRewardsMerkleClaimStrings) *rewardsV1.Proof { + tokenLeaves := make([]*rewardsV1.TokenLeaf, 0) for _, l := range solidityProof.TokenLeaves { - tokenLeaves = append(tokenLeaves, &sidecarV1.TokenLeaf{ + tokenLeaves = append(tokenLeaves, &rewardsV1.TokenLeaf{ Token: l.Token.String(), CumulativeEarnings: l.CumulativeEarnings, }) } - return &sidecarV1.Proof{ + return &rewardsV1.Proof{ Root: solidityProof.Root, RootIndex: solidityProof.RootIndex, EarnerIndex: solidityProof.EarnerIndex, EarnerTreeProof: solidityProof.EarnerTreeProof, - EarnerLeaf: &sidecarV1.EarnerLeaf{ + EarnerLeaf: &rewardsV1.EarnerLeaf{ Earner: solidityProof.EarnerLeaf.Earner.String(), EarnerTokenRoot: solidityProof.EarnerLeaf.EarnerTokenRoot, }, @@ -33,7 +33,7 @@ func convertClaimProofToRPCResponse(solidityProof *claimgen.IRewardsCoordinatorR } } -func (rpc *RpcServer) GenerateClaimProof(ctx context.Context, req *sidecarV1.GenerateClaimProofRequest) (*sidecarV1.GenerateClaimProofResponse, error) { +func (rpc *RpcServer) GenerateClaimProof(ctx context.Context, req *rewardsV1.GenerateClaimProofRequest) (*rewardsV1.GenerateClaimProofResponse, error) { earner := req.GetEarnerAddress() tokens := req.GetTokens() rootIndex := req.GetRootIndex() @@ -52,7 +52,7 @@ func (rpc *RpcServer) GenerateClaimProof(ctx context.Context, req *sidecarV1.Gen solidityClaim := claimgen.FormatProofForSolidity(root, claim) - return &sidecarV1.GenerateClaimProofResponse{ + return &rewardsV1.GenerateClaimProofResponse{ Proof: convertClaimProofToRPCResponse(solidityClaim), }, nil } diff --git a/pkg/rpcServer/protocolHandlers.go b/pkg/rpcServer/protocolHandlers.go new file mode 100644 index 00000000..68aaf179 --- /dev/null +++ b/pkg/rpcServer/protocolHandlers.go @@ -0,0 +1,31 @@ +package rpcServer + +import ( + "context" + protocolV1 "github.com/Layr-Labs/protocol-apis/gen/protos/eigenlayer/sidecar/v1/protocol" +) + +func (rpc *RpcServer) GetRegisteredAvsForOperator(ctx context.Context, request *protocolV1.GetRegisteredAvsForOperatorRequest) (*protocolV1.GetRegisteredAvsForOperatorResponse, error) { + //TODO implement me + panic("implement me") +} + +func (rpc *RpcServer) GetDelegatedStrategiesForOperator(ctx context.Context, request *protocolV1.GetDelegatedStrategiesForOperatorRequest) (*protocolV1.GetDelegatedStrategiesForOperatorResponse, error) { + //TODO implement me + panic("implement me") +} + +func (rpc *RpcServer) GetOperatorDelegatedStakeForStrategy(ctx context.Context, request *protocolV1.GetOperatorDelegatedStakeForStrategyRequest) (*protocolV1.GetOperatorDelegatedStakeForStrategyResponse, error) { + //TODO implement me + panic("implement me") +} + +func (rpc *RpcServer) GetDelegatedStakersForOperator(ctx context.Context, request *protocolV1.GetDelegatedStakersForOperatorRequest) (*protocolV1.GetDelegatedStakersForOperatorResponse, error) { + //TODO implement me + panic("implement me") +} + +func (rpc *RpcServer) GetStakerShares(ctx context.Context, request *protocolV1.GetStakerSharesRequest) (*protocolV1.GetStakerSharesResponse, error) { + //TODO implement me + panic("implement me") +} diff --git a/pkg/rpcServer/rewardsHandlers.go b/pkg/rpcServer/rewardsHandlers.go index 916a6766..7d55797f 100644 --- a/pkg/rpcServer/rewardsHandlers.go +++ b/pkg/rpcServer/rewardsHandlers.go @@ -3,7 +3,7 @@ package rpcServer import ( "context" "errors" - sidecarV1 "github.com/Layr-Labs/protocol-apis/gen/protos/eigenlayer/sidecar/v1" + rewardsV1 "github.com/Layr-Labs/protocol-apis/gen/protos/eigenlayer/sidecar/v1/rewards" "github.com/Layr-Labs/sidecar/pkg/rewards" "github.com/Layr-Labs/sidecar/pkg/rewardsCalculatorQueue" "github.com/Layr-Labs/sidecar/pkg/utils" @@ -13,11 +13,11 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" ) -func (rpc *RpcServer) GetRewardsRoot(ctx context.Context, req *sidecarV1.GetRewardsRootRequest) (*sidecarV1.GetRewardsRootResponse, error) { +func (rpc *RpcServer) GetRewardsRoot(ctx context.Context, req *rewardsV1.GetRewardsRootRequest) (*rewardsV1.GetRewardsRootResponse, error) { return nil, status.Error(codes.Unimplemented, "method GetRewardsRoot not implemented") } -func (rpc *RpcServer) GenerateRewards(ctx context.Context, req *sidecarV1.GenerateRewardsRequest) (*sidecarV1.GenerateRewardsResponse, error) { +func (rpc *RpcServer) GenerateRewards(ctx context.Context, req *rewardsV1.GenerateRewardsRequest) (*rewardsV1.GenerateRewardsResponse, error) { cutoffDate := req.GetCutoffDate() waitForComplete := req.GetWaitForComplete() @@ -46,13 +46,13 @@ func (rpc *RpcServer) GenerateRewards(ctx context.Context, req *sidecarV1.Genera } return nil, status.Error(codes.Internal, err.Error()) } - return &sidecarV1.GenerateRewardsResponse{ + return &rewardsV1.GenerateRewardsResponse{ CutoffDate: cutoffDate, Queued: queued, }, nil } -func (rpc *RpcServer) GenerateRewardsRoot(ctx context.Context, req *sidecarV1.GenerateRewardsRootRequest) (*sidecarV1.GenerateRewardsRootResponse, error) { +func (rpc *RpcServer) GenerateRewardsRoot(ctx context.Context, req *rewardsV1.GenerateRewardsRootRequest) (*rewardsV1.GenerateRewardsRootResponse, error) { cutoffDate := req.GetCutoffDate() if cutoffDate == "" { return nil, status.Error(codes.InvalidArgument, "snapshot date is required") @@ -101,13 +101,13 @@ func (rpc *RpcServer) GenerateRewardsRoot(ctx context.Context, req *sidecarV1.Ge zap.String("cutoffDate", cutoffDate), ) - return &sidecarV1.GenerateRewardsRootResponse{ + return &rewardsV1.GenerateRewardsRootResponse{ RewardsRoot: rootString, RewardsCalcEndDate: rewardsCalcEndDate, }, nil } -func (rpc *RpcServer) GenerateStakerOperators(ctx context.Context, req *sidecarV1.GenerateStakerOperatorsRequest) (*sidecarV1.GenerateStakerOperatorsResponse, error) { +func (rpc *RpcServer) GenerateStakerOperators(ctx context.Context, req *rewardsV1.GenerateStakerOperatorsRequest) (*rewardsV1.GenerateStakerOperatorsResponse, error) { cutoffDate := req.GetCutoffDate() if cutoffDate == "" { @@ -136,12 +136,12 @@ func (rpc *RpcServer) GenerateStakerOperators(ctx context.Context, req *sidecarV } return nil, status.Error(codes.Internal, err.Error()) } - return &sidecarV1.GenerateStakerOperatorsResponse{ + return &rewardsV1.GenerateStakerOperatorsResponse{ Queued: queued, }, nil } -func (rpc *RpcServer) BackfillStakerOperators(ctx context.Context, req *sidecarV1.BackfillStakerOperatorsRequest) (*sidecarV1.BackfillStakerOperatorsResponse, error) { +func (rpc *RpcServer) BackfillStakerOperators(ctx context.Context, req *rewardsV1.BackfillStakerOperatorsRequest) (*rewardsV1.BackfillStakerOperatorsResponse, error) { var err error queued := false @@ -164,52 +164,52 @@ func (rpc *RpcServer) BackfillStakerOperators(ctx context.Context, req *sidecarV } return nil, status.Error(codes.Internal, err.Error()) } - return &sidecarV1.BackfillStakerOperatorsResponse{ + return &rewardsV1.BackfillStakerOperatorsResponse{ Queued: queued, }, nil } -func (rpc *RpcServer) GetRewardsForSnapshot(ctx context.Context, req *sidecarV1.GetRewardsForSnapshotRequest) (*sidecarV1.GetRewardsForSnapshotResponse, error) { +func (rpc *RpcServer) GetRewardsForSnapshot(ctx context.Context, req *rewardsV1.GetRewardsForSnapshotRequest) (*rewardsV1.GetRewardsForSnapshotResponse, error) { return nil, status.Error(codes.Unimplemented, "method GetRewardsForSnapshot not implemented") } -func (rpc *RpcServer) GetAttributableRewardsForSnapshot(ctx context.Context, req *sidecarV1.GetAttributableRewardsForSnapshotRequest) (*sidecarV1.GetAttributableRewardsForSnapshotResponse, error) { +func (rpc *RpcServer) GetAttributableRewardsForSnapshot(ctx context.Context, req *rewardsV1.GetAttributableRewardsForSnapshotRequest) (*rewardsV1.GetAttributableRewardsForSnapshotResponse, error) { return nil, status.Error(codes.Unimplemented, "method GetAttributableRewardsForSnapshot not implemented") } -func (rpc *RpcServer) GetAttributableRewardsForDistributionRoot(ctx context.Context, req *sidecarV1.GetAttributableRewardsForDistributionRootRequest) (*sidecarV1.GetAttributableRewardsForDistributionRootResponse, error) { +func (rpc *RpcServer) GetAttributableRewardsForDistributionRoot(ctx context.Context, req *rewardsV1.GetAttributableRewardsForDistributionRootRequest) (*rewardsV1.GetAttributableRewardsForDistributionRootResponse, error) { return nil, status.Error(codes.Unimplemented, "method GetAttributableRewardsForDistributionRoot not implemented") } -func (rpc *RpcServer) GetAvailableRewards(ctx context.Context, req *sidecarV1.GetAvailableRewardsRequest) (*sidecarV1.GetAvailableRewardsResponse, error) { +func (rpc *RpcServer) GetAvailableRewards(ctx context.Context, req *rewardsV1.GetAvailableRewardsRequest) (*rewardsV1.GetAvailableRewardsResponse, error) { return nil, status.Error(codes.Unimplemented, "method GetAvailableRewards not implemented") } -func (rpc *RpcServer) GetTotalClaimedRewards(ctx context.Context, req *sidecarV1.GetTotalClaimedRewardsRequest) (*sidecarV1.GetTotalClaimedRewardsResponse, error) { +func (rpc *RpcServer) GetTotalClaimedRewards(ctx context.Context, req *rewardsV1.GetTotalClaimedRewardsRequest) (*rewardsV1.GetTotalClaimedRewardsResponse, error) { return nil, status.Error(codes.Unimplemented, "method GetTotalClaimedRewards not implemented") } -func (rpc *RpcServer) GetAvailableRewardsTokens(ctx context.Context, req *sidecarV1.GetAvailableRewardsTokensRequest) (*sidecarV1.GetAvailableRewardsTokensResponse, error) { +func (rpc *RpcServer) GetAvailableRewardsTokens(ctx context.Context, req *rewardsV1.GetAvailableRewardsTokensRequest) (*rewardsV1.GetAvailableRewardsTokensResponse, error) { return nil, status.Error(codes.Unimplemented, "method GetAvailableRewardsTokens not implemented") } -func (rpc *RpcServer) GetSummarizedRewardsForEarner(ctx context.Context, req *sidecarV1.GetSummarizedRewardsForEarnerRequest) (*sidecarV1.GetSummarizedRewardsForEarnerResponse, error) { +func (rpc *RpcServer) GetSummarizedRewardsForEarner(ctx context.Context, req *rewardsV1.GetSummarizedRewardsForEarnerRequest) (*rewardsV1.GetSummarizedRewardsForEarnerResponse, error) { return nil, status.Error(codes.Unimplemented, "method GetSummarizedRewardsForEarner not implemented") } -func (rpc *RpcServer) GetClaimedRewardsByBlock(ctx context.Context, req *sidecarV1.GetClaimedRewardsByBlockRequest) (*sidecarV1.GetClaimedRewardsByBlockResponse, error) { +func (rpc *RpcServer) GetClaimedRewardsByBlock(ctx context.Context, req *rewardsV1.GetClaimedRewardsByBlockRequest) (*rewardsV1.GetClaimedRewardsByBlockResponse, error) { return nil, status.Error(codes.Unimplemented, "method GetClaimedRewardsByBlock not implemented") } -func (rpc *RpcServer) ListDistributionRoots(ctx context.Context, req *sidecarV1.ListDistributionRootsRequest) (*sidecarV1.ListDistributionRootsResponse, error) { +func (rpc *RpcServer) ListDistributionRoots(ctx context.Context, req *rewardsV1.ListDistributionRootsRequest) (*rewardsV1.ListDistributionRootsResponse, error) { roots, err := rpc.rewardsCalculator.ListDistributionRoots() if err != nil { return nil, status.Error(codes.Internal, err.Error()) } - responseRoots := make([]*sidecarV1.DistributionRoot, 0, len(roots)) + responseRoots := make([]*rewardsV1.DistributionRoot, 0, len(roots)) for _, root := range roots { - responseRoots = append(responseRoots, &sidecarV1.DistributionRoot{ + responseRoots = append(responseRoots, &rewardsV1.DistributionRoot{ Root: root.Root, RootIndex: root.RootIndex, RewardsCalculationEnd: timestamppb.New(root.RewardsCalculationEnd), @@ -224,7 +224,7 @@ func (rpc *RpcServer) ListDistributionRoots(ctx context.Context, req *sidecarV1. }) } - return &sidecarV1.ListDistributionRootsResponse{ + return &rewardsV1.ListDistributionRootsResponse{ DistributionRoots: responseRoots, }, nil } diff --git a/pkg/rpcServer/server.go b/pkg/rpcServer/server.go index d4c186f8..6212d04e 100644 --- a/pkg/rpcServer/server.go +++ b/pkg/rpcServer/server.go @@ -4,8 +4,11 @@ import ( "context" "errors" "fmt" - v1 "github.com/Layr-Labs/protocol-apis/gen/protos/eigenlayer/sidecar/v1" eventsV1 "github.com/Layr-Labs/protocol-apis/gen/protos/eigenlayer/sidecar/v1/events" + healthV1 "github.com/Layr-Labs/protocol-apis/gen/protos/eigenlayer/sidecar/v1/health" + protocolV1 "github.com/Layr-Labs/protocol-apis/gen/protos/eigenlayer/sidecar/v1/protocol" + rewardsV1 "github.com/Layr-Labs/protocol-apis/gen/protos/eigenlayer/sidecar/v1/rewards" + sidecarV1 "github.com/Layr-Labs/protocol-apis/gen/protos/eigenlayer/sidecar/v1/sidecar" "github.com/Layr-Labs/sidecar/internal/logger" "github.com/Layr-Labs/sidecar/pkg/eigenState/stateManager" "github.com/Layr-Labs/sidecar/pkg/eventBus/eventBusTypes" @@ -31,7 +34,6 @@ type RpcServerConfig struct { } type RpcServer struct { - v1.UnimplementedRpcServer Logger *zap.Logger rpcConfig *RpcServerConfig blockStore storage.BlockStore @@ -67,20 +69,26 @@ func NewRpcServer( } func (s *RpcServer) registerHandlers(ctx context.Context, grpcServer *grpc.Server, mux *runtime.ServeMux) error { - v1.RegisterHealthServer(grpcServer, s) - if err := v1.RegisterHealthHandlerServer(ctx, mux, s); err != nil { + healthV1.RegisterHealthServer(grpcServer, s) + if err := healthV1.RegisterHealthHandlerServer(ctx, mux, s); err != nil { s.Logger.Sugar().Errorw("Failed to register Health server", zap.Error(err)) return err } - v1.RegisterRpcServer(grpcServer, s) - if err := v1.RegisterRpcHandlerServer(ctx, mux, s); err != nil { + sidecarV1.RegisterRpcServer(grpcServer, s) + if err := sidecarV1.RegisterRpcHandlerServer(ctx, mux, s); err != nil { s.Logger.Sugar().Errorw("Failed to register SidecarRpc server", zap.Error(err)) return err } - v1.RegisterRewardsServer(grpcServer, s) - if err := v1.RegisterRewardsHandlerServer(ctx, mux, s); err != nil { + rewardsV1.RegisterRewardsServer(grpcServer, s) + if err := rewardsV1.RegisterRewardsHandlerServer(ctx, mux, s); err != nil { + s.Logger.Sugar().Errorw("Failed to register Rewards server", zap.Error(err)) + return err + } + + protocolV1.RegisterProtocolServer(grpcServer, s) + if err := protocolV1.RegisterProtocolHandlerServer(ctx, mux, s); err != nil { s.Logger.Sugar().Errorw("Failed to register Rewards server", zap.Error(err)) return err } diff --git a/pkg/service/protocolDataService/protocol.go b/pkg/service/protocolDataService/protocol.go new file mode 100644 index 00000000..533752c9 --- /dev/null +++ b/pkg/service/protocolDataService/protocol.go @@ -0,0 +1,63 @@ +package protocolDataService + +import ( + "github.com/Layr-Labs/sidecar/internal/config" + "github.com/Layr-Labs/sidecar/pkg/eigenState/stakerShares" + "github.com/Layr-Labs/sidecar/pkg/service/types" + "go.uber.org/zap" + "gorm.io/gorm" +) + +type ProtocolDataService struct { + db *gorm.DB + logger *zap.Logger + globalConfig *config.Config +} + +func NewProtocolDataService( + db *gorm.DB, + logger *zap.Logger, + globalConfig *config.Config, +) *ProtocolDataService { + return &ProtocolDataService{ + db: db, + logger: logger, + globalConfig: globalConfig, + } +} + +func (pds *ProtocolDataService) ListRegisteredAVSsForOperator(operator string, blockHeight uint64) (interface{}, error) { + return nil, nil +} + +func (pds *ProtocolDataService) ListDelegatedStrategiesForOperator(operator string, blockHeight uint64) (interface{}, error) { + return nil, nil +} + +func (pds *ProtocolDataService) GetOperatorDelegatedStake(operator string, strategy string, blockHeight uint64) (interface{}, error) { + return nil, nil +} + +func (pds *ProtocolDataService) ListDelegatedStakersForOperator(operator string, blockHeight uint64, pagination types.Pagination) (interface{}, error) { + return nil, nil +} + +func (pds *ProtocolDataService) ListStakerShares(staker string, blockHeight uint64) ([]*stakerShares.StakerShareDeltas, error) { + shares := make([]*stakerShares.StakerShareDeltas, 0) + + whereParams := []interface{}{staker} + where := "staker = ?" + if blockHeight > 0 { + where += " AND block_height <= ?" + whereParams = append(whereParams, blockHeight) + } + + res := pds.db.Model(&stakerShares.StakerShareDeltas{}). + Where(where, whereParams...). + Find(&shares) + + if res.Error != nil { + return nil, res.Error + } + return shares, nil +} diff --git a/pkg/service/types/pagination.go b/pkg/service/types/pagination.go new file mode 100644 index 00000000..ab19fad6 --- /dev/null +++ b/pkg/service/types/pagination.go @@ -0,0 +1,6 @@ +package types + +type Pagination struct { + Page int `json:"page"` + PageSize int `json:"page_size"` +} From 13cdf2d28f5630085546fb2680b1ec5b6b744fe1 Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Thu, 16 Jan 2025 08:34:34 -0600 Subject: [PATCH 02/11] feat: ListStakerShares rpc --- cmd/debugger/main.go | 4 +- cmd/run.go | 5 +- examples/rewardsProofs/main.go | 8 +- go.mod | 2 +- go.sum | 2 + pkg/rpcServer/protocolHandlers.go | 20 ++++- pkg/rpcServer/server.go | 36 +++++---- pkg/service/protocolDataService/protocol.go | 86 ++++++++++++++++++--- 8 files changed, 126 insertions(+), 37 deletions(-) diff --git a/cmd/debugger/main.go b/cmd/debugger/main.go index ea9255fa..8e4ec0ec 100644 --- a/cmd/debugger/main.go +++ b/cmd/debugger/main.go @@ -19,6 +19,7 @@ import ( "github.com/Layr-Labs/sidecar/pkg/rewards/stakerOperators" "github.com/Layr-Labs/sidecar/pkg/rewardsCalculatorQueue" "github.com/Layr-Labs/sidecar/pkg/rpcServer" + "github.com/Layr-Labs/sidecar/pkg/service/protocolDataService" "github.com/Layr-Labs/sidecar/pkg/sidecar" pgStorage "github.com/Layr-Labs/sidecar/pkg/storage/postgres" "log" @@ -104,6 +105,7 @@ func main() { p := pipeline.NewPipeline(fetchr, idxr, mds, sm, msm, rc, rcq, cfg, sdc, eb, l) rps := proofs.NewRewardsProofsStore(rc, l) + pds := protocolDataService.NewProtocolDataService(grm, l, cfg) // Create new sidecar instance // Create new sidecar instance @@ -114,7 +116,7 @@ func main() { rpc := rpcServer.NewRpcServer(&rpcServer.RpcServerConfig{ GrpcPort: cfg.RpcConfig.GrpcPort, HttpPort: cfg.RpcConfig.HttpPort, - }, mds, sm, rc, rcq, eb, rps, l) + }, mds, sm, rc, rcq, eb, rps, pds, l) // RPC channel to notify the RPC server to shutdown gracefully rpcChannel := make(chan bool) diff --git a/cmd/run.go b/cmd/run.go index dbb246db..5b458b26 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -21,6 +21,7 @@ import ( "github.com/Layr-Labs/sidecar/pkg/rewards/stakerOperators" "github.com/Layr-Labs/sidecar/pkg/rewardsCalculatorQueue" "github.com/Layr-Labs/sidecar/pkg/rpcServer" + "github.com/Layr-Labs/sidecar/pkg/service/protocolDataService" "github.com/Layr-Labs/sidecar/pkg/shutdown" "github.com/Layr-Labs/sidecar/pkg/sidecar" pgStorage "github.com/Layr-Labs/sidecar/pkg/storage/postgres" @@ -121,6 +122,8 @@ var runCmd = &cobra.Command{ rps := proofs.NewRewardsProofsStore(rc, l) + pds := protocolDataService.NewProtocolDataService(grm, l, cfg) + go rcq.Process() p := pipeline.NewPipeline(fetchr, idxr, mds, sm, msm, rc, rcq, cfg, sdc, eb, l) @@ -133,7 +136,7 @@ var runCmd = &cobra.Command{ rpc := rpcServer.NewRpcServer(&rpcServer.RpcServerConfig{ GrpcPort: cfg.RpcConfig.GrpcPort, HttpPort: cfg.RpcConfig.HttpPort, - }, mds, sm, rc, rcq, eb, rps, l) + }, mds, sm, rc, rcq, eb, rps, pds, l) // RPC channel to notify the RPC server to shutdown gracefully rpcChannel := make(chan bool) diff --git a/examples/rewardsProofs/main.go b/examples/rewardsProofs/main.go index 39229289..ca77b1c1 100644 --- a/examples/rewardsProofs/main.go +++ b/examples/rewardsProofs/main.go @@ -4,7 +4,7 @@ import ( "context" "crypto/tls" "fmt" - v1 "github.com/Layr-Labs/protocol-apis/gen/protos/eigenlayer/sidecar/v1" + rewardsV1 "github.com/Layr-Labs/protocol-apis/gen/protos/eigenlayer/sidecar/v1/rewards" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" @@ -12,7 +12,7 @@ import ( "strings" ) -func NewSidecarClient(url string, insecureConn bool) (v1.RewardsClient, error) { +func NewSidecarClient(url string, insecureConn bool) (rewardsV1.RewardsClient, error) { var creds grpc.DialOption if strings.Contains(url, "localhost:") || strings.Contains(url, "127.0.0.1:") || insecureConn { creds = grpc.WithTransportCredentials(insecure.NewCredentials()) @@ -25,7 +25,7 @@ func NewSidecarClient(url string, insecureConn bool) (v1.RewardsClient, error) { return nil, err } - return v1.NewRewardsClient(grpcClient), nil + return rewardsV1.NewRewardsClient(grpcClient), nil } func main() { @@ -37,7 +37,7 @@ func main() { log.Fatal(err) } - res, err := client.GenerateClaimProof(context.Background(), &v1.GenerateClaimProofRequest{ + res, err := client.GenerateClaimProof(context.Background(), &rewardsV1.GenerateClaimProofRequest{ EarnerAddress: earnerAddress, Tokens: tokens, }) diff --git a/go.mod b/go.mod index ea1940a9..3cba7042 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/DataDog/datadog-go/v5 v5.5.0 github.com/Layr-Labs/eigenlayer-contracts v0.4.1-holesky-pepe.0.20240813143901-00fc4b95e9c1 github.com/Layr-Labs/eigenlayer-rewards-proofs v0.2.13 - github.com/Layr-Labs/protocol-apis v1.1.1-0.20250115220323-135176acb92b + github.com/Layr-Labs/protocol-apis v1.1.1-0.20250115230325-93c4ebccbeb7 github.com/ethereum/go-ethereum v1.14.9 github.com/gocarina/gocsv v0.0.0-20240520201108-78e41c74b4b1 github.com/google/uuid v1.6.0 diff --git a/go.sum b/go.sum index 46bbb474..3c546f54 100644 --- a/go.sum +++ b/go.sum @@ -53,6 +53,8 @@ github.com/Layr-Labs/protocol-apis v1.1.1-0.20250114181701-acb87ef4eeb5 h1:0PLxb github.com/Layr-Labs/protocol-apis v1.1.1-0.20250114181701-acb87ef4eeb5/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk= github.com/Layr-Labs/protocol-apis v1.1.1-0.20250115220323-135176acb92b h1:eJmPAq3s+AwOrQUjSFXILCzUstDZobwYEraOZ2NFC1M= github.com/Layr-Labs/protocol-apis v1.1.1-0.20250115220323-135176acb92b/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk= +github.com/Layr-Labs/protocol-apis v1.1.1-0.20250115230325-93c4ebccbeb7 h1:zTOIFjJcCzOZ1PBk9jtoW/bsKSyRzvQyTG2Beutpiuk= +github.com/Layr-Labs/protocol-apis v1.1.1-0.20250115230325-93c4ebccbeb7/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk= github.com/Microsoft/go-winio v0.5.0/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= diff --git a/pkg/rpcServer/protocolHandlers.go b/pkg/rpcServer/protocolHandlers.go index 68aaf179..9a829545 100644 --- a/pkg/rpcServer/protocolHandlers.go +++ b/pkg/rpcServer/protocolHandlers.go @@ -26,6 +26,22 @@ func (rpc *RpcServer) GetDelegatedStakersForOperator(ctx context.Context, reques } func (rpc *RpcServer) GetStakerShares(ctx context.Context, request *protocolV1.GetStakerSharesRequest) (*protocolV1.GetStakerSharesResponse, error) { - //TODO implement me - panic("implement me") + shares, err := rpc.protocolDataService.ListStakerShares(request.GetStakerAddress(), request.GetBlockHeight()) + if err != nil { + return nil, err + } + + stakerShares := make([]*protocolV1.StakerShare, 0, len(shares)) + for _, share := range shares { + stakerShares = append(stakerShares, &protocolV1.StakerShare{ + Strategy: share.Strategy, + Shares: share.Shares, + OperatorAddress: share.Operator, + AvsAddresses: share.AvsAddresses, + }) + } + + return &protocolV1.GetStakerSharesResponse{ + Shares: stakerShares, + }, nil } diff --git a/pkg/rpcServer/server.go b/pkg/rpcServer/server.go index 6212d04e..97a70967 100644 --- a/pkg/rpcServer/server.go +++ b/pkg/rpcServer/server.go @@ -15,6 +15,7 @@ import ( "github.com/Layr-Labs/sidecar/pkg/proofs" "github.com/Layr-Labs/sidecar/pkg/rewards" "github.com/Layr-Labs/sidecar/pkg/rewardsCalculatorQueue" + "github.com/Layr-Labs/sidecar/pkg/service/protocolDataService" "github.com/Layr-Labs/sidecar/pkg/storage" grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" @@ -34,14 +35,15 @@ type RpcServerConfig struct { } type RpcServer struct { - Logger *zap.Logger - rpcConfig *RpcServerConfig - blockStore storage.BlockStore - stateManager *stateManager.EigenStateManager - rewardsCalculator *rewards.RewardsCalculator - rewardsQueue *rewardsCalculatorQueue.RewardsCalculatorQueue - eventBus eventBusTypes.IEventBus - rewardsProofs *proofs.RewardsProofsStore + Logger *zap.Logger + rpcConfig *RpcServerConfig + blockStore storage.BlockStore + stateManager *stateManager.EigenStateManager + rewardsCalculator *rewards.RewardsCalculator + rewardsQueue *rewardsCalculatorQueue.RewardsCalculatorQueue + eventBus eventBusTypes.IEventBus + rewardsProofs *proofs.RewardsProofsStore + protocolDataService *protocolDataService.ProtocolDataService } func NewRpcServer( @@ -52,17 +54,19 @@ func NewRpcServer( rcq *rewardsCalculatorQueue.RewardsCalculatorQueue, eb eventBusTypes.IEventBus, rp *proofs.RewardsProofsStore, + pds *protocolDataService.ProtocolDataService, l *zap.Logger, ) *RpcServer { server := &RpcServer{ - rpcConfig: config, - blockStore: bs, - stateManager: sm, - rewardsCalculator: rc, - rewardsQueue: rcq, - eventBus: eb, - rewardsProofs: rp, - Logger: l, + rpcConfig: config, + blockStore: bs, + stateManager: sm, + rewardsCalculator: rc, + rewardsQueue: rcq, + eventBus: eb, + rewardsProofs: rp, + protocolDataService: pds, + Logger: l, } return server diff --git a/pkg/service/protocolDataService/protocol.go b/pkg/service/protocolDataService/protocol.go index 533752c9..0f86838b 100644 --- a/pkg/service/protocolDataService/protocol.go +++ b/pkg/service/protocolDataService/protocol.go @@ -1,9 +1,10 @@ package protocolDataService import ( + "database/sql" "github.com/Layr-Labs/sidecar/internal/config" - "github.com/Layr-Labs/sidecar/pkg/eigenState/stakerShares" "github.com/Layr-Labs/sidecar/pkg/service/types" + "github.com/Layr-Labs/sidecar/pkg/storage" "go.uber.org/zap" "gorm.io/gorm" ) @@ -42,20 +43,81 @@ func (pds *ProtocolDataService) ListDelegatedStakersForOperator(operator string, return nil, nil } -func (pds *ProtocolDataService) ListStakerShares(staker string, blockHeight uint64) ([]*stakerShares.StakerShareDeltas, error) { - shares := make([]*stakerShares.StakerShareDeltas, 0) +type StakerShares struct { + Staker string + Strategy string + Shares string + BlockHeight uint64 + Operator *string + Delegated *bool + AvsAddresses []string +} - whereParams := []interface{}{staker} - where := "staker = ?" - if blockHeight > 0 { - where += " AND block_height <= ?" - whereParams = append(whereParams, blockHeight) - } +// ListStakerShares returns the shares of a staker at a given block height, including the operator they were delegated to +// and the addresses of the AVSs the operator was registered to. +// +// If not blockHeight is provided, the most recently indexed block will be used. +func (pds *ProtocolDataService) ListStakerShares(staker string, blockHeight uint64) ([]*StakerShares, error) { + shares := make([]*StakerShares, 0) - res := pds.db.Model(&stakerShares.StakerShareDeltas{}). - Where(where, whereParams...). - Find(&shares) + if blockHeight == 0 { + var currentBlock *storage.Block + res := pds.db.Model(&storage.Block{}).Order("number desc").First(¤tBlock) + if res.Error != nil { + return nil, res.Error + } + blockHeight = currentBlock.Number + } + query := ` + with distinct_staker_strategies as ( + select + ssd.staker, + ssd.strategy, + ssd.shares, + ssd.block_number, + row_number() over (partition by ssd.staker, ssd.strategy order by ssd.block_number desc) as rn + from sidecar_mainnet_ethereum.staker_shares as ssd + where + ssd.staker = @staker + and block_number <= @blockHeight + order by block_number desc + ) + select + dss.*, + dsc.operator, + dsc.delegated, + aosc.avs_list as avs_addresses + from distinct_staker_strategies as dss + left join lateral ( + select + sdc.staker, + sdc.operator, + sdc.delegated, + row_number() over (partition by sdc.staker order by sdc.block_number desc, sdc.log_index) as rn + from sidecar_mainnet_ethereum.staker_delegation_changes as sdc + where + sdc.staker = dss.staker + and sdc.block_number <= dss.block_number + order by block_number desc + ) as dsc on (dsc.rn = 1) + left join lateral ( + select + jsonb_agg(distinct aosc.avs) as avs_list + from sidecar_mainnet_ethereum.avs_operator_state_changes aosc + where + aosc.operator = dsc.operator + and aosc.block_number <= dss.block_number + and aosc.registered = true + ) as aosc on true + where + dss.rn = 1 + order by block_number desc; + ` + res := pds.db.Raw(query, + sql.Named("staker", staker), + sql.Named("blockHeight", blockHeight), + ).Scan(&shares) if res.Error != nil { return nil, res.Error } From accde565083157717aec17a0118a205b4b1069e2 Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Thu, 16 Jan 2025 15:24:19 -0600 Subject: [PATCH 03/11] feat: GetDelegatedStakersForOperator rpc --- go.mod | 2 +- go.sum | 6 ++ pkg/rpcServer/protocolHandlers.go | 32 +++++++++- pkg/service/protocolDataService/protocol.go | 70 ++++++++++++++++++--- pkg/service/types/pagination.go | 21 ++++++- 5 files changed, 116 insertions(+), 15 deletions(-) diff --git a/go.mod b/go.mod index 3cba7042..d1121c58 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/DataDog/datadog-go/v5 v5.5.0 github.com/Layr-Labs/eigenlayer-contracts v0.4.1-holesky-pepe.0.20240813143901-00fc4b95e9c1 github.com/Layr-Labs/eigenlayer-rewards-proofs v0.2.13 - github.com/Layr-Labs/protocol-apis v1.1.1-0.20250115230325-93c4ebccbeb7 + github.com/Layr-Labs/protocol-apis v1.1.1-0.20250121193118-8112817d1079 github.com/ethereum/go-ethereum v1.14.9 github.com/gocarina/gocsv v0.0.0-20240520201108-78e41c74b4b1 github.com/google/uuid v1.6.0 diff --git a/go.sum b/go.sum index 3c546f54..3e9c0f1a 100644 --- a/go.sum +++ b/go.sum @@ -55,6 +55,12 @@ github.com/Layr-Labs/protocol-apis v1.1.1-0.20250115220323-135176acb92b h1:eJmPA github.com/Layr-Labs/protocol-apis v1.1.1-0.20250115220323-135176acb92b/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk= github.com/Layr-Labs/protocol-apis v1.1.1-0.20250115230325-93c4ebccbeb7 h1:zTOIFjJcCzOZ1PBk9jtoW/bsKSyRzvQyTG2Beutpiuk= github.com/Layr-Labs/protocol-apis v1.1.1-0.20250115230325-93c4ebccbeb7/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk= +github.com/Layr-Labs/protocol-apis v1.1.1-0.20250116155113-c22028af9e00 h1:7N3ta4X5YRygobnCuAAcKr4T76eUboXVcj+IGW0P2eA= +github.com/Layr-Labs/protocol-apis v1.1.1-0.20250116155113-c22028af9e00/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk= +github.com/Layr-Labs/protocol-apis v1.1.1-0.20250116155715-919a0a9a27e5 h1:m/I5hGK4JpOE7OL7wYlKt9HwlCAgQ0SxeT3dNHfjzFY= +github.com/Layr-Labs/protocol-apis v1.1.1-0.20250116155715-919a0a9a27e5/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk= +github.com/Layr-Labs/protocol-apis v1.1.1-0.20250121193118-8112817d1079 h1:B1b9ghilo70y4MJ7ZkF5w8BuPtslAUP2oE6vfzG6DBA= +github.com/Layr-Labs/protocol-apis v1.1.1-0.20250121193118-8112817d1079/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk= github.com/Microsoft/go-winio v0.5.0/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= diff --git a/pkg/rpcServer/protocolHandlers.go b/pkg/rpcServer/protocolHandlers.go index 9a829545..e6962b48 100644 --- a/pkg/rpcServer/protocolHandlers.go +++ b/pkg/rpcServer/protocolHandlers.go @@ -2,7 +2,10 @@ package rpcServer import ( "context" + "errors" + "github.com/Layr-Labs/protocol-apis/gen/protos/eigenlayer/sidecar/v1/common" protocolV1 "github.com/Layr-Labs/protocol-apis/gen/protos/eigenlayer/sidecar/v1/protocol" + "github.com/Layr-Labs/sidecar/pkg/service/types" ) func (rpc *RpcServer) GetRegisteredAvsForOperator(ctx context.Context, request *protocolV1.GetRegisteredAvsForOperatorRequest) (*protocolV1.GetRegisteredAvsForOperatorResponse, error) { @@ -21,8 +24,33 @@ func (rpc *RpcServer) GetOperatorDelegatedStakeForStrategy(ctx context.Context, } func (rpc *RpcServer) GetDelegatedStakersForOperator(ctx context.Context, request *protocolV1.GetDelegatedStakersForOperatorRequest) (*protocolV1.GetDelegatedStakersForOperatorResponse, error) { - //TODO implement me - panic("implement me") + operator := request.GetOperatorAddress() + if operator == "" { + return nil, errors.New("operator address is required") + } + pagination := types.NewDefaultPagination() + + if p := request.GetPagination(); p != nil { + pagination.Load(p.GetPageNumber(), p.GetPageSize()) + } + + delegatedStakers, err := rpc.protocolDataService.ListDelegatedStakersForOperator(operator, request.GetBlockHeight(), pagination) + if err != nil { + return nil, err + } + + var nextPage *common.Pagination + if uint32(len(delegatedStakers)) == pagination.PageSize { + nextPage = &common.Pagination{ + PageNumber: pagination.Page + 1, + PageSize: pagination.PageSize, + } + } + + return &protocolV1.GetDelegatedStakersForOperatorResponse{ + StakerAddresses: delegatedStakers, + NextPage: nextPage, + }, nil } func (rpc *RpcServer) GetStakerShares(ctx context.Context, request *protocolV1.GetStakerSharesRequest) (*protocolV1.GetStakerSharesResponse, error) { diff --git a/pkg/service/protocolDataService/protocol.go b/pkg/service/protocolDataService/protocol.go index 0f86838b..3a3a5810 100644 --- a/pkg/service/protocolDataService/protocol.go +++ b/pkg/service/protocolDataService/protocol.go @@ -27,6 +27,18 @@ func NewProtocolDataService( } } +func (pds *ProtocolDataService) getCurrentBlockHeightIfNotPresent(blockHeight uint64) (uint64, error) { + if blockHeight == 0 { + var currentBlock *storage.Block + res := pds.db.Model(&storage.Block{}).Order("number desc").First(¤tBlock) + if res.Error != nil { + return 0, res.Error + } + blockHeight = currentBlock.Number + } + return blockHeight, nil +} + func (pds *ProtocolDataService) ListRegisteredAVSsForOperator(operator string, blockHeight uint64) (interface{}, error) { return nil, nil } @@ -39,8 +51,50 @@ func (pds *ProtocolDataService) GetOperatorDelegatedStake(operator string, strat return nil, nil } -func (pds *ProtocolDataService) ListDelegatedStakersForOperator(operator string, blockHeight uint64, pagination types.Pagination) (interface{}, error) { - return nil, nil +func (pds *ProtocolDataService) ListDelegatedStakersForOperator(operator string, blockHeight uint64, pagination *types.Pagination) ([]string, error) { + bh, err := pds.getCurrentBlockHeightIfNotPresent(blockHeight) + if err != nil { + return nil, err + } + + query := ` + with staker_operator_delegations as ( + SELECT DISTINCT ON (staker) + staker, + operator, + delegated + FROM sidecar_mainnet_ethereum.staker_delegation_changes + WHERE operator = @operator + AND block_number <= @blockHeight + ORDER BY staker, block_number desc, log_index asc + ) + SELECT + sod.staker + from staker_operator_delegations as sod + where sod.delegated = true + ` + + queryParams := []interface{}{ + sql.Named("operator", operator), + sql.Named("blockHeight", bh), + } + + if pagination != nil { + query += ` LIMIT @limit` + queryParams = append(queryParams, sql.Named("limit", pagination.PageSize)) + + if pagination.Page > 0 { + query += ` OFFSET @offset` + queryParams = append(queryParams, sql.Named("offset", pagination.Page*pagination.PageSize)) + } + } + + var stakers []string + res := pds.db.Raw(query, queryParams...).Scan(&stakers) + if res.Error != nil { + return nil, res.Error + } + return stakers, nil } type StakerShares struct { @@ -60,13 +114,9 @@ type StakerShares struct { func (pds *ProtocolDataService) ListStakerShares(staker string, blockHeight uint64) ([]*StakerShares, error) { shares := make([]*StakerShares, 0) - if blockHeight == 0 { - var currentBlock *storage.Block - res := pds.db.Model(&storage.Block{}).Order("number desc").First(¤tBlock) - if res.Error != nil { - return nil, res.Error - } - blockHeight = currentBlock.Number + bh, err := pds.getCurrentBlockHeightIfNotPresent(blockHeight) + if err != nil { + return nil, err } query := ` @@ -116,7 +166,7 @@ func (pds *ProtocolDataService) ListStakerShares(staker string, blockHeight uint ` res := pds.db.Raw(query, sql.Named("staker", staker), - sql.Named("blockHeight", blockHeight), + sql.Named("blockHeight", bh), ).Scan(&shares) if res.Error != nil { return nil, res.Error diff --git a/pkg/service/types/pagination.go b/pkg/service/types/pagination.go index ab19fad6..3f6a047f 100644 --- a/pkg/service/types/pagination.go +++ b/pkg/service/types/pagination.go @@ -1,6 +1,23 @@ package types type Pagination struct { - Page int `json:"page"` - PageSize int `json:"page_size"` + Page uint32 `json:"page"` + PageSize uint32 `json:"page_size"` +} + +const DefaultPageSize = 100 +const DefaultPage = 0 + +func NewDefaultPagination() *Pagination { + return &Pagination{ + Page: DefaultPage, + PageSize: DefaultPageSize, + } +} + +func (p *Pagination) Load(pageNumber uint32, pageSize uint32) { + p.Page = pageNumber + if pageSize > 0 { + p.PageSize = pageSize + } } From 775ceb8ffa7eda611a5db7ae98f299b9b2ece917 Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Wed, 22 Jan 2025 09:36:01 -0600 Subject: [PATCH 04/11] feat: add GetOperatorDelegatedStakeForStrategy and GetRegisteredAvsForOperator --- pkg/rpcServer/protocolHandlers.go | 40 +++- pkg/service/protocolDataService/protocol.go | 201 +++++++++++++++++++- 2 files changed, 229 insertions(+), 12 deletions(-) diff --git a/pkg/rpcServer/protocolHandlers.go b/pkg/rpcServer/protocolHandlers.go index e6962b48..d212fa32 100644 --- a/pkg/rpcServer/protocolHandlers.go +++ b/pkg/rpcServer/protocolHandlers.go @@ -9,8 +9,20 @@ import ( ) func (rpc *RpcServer) GetRegisteredAvsForOperator(ctx context.Context, request *protocolV1.GetRegisteredAvsForOperatorRequest) (*protocolV1.GetRegisteredAvsForOperatorResponse, error) { - //TODO implement me - panic("implement me") + operator := request.GetOperatorAddress() + if operator == "" { + return nil, errors.New("operator address is required") + } + + blockHeight := request.GetBlockHeight() + registeredAvs, err := rpc.protocolDataService.ListRegisteredAVSsForOperator(operator, blockHeight) + if err != nil { + return nil, err + } + + return &protocolV1.GetRegisteredAvsForOperatorResponse{ + AvsAddresses: registeredAvs, + }, nil } func (rpc *RpcServer) GetDelegatedStrategiesForOperator(ctx context.Context, request *protocolV1.GetDelegatedStrategiesForOperatorRequest) (*protocolV1.GetDelegatedStrategiesForOperatorResponse, error) { @@ -19,8 +31,28 @@ func (rpc *RpcServer) GetDelegatedStrategiesForOperator(ctx context.Context, req } func (rpc *RpcServer) GetOperatorDelegatedStakeForStrategy(ctx context.Context, request *protocolV1.GetOperatorDelegatedStakeForStrategyRequest) (*protocolV1.GetOperatorDelegatedStakeForStrategyResponse, error) { - //TODO implement me - panic("implement me") + operator := request.GetOperatorAddress() + strategy := request.GetStrategyAddress() + blockHeight := request.GetBlockHeight() + + if operator == "" { + return nil, errors.New("operator address is required") + } + + if strategy == "" { + return nil, errors.New("strategy address is required") + } + + delegatedStake, err := rpc.protocolDataService.GetOperatorDelegatedStake(operator, strategy, blockHeight) + + if err != nil { + return nil, err + } + + return &protocolV1.GetOperatorDelegatedStakeForStrategyResponse{ + Shares: delegatedStake.Shares, + AvsAddresses: delegatedStake.AvsAddresses, + }, nil } func (rpc *RpcServer) GetDelegatedStakersForOperator(ctx context.Context, request *protocolV1.GetDelegatedStakersForOperatorRequest) (*protocolV1.GetDelegatedStakersForOperatorResponse, error) { diff --git a/pkg/service/protocolDataService/protocol.go b/pkg/service/protocolDataService/protocol.go index 3a3a5810..47a1a344 100644 --- a/pkg/service/protocolDataService/protocol.go +++ b/pkg/service/protocolDataService/protocol.go @@ -7,6 +7,8 @@ import ( "github.com/Layr-Labs/sidecar/pkg/storage" "go.uber.org/zap" "gorm.io/gorm" + "strings" + "sync" ) type ProtocolDataService struct { @@ -39,16 +41,199 @@ func (pds *ProtocolDataService) getCurrentBlockHeightIfNotPresent(blockHeight ui return blockHeight, nil } -func (pds *ProtocolDataService) ListRegisteredAVSsForOperator(operator string, blockHeight uint64) (interface{}, error) { - return nil, nil +func (pds *ProtocolDataService) ListRegisteredAVSsForOperator(operator string, blockHeight uint64) ([]string, error) { + operator = strings.ToLower(operator) + + blockHeight, err := pds.getCurrentBlockHeightIfNotPresent(blockHeight) + if err != nil { + return nil, err + } + + query := ` + with ranked_operators as ( + select + aosc.operator, + aosc.avs, + aosc.registered, + row_number() over (partition by aosc.operator order by aosc.block_number desc, aosc.log_index asc) as rn + from avs_operator_state_changes as aosc + where + operator = @operator' + and block_number <= @blockHeight + ) + select + distinct ro.avs as avs + from ranked_operators as ro + where + ro.rn = 1 + and ro.registered = true + ` + var avsAddresses []string + res := pds.db.Raw(query, + sql.Named("operator", operator), + sql.Named("blockHeight", blockHeight), + ).Scan(&avsAddresses) + + if res.Error != nil { + return nil, res.Error + } + return avsAddresses, nil } func (pds *ProtocolDataService) ListDelegatedStrategiesForOperator(operator string, blockHeight uint64) (interface{}, error) { return nil, nil } -func (pds *ProtocolDataService) GetOperatorDelegatedStake(operator string, strategy string, blockHeight uint64) (interface{}, error) { - return nil, nil +// getTotalDelegatedOperatorSharesForStrategy returns the total shares delegated to an operator for a given strategy at a given block height. +func (pds *ProtocolDataService) getTotalDelegatedOperatorSharesForStrategy(operator string, strategy string, blockHeight uint64) (string, error) { + query := ` + with operator_stakers as ( + select + staker, + operator, + delegated, + block_number, + log_index, + row_number() over (partition by staker order by block_number desc, log_index asc) as rn + from staker_delegation_changes + where + operator = @operator + and block_number <= @blockNumber + order by block_number desc, log_index desc + ), + distinct_delegated_stakers as ( + select + distinct staker, + operator, + block_number, + log_index + from operator_stakers as os + where + os.rn = 1 + and os.delegated = true + ), + stakers_with_shares as ( + select + dds.staker, + dds.operator, + dds.block_number, + ss.strategy, + dds.log_index, + ss.shares + from distinct_delegated_stakers as dds + join lateral ( + select + ssd.strategy, + sum(ssd.shares) as shares + -- TODO: this should reference the staker_shares table once it is persistent and not deleted and recreated on each rewards run + from staker_share_deltas as ssd + where + ssd.staker = dds.staker + and ssd.block_number <= dds.block_number + group by 1 + ) as ss on(ss.strategy = @strategy) + ) + select + sws.operator, + sum(sws.shares) as shares + from stakers_with_shares as sws + group by 1 + ` + + var results struct { + Operator string + Shares string + } + + res := pds.db.Raw(query, + sql.Named("operator", strings.ToLower(operator)), + sql.Named("strategy", strings.ToLower(strategy)), + sql.Named("blockNumber", blockHeight), + ).Scan(&results) + + if res.Error != nil { + return "", res.Error + } + return results.Shares, nil +} + +type OperatorDelegatedStake struct { + Shares string + AvsAddresses []string +} + +type ResultCollector[T any] struct { + Result T + Error error +} + +func (pds *ProtocolDataService) GetOperatorDelegatedStake(operator string, strategy string, blockHeight uint64) (*OperatorDelegatedStake, error) { + blockHeight, err := pds.getCurrentBlockHeightIfNotPresent(blockHeight) + if err != nil { + return nil, err + } + + var wg sync.WaitGroup + sharesChan := make(chan *ResultCollector[string]) + avsChan := make(chan *ResultCollector[[]string]) + + wg.Add(2) + + go func() { + defer wg.Done() + result := &ResultCollector[string]{} + + shares, err := pds.getTotalDelegatedOperatorSharesForStrategy(operator, strategy, blockHeight) + if err != nil { + result.Error = err + } else { + result.Result = shares + } + sharesChan <- result + }() + + go func() { + defer wg.Done() + result := &ResultCollector[[]string]{} + + avsAddresses, err := pds.ListRegisteredAVSsForOperator(operator, blockHeight) + if err != nil { + result.Error = err + } else { + result.Result = avsAddresses + } + avsChan <- result + }() + wg.Wait() + close(sharesChan) + close(avsChan) + + shares := <-sharesChan + if shares.Error != nil { + pds.logger.Sugar().Errorw("Failed to get operator delegated stake", + zap.String("operator", operator), + zap.String("strategy", strategy), + zap.Uint64("blockHeight", blockHeight), + zap.Error(shares.Error), + ) + return nil, shares.Error + } + + registeredAvss := <-avsChan + if registeredAvss.Error != nil { + pds.logger.Sugar().Errorw("Failed to get registered AVSs for operator", + zap.String("operator", operator), + zap.String("strategy", strategy), + zap.Uint64("blockHeight", blockHeight), + zap.Error(registeredAvss.Error), + ) + return nil, registeredAvss.Error + } + + return &OperatorDelegatedStake{ + Shares: shares.Result, + AvsAddresses: registeredAvss.Result, + }, nil } func (pds *ProtocolDataService) ListDelegatedStakersForOperator(operator string, blockHeight uint64, pagination *types.Pagination) ([]string, error) { @@ -63,7 +248,7 @@ func (pds *ProtocolDataService) ListDelegatedStakersForOperator(operator string, staker, operator, delegated - FROM sidecar_mainnet_ethereum.staker_delegation_changes + FROM staker_delegation_changes WHERE operator = @operator AND block_number <= @blockHeight ORDER BY staker, block_number desc, log_index asc @@ -127,7 +312,7 @@ func (pds *ProtocolDataService) ListStakerShares(staker string, blockHeight uint ssd.shares, ssd.block_number, row_number() over (partition by ssd.staker, ssd.strategy order by ssd.block_number desc) as rn - from sidecar_mainnet_ethereum.staker_shares as ssd + from staker_shares as ssd where ssd.staker = @staker and block_number <= @blockHeight @@ -145,7 +330,7 @@ func (pds *ProtocolDataService) ListStakerShares(staker string, blockHeight uint sdc.operator, sdc.delegated, row_number() over (partition by sdc.staker order by sdc.block_number desc, sdc.log_index) as rn - from sidecar_mainnet_ethereum.staker_delegation_changes as sdc + from staker_delegation_changes as sdc where sdc.staker = dss.staker and sdc.block_number <= dss.block_number @@ -154,7 +339,7 @@ func (pds *ProtocolDataService) ListStakerShares(staker string, blockHeight uint left join lateral ( select jsonb_agg(distinct aosc.avs) as avs_list - from sidecar_mainnet_ethereum.avs_operator_state_changes aosc + from avs_operator_state_changes aosc where aosc.operator = dsc.operator and aosc.block_number <= dss.block_number From 96d371794a7f98cc10c8ce3e31eb779b7134dfba Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Wed, 22 Jan 2025 11:00:46 -0600 Subject: [PATCH 05/11] feat: GetRegisteredAvsForOperator rpc --- pkg/rpcServer/protocolHandlers.go | 17 +++++- pkg/service/protocolDataService/protocol.go | 64 +++++++++++++++++++-- 2 files changed, 75 insertions(+), 6 deletions(-) diff --git a/pkg/rpcServer/protocolHandlers.go b/pkg/rpcServer/protocolHandlers.go index d212fa32..add56d60 100644 --- a/pkg/rpcServer/protocolHandlers.go +++ b/pkg/rpcServer/protocolHandlers.go @@ -26,8 +26,21 @@ func (rpc *RpcServer) GetRegisteredAvsForOperator(ctx context.Context, request * } func (rpc *RpcServer) GetDelegatedStrategiesForOperator(ctx context.Context, request *protocolV1.GetDelegatedStrategiesForOperatorRequest) (*protocolV1.GetDelegatedStrategiesForOperatorResponse, error) { - //TODO implement me - panic("implement me") + operator := request.GetOperatorAddress() + blockHeight := request.GetBlockHeight() + + if operator == "" { + return nil, errors.New("operator address is required") + } + + delegatedStrategies, err := rpc.protocolDataService.ListDelegatedStrategiesForOperator(operator, blockHeight) + if err != nil { + return nil, err + } + + return &protocolV1.GetDelegatedStrategiesForOperatorResponse{ + StrategyAddresses: delegatedStrategies, + }, nil } func (rpc *RpcServer) GetOperatorDelegatedStakeForStrategy(ctx context.Context, request *protocolV1.GetOperatorDelegatedStakeForStrategyRequest) (*protocolV1.GetOperatorDelegatedStakeForStrategyResponse, error) { diff --git a/pkg/service/protocolDataService/protocol.go b/pkg/service/protocolDataService/protocol.go index 47a1a344..a4db544b 100644 --- a/pkg/service/protocolDataService/protocol.go +++ b/pkg/service/protocolDataService/protocol.go @@ -80,8 +80,64 @@ func (pds *ProtocolDataService) ListRegisteredAVSsForOperator(operator string, b return avsAddresses, nil } -func (pds *ProtocolDataService) ListDelegatedStrategiesForOperator(operator string, blockHeight uint64) (interface{}, error) { - return nil, nil +func (pds *ProtocolDataService) ListDelegatedStrategiesForOperator(operator string, blockHeight uint64) ([]string, error) { + operator = strings.ToLower(operator) + blockHeight, err := pds.getCurrentBlockHeightIfNotPresent(blockHeight) + if err != nil { + return nil, err + } + + query := ` + with operator_stakers as ( + select distinct on (staker) + staker, + block_number, + delegated + from staker_delegation_changes + where + operator = @operator + and block_number <= @blockHeight + order by staker, block_number desc, log_index asc + ), + delegated_stakers as ( + select + staker, + block_number + from operator_stakers + where delegated = true + ), + staker_strategies as ( + select + s.strategy, + s.shares + from delegated_stakers as ds + left join staker_share_deltas as s + on s.staker = ds.staker + and s.block_number <= ds.block_number + ), + strategy_shares as ( + select + ss.strategy, + sum(ss.shares) as shares + from staker_strategies as ss + group by 1 + ) + select + strategy + from strategy_shares + where shares > 0; + ` + + var strategies []string + res := pds.db.Raw(query, + sql.Named("operator", operator), + sql.Named("blockHeight", blockHeight), + ).Scan(&strategies) + + if res.Error != nil { + return nil, res.Error + } + return strategies, nil } // getTotalDelegatedOperatorSharesForStrategy returns the total shares delegated to an operator for a given strategy at a given block height. @@ -98,7 +154,7 @@ func (pds *ProtocolDataService) getTotalDelegatedOperatorSharesForStrategy(opera from staker_delegation_changes where operator = @operator - and block_number <= @blockNumber + and block_number <= @blockHeight order by block_number desc, log_index desc ), distinct_delegated_stakers as ( @@ -148,7 +204,7 @@ func (pds *ProtocolDataService) getTotalDelegatedOperatorSharesForStrategy(opera res := pds.db.Raw(query, sql.Named("operator", strings.ToLower(operator)), sql.Named("strategy", strings.ToLower(strategy)), - sql.Named("blockNumber", blockHeight), + sql.Named("blockHeight", blockHeight), ).Scan(&results) if res.Error != nil { From 349415f7afdd125db605fc0ac2fbec0cf641b181 Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Wed, 22 Jan 2025 12:43:34 -0600 Subject: [PATCH 06/11] feat: GetRewardsForSnapshot rpc --- cmd/debugger/main.go | 4 +- cmd/run.go | 4 +- pkg/rewards/rewards.go | 12 ++---- pkg/rewards/rewardsTypes/types.go | 8 ++++ pkg/rpcServer/protocolHandlers.go | 10 ++--- pkg/rpcServer/rewardsHandlers.go | 24 ++++++++++- pkg/rpcServer/server.go | 4 ++ pkg/service/baseDataService/base.go | 22 ++++++++++ pkg/service/protocolDataService/protocol.go | 45 +++++++++------------ pkg/service/rewardsDataService/rewards.go | 40 ++++++++++++++++++ 10 files changed, 130 insertions(+), 43 deletions(-) create mode 100644 pkg/rewards/rewardsTypes/types.go create mode 100644 pkg/service/baseDataService/base.go create mode 100644 pkg/service/rewardsDataService/rewards.go diff --git a/cmd/debugger/main.go b/cmd/debugger/main.go index 8e4ec0ec..079f3d2f 100644 --- a/cmd/debugger/main.go +++ b/cmd/debugger/main.go @@ -20,6 +20,7 @@ import ( "github.com/Layr-Labs/sidecar/pkg/rewardsCalculatorQueue" "github.com/Layr-Labs/sidecar/pkg/rpcServer" "github.com/Layr-Labs/sidecar/pkg/service/protocolDataService" + "github.com/Layr-Labs/sidecar/pkg/service/rewardsDataService" "github.com/Layr-Labs/sidecar/pkg/sidecar" pgStorage "github.com/Layr-Labs/sidecar/pkg/storage/postgres" "log" @@ -106,6 +107,7 @@ func main() { p := pipeline.NewPipeline(fetchr, idxr, mds, sm, msm, rc, rcq, cfg, sdc, eb, l) rps := proofs.NewRewardsProofsStore(rc, l) pds := protocolDataService.NewProtocolDataService(grm, l, cfg) + rds := rewardsDataService.NewRewardsDataService(grm, l, cfg, rc) // Create new sidecar instance // Create new sidecar instance @@ -116,7 +118,7 @@ func main() { rpc := rpcServer.NewRpcServer(&rpcServer.RpcServerConfig{ GrpcPort: cfg.RpcConfig.GrpcPort, HttpPort: cfg.RpcConfig.HttpPort, - }, mds, sm, rc, rcq, eb, rps, pds, l) + }, mds, sm, rc, rcq, eb, rps, pds, rds, l) // RPC channel to notify the RPC server to shutdown gracefully rpcChannel := make(chan bool) diff --git a/cmd/run.go b/cmd/run.go index 5b458b26..8061e81f 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -22,6 +22,7 @@ import ( "github.com/Layr-Labs/sidecar/pkg/rewardsCalculatorQueue" "github.com/Layr-Labs/sidecar/pkg/rpcServer" "github.com/Layr-Labs/sidecar/pkg/service/protocolDataService" + "github.com/Layr-Labs/sidecar/pkg/service/rewardsDataService" "github.com/Layr-Labs/sidecar/pkg/shutdown" "github.com/Layr-Labs/sidecar/pkg/sidecar" pgStorage "github.com/Layr-Labs/sidecar/pkg/storage/postgres" @@ -123,6 +124,7 @@ var runCmd = &cobra.Command{ rps := proofs.NewRewardsProofsStore(rc, l) pds := protocolDataService.NewProtocolDataService(grm, l, cfg) + rds := rewardsDataService.NewRewardsDataService(grm, l, cfg, rc) go rcq.Process() @@ -136,7 +138,7 @@ var runCmd = &cobra.Command{ rpc := rpcServer.NewRpcServer(&rpcServer.RpcServerConfig{ GrpcPort: cfg.RpcConfig.GrpcPort, HttpPort: cfg.RpcConfig.HttpPort, - }, mds, sm, rc, rcq, eb, rps, pds, l) + }, mds, sm, rc, rcq, eb, rps, pds, rds, l) // RPC channel to notify the RPC server to shutdown gracefully rpcChannel := make(chan bool) diff --git a/pkg/rewards/rewards.go b/pkg/rewards/rewards.go index 9caf183b..4fdb58b2 100644 --- a/pkg/rewards/rewards.go +++ b/pkg/rewards/rewards.go @@ -4,6 +4,7 @@ import ( "database/sql" "errors" "fmt" + "github.com/Layr-Labs/sidecar/pkg/rewards/rewardsTypes" "time" "sync/atomic" @@ -482,15 +483,8 @@ func (rc *RewardsCalculator) DeleteCorruptedRewardsFromBlockHeight(blockHeight u return nil } -type Reward struct { - Earner string - Token string - Snapshot string - CumulativeAmount string -} - -func (rc *RewardsCalculator) FetchRewardsForSnapshot(snapshotDate string) ([]*Reward, error) { - var goldRows []*Reward +func (rc *RewardsCalculator) FetchRewardsForSnapshot(snapshotDate string) ([]*rewardsTypes.Reward, error) { + var goldRows []*rewardsTypes.Reward query, err := rewardsUtils.RenderQueryTemplate(` select earner, diff --git a/pkg/rewards/rewardsTypes/types.go b/pkg/rewards/rewardsTypes/types.go new file mode 100644 index 00000000..83c3ac47 --- /dev/null +++ b/pkg/rewards/rewardsTypes/types.go @@ -0,0 +1,8 @@ +package rewardsTypes + +type Reward struct { + Earner string + Token string + Snapshot string + CumulativeAmount string +} diff --git a/pkg/rpcServer/protocolHandlers.go b/pkg/rpcServer/protocolHandlers.go index add56d60..6b6197ff 100644 --- a/pkg/rpcServer/protocolHandlers.go +++ b/pkg/rpcServer/protocolHandlers.go @@ -15,7 +15,7 @@ func (rpc *RpcServer) GetRegisteredAvsForOperator(ctx context.Context, request * } blockHeight := request.GetBlockHeight() - registeredAvs, err := rpc.protocolDataService.ListRegisteredAVSsForOperator(operator, blockHeight) + registeredAvs, err := rpc.protocolDataService.ListRegisteredAVSsForOperator(ctx, operator, blockHeight) if err != nil { return nil, err } @@ -33,7 +33,7 @@ func (rpc *RpcServer) GetDelegatedStrategiesForOperator(ctx context.Context, req return nil, errors.New("operator address is required") } - delegatedStrategies, err := rpc.protocolDataService.ListDelegatedStrategiesForOperator(operator, blockHeight) + delegatedStrategies, err := rpc.protocolDataService.ListDelegatedStrategiesForOperator(ctx, operator, blockHeight) if err != nil { return nil, err } @@ -56,7 +56,7 @@ func (rpc *RpcServer) GetOperatorDelegatedStakeForStrategy(ctx context.Context, return nil, errors.New("strategy address is required") } - delegatedStake, err := rpc.protocolDataService.GetOperatorDelegatedStake(operator, strategy, blockHeight) + delegatedStake, err := rpc.protocolDataService.GetOperatorDelegatedStake(ctx, operator, strategy, blockHeight) if err != nil { return nil, err @@ -79,7 +79,7 @@ func (rpc *RpcServer) GetDelegatedStakersForOperator(ctx context.Context, reques pagination.Load(p.GetPageNumber(), p.GetPageSize()) } - delegatedStakers, err := rpc.protocolDataService.ListDelegatedStakersForOperator(operator, request.GetBlockHeight(), pagination) + delegatedStakers, err := rpc.protocolDataService.ListDelegatedStakersForOperator(ctx, operator, request.GetBlockHeight(), pagination) if err != nil { return nil, err } @@ -99,7 +99,7 @@ func (rpc *RpcServer) GetDelegatedStakersForOperator(ctx context.Context, reques } func (rpc *RpcServer) GetStakerShares(ctx context.Context, request *protocolV1.GetStakerSharesRequest) (*protocolV1.GetStakerSharesResponse, error) { - shares, err := rpc.protocolDataService.ListStakerShares(request.GetStakerAddress(), request.GetBlockHeight()) + shares, err := rpc.protocolDataService.ListStakerShares(ctx, request.GetStakerAddress(), request.GetBlockHeight()) if err != nil { return nil, err } diff --git a/pkg/rpcServer/rewardsHandlers.go b/pkg/rpcServer/rewardsHandlers.go index 7d55797f..d04cfae1 100644 --- a/pkg/rpcServer/rewardsHandlers.go +++ b/pkg/rpcServer/rewardsHandlers.go @@ -170,7 +170,29 @@ func (rpc *RpcServer) BackfillStakerOperators(ctx context.Context, req *rewardsV } func (rpc *RpcServer) GetRewardsForSnapshot(ctx context.Context, req *rewardsV1.GetRewardsForSnapshotRequest) (*rewardsV1.GetRewardsForSnapshotResponse, error) { - return nil, status.Error(codes.Unimplemented, "method GetRewardsForSnapshot not implemented") + snapshot := req.GetSnapshot() + if snapshot == "" { + return nil, status.Error(codes.InvalidArgument, "snapshot is required") + } + + snapshotRewards, err := rpc.rewardsDataService.GetRewardsForSnapshot(ctx, snapshot) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + rewardsRes := make([]*rewardsV1.Reward, 0, len(snapshotRewards)) + + for _, reward := range snapshotRewards { + rewardsRes = append(rewardsRes, &rewardsV1.Reward{ + Earner: reward.Earner, + Amount: reward.CumulativeAmount, + Snapshot: reward.Snapshot, + Token: reward.Token, + }) + } + + return &rewardsV1.GetRewardsForSnapshotResponse{ + Rewards: rewardsRes, + }, nil } func (rpc *RpcServer) GetAttributableRewardsForSnapshot(ctx context.Context, req *rewardsV1.GetAttributableRewardsForSnapshotRequest) (*rewardsV1.GetAttributableRewardsForSnapshotResponse, error) { diff --git a/pkg/rpcServer/server.go b/pkg/rpcServer/server.go index 97a70967..dd1dc811 100644 --- a/pkg/rpcServer/server.go +++ b/pkg/rpcServer/server.go @@ -16,6 +16,7 @@ import ( "github.com/Layr-Labs/sidecar/pkg/rewards" "github.com/Layr-Labs/sidecar/pkg/rewardsCalculatorQueue" "github.com/Layr-Labs/sidecar/pkg/service/protocolDataService" + "github.com/Layr-Labs/sidecar/pkg/service/rewardsDataService" "github.com/Layr-Labs/sidecar/pkg/storage" grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" @@ -44,6 +45,7 @@ type RpcServer struct { eventBus eventBusTypes.IEventBus rewardsProofs *proofs.RewardsProofsStore protocolDataService *protocolDataService.ProtocolDataService + rewardsDataService *rewardsDataService.RewardsDataService } func NewRpcServer( @@ -55,6 +57,7 @@ func NewRpcServer( eb eventBusTypes.IEventBus, rp *proofs.RewardsProofsStore, pds *protocolDataService.ProtocolDataService, + rds *rewardsDataService.RewardsDataService, l *zap.Logger, ) *RpcServer { server := &RpcServer{ @@ -66,6 +69,7 @@ func NewRpcServer( eventBus: eb, rewardsProofs: rp, protocolDataService: pds, + rewardsDataService: rds, Logger: l, } diff --git a/pkg/service/baseDataService/base.go b/pkg/service/baseDataService/base.go new file mode 100644 index 00000000..90e843e7 --- /dev/null +++ b/pkg/service/baseDataService/base.go @@ -0,0 +1,22 @@ +package baseDataService + +import ( + "github.com/Layr-Labs/sidecar/pkg/storage" + "gorm.io/gorm" +) + +type BaseDataService struct { + DB *gorm.DB +} + +func (b *BaseDataService) GetCurrentBlockHeightIfNotPresent(blockHeight uint64) (uint64, error) { + if blockHeight == 0 { + var currentBlock *storage.Block + res := b.DB.Model(&storage.Block{}).Order("number desc").First(¤tBlock) + if res.Error != nil { + return 0, res.Error + } + blockHeight = currentBlock.Number + } + return blockHeight, nil +} diff --git a/pkg/service/protocolDataService/protocol.go b/pkg/service/protocolDataService/protocol.go index a4db544b..2f5132b7 100644 --- a/pkg/service/protocolDataService/protocol.go +++ b/pkg/service/protocolDataService/protocol.go @@ -1,10 +1,11 @@ package protocolDataService import ( + "context" "database/sql" "github.com/Layr-Labs/sidecar/internal/config" + "github.com/Layr-Labs/sidecar/pkg/service/baseDataService" "github.com/Layr-Labs/sidecar/pkg/service/types" - "github.com/Layr-Labs/sidecar/pkg/storage" "go.uber.org/zap" "gorm.io/gorm" "strings" @@ -12,6 +13,7 @@ import ( ) type ProtocolDataService struct { + baseDataService.BaseDataService db *gorm.DB logger *zap.Logger globalConfig *config.Config @@ -23,28 +25,19 @@ func NewProtocolDataService( globalConfig *config.Config, ) *ProtocolDataService { return &ProtocolDataService{ + BaseDataService: baseDataService.BaseDataService{ + DB: db, + }, db: db, logger: logger, globalConfig: globalConfig, } } -func (pds *ProtocolDataService) getCurrentBlockHeightIfNotPresent(blockHeight uint64) (uint64, error) { - if blockHeight == 0 { - var currentBlock *storage.Block - res := pds.db.Model(&storage.Block{}).Order("number desc").First(¤tBlock) - if res.Error != nil { - return 0, res.Error - } - blockHeight = currentBlock.Number - } - return blockHeight, nil -} - -func (pds *ProtocolDataService) ListRegisteredAVSsForOperator(operator string, blockHeight uint64) ([]string, error) { +func (pds *ProtocolDataService) ListRegisteredAVSsForOperator(ctx context.Context, operator string, blockHeight uint64) ([]string, error) { operator = strings.ToLower(operator) - blockHeight, err := pds.getCurrentBlockHeightIfNotPresent(blockHeight) + blockHeight, err := pds.BaseDataService.GetCurrentBlockHeightIfNotPresent(blockHeight) if err != nil { return nil, err } @@ -80,9 +73,9 @@ func (pds *ProtocolDataService) ListRegisteredAVSsForOperator(operator string, b return avsAddresses, nil } -func (pds *ProtocolDataService) ListDelegatedStrategiesForOperator(operator string, blockHeight uint64) ([]string, error) { +func (pds *ProtocolDataService) ListDelegatedStrategiesForOperator(ctx context.Context, operator string, blockHeight uint64) ([]string, error) { operator = strings.ToLower(operator) - blockHeight, err := pds.getCurrentBlockHeightIfNotPresent(blockHeight) + blockHeight, err := pds.BaseDataService.GetCurrentBlockHeightIfNotPresent(blockHeight) if err != nil { return nil, err } @@ -141,7 +134,7 @@ func (pds *ProtocolDataService) ListDelegatedStrategiesForOperator(operator stri } // getTotalDelegatedOperatorSharesForStrategy returns the total shares delegated to an operator for a given strategy at a given block height. -func (pds *ProtocolDataService) getTotalDelegatedOperatorSharesForStrategy(operator string, strategy string, blockHeight uint64) (string, error) { +func (pds *ProtocolDataService) getTotalDelegatedOperatorSharesForStrategy(ctx context.Context, operator string, strategy string, blockHeight uint64) (string, error) { query := ` with operator_stakers as ( select @@ -223,8 +216,8 @@ type ResultCollector[T any] struct { Error error } -func (pds *ProtocolDataService) GetOperatorDelegatedStake(operator string, strategy string, blockHeight uint64) (*OperatorDelegatedStake, error) { - blockHeight, err := pds.getCurrentBlockHeightIfNotPresent(blockHeight) +func (pds *ProtocolDataService) GetOperatorDelegatedStake(ctx context.Context, operator string, strategy string, blockHeight uint64) (*OperatorDelegatedStake, error) { + blockHeight, err := pds.BaseDataService.GetCurrentBlockHeightIfNotPresent(blockHeight) if err != nil { return nil, err } @@ -239,7 +232,7 @@ func (pds *ProtocolDataService) GetOperatorDelegatedStake(operator string, strat defer wg.Done() result := &ResultCollector[string]{} - shares, err := pds.getTotalDelegatedOperatorSharesForStrategy(operator, strategy, blockHeight) + shares, err := pds.getTotalDelegatedOperatorSharesForStrategy(ctx, operator, strategy, blockHeight) if err != nil { result.Error = err } else { @@ -252,7 +245,7 @@ func (pds *ProtocolDataService) GetOperatorDelegatedStake(operator string, strat defer wg.Done() result := &ResultCollector[[]string]{} - avsAddresses, err := pds.ListRegisteredAVSsForOperator(operator, blockHeight) + avsAddresses, err := pds.ListRegisteredAVSsForOperator(ctx, operator, blockHeight) if err != nil { result.Error = err } else { @@ -292,8 +285,8 @@ func (pds *ProtocolDataService) GetOperatorDelegatedStake(operator string, strat }, nil } -func (pds *ProtocolDataService) ListDelegatedStakersForOperator(operator string, blockHeight uint64, pagination *types.Pagination) ([]string, error) { - bh, err := pds.getCurrentBlockHeightIfNotPresent(blockHeight) +func (pds *ProtocolDataService) ListDelegatedStakersForOperator(ctx context.Context, operator string, blockHeight uint64, pagination *types.Pagination) ([]string, error) { + bh, err := pds.BaseDataService.GetCurrentBlockHeightIfNotPresent(blockHeight) if err != nil { return nil, err } @@ -352,10 +345,10 @@ type StakerShares struct { // and the addresses of the AVSs the operator was registered to. // // If not blockHeight is provided, the most recently indexed block will be used. -func (pds *ProtocolDataService) ListStakerShares(staker string, blockHeight uint64) ([]*StakerShares, error) { +func (pds *ProtocolDataService) ListStakerShares(ctx context.Context, staker string, blockHeight uint64) ([]*StakerShares, error) { shares := make([]*StakerShares, 0) - bh, err := pds.getCurrentBlockHeightIfNotPresent(blockHeight) + bh, err := pds.BaseDataService.GetCurrentBlockHeightIfNotPresent(blockHeight) if err != nil { return nil, err } diff --git a/pkg/service/rewardsDataService/rewards.go b/pkg/service/rewardsDataService/rewards.go new file mode 100644 index 00000000..c0bd1971 --- /dev/null +++ b/pkg/service/rewardsDataService/rewards.go @@ -0,0 +1,40 @@ +package rewardsDataService + +import ( + "context" + "github.com/Layr-Labs/sidecar/internal/config" + "github.com/Layr-Labs/sidecar/pkg/rewards" + "github.com/Layr-Labs/sidecar/pkg/rewards/rewardsTypes" + "github.com/Layr-Labs/sidecar/pkg/service/baseDataService" + "go.uber.org/zap" + "gorm.io/gorm" +) + +type RewardsDataService struct { + baseDataService.BaseDataService + db *gorm.DB + logger *zap.Logger + globalConfig *config.Config + rewardsCalculator *rewards.RewardsCalculator +} + +func NewRewardsDataService( + db *gorm.DB, + logger *zap.Logger, + globalConfig *config.Config, + rc *rewards.RewardsCalculator, +) *RewardsDataService { + return &RewardsDataService{ + BaseDataService: baseDataService.BaseDataService{ + DB: db, + }, + db: db, + logger: logger, + globalConfig: globalConfig, + rewardsCalculator: rc, + } +} + +func (rds *RewardsDataService) GetRewardsForSnapshot(ctx context.Context, snapshot string) ([]*rewardsTypes.Reward, error) { + return rds.rewardsCalculator.FetchRewardsForSnapshot(snapshot) +} From 1652f6c1df048e9c26e65703314e5e7634df100f Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Wed, 22 Jan 2025 12:56:24 -0600 Subject: [PATCH 07/11] feat: GetTotalClaimedRewards rpc --- go.mod | 2 +- go.sum | 2 ++ pkg/rpcServer/rewardsHandlers.go | 23 ++++++++++++- pkg/service/baseDataService/base.go | 3 +- pkg/service/protocolDataService/protocol.go | 10 +++--- pkg/service/rewardsDataService/rewards.go | 37 +++++++++++++++++++++ 6 files changed, 69 insertions(+), 8 deletions(-) diff --git a/go.mod b/go.mod index d1121c58..7b447d82 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/DataDog/datadog-go/v5 v5.5.0 github.com/Layr-Labs/eigenlayer-contracts v0.4.1-holesky-pepe.0.20240813143901-00fc4b95e9c1 github.com/Layr-Labs/eigenlayer-rewards-proofs v0.2.13 - github.com/Layr-Labs/protocol-apis v1.1.1-0.20250121193118-8112817d1079 + github.com/Layr-Labs/protocol-apis v1.1.1-0.20250122185055-8ce2cc7afa86 github.com/ethereum/go-ethereum v1.14.9 github.com/gocarina/gocsv v0.0.0-20240520201108-78e41c74b4b1 github.com/google/uuid v1.6.0 diff --git a/go.sum b/go.sum index 3e9c0f1a..ffa7483c 100644 --- a/go.sum +++ b/go.sum @@ -61,6 +61,8 @@ github.com/Layr-Labs/protocol-apis v1.1.1-0.20250116155715-919a0a9a27e5 h1:m/I5h github.com/Layr-Labs/protocol-apis v1.1.1-0.20250116155715-919a0a9a27e5/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk= github.com/Layr-Labs/protocol-apis v1.1.1-0.20250121193118-8112817d1079 h1:B1b9ghilo70y4MJ7ZkF5w8BuPtslAUP2oE6vfzG6DBA= github.com/Layr-Labs/protocol-apis v1.1.1-0.20250121193118-8112817d1079/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk= +github.com/Layr-Labs/protocol-apis v1.1.1-0.20250122185055-8ce2cc7afa86 h1:XfVh8deohE3LYXAjYOAjzD4wRPNgbIP3OjV6Syy0FIw= +github.com/Layr-Labs/protocol-apis v1.1.1-0.20250122185055-8ce2cc7afa86/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk= github.com/Microsoft/go-winio v0.5.0/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= diff --git a/pkg/rpcServer/rewardsHandlers.go b/pkg/rpcServer/rewardsHandlers.go index d04cfae1..cee4fb3b 100644 --- a/pkg/rpcServer/rewardsHandlers.go +++ b/pkg/rpcServer/rewardsHandlers.go @@ -6,6 +6,7 @@ import ( rewardsV1 "github.com/Layr-Labs/protocol-apis/gen/protos/eigenlayer/sidecar/v1/rewards" "github.com/Layr-Labs/sidecar/pkg/rewards" "github.com/Layr-Labs/sidecar/pkg/rewardsCalculatorQueue" + "github.com/Layr-Labs/sidecar/pkg/service/rewardsDataService" "github.com/Layr-Labs/sidecar/pkg/utils" "go.uber.org/zap" "google.golang.org/grpc/codes" @@ -207,8 +208,28 @@ func (rpc *RpcServer) GetAvailableRewards(ctx context.Context, req *rewardsV1.Ge return nil, status.Error(codes.Unimplemented, "method GetAvailableRewards not implemented") } +// GetTotalClaimedRewards returns the total claimed rewards for an earner up to, and including, the provided block height. func (rpc *RpcServer) GetTotalClaimedRewards(ctx context.Context, req *rewardsV1.GetTotalClaimedRewardsRequest) (*rewardsV1.GetTotalClaimedRewardsResponse, error) { - return nil, status.Error(codes.Unimplemented, "method GetTotalClaimedRewards not implemented") + earner := req.GetEarnerAddress() + blockHeight := req.GetBlockHeight() + + if earner == "" { + return nil, status.Error(codes.InvalidArgument, "earner address is required") + } + + totalClaimed, err := rpc.rewardsDataService.GetTotalClaimedRewards(ctx, earner, blockHeight) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + return &rewardsV1.GetTotalClaimedRewardsResponse{ + Rewards: utils.Map(totalClaimed, func(r *rewardsDataService.TotalClaimedReward, i uint64) *rewardsV1.TotalClaimedReward { + return &rewardsV1.TotalClaimedReward{ + Earner: r.Earner, + Token: r.Token, + Amount: r.Amount, + } + }), + }, nil } func (rpc *RpcServer) GetAvailableRewardsTokens(ctx context.Context, req *rewardsV1.GetAvailableRewardsTokensRequest) (*rewardsV1.GetAvailableRewardsTokensResponse, error) { diff --git a/pkg/service/baseDataService/base.go b/pkg/service/baseDataService/base.go index 90e843e7..df3c4e5c 100644 --- a/pkg/service/baseDataService/base.go +++ b/pkg/service/baseDataService/base.go @@ -1,6 +1,7 @@ package baseDataService import ( + "context" "github.com/Layr-Labs/sidecar/pkg/storage" "gorm.io/gorm" ) @@ -9,7 +10,7 @@ type BaseDataService struct { DB *gorm.DB } -func (b *BaseDataService) GetCurrentBlockHeightIfNotPresent(blockHeight uint64) (uint64, error) { +func (b *BaseDataService) GetCurrentBlockHeightIfNotPresent(ctx context.Context, blockHeight uint64) (uint64, error) { if blockHeight == 0 { var currentBlock *storage.Block res := b.DB.Model(&storage.Block{}).Order("number desc").First(¤tBlock) diff --git a/pkg/service/protocolDataService/protocol.go b/pkg/service/protocolDataService/protocol.go index 2f5132b7..81735d01 100644 --- a/pkg/service/protocolDataService/protocol.go +++ b/pkg/service/protocolDataService/protocol.go @@ -37,7 +37,7 @@ func NewProtocolDataService( func (pds *ProtocolDataService) ListRegisteredAVSsForOperator(ctx context.Context, operator string, blockHeight uint64) ([]string, error) { operator = strings.ToLower(operator) - blockHeight, err := pds.BaseDataService.GetCurrentBlockHeightIfNotPresent(blockHeight) + blockHeight, err := pds.BaseDataService.GetCurrentBlockHeightIfNotPresent(ctx, blockHeight) if err != nil { return nil, err } @@ -75,7 +75,7 @@ func (pds *ProtocolDataService) ListRegisteredAVSsForOperator(ctx context.Contex func (pds *ProtocolDataService) ListDelegatedStrategiesForOperator(ctx context.Context, operator string, blockHeight uint64) ([]string, error) { operator = strings.ToLower(operator) - blockHeight, err := pds.BaseDataService.GetCurrentBlockHeightIfNotPresent(blockHeight) + blockHeight, err := pds.BaseDataService.GetCurrentBlockHeightIfNotPresent(ctx, blockHeight) if err != nil { return nil, err } @@ -217,7 +217,7 @@ type ResultCollector[T any] struct { } func (pds *ProtocolDataService) GetOperatorDelegatedStake(ctx context.Context, operator string, strategy string, blockHeight uint64) (*OperatorDelegatedStake, error) { - blockHeight, err := pds.BaseDataService.GetCurrentBlockHeightIfNotPresent(blockHeight) + blockHeight, err := pds.BaseDataService.GetCurrentBlockHeightIfNotPresent(ctx, blockHeight) if err != nil { return nil, err } @@ -286,7 +286,7 @@ func (pds *ProtocolDataService) GetOperatorDelegatedStake(ctx context.Context, o } func (pds *ProtocolDataService) ListDelegatedStakersForOperator(ctx context.Context, operator string, blockHeight uint64, pagination *types.Pagination) ([]string, error) { - bh, err := pds.BaseDataService.GetCurrentBlockHeightIfNotPresent(blockHeight) + bh, err := pds.BaseDataService.GetCurrentBlockHeightIfNotPresent(ctx, blockHeight) if err != nil { return nil, err } @@ -348,7 +348,7 @@ type StakerShares struct { func (pds *ProtocolDataService) ListStakerShares(ctx context.Context, staker string, blockHeight uint64) ([]*StakerShares, error) { shares := make([]*StakerShares, 0) - bh, err := pds.BaseDataService.GetCurrentBlockHeightIfNotPresent(blockHeight) + bh, err := pds.BaseDataService.GetCurrentBlockHeightIfNotPresent(ctx, blockHeight) if err != nil { return nil, err } diff --git a/pkg/service/rewardsDataService/rewards.go b/pkg/service/rewardsDataService/rewards.go index c0bd1971..18e2ce49 100644 --- a/pkg/service/rewardsDataService/rewards.go +++ b/pkg/service/rewardsDataService/rewards.go @@ -2,6 +2,7 @@ package rewardsDataService import ( "context" + "database/sql" "github.com/Layr-Labs/sidecar/internal/config" "github.com/Layr-Labs/sidecar/pkg/rewards" "github.com/Layr-Labs/sidecar/pkg/rewards/rewardsTypes" @@ -38,3 +39,39 @@ func NewRewardsDataService( func (rds *RewardsDataService) GetRewardsForSnapshot(ctx context.Context, snapshot string) ([]*rewardsTypes.Reward, error) { return rds.rewardsCalculator.FetchRewardsForSnapshot(snapshot) } + +type TotalClaimedReward struct { + Earner string + Token string + Amount string +} + +func (rds *RewardsDataService) GetTotalClaimedRewards(ctx context.Context, earner string, blockHeight uint64) ([]*TotalClaimedReward, error) { + blockHeight, err := rds.BaseDataService.GetCurrentBlockHeightIfNotPresent(ctx, blockHeight) + if err != nil { + return nil, err + } + + query := ` + select + earner, + token, + sum(claimed_amount) as amount + from erwards_claimed as rc + where + earner = @earner + and block_number <= @blockHeight + group by 1, 2 + ` + + claimedAmounts := make([]*TotalClaimedReward, 0) + res := rds.db.Raw(query, + sql.Named("earner", earner), + sql.Named("block_height", blockHeight), + ).Scan(&claimedAmounts) + + if res.Error != nil { + return nil, res.Error + } + return claimedAmounts, nil +} From a19c591a6e0c76f8cf214db7c0c20598de53c088 Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Wed, 22 Jan 2025 13:15:26 -0600 Subject: [PATCH 08/11] feat: ListClaimedRewardsByBlockRange and GetClaimedRewardsByBlock rpcs --- go.mod | 2 +- go.sum | 2 + pkg/rpcServer/rewardsHandlers.go | 47 +++++++++++++++++++- pkg/service/rewardsDataService/rewards.go | 53 +++++++++++++++++++++++ 4 files changed, 102 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index 7b447d82..631d9f10 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/DataDog/datadog-go/v5 v5.5.0 github.com/Layr-Labs/eigenlayer-contracts v0.4.1-holesky-pepe.0.20240813143901-00fc4b95e9c1 github.com/Layr-Labs/eigenlayer-rewards-proofs v0.2.13 - github.com/Layr-Labs/protocol-apis v1.1.1-0.20250122185055-8ce2cc7afa86 + github.com/Layr-Labs/protocol-apis v1.1.1-0.20250122190013-01d27f288892 github.com/ethereum/go-ethereum v1.14.9 github.com/gocarina/gocsv v0.0.0-20240520201108-78e41c74b4b1 github.com/google/uuid v1.6.0 diff --git a/go.sum b/go.sum index ffa7483c..efdbb302 100644 --- a/go.sum +++ b/go.sum @@ -63,6 +63,8 @@ github.com/Layr-Labs/protocol-apis v1.1.1-0.20250121193118-8112817d1079 h1:B1b9g github.com/Layr-Labs/protocol-apis v1.1.1-0.20250121193118-8112817d1079/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk= github.com/Layr-Labs/protocol-apis v1.1.1-0.20250122185055-8ce2cc7afa86 h1:XfVh8deohE3LYXAjYOAjzD4wRPNgbIP3OjV6Syy0FIw= github.com/Layr-Labs/protocol-apis v1.1.1-0.20250122185055-8ce2cc7afa86/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk= +github.com/Layr-Labs/protocol-apis v1.1.1-0.20250122190013-01d27f288892 h1:OJs2VEsSVhdrPcS+CcX6sWr2UOIYhFY473y0+QDaq/4= +github.com/Layr-Labs/protocol-apis v1.1.1-0.20250122190013-01d27f288892/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk= github.com/Microsoft/go-winio v0.5.0/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= diff --git a/pkg/rpcServer/rewardsHandlers.go b/pkg/rpcServer/rewardsHandlers.go index cee4fb3b..309f3071 100644 --- a/pkg/rpcServer/rewardsHandlers.go +++ b/pkg/rpcServer/rewardsHandlers.go @@ -4,6 +4,7 @@ import ( "context" "errors" rewardsV1 "github.com/Layr-Labs/protocol-apis/gen/protos/eigenlayer/sidecar/v1/rewards" + "github.com/Layr-Labs/sidecar/pkg/metaState/types" "github.com/Layr-Labs/sidecar/pkg/rewards" "github.com/Layr-Labs/sidecar/pkg/rewardsCalculatorQueue" "github.com/Layr-Labs/sidecar/pkg/service/rewardsDataService" @@ -240,8 +241,52 @@ func (rpc *RpcServer) GetSummarizedRewardsForEarner(ctx context.Context, req *re return nil, status.Error(codes.Unimplemented, "method GetSummarizedRewardsForEarner not implemented") } +// GetClaimedRewardsByBlock returns the claimed rewards for an earner for a specific block. func (rpc *RpcServer) GetClaimedRewardsByBlock(ctx context.Context, req *rewardsV1.GetClaimedRewardsByBlockRequest) (*rewardsV1.GetClaimedRewardsByBlockResponse, error) { - return nil, status.Error(codes.Unimplemented, "method GetClaimedRewardsByBlock not implemented") + blockHeight := req.GetBlockHeight() + + claims, err := rpc.rewardsDataService.ListClaimedRewardsByBlockRange(ctx, "", blockHeight, blockHeight) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + return &rewardsV1.GetClaimedRewardsByBlockResponse{ + Rewards: utils.Map(claims, func(c *types.RewardsClaimed, i uint64) *rewardsV1.ClaimedReward { + return &rewardsV1.ClaimedReward{ + Earner: c.Earner, + Claimer: c.Claimer, + Token: c.Token, + BlockNumber: c.BlockNumber, + } + }), + }, nil +} + +// ListClaimedRewardsByBlockRange returns the claimed rewards for each block in the given range (inclusive of start and end block heights). +func (rpc *RpcServer) ListClaimedRewardsByBlockRange(ctx context.Context, req *rewardsV1.ListClaimedRewardsByBlockRangeRequest) (*rewardsV1.ListClaimedRewardsByBlockRangeResponse, error) { + earner := req.GetEarnerAddress() + startBlockHeight := req.GetStartBlockHeight() + endBlockHeight := req.GetEndBlockHeight() + + if earner == "" { + return nil, status.Error(codes.InvalidArgument, "earner address is required") + } + + claims, err := rpc.rewardsDataService.ListClaimedRewardsByBlockRange(ctx, earner, startBlockHeight, endBlockHeight) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + return &rewardsV1.ListClaimedRewardsByBlockRangeResponse{ + Rewards: utils.Map(claims, func(c *types.RewardsClaimed, i uint64) *rewardsV1.ClaimedReward { + return &rewardsV1.ClaimedReward{ + Earner: c.Earner, + Claimer: c.Claimer, + Token: c.Token, + BlockNumber: c.BlockNumber, + } + }), + }, nil } func (rpc *RpcServer) ListDistributionRoots(ctx context.Context, req *rewardsV1.ListDistributionRootsRequest) (*rewardsV1.ListDistributionRootsResponse, error) { diff --git a/pkg/service/rewardsDataService/rewards.go b/pkg/service/rewardsDataService/rewards.go index 18e2ce49..93f4a951 100644 --- a/pkg/service/rewardsDataService/rewards.go +++ b/pkg/service/rewardsDataService/rewards.go @@ -3,7 +3,9 @@ package rewardsDataService import ( "context" "database/sql" + "fmt" "github.com/Layr-Labs/sidecar/internal/config" + "github.com/Layr-Labs/sidecar/pkg/metaState/types" "github.com/Layr-Labs/sidecar/pkg/rewards" "github.com/Layr-Labs/sidecar/pkg/rewards/rewardsTypes" "github.com/Layr-Labs/sidecar/pkg/service/baseDataService" @@ -75,3 +77,54 @@ func (rds *RewardsDataService) GetTotalClaimedRewards(ctx context.Context, earne } return claimedAmounts, nil } + +// ListClaimedRewardsByBlockRange returns a list of claimed rewards for a given earner within a block range. +// +// If earner is an empty string, all claimed rewards within the block range are returned. +func (rds *RewardsDataService) ListClaimedRewardsByBlockRange( + ctx context.Context, + earner string, + startBlockHeight uint64, + endBlockHeight uint64, +) ([]*types.RewardsClaimed, error) { + if endBlockHeight == 0 { + return nil, fmt.Errorf("endBlockHeight must be greater than 0") + } + if endBlockHeight < startBlockHeight { + return nil, fmt.Errorf("endBlockHeight must be greater than or equal to startBlockHeight") + } + + query := ` + select + rc.root, + rc.earner, + rc.claimer, + rc.recipient, + rc.token, + rc.claimed_amount, + rc.transaction_hash, + rc.block_number, + rc.log_index + from rewards_claimed as rc + where + block_number >= @startBlockHeight + and block_number <= @endBlockHeight + ` + args := []interface{}{ + sql.Named("startBlockHeight", startBlockHeight), + sql.Named("endBlockHeight", endBlockHeight), + } + if earner != "" { + query += " and earner = @earner" + args = append(args, sql.Named("earner", earner)) + } + query += " order by block_number, log_index" + + claimedRewards := make([]*types.RewardsClaimed, 0) + res := rds.db.Raw(query, args...).Scan(&claimedRewards) + + if res.Error != nil { + return nil, res.Error + } + return claimedRewards, nil +} From 84e5dc5b89891771247958c899fb30ee5cdd05dc Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Wed, 22 Jan 2025 14:21:18 -0600 Subject: [PATCH 09/11] feat: add blockHeight filter to ListDistributionRoots --- go.mod | 2 +- go.sum | 2 ++ pkg/rewards/rewards.go | 12 ++++++++++-- pkg/rpcServer/rewardsHandlers.go | 3 ++- 4 files changed, 15 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index 631d9f10..b19e7d27 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/DataDog/datadog-go/v5 v5.5.0 github.com/Layr-Labs/eigenlayer-contracts v0.4.1-holesky-pepe.0.20240813143901-00fc4b95e9c1 github.com/Layr-Labs/eigenlayer-rewards-proofs v0.2.13 - github.com/Layr-Labs/protocol-apis v1.1.1-0.20250122190013-01d27f288892 + github.com/Layr-Labs/protocol-apis v1.1.1-0.20250122201559-ae5f5b65ce44 github.com/ethereum/go-ethereum v1.14.9 github.com/gocarina/gocsv v0.0.0-20240520201108-78e41c74b4b1 github.com/google/uuid v1.6.0 diff --git a/go.sum b/go.sum index efdbb302..31106f70 100644 --- a/go.sum +++ b/go.sum @@ -65,6 +65,8 @@ github.com/Layr-Labs/protocol-apis v1.1.1-0.20250122185055-8ce2cc7afa86 h1:XfVh8 github.com/Layr-Labs/protocol-apis v1.1.1-0.20250122185055-8ce2cc7afa86/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk= github.com/Layr-Labs/protocol-apis v1.1.1-0.20250122190013-01d27f288892 h1:OJs2VEsSVhdrPcS+CcX6sWr2UOIYhFY473y0+QDaq/4= github.com/Layr-Labs/protocol-apis v1.1.1-0.20250122190013-01d27f288892/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk= +github.com/Layr-Labs/protocol-apis v1.1.1-0.20250122201559-ae5f5b65ce44 h1:jaWO3t0OvIKFBDP9zK6/mmwSeEA4P1FQKWqB5qdKjuU= +github.com/Layr-Labs/protocol-apis v1.1.1-0.20250122201559-ae5f5b65ce44/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk= github.com/Microsoft/go-winio v0.5.0/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= diff --git a/pkg/rewards/rewards.go b/pkg/rewards/rewards.go index 4fdb58b2..31eecd4e 100644 --- a/pkg/rewards/rewards.go +++ b/pkg/rewards/rewards.go @@ -790,15 +790,23 @@ type DistributionRoot struct { Disabled bool } -func (rc *RewardsCalculator) ListDistributionRoots() ([]*DistributionRoot, error) { +// ListDistributionRoots returns a list of submitted distribution roots. If a non-zero blockHeight is provided, +// DistributionRoots for only that blockHeight will be returned +func (rc *RewardsCalculator) ListDistributionRoots(blockHeight uint64) ([]*DistributionRoot, error) { query := ` select sdr.*, case when ddr.root_index is not null then true else false end as disabled from submitted_distribution_roots as sdr left join disabled_distribution_roots as ddr on (sdr.root_index = ddr.root_index) - order by root_index desc ` + if blockHeight > 0 { + query += ` + where sdr.block_number = {{.blockHeight}} + ` + } + query += ` order by root_index desc` + var submittedDistributionRoots []*DistributionRoot res := rc.grm.Raw(query).Scan(&submittedDistributionRoots) if res.Error != nil { diff --git a/pkg/rpcServer/rewardsHandlers.go b/pkg/rpcServer/rewardsHandlers.go index 309f3071..2f334246 100644 --- a/pkg/rpcServer/rewardsHandlers.go +++ b/pkg/rpcServer/rewardsHandlers.go @@ -290,7 +290,8 @@ func (rpc *RpcServer) ListClaimedRewardsByBlockRange(ctx context.Context, req *r } func (rpc *RpcServer) ListDistributionRoots(ctx context.Context, req *rewardsV1.ListDistributionRootsRequest) (*rewardsV1.ListDistributionRootsResponse, error) { - roots, err := rpc.rewardsCalculator.ListDistributionRoots() + blockHeight := req.GetBlockHeight() + roots, err := rpc.rewardsCalculator.ListDistributionRoots(blockHeight) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } From e7560f64caeb02ae01b9db68c9f95abaa65aac97 Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Thu, 23 Jan 2025 16:15:05 -0600 Subject: [PATCH 10/11] feat: GetSummarizedRewardsForEarner rpc --- go.mod | 4 +- go.sum | 28 +- pkg/rpcServer/rewardsHandlers.go | 30 +- pkg/service/rewardsDataService/rewards.go | 338 +++++++++++++++++++++- 4 files changed, 361 insertions(+), 39 deletions(-) diff --git a/go.mod b/go.mod index b19e7d27..bc50c19c 100644 --- a/go.mod +++ b/go.mod @@ -6,12 +6,13 @@ require ( github.com/DataDog/datadog-go/v5 v5.5.0 github.com/Layr-Labs/eigenlayer-contracts v0.4.1-holesky-pepe.0.20240813143901-00fc4b95e9c1 github.com/Layr-Labs/eigenlayer-rewards-proofs v0.2.13 - github.com/Layr-Labs/protocol-apis v1.1.1-0.20250122201559-ae5f5b65ce44 + github.com/Layr-Labs/protocol-apis v1.1.1-0.20250122221613-65afff84af27 github.com/ethereum/go-ethereum v1.14.9 github.com/gocarina/gocsv v0.0.0-20240520201108-78e41c74b4b1 github.com/google/uuid v1.6.0 github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 + github.com/habx/pg-commands v0.6.1 github.com/lib/pq v1.10.9 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.12.0 @@ -53,7 +54,6 @@ require ( github.com/go-ole/go-ole v1.3.0 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/gorilla/websocket v1.4.2 // indirect - github.com/habx/pg-commands v0.6.1 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/holiman/uint256 v1.3.1 // indirect github.com/iden3/go-iden3-crypto v0.0.16 // indirect diff --git a/go.sum b/go.sum index 31106f70..14edafe4 100644 --- a/go.sum +++ b/go.sum @@ -41,32 +41,8 @@ github.com/Layr-Labs/eigenlayer-contracts v0.4.1-holesky-pepe.0.20240813143901-0 github.com/Layr-Labs/eigenlayer-contracts v0.4.1-holesky-pepe.0.20240813143901-00fc4b95e9c1/go.mod h1:Ie8YE3EQkTHqG6/tnUS0He7/UPMkXPo/3OFXwSy0iRo= github.com/Layr-Labs/eigenlayer-rewards-proofs v0.2.13 h1:Blb4AE+jC/vddV71w4/MQAPooM+8EVqv9w2bL4OytgY= github.com/Layr-Labs/eigenlayer-rewards-proofs v0.2.13/go.mod h1:PD/HoyzZjxDw1tAcZw3yD0yGddo+yhmwQAi+lk298r4= -github.com/Layr-Labs/protocol-apis v1.1.0 h1:PO6x+Y9ORiac2dkaWJayRFqhyzcvMbvRQkDIpLTNtVc= -github.com/Layr-Labs/protocol-apis v1.1.0/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk= -github.com/Layr-Labs/protocol-apis v1.1.1-0.20250110201222-e8670ac00c32 h1:nRHAH0dn5qkQXUjdrPlGThtKLt154UKAHfzCEyMqfr0= -github.com/Layr-Labs/protocol-apis v1.1.1-0.20250110201222-e8670ac00c32/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk= -github.com/Layr-Labs/protocol-apis v1.1.1-0.20250110201843-c2f2cf37e910 h1:X3t1mr1kAOGDJ3paPS/lzHJchK4y+oWSJ+D/7+MgOmY= -github.com/Layr-Labs/protocol-apis v1.1.1-0.20250110201843-c2f2cf37e910/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk= -github.com/Layr-Labs/protocol-apis v1.1.1-0.20250114180833-6f2487a7e08c h1:kcTwHJVRDQAGqVacRJ4h6r6LKIZP6nBkxaBEJvZ9A3Q= -github.com/Layr-Labs/protocol-apis v1.1.1-0.20250114180833-6f2487a7e08c/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk= -github.com/Layr-Labs/protocol-apis v1.1.1-0.20250114181701-acb87ef4eeb5 h1:0PLxb8fpwdpWpfk24yhdZzETFCxVMN2yJjRDyBBf6wM= -github.com/Layr-Labs/protocol-apis v1.1.1-0.20250114181701-acb87ef4eeb5/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk= -github.com/Layr-Labs/protocol-apis v1.1.1-0.20250115220323-135176acb92b h1:eJmPAq3s+AwOrQUjSFXILCzUstDZobwYEraOZ2NFC1M= -github.com/Layr-Labs/protocol-apis v1.1.1-0.20250115220323-135176acb92b/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk= -github.com/Layr-Labs/protocol-apis v1.1.1-0.20250115230325-93c4ebccbeb7 h1:zTOIFjJcCzOZ1PBk9jtoW/bsKSyRzvQyTG2Beutpiuk= -github.com/Layr-Labs/protocol-apis v1.1.1-0.20250115230325-93c4ebccbeb7/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk= -github.com/Layr-Labs/protocol-apis v1.1.1-0.20250116155113-c22028af9e00 h1:7N3ta4X5YRygobnCuAAcKr4T76eUboXVcj+IGW0P2eA= -github.com/Layr-Labs/protocol-apis v1.1.1-0.20250116155113-c22028af9e00/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk= -github.com/Layr-Labs/protocol-apis v1.1.1-0.20250116155715-919a0a9a27e5 h1:m/I5hGK4JpOE7OL7wYlKt9HwlCAgQ0SxeT3dNHfjzFY= -github.com/Layr-Labs/protocol-apis v1.1.1-0.20250116155715-919a0a9a27e5/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk= -github.com/Layr-Labs/protocol-apis v1.1.1-0.20250121193118-8112817d1079 h1:B1b9ghilo70y4MJ7ZkF5w8BuPtslAUP2oE6vfzG6DBA= -github.com/Layr-Labs/protocol-apis v1.1.1-0.20250121193118-8112817d1079/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk= -github.com/Layr-Labs/protocol-apis v1.1.1-0.20250122185055-8ce2cc7afa86 h1:XfVh8deohE3LYXAjYOAjzD4wRPNgbIP3OjV6Syy0FIw= -github.com/Layr-Labs/protocol-apis v1.1.1-0.20250122185055-8ce2cc7afa86/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk= -github.com/Layr-Labs/protocol-apis v1.1.1-0.20250122190013-01d27f288892 h1:OJs2VEsSVhdrPcS+CcX6sWr2UOIYhFY473y0+QDaq/4= -github.com/Layr-Labs/protocol-apis v1.1.1-0.20250122190013-01d27f288892/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk= -github.com/Layr-Labs/protocol-apis v1.1.1-0.20250122201559-ae5f5b65ce44 h1:jaWO3t0OvIKFBDP9zK6/mmwSeEA4P1FQKWqB5qdKjuU= -github.com/Layr-Labs/protocol-apis v1.1.1-0.20250122201559-ae5f5b65ce44/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk= +github.com/Layr-Labs/protocol-apis v1.1.1-0.20250122221613-65afff84af27 h1:3jWR07FKc5qTmJR8VKlQFfHv26ZdnbwFi34CvtWuRRY= +github.com/Layr-Labs/protocol-apis v1.1.1-0.20250122221613-65afff84af27/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk= github.com/Microsoft/go-winio v0.5.0/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= diff --git a/pkg/rpcServer/rewardsHandlers.go b/pkg/rpcServer/rewardsHandlers.go index 2f334246..60a4653a 100644 --- a/pkg/rpcServer/rewardsHandlers.go +++ b/pkg/rpcServer/rewardsHandlers.go @@ -218,7 +218,7 @@ func (rpc *RpcServer) GetTotalClaimedRewards(ctx context.Context, req *rewardsV1 return nil, status.Error(codes.InvalidArgument, "earner address is required") } - totalClaimed, err := rpc.rewardsDataService.GetTotalClaimedRewards(ctx, earner, blockHeight) + totalClaimed, err := rpc.rewardsDataService.GetTotalClaimedRewards(ctx, earner, nil, blockHeight) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } @@ -238,14 +238,36 @@ func (rpc *RpcServer) GetAvailableRewardsTokens(ctx context.Context, req *reward } func (rpc *RpcServer) GetSummarizedRewardsForEarner(ctx context.Context, req *rewardsV1.GetSummarizedRewardsForEarnerRequest) (*rewardsV1.GetSummarizedRewardsForEarnerResponse, error) { - return nil, status.Error(codes.Unimplemented, "method GetSummarizedRewardsForEarner not implemented") + earner := req.GetEarnerAddress() + blockHeight := req.GetBlockHeight() + + if earner == "" { + return nil, status.Error(codes.InvalidArgument, "earner address is required") + } + + summarizedRewards, err := rpc.rewardsDataService.GetSummarizedRewards(ctx, earner, nil, blockHeight) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + return &rewardsV1.GetSummarizedRewardsForEarnerResponse{ + Rewards: utils.Map(summarizedRewards, func(r *rewardsDataService.SummarizedReward, i uint64) *rewardsV1.SummarizedEarnerReward { + return &rewardsV1.SummarizedEarnerReward{ + Token: r.Token, + Earned: r.Earned, + Active: r.Active, + Claimed: r.Claimed, + Claimable: r.Claimable, + } + }), + }, nil } // GetClaimedRewardsByBlock returns the claimed rewards for an earner for a specific block. func (rpc *RpcServer) GetClaimedRewardsByBlock(ctx context.Context, req *rewardsV1.GetClaimedRewardsByBlockRequest) (*rewardsV1.GetClaimedRewardsByBlockResponse, error) { blockHeight := req.GetBlockHeight() - claims, err := rpc.rewardsDataService.ListClaimedRewardsByBlockRange(ctx, "", blockHeight, blockHeight) + claims, err := rpc.rewardsDataService.ListClaimedRewardsByBlockRange(ctx, "", blockHeight, blockHeight, nil) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } @@ -272,7 +294,7 @@ func (rpc *RpcServer) ListClaimedRewardsByBlockRange(ctx context.Context, req *r return nil, status.Error(codes.InvalidArgument, "earner address is required") } - claims, err := rpc.rewardsDataService.ListClaimedRewardsByBlockRange(ctx, earner, startBlockHeight, endBlockHeight) + claims, err := rpc.rewardsDataService.ListClaimedRewardsByBlockRange(ctx, earner, startBlockHeight, endBlockHeight, nil) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } diff --git a/pkg/service/rewardsDataService/rewards.go b/pkg/service/rewardsDataService/rewards.go index 93f4a951..daeeb589 100644 --- a/pkg/service/rewardsDataService/rewards.go +++ b/pkg/service/rewardsDataService/rewards.go @@ -3,14 +3,21 @@ package rewardsDataService import ( "context" "database/sql" + "errors" "fmt" "github.com/Layr-Labs/sidecar/internal/config" + eigenStateTypes "github.com/Layr-Labs/sidecar/pkg/eigenState/types" "github.com/Layr-Labs/sidecar/pkg/metaState/types" "github.com/Layr-Labs/sidecar/pkg/rewards" "github.com/Layr-Labs/sidecar/pkg/rewards/rewardsTypes" + "github.com/Layr-Labs/sidecar/pkg/rewardsUtils" "github.com/Layr-Labs/sidecar/pkg/service/baseDataService" + "github.com/Layr-Labs/sidecar/pkg/utils" "go.uber.org/zap" "gorm.io/gorm" + "reflect" + "strings" + "sync" ) type RewardsDataService struct { @@ -48,7 +55,7 @@ type TotalClaimedReward struct { Amount string } -func (rds *RewardsDataService) GetTotalClaimedRewards(ctx context.Context, earner string, blockHeight uint64) ([]*TotalClaimedReward, error) { +func (rds *RewardsDataService) GetTotalClaimedRewards(ctx context.Context, earner string, tokens []string, blockHeight uint64) ([]*TotalClaimedReward, error) { blockHeight, err := rds.BaseDataService.GetCurrentBlockHeightIfNotPresent(ctx, blockHeight) if err != nil { return nil, err @@ -59,18 +66,27 @@ func (rds *RewardsDataService) GetTotalClaimedRewards(ctx context.Context, earne earner, token, sum(claimed_amount) as amount - from erwards_claimed as rc + from rewards_claimed as rc where earner = @earner and block_number <= @blockHeight - group by 1, 2 ` + args := []interface{}{ + sql.Named("earner", earner), + sql.Named("blockHeight", blockHeight), + } + if len(tokens) > 0 { + query += " and token in (?)" + formattedTokens := utils.Map(tokens, func(token string, i uint64) string { + return strings.ToLower(token) + }) + args = append(args, sql.Named("tokens", formattedTokens)) + } + + query += " group by earner, token" claimedAmounts := make([]*TotalClaimedReward, 0) - res := rds.db.Raw(query, - sql.Named("earner", earner), - sql.Named("block_height", blockHeight), - ).Scan(&claimedAmounts) + res := rds.db.Raw(query, args...).Scan(&claimedAmounts) if res.Error != nil { return nil, res.Error @@ -86,6 +102,7 @@ func (rds *RewardsDataService) ListClaimedRewardsByBlockRange( earner string, startBlockHeight uint64, endBlockHeight uint64, + tokens []string, ) ([]*types.RewardsClaimed, error) { if endBlockHeight == 0 { return nil, fmt.Errorf("endBlockHeight must be greater than 0") @@ -118,6 +135,13 @@ func (rds *RewardsDataService) ListClaimedRewardsByBlockRange( query += " and earner = @earner" args = append(args, sql.Named("earner", earner)) } + if len(tokens) > 0 { + query += " and token in (?)" + formattedTokens := utils.Map(tokens, func(token string, i uint64) string { + return strings.ToLower(token) + }) + args = append(args, sql.Named("tokens", formattedTokens)) + } query += " order by block_number, log_index" claimedRewards := make([]*types.RewardsClaimed, 0) @@ -128,3 +152,303 @@ func (rds *RewardsDataService) ListClaimedRewardsByBlockRange( } return claimedRewards, nil } + +type RewardAmount struct { + Token string + Amount string +} + +// GetTotalRewardsForEarner returns the total earned rewards for a given earner at a given block height. +func (rds *RewardsDataService) GetTotalRewardsForEarner(earner string, tokens []string, blockHeight uint64, claimable bool) ([]*RewardAmount, error) { + if earner == "" { + return nil, fmt.Errorf("earner is required") + } + + snapshot, err := rds.findDistributionRootClosestToBlockHeight(blockHeight, claimable) + if err != nil { + return nil, err + } + + query := ` + select + token, + sum(amount) as amount + from sidecar_mainnet_ethereum.gold_table as gt + where + earner = @earner + and snapshot <= @snapshot + ` + args := []interface{}{ + sql.Named("earner", earner), + sql.Named("snapshot", snapshot.GetSnapshotDate()), + } + if len(tokens) > 0 { + query += " and token in (?)" + formattedTokens := utils.Map(tokens, func(token string, i uint64) string { + return strings.ToLower(token) + }) + args = append(args, sql.Named("tokens", formattedTokens)) + } + query += ` order by snapshot desc` + + rewardAmounts := make([]*RewardAmount, 0) + res := rds.db.Raw(query, args...).Scan(&tokens) + + if res.Error != nil { + return nil, res.Error + } + + return rewardAmounts, nil +} + +// GetClaimableRewardsForEarner returns the rewards that are claimable for a given earner at a given block height (totalActiveRewards - claimed) +func (rds *RewardsDataService) GetClaimableRewardsForEarner(earner string, tokens []string, blockHeight uint64) ([]*RewardAmount, error) { + if earner == "" { + return nil, fmt.Errorf("earner is required") + } + snapshot, err := rds.findDistributionRootClosestToBlockHeight(blockHeight, true) + if err != nil { + return nil, err + } + query := ` + with claimed_tokens as ( + select + earner, + token, + sum(claimed_amount) as amount + from rewards_claimed as rc + where + earner = @earner + and block_number <= @blockNumber + group by 1, 2 + ), + earner_tokens as ( + select + earner, + token, + sum(amount) as amount + from gold_table as gt + where + earner = @earner + and snapshot <= @snapshot + group by earner, token + ) + select + et.earner, + et.token, + et.amount::numeric as earned_amount, + coalesce(ct.amount, 0)::numeric as claimed_amount, + (coalesce(et.amount, 0) - coalesce(ct.amount, 0))::numeric as claimable + from earner_tokens as et + left join claimed_tokens as ct on ( + ct.token = et.token + and ct.earner = et.earner + ) + ` + args := []interface{}{ + sql.Named("earner", earner), + sql.Named("blockHeight", blockHeight), + sql.Named("snapshot", snapshot.GetSnapshotDate()), + } + if len(tokens) > 0 { + query += " and token in (?)" + formattedTokens := utils.Map(tokens, func(token string, i uint64) string { + return strings.ToLower(token) + }) + args = append(args, sql.Named("tokens", formattedTokens)) + } + + claimableRewards := make([]*RewardAmount, 0) + res := rds.db.Raw(query, args...).Scan(&claimableRewards) + if res.Error != nil { + return nil, res.Error + } + return claimableRewards, nil +} + +// findDistributionRootClosestToBlockHeight returns the distribution root that is closest to the provided block height +// that is also not disabled. +func (rds *RewardsDataService) findDistributionRootClosestToBlockHeight(blockHeight uint64, claimable bool) (*eigenStateTypes.SubmittedDistributionRoot, error) { + query := ` + select + * + from submitted_distribution_roots as sdr + left join disabled_distribution_roots as ddr on (sdr.root_index = ddr.root_index) + where + ddr.root_index is null + and sdr.block_number <= @blockHeight + {{ if eq .claimable "true" }} + and sdr.activated_at <= now() + {{ end }} + order by sdr.block_number desc + limit 1 + ` + + claimableStr := "false" + if claimable { + claimableStr = "true" + } + + // only render claimable since it's safe; blockHeight should be sanitized + renderedQuery, err := rewardsUtils.RenderQueryTemplate(query, map[string]interface{}{ + "claimable": claimableStr, + }) + if err != nil { + rds.logger.Sugar().Errorw("failed to render query template", + zap.Uint64("blockHeight", blockHeight), + zap.Bool("claimable", claimable), + zap.Error(err), + ) + return nil, err + } + + var root *eigenStateTypes.SubmittedDistributionRoot + res := rds.db.Raw(renderedQuery, sql.Named("blockHeight", blockHeight)).Scan(&root) + if res.Error != nil && !errors.Is(res.Error, gorm.ErrRecordNotFound) { + return nil, res.Error + } + if errors.Is(res.Error, gorm.ErrRecordNotFound) { + return nil, fmt.Errorf("no distribution root found for blockHeight '%d'", blockHeight) + } + return root, nil +} + +type SummarizedReward struct { + Token string + Earned string + Active string + Claimed string + Claimable string +} + +func setTokenValueInMap(tokenMap map[string]*SummarizedReward, values []*RewardAmount, fieldName string) { + for _, value := range values { + v, ok := tokenMap[value.Token] + if !ok { + v = &SummarizedReward{ + Token: value.Token, + } + tokenMap[value.Token] = v + } + f := reflect.ValueOf(v).Elem().FieldByName(fieldName) + if f.IsValid() && f.CanSet() { + f.SetString(value.Amount) + } + } +} + +// GetSummarizedRewards returns the summarized rewards for a given earner at a given block height. +// The blockHeight will be used to find the root that is <= the provided blockHeight +func (rds *RewardsDataService) GetSummarizedRewards(ctx context.Context, earner string, tokens []string, blockHeight uint64) ([]*SummarizedReward, error) { + if earner == "" { + return nil, fmt.Errorf("earner is required") + } + + blockHeight, err := rds.BaseDataService.GetCurrentBlockHeightIfNotPresent(context.Background(), blockHeight) + if err != nil { + return nil, err + } + + tokenMap := make(map[string]*SummarizedReward) + + type ChanResult[T any] struct { + Data T + Error error + } + + // channels to aggregate results together in a thread safe way + earnedRewardsChan := make(chan *ChanResult[[]*RewardAmount]) + activeRewardsChan := make(chan *ChanResult[[]*RewardAmount]) + claimableRewardsChan := make(chan *ChanResult[[]*RewardAmount]) + claimedRewardsChan := make(chan *ChanResult[[]*RewardAmount]) + var wg sync.WaitGroup + wg.Add(4) + + go func() { + defer wg.Done() + res := &ChanResult[[]*RewardAmount]{} + earnedRewards, err := rds.GetTotalRewardsForEarner(earner, tokens, blockHeight, false) + if err != nil { + res.Error = err + } else { + res.Data = earnedRewards + } + earnedRewardsChan <- res + }() + + go func() { + defer wg.Done() + res := &ChanResult[[]*RewardAmount]{} + activeRewards, err := rds.GetTotalRewardsForEarner(earner, tokens, blockHeight, true) + if err != nil { + res.Error = err + } else { + res.Data = activeRewards + } + activeRewardsChan <- res + }() + + go func() { + defer wg.Done() + res := &ChanResult[[]*RewardAmount]{} + claimableRewards, err := rds.GetClaimableRewardsForEarner(earner, tokens, blockHeight) + if err != nil { + res.Error = err + } else { + res.Data = claimableRewards + } + claimableRewardsChan <- res + }() + + go func() { + defer wg.Done() + res := &ChanResult[[]*RewardAmount]{} + claimedRewards, err := rds.GetTotalClaimedRewards(context.Background(), earner, tokens, blockHeight) + if err != nil { + res.Error = err + } else { + res.Data = utils.Map(claimedRewards, func(cr *TotalClaimedReward, i uint64) *RewardAmount { + return &RewardAmount{ + Token: cr.Token, + Amount: cr.Amount, + } + }) + } + claimedRewardsChan <- res + }() + wg.Wait() + close(earnedRewardsChan) + close(activeRewardsChan) + close(claimableRewardsChan) + close(claimedRewardsChan) + + earnedRewards := <-earnedRewardsChan + if earnedRewards.Error != nil { + return nil, earnedRewards.Error + } + setTokenValueInMap(tokenMap, earnedRewards.Data, "Earned") + + activeRewards := <-activeRewardsChan + if activeRewards.Error != nil { + return nil, activeRewards.Error + } + setTokenValueInMap(tokenMap, activeRewards.Data, "Active") + + claimableRewards := <-claimableRewardsChan + if claimableRewards.Error != nil { + return nil, claimableRewards.Error + } + setTokenValueInMap(tokenMap, claimableRewards.Data, "Claimable") + + claimedRewards := <-claimedRewardsChan + if claimedRewards.Error != nil { + return nil, claimedRewards.Error + } + setTokenValueInMap(tokenMap, claimedRewards.Data, "Claimed") + + tokenList := make([]*SummarizedReward, 0) + for _, v := range tokenMap { + tokenList = append(tokenList, v) + } + return tokenList, nil +} From 99b6b8ce80c7cf7ce6d7b040e7b1b379988ac5fb Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Thu, 23 Jan 2025 16:30:16 -0600 Subject: [PATCH 11/11] feat: GetClaimableRewards rpc --- go.mod | 2 +- go.sum | 4 ++++ pkg/rpcServer/rewardsHandlers.go | 25 +++++++++++++++++-- pkg/service/rewardsDataService/rewards.go | 29 +++++++++++++++-------- 4 files changed, 47 insertions(+), 13 deletions(-) diff --git a/go.mod b/go.mod index bc50c19c..9c2b14ce 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/DataDog/datadog-go/v5 v5.5.0 github.com/Layr-Labs/eigenlayer-contracts v0.4.1-holesky-pepe.0.20240813143901-00fc4b95e9c1 github.com/Layr-Labs/eigenlayer-rewards-proofs v0.2.13 - github.com/Layr-Labs/protocol-apis v1.1.1-0.20250122221613-65afff84af27 + github.com/Layr-Labs/protocol-apis v1.1.1-0.20250123222616-41f0274e56d9 github.com/ethereum/go-ethereum v1.14.9 github.com/gocarina/gocsv v0.0.0-20240520201108-78e41c74b4b1 github.com/google/uuid v1.6.0 diff --git a/go.sum b/go.sum index 14edafe4..0b43fc19 100644 --- a/go.sum +++ b/go.sum @@ -43,6 +43,10 @@ github.com/Layr-Labs/eigenlayer-rewards-proofs v0.2.13 h1:Blb4AE+jC/vddV71w4/MQA github.com/Layr-Labs/eigenlayer-rewards-proofs v0.2.13/go.mod h1:PD/HoyzZjxDw1tAcZw3yD0yGddo+yhmwQAi+lk298r4= github.com/Layr-Labs/protocol-apis v1.1.1-0.20250122221613-65afff84af27 h1:3jWR07FKc5qTmJR8VKlQFfHv26ZdnbwFi34CvtWuRRY= github.com/Layr-Labs/protocol-apis v1.1.1-0.20250122221613-65afff84af27/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk= +github.com/Layr-Labs/protocol-apis v1.1.1-0.20250123222217-15d8d3b2f108 h1:gspRqV5XKOpdG21mFCQs5Dojg2sMA+oSI8S6D+z4fpc= +github.com/Layr-Labs/protocol-apis v1.1.1-0.20250123222217-15d8d3b2f108/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk= +github.com/Layr-Labs/protocol-apis v1.1.1-0.20250123222616-41f0274e56d9 h1:InnITBvNMqkNVnrbihyzMizagobMKtA2/RglLr2RpXg= +github.com/Layr-Labs/protocol-apis v1.1.1-0.20250123222616-41f0274e56d9/go.mod h1:prNA2/mLO5vpMZ2q78Nsn0m97wm28uiRnwO+/yOxigk= github.com/Microsoft/go-winio v0.5.0/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= diff --git a/pkg/rpcServer/rewardsHandlers.go b/pkg/rpcServer/rewardsHandlers.go index 60a4653a..57a02185 100644 --- a/pkg/rpcServer/rewardsHandlers.go +++ b/pkg/rpcServer/rewardsHandlers.go @@ -205,8 +205,29 @@ func (rpc *RpcServer) GetAttributableRewardsForDistributionRoot(ctx context.Cont return nil, status.Error(codes.Unimplemented, "method GetAttributableRewardsForDistributionRoot not implemented") } -func (rpc *RpcServer) GetAvailableRewards(ctx context.Context, req *rewardsV1.GetAvailableRewardsRequest) (*rewardsV1.GetAvailableRewardsResponse, error) { - return nil, status.Error(codes.Unimplemented, "method GetAvailableRewards not implemented") +func (rpc *RpcServer) GetClaimableRewards(ctx context.Context, req *rewardsV1.GetClaimableRewardsRequest) (*rewardsV1.GetClaimableRewardsResponse, error) { + earner := req.GetEarnerAddress() + blockHeight := req.GetBlockHeight() + + if earner == "" { + return nil, status.Error(codes.InvalidArgument, "earner address is required") + } + + claimableRewards, snapshot, err := rpc.rewardsDataService.GetClaimableRewardsForEarner(ctx, earner, nil, blockHeight) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + return &rewardsV1.GetClaimableRewardsResponse{ + Rewards: utils.Map(claimableRewards, func(r *rewardsDataService.RewardAmount, i uint64) *rewardsV1.Reward { + return &rewardsV1.Reward{ + Earner: earner, + Token: r.Token, + Amount: r.Amount, + Snapshot: snapshot.GetSnapshotDate(), + } + }), + }, nil } // GetTotalClaimedRewards returns the total claimed rewards for an earner up to, and including, the provided block height. diff --git a/pkg/service/rewardsDataService/rewards.go b/pkg/service/rewardsDataService/rewards.go index daeeb589..5c2c027f 100644 --- a/pkg/service/rewardsDataService/rewards.go +++ b/pkg/service/rewardsDataService/rewards.go @@ -159,7 +159,7 @@ type RewardAmount struct { } // GetTotalRewardsForEarner returns the total earned rewards for a given earner at a given block height. -func (rds *RewardsDataService) GetTotalRewardsForEarner(earner string, tokens []string, blockHeight uint64, claimable bool) ([]*RewardAmount, error) { +func (rds *RewardsDataService) GetTotalRewardsForEarner(ctx context.Context, earner string, tokens []string, blockHeight uint64, claimable bool) ([]*RewardAmount, error) { if earner == "" { return nil, fmt.Errorf("earner is required") } @@ -202,13 +202,22 @@ func (rds *RewardsDataService) GetTotalRewardsForEarner(earner string, tokens [] } // GetClaimableRewardsForEarner returns the rewards that are claimable for a given earner at a given block height (totalActiveRewards - claimed) -func (rds *RewardsDataService) GetClaimableRewardsForEarner(earner string, tokens []string, blockHeight uint64) ([]*RewardAmount, error) { +func (rds *RewardsDataService) GetClaimableRewardsForEarner( + ctx context.Context, + earner string, + tokens []string, + blockHeight uint64, +) ( + []*RewardAmount, + *eigenStateTypes.SubmittedDistributionRoot, + error, +) { if earner == "" { - return nil, fmt.Errorf("earner is required") + return nil, nil, fmt.Errorf("earner is required") } snapshot, err := rds.findDistributionRootClosestToBlockHeight(blockHeight, true) if err != nil { - return nil, err + return nil, nil, err } query := ` with claimed_tokens as ( @@ -261,9 +270,9 @@ func (rds *RewardsDataService) GetClaimableRewardsForEarner(earner string, token claimableRewards := make([]*RewardAmount, 0) res := rds.db.Raw(query, args...).Scan(&claimableRewards) if res.Error != nil { - return nil, res.Error + return nil, nil, res.Error } - return claimableRewards, nil + return claimableRewards, snapshot, nil } // findDistributionRootClosestToBlockHeight returns the distribution root that is closest to the provided block height @@ -367,7 +376,7 @@ func (rds *RewardsDataService) GetSummarizedRewards(ctx context.Context, earner go func() { defer wg.Done() res := &ChanResult[[]*RewardAmount]{} - earnedRewards, err := rds.GetTotalRewardsForEarner(earner, tokens, blockHeight, false) + earnedRewards, err := rds.GetTotalRewardsForEarner(ctx, earner, tokens, blockHeight, false) if err != nil { res.Error = err } else { @@ -379,7 +388,7 @@ func (rds *RewardsDataService) GetSummarizedRewards(ctx context.Context, earner go func() { defer wg.Done() res := &ChanResult[[]*RewardAmount]{} - activeRewards, err := rds.GetTotalRewardsForEarner(earner, tokens, blockHeight, true) + activeRewards, err := rds.GetTotalRewardsForEarner(ctx, earner, tokens, blockHeight, true) if err != nil { res.Error = err } else { @@ -391,7 +400,7 @@ func (rds *RewardsDataService) GetSummarizedRewards(ctx context.Context, earner go func() { defer wg.Done() res := &ChanResult[[]*RewardAmount]{} - claimableRewards, err := rds.GetClaimableRewardsForEarner(earner, tokens, blockHeight) + claimableRewards, _, err := rds.GetClaimableRewardsForEarner(ctx, earner, tokens, blockHeight) if err != nil { res.Error = err } else { @@ -403,7 +412,7 @@ func (rds *RewardsDataService) GetSummarizedRewards(ctx context.Context, earner go func() { defer wg.Done() res := &ChanResult[[]*RewardAmount]{} - claimedRewards, err := rds.GetTotalClaimedRewards(context.Background(), earner, tokens, blockHeight) + claimedRewards, err := rds.GetTotalClaimedRewards(ctx, earner, tokens, blockHeight) if err != nil { res.Error = err } else {