From 00b9279bd04f6187dd73552dd10e4c88f70f457f Mon Sep 17 00:00:00 2001 From: pschork <354473+pschork@users.noreply.github.com> Date: Tue, 19 Nov 2024 23:00:52 -0800 Subject: [PATCH] wip adds v2 metadatastore flags to dataapi --- disperser/cmd/dataapi/config.go | 16 +- disperser/cmd/dataapi/flags/flags.go | 7 + disperser/cmd/dataapi/main.go | 21 +- .../common/blobstore/blob_metadata_store.go | 10 + disperser/dataapi/server.go | 5 + disperser/dataapi/server_test.go | 292 ++---------------- 6 files changed, 63 insertions(+), 288 deletions(-) diff --git a/disperser/cmd/dataapi/config.go b/disperser/cmd/dataapi/config.go index 11ed9c8c33..a2a037dd84 100644 --- a/disperser/cmd/dataapi/config.go +++ b/disperser/cmd/dataapi/config.go @@ -31,9 +31,10 @@ type Config struct { BLSOperatorStateRetrieverAddr string EigenDAServiceManagerAddr string - DisperserHostname string - ChurnerHostname string - BatcherHealthEndpt string + DisperserHostname string + ChurnerHostname string + BatcherHealthEndpt string + BlobMetadataStoreV2 string } func NewConfig(ctx *cli.Context) (Config, error) { @@ -68,10 +69,11 @@ func NewConfig(ctx *cli.Context) (Config, error) { HTTPPort: ctx.GlobalString(flags.MetricsHTTPPort.Name), EnableMetrics: ctx.GlobalBool(flags.EnableMetricsFlag.Name), }, - DisperserHostname: ctx.GlobalString(flags.DisperserHostnameFlag.Name), - ChurnerHostname: ctx.GlobalString(flags.ChurnerHostnameFlag.Name), - BatcherHealthEndpt: ctx.GlobalString(flags.BatcherHealthEndptFlag.Name), - ChainStateConfig: thegraph.ReadCLIConfig(ctx), + DisperserHostname: ctx.GlobalString(flags.DisperserHostnameFlag.Name), + ChurnerHostname: ctx.GlobalString(flags.ChurnerHostnameFlag.Name), + BatcherHealthEndpt: ctx.GlobalString(flags.BatcherHealthEndptFlag.Name), + ChainStateConfig: thegraph.ReadCLIConfig(ctx), + BlobMetadataStoreV2: ctx.GlobalString(flags.DynamoMetadataStoreV2Flag.Name), } return config, nil } diff --git a/disperser/cmd/dataapi/flags/flags.go b/disperser/cmd/dataapi/flags/flags.go index 8a77bcfb17..bf6da3815c 100644 --- a/disperser/cmd/dataapi/flags/flags.go +++ b/disperser/cmd/dataapi/flags/flags.go @@ -20,6 +20,12 @@ var ( Required: true, EnvVar: common.PrefixEnvVar(envVarPrefix, "DYNAMO_TABLE_NAME"), } + DynamoMetadataStoreV2Flag = cli.StringFlag{ + Name: common.PrefixFlag(FlagPrefix, "dynamo-metadata-store-v2"), + Usage: "Name of the dynamo table to store v2 blob metadata", + Required: false, + EnvVar: common.PrefixEnvVar(envVarPrefix, "DYNAMO_METADATA_STORE_V2"), + } S3BucketNameFlag = cli.StringFlag{ Name: common.PrefixFlag(FlagPrefix, "s3-bucket-name"), Usage: "Name of the bucket to store blobs", @@ -136,6 +142,7 @@ var ( var requiredFlags = []cli.Flag{ DynamoTableNameFlag, + DynamoMetadataStoreV2Flag, SocketAddrFlag, S3BucketNameFlag, SubgraphApiBatchMetadataAddrFlag, diff --git a/disperser/cmd/dataapi/main.go b/disperser/cmd/dataapi/main.go index 2f46b05aa7..b0b2d73ddc 100644 --- a/disperser/cmd/dataapi/main.go +++ b/disperser/cmd/dataapi/main.go @@ -16,6 +16,7 @@ import ( "github.com/Layr-Labs/eigenda/core/thegraph" "github.com/Layr-Labs/eigenda/disperser/cmd/dataapi/flags" "github.com/Layr-Labs/eigenda/disperser/common/blobstore" + v2blobstore "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" "github.com/Layr-Labs/eigenda/disperser/dataapi" "github.com/Layr-Labs/eigenda/disperser/dataapi/prometheus" "github.com/Layr-Labs/eigenda/disperser/dataapi/subgraph" @@ -89,15 +90,16 @@ func RunDataApi(ctx *cli.Context) error { } var ( - promClient = dataapi.NewPrometheusClient(promApi, config.PrometheusConfig.Cluster) - blobMetadataStore = blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, 0) - sharedStorage = blobstore.NewSharedStorage(config.BlobstoreConfig.BucketName, s3Client, blobMetadataStore, logger) - subgraphApi = subgraph.NewApi(config.SubgraphApiBatchMetadataAddr, config.SubgraphApiOperatorStateAddr) - subgraphClient = dataapi.NewSubgraphClient(subgraphApi, logger) - chainState = coreeth.NewChainState(tx, client) - indexedChainState = thegraph.MakeIndexedChainState(config.ChainStateConfig, chainState, logger) - metrics = dataapi.NewMetrics(blobMetadataStore, config.MetricsConfig.HTTPPort, logger) - server = dataapi.NewServer( + promClient = dataapi.NewPrometheusClient(promApi, config.PrometheusConfig.Cluster) + blobMetadataStore = blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, 0) + sharedStorage = blobstore.NewSharedStorage(config.BlobstoreConfig.BucketName, s3Client, blobMetadataStore, logger) + blobMetadataStoreV2 = v2blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobMetadataStoreV2) + subgraphApi = subgraph.NewApi(config.SubgraphApiBatchMetadataAddr, config.SubgraphApiOperatorStateAddr) + subgraphClient = dataapi.NewSubgraphClient(subgraphApi, logger) + chainState = coreeth.NewChainState(tx, client) + indexedChainState = thegraph.MakeIndexedChainState(config.ChainStateConfig, chainState, logger) + metrics = dataapi.NewMetrics(blobMetadataStore, config.MetricsConfig.HTTPPort, logger) + server = dataapi.NewServer( dataapi.Config{ ServerMode: config.ServerMode, SocketAddr: config.SocketAddr, @@ -107,6 +109,7 @@ func RunDataApi(ctx *cli.Context) error { BatcherHealthEndpt: config.BatcherHealthEndpt, }, sharedStorage, + blobMetadataStoreV2, promClient, subgraphClient, tx, diff --git a/disperser/common/blobstore/blob_metadata_store.go b/disperser/common/blobstore/blob_metadata_store.go index 43f3a9fc48..f61247e105 100644 --- a/disperser/common/blobstore/blob_metadata_store.go +++ b/disperser/common/blobstore/blob_metadata_store.go @@ -46,6 +46,16 @@ func NewBlobMetadataStore(dynamoDBClient commondynamodb.Client, logger logging.L } } +func NewBlobMetadataStoreV2(dynamoDBClient commondynamodb.Client, logger logging.Logger, tableName string, ttl time.Duration) *BlobMetadataStore { + logger.Debugf("creating blob metadata store v2 with table %s with TTL: %s", tableName, ttl) + return &BlobMetadataStore{ + dynamoDBClient: dynamoDBClient, + logger: logger.With("component", "BlobMetadataStoreV2"), + tableName: tableName, + ttl: ttl, + } +} + func (s *BlobMetadataStore) QueueNewBlobMetadata(ctx context.Context, blobMetadata *disperser.BlobMetadata) error { item, err := MarshalBlobMetadata(blobMetadata) if err != nil { diff --git a/disperser/dataapi/server.go b/disperser/dataapi/server.go index abed52e39b..5e8fb441f4 100644 --- a/disperser/dataapi/server.go +++ b/disperser/dataapi/server.go @@ -24,7 +24,9 @@ import ( "github.com/Layr-Labs/eigenda/disperser" "github.com/Layr-Labs/eigenda/disperser/common/semver" + v2blobstore "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" "github.com/Layr-Labs/eigenda/disperser/dataapi/docs" + "github.com/gin-contrib/cors" "github.com/gin-contrib/logger" "github.com/gin-gonic/gin" @@ -196,6 +198,7 @@ type ( allowOrigins []string logger logging.Logger blobstore disperser.BlobStore + blobstoreV2 *v2blobstore.BlobMetadataStore promClient PrometheusClient subgraphClient SubgraphClient transactor core.Reader @@ -214,6 +217,7 @@ type ( func NewServer( config Config, blobstore disperser.BlobStore, + blobstoreV2 *v2blobstore.BlobMetadataStore, promClient PrometheusClient, subgraphClient SubgraphClient, transactor core.Reader, @@ -245,6 +249,7 @@ func NewServer( socketAddr: config.SocketAddr, allowOrigins: config.AllowOrigins, blobstore: blobstore, + blobstoreV2: blobstoreV2, promClient: promClient, subgraphClient: subgraphClient, transactor: transactor, diff --git a/disperser/dataapi/server_test.go b/disperser/dataapi/server_test.go index d0ffe58397..8977fe5d49 100644 --- a/disperser/dataapi/server_test.go +++ b/disperser/dataapi/server_test.go @@ -15,10 +15,12 @@ import ( "testing" "time" + awsmock "github.com/Layr-Labs/eigenda/common/aws/mock" "github.com/Layr-Labs/eigenda/core" coremock "github.com/Layr-Labs/eigenda/core/mock" "github.com/Layr-Labs/eigenda/disperser" "github.com/Layr-Labs/eigenda/disperser/common/inmem" + v2blobstore "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" "github.com/Layr-Labs/eigenda/disperser/dataapi" prommock "github.com/Layr-Labs/eigenda/disperser/dataapi/prometheus/mock" "github.com/Layr-Labs/eigenda/disperser/dataapi/subgraph" @@ -50,6 +52,8 @@ var ( prometheusClient = dataapi.NewPrometheusClient(mockPrometheusApi, "test-cluster") mockSubgraphApi = &subgraphmock.MockSubgraphApi{} subgraphClient = dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger) + mockDynamoClient = &awsmock.MockDynamoDBClient{} + blobMetadataStore = v2blobstore.NewBlobMetadataStore(mockDynamoClient, mockLogger, "test-cluster") config = dataapi.Config{ServerMode: "test", SocketAddr: ":8080", AllowOrigins: []string{"*"}, DisperserHostname: "localhost:32007", ChurnerHostname: "localhost:32009"} @@ -72,7 +76,7 @@ var ( 1: 10, 2: 10, }) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, subgraphClient, mockTx, mockChainState, mockIndexedChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, blobMetadataStore, prometheusClient, subgraphClient, mockTx, mockChainState, mockIndexedChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil) expectedRequestedAt = uint64(5567830000000000000) expectedDataLength = 32 expectedBatchId = uint32(99) @@ -552,7 +556,7 @@ func TestPortCheck(t *testing.T) { func TestCheckBatcherHealthExpectServing(t *testing.T) { r := setUpRouter() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, &MockHttpClient{ShouldSucceed: true}) + testDataApiServer = dataapi.NewServer(config, blobstore, blobMetadataStore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, &MockHttpClient{ShouldSucceed: true}) r.GET("/v1/metrics/batcher-service-availability", testDataApiServer.FetchBatcherAvailability) @@ -585,7 +589,7 @@ func TestCheckBatcherHealthExpectServing(t *testing.T) { func TestCheckBatcherHealthExpectNotServing(t *testing.T) { r := setUpRouter() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, &MockHttpClient{ShouldSucceed: false}) + testDataApiServer = dataapi.NewServer(config, blobstore, blobMetadataStore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, &MockHttpClient{ShouldSucceed: false}) r.GET("/v1/metrics/batcher-service-availability", testDataApiServer.FetchBatcherAvailability) @@ -623,7 +627,7 @@ func TestFetchDisperserServiceAvailabilityHandler(t *testing.T) { Status: grpc_health_v1.HealthCheckResponse_SERVING, }) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, mockHealthCheckService, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, blobMetadataStore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, mockHealthCheckService, nil) r.GET("/v1/metrics/disperser-service-availability", testDataApiServer.FetchDisperserServiceAvailability) @@ -661,7 +665,7 @@ func TestChurnerServiceAvailabilityHandler(t *testing.T) { Status: grpc_health_v1.HealthCheckResponse_SERVING, }) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, mockHealthCheckService, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, blobMetadataStore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, mockHealthCheckService, nil) r.GET("/v1/metrics/churner-service-availability", testDataApiServer.FetchChurnerServiceAvailability) @@ -707,7 +711,7 @@ func TestFetchDeregisteredOperatorNoSocketInfoOneOperatorHandler(t *testing.T) { // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfoNoSocketInfo, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, blobMetadataStore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) @@ -755,7 +759,7 @@ func TestFetchDeregisteredMultipleOperatorsOneWithNoSocketInfoHandler(t *testing // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfoNoSocketInfo, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, blobMetadataStore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) @@ -822,7 +826,7 @@ func TestFetchDeregisteredOperatorInfoInvalidTimeStampHandler(t *testing.T) { // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, blobMetadataStore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) @@ -866,7 +870,7 @@ func TestFetchDeregisteredOperatorInfoInvalidTimeStampTwoOperatorsHandler(t *tes // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, blobMetadataStore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) @@ -921,7 +925,7 @@ func TestFetchMetricsDeregisteredOperatorHandler(t *testing.T) { // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, blobMetadataStore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) @@ -985,7 +989,7 @@ func TestFetchDeregisteredOperatorOffline(t *testing.T) { mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregistered, nil) mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, blobMetadataStore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorState, nil) @@ -1037,7 +1041,7 @@ func TestFetchDeregisteredOperatorsWithoutDaysQueryParam(t *testing.T) { // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, blobMetadataStore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) @@ -1094,7 +1098,7 @@ func TestFetchDeregisteredOperatorInvalidDaysQueryParam(t *testing.T) { mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregistered, nil) mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, blobMetadataStore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) @@ -1135,7 +1139,7 @@ func TestFetchDeregisteredOperatorQueryDaysGreaterThan30(t *testing.T) { mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregistered, nil) mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, blobMetadataStore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorState, nil) @@ -1180,7 +1184,7 @@ func TestFetchDeregisteredOperatorsMultipleOffline(t *testing.T) { // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, blobMetadataStore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) @@ -1232,267 +1236,11 @@ func TestFetchDeregisteredOperatorOnline(t *testing.T) { r := setUpRouter() - indexedOperatorState := make(map[core.OperatorID]*subgraph.OperatorInfo) - indexedOperatorState[core.OperatorID{0}] = subgraphDeregisteredOperatorInfo - - mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregistered, nil) - mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) - - mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorState, nil) - - // Start test server for Operator - closeServer, err := startTestGRPCServer("localhost:32007") // Let the OS assign a free port - if err != nil { - t.Fatalf("Failed to start test server: %v", err) - } - defer closeServer() // Ensure the server is closed after the test - - r.GET("/v1/operators-info/deregistered-operators", testDataApiServer.FetchDeregisteredOperators) - - w := httptest.NewRecorder() - req := httptest.NewRequest(http.MethodGet, "/v1/operators-info/deregistered-operators?days=14", nil) - r.ServeHTTP(w, req) - - res := w.Result() - defer res.Body.Close() - - data, err := io.ReadAll(res.Body) - assert.NoError(t, err) - - var response dataapi.QueriedStateOperatorsResponse - err = json.Unmarshal(data, &response) - assert.NoError(t, err) - assert.NotNil(t, response) - - assert.Equal(t, http.StatusOK, res.StatusCode) - assert.Equal(t, 1, response.Meta.Size) - assert.Equal(t, 1, len(response.Data)) - assert.Equal(t, true, response.Data[0].IsOnline) - - // Reset the mock - mockSubgraphApi.ExpectedCalls = nil - mockSubgraphApi.Calls = nil -} - -func TestFetchDeregisteredOperatorsMultipleOfflineOnline(t *testing.T) { - // Skipping this test as repported being flaky but could not reproduce it locally - t.Skip("Skipping testing in CI environment") - - defer goleak.VerifyNone(t) - - r := setUpRouter() - - indexedOperatorStates := make(map[core.OperatorID]*subgraph.OperatorInfo) - indexedOperatorStates[core.OperatorID{0}] = subgraphDeregisteredOperatorInfo - indexedOperatorStates[core.OperatorID{1}] = subgraphDeregisteredOperatorInfo2 - - mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphTwoOperatorsDeregistered, nil) - - // Set up the mock calls for the two operators - mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() - mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) - - mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) - - // Start the test server for Operator 2 - closeServer, err := startTestGRPCServer("localhost:32009") - if err != nil { - t.Fatalf("Failed to start test server: %v", err) - } - defer closeServer() - - r.GET("/v1/operators-info/deregistered-operators", testDataApiServer.FetchDeregisteredOperators) - - w := httptest.NewRecorder() - req := httptest.NewRequest(http.MethodGet, "/v1/operators-info/deregistered-operators?days=14", nil) - r.ServeHTTP(w, req) - - res := w.Result() - defer res.Body.Close() - - data, err := io.ReadAll(res.Body) - assert.NoError(t, err) - - var response dataapi.QueriedStateOperatorsResponse - err = json.Unmarshal(data, &response) - assert.NoError(t, err) - assert.NotNil(t, response) - - assert.Equal(t, http.StatusOK, res.StatusCode) - assert.Equal(t, 2, response.Meta.Size) - assert.Equal(t, 2, len(response.Data)) - - operator1Data := response.Data[0] - operator2Data := response.Data[1] - - responseJson := string(data) - fmt.Printf("Response: %v\n", responseJson) - - assert.Equal(t, "0xe22dae12a0074f20b8fc96a0489376db34075e545ef60c4845d264a732568311", operator1Data.OperatorId) - assert.Equal(t, uint(22), operator1Data.BlockNumber) - assert.Equal(t, "localhost:32007", operator1Data.Socket) - assert.Equal(t, false, operator1Data.IsOnline) - - assert.Equal(t, "0xe23cae12a0074f20b8fc96a0489376db34075e545ef60c4845d264b732568312", operator2Data.OperatorId) - assert.Equal(t, uint(24), operator2Data.BlockNumber) - assert.Equal(t, "localhost:32009", operator2Data.Socket) - assert.Equal(t, true, operator2Data.IsOnline) - - // Reset the mock - mockSubgraphApi.ExpectedCalls = nil - mockSubgraphApi.Calls = nil -} - -func TestFetchDeregisteredOperatorsMultipleOnline(t *testing.T) { - - defer goleak.VerifyNone(t) - - r := setUpRouter() - - indexedOperatorStates := make(map[core.OperatorID]*subgraph.OperatorInfo) - indexedOperatorStates[core.OperatorID{0}] = subgraphDeregisteredOperatorInfo - indexedOperatorStates[core.OperatorID{1}] = subgraphDeregisteredOperatorInfo2 - - mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphTwoOperatorsDeregistered, nil) - mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() - mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) - - mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) - - // Start test server for Operator 1 - closeServer1, err := startTestGRPCServer("localhost:32007") // Let the OS assign a free port - if err != nil { - t.Fatalf("Failed to start test server: %v", err) - } - defer closeServer1() // Ensure the server is closed after the test - - // Start test server for Operator 2 - closeServer2, err := startTestGRPCServer("localhost:32009") // Let the OS assign a free port - if err != nil { - t.Fatalf("Failed to start test server: %v", err) - } - defer closeServer2() // Ensure the server is closed after the test - - r.GET("/v1/operators-info/deregistered-operators", testDataApiServer.FetchDeregisteredOperators) - - w := httptest.NewRecorder() - req := httptest.NewRequest(http.MethodGet, "/v1/operators-info/deregistered-operators?days=14", nil) - r.ServeHTTP(w, req) - - res := w.Result() - defer res.Body.Close() - - data, err := io.ReadAll(res.Body) - assert.NoError(t, err) - - var response dataapi.QueriedStateOperatorsResponse - err = json.Unmarshal(data, &response) - assert.NoError(t, err) - assert.NotNil(t, response) - - assert.Equal(t, http.StatusOK, res.StatusCode) - assert.Equal(t, 2, response.Meta.Size) - assert.Equal(t, 2, len(response.Data)) - - operator1Data := response.Data[0] - operator2Data := response.Data[1] - - assert.Equal(t, "0xe22dae12a0074f20b8fc96a0489376db34075e545ef60c4845d264a732568311", operator1Data.OperatorId) - assert.Equal(t, uint(22), operator1Data.BlockNumber) - assert.Equal(t, "localhost:32007", operator1Data.Socket) - assert.Equal(t, true, operator1Data.IsOnline) - - assert.Equal(t, "0xe23cae12a0074f20b8fc96a0489376db34075e545ef60c4845d264b732568312", operator2Data.OperatorId) - assert.Equal(t, uint(24), operator2Data.BlockNumber) - assert.Equal(t, "localhost:32009", operator2Data.Socket) - assert.Equal(t, true, operator2Data.IsOnline) - - // Reset the mock - mockSubgraphApi.ExpectedCalls = nil - mockSubgraphApi.Calls = nil -} - -func TestFetchDeregisteredOperatorsMultipleOfflineSameBlock(t *testing.T) { - - defer goleak.VerifyNone(t) - - r := setUpRouter() - - indexedOperatorStates := make(map[core.OperatorID]*subgraph.OperatorInfo) - indexedOperatorStates[core.OperatorID{0}] = subgraphDeregisteredOperatorInfo - indexedOperatorStates[core.OperatorID{1}] = subgraphDeregisteredOperatorInfo2 - indexedOperatorStates[core.OperatorID{2}] = subgraphDeregisteredOperatorInfo3 - - mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphThreeOperatorsDeregistered, nil) - - // Set up the mock calls for the three operators - mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() - mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo3, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) - - mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) - - r.GET("/v1/operators-info/deregistered-operators", testDataApiServer.FetchDeregisteredOperators) - - w := httptest.NewRecorder() - req := httptest.NewRequest(http.MethodGet, "/v1/operators-info/deregistered-operators?days=14", nil) - r.ServeHTTP(w, req) - - res := w.Result() - defer res.Body.Close() - - data, err := io.ReadAll(res.Body) - assert.NoError(t, err) - - var response dataapi.QueriedStateOperatorsResponse - err = json.Unmarshal(data, &response) - assert.NoError(t, err) - assert.NotNil(t, response) - - assert.Equal(t, http.StatusOK, res.StatusCode) - assert.Equal(t, 3, response.Meta.Size) - assert.Equal(t, 3, len(response.Data)) - - operator1Data := response.Data[0] - - assert.Equal(t, "0xe22dae12a0074f20b8fc96a0489376db34075e545ef60c4845d264a732568311", operator1Data.OperatorId) - assert.Equal(t, uint(22), operator1Data.BlockNumber) - assert.Equal(t, "localhost:32007", operator1Data.Socket) - assert.Equal(t, false, operator1Data.IsOnline) - - operator2Data := getOperatorData(response.Data, "0xe23cae12a0074f20b8fc96a0489376db34075e545ef60c4845d264b732568312") - operator3Data := getOperatorData(response.Data, "0xe24cae12a0074f20b8fc96a0489376db34075e545ef60c4845d264b732568313") - - assert.Equal(t, "0xe23cae12a0074f20b8fc96a0489376db34075e545ef60c4845d264b732568312", operator2Data.OperatorId) - assert.Equal(t, uint(24), operator2Data.BlockNumber) - assert.Equal(t, "localhost:32009", operator2Data.Socket) - assert.Equal(t, false, operator1Data.IsOnline) - - assert.Equal(t, "0xe24cae12a0074f20b8fc96a0489376db34075e545ef60c4845d264b732568313", operator3Data.OperatorId) - assert.Equal(t, uint(24), operator3Data.BlockNumber) - assert.Equal(t, "localhost:32011", operator3Data.Socket) - assert.Equal(t, false, operator3Data.IsOnline) - - // Reset the mock - mockSubgraphApi.ExpectedCalls = nil - mockSubgraphApi.Calls = nil -} - -func TestFetchRegisteredOperatorOnline(t *testing.T) { - - defer goleak.VerifyNone(t) - - r := setUpRouter() - indexedOperatorState := make(map[core.OperatorID]*subgraph.OperatorInfo) indexedOperatorState[core.OperatorID{0}] = subgraphDeregisteredOperatorInfo mockSubgraphApi.On("QueryRegisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorRegistered, nil) mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, blobMetadataStore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorState, nil)