diff --git a/disperser/cmd/dataapi/config.go b/disperser/cmd/dataapi/config.go index 2dfcca2e2b..9154e5578a 100644 --- a/disperser/cmd/dataapi/config.go +++ b/disperser/cmd/dataapi/config.go @@ -37,6 +37,8 @@ type Config struct { DisperserHostname string ChurnerHostname string BatcherHealthEndpt string + + BlobMetadataV2TableName string } func NewConfig(ctx *cli.Context) (Config, error) { @@ -77,10 +79,11 @@ func NewConfig(ctx *cli.Context) (Config, error) { HTTPPort: ctx.GlobalString(flags.MetricsHTTPPort.Name), EnableMetrics: ctx.GlobalBool(flags.EnableMetricsFlag.Name), }, - DisperserHostname: ctx.GlobalString(flags.DisperserHostnameFlag.Name), - ChurnerHostname: ctx.GlobalString(flags.ChurnerHostnameFlag.Name), - BatcherHealthEndpt: ctx.GlobalString(flags.BatcherHealthEndptFlag.Name), - ChainStateConfig: thegraph.ReadCLIConfig(ctx), + DisperserHostname: ctx.GlobalString(flags.DisperserHostnameFlag.Name), + ChurnerHostname: ctx.GlobalString(flags.ChurnerHostnameFlag.Name), + BatcherHealthEndpt: ctx.GlobalString(flags.BatcherHealthEndptFlag.Name), + ChainStateConfig: thegraph.ReadCLIConfig(ctx), + BlobMetadataV2TableName: ctx.GlobalString(flags.DynamoV2TableNameFlag.Name), } return config, nil } diff --git a/disperser/cmd/dataapi/flags/flags.go b/disperser/cmd/dataapi/flags/flags.go index b38070dfb2..8bf51b44a4 100644 --- a/disperser/cmd/dataapi/flags/flags.go +++ b/disperser/cmd/dataapi/flags/flags.go @@ -20,6 +20,12 @@ var ( Required: true, EnvVar: common.PrefixEnvVar(envVarPrefix, "DYNAMO_TABLE_NAME"), } + DynamoV2TableNameFlag = cli.StringFlag{ + Name: common.PrefixFlag(FlagPrefix, "dynamo-v2-table-name"), + Usage: "Name of the dynamo table to store v2 blob metadata", + Required: true, + EnvVar: common.PrefixEnvVar(envVarPrefix, "DYNAMO_V2_TABLE_NAME"), + } S3BucketNameFlag = cli.StringFlag{ Name: common.PrefixFlag(FlagPrefix, "s3-bucket-name"), Usage: "Name of the bucket to store blobs", @@ -143,6 +149,7 @@ var ( var requiredFlags = []cli.Flag{ DynamoTableNameFlag, + DynamoV2TableNameFlag, SocketAddrFlag, S3BucketNameFlag, SubgraphApiBatchMetadataAddrFlag, diff --git a/disperser/cmd/dataapi/main.go b/disperser/cmd/dataapi/main.go index 7053639e0c..df3a5bb657 100644 --- a/disperser/cmd/dataapi/main.go +++ b/disperser/cmd/dataapi/main.go @@ -91,15 +91,16 @@ func RunDataApi(ctx *cli.Context) error { } var ( - promClient = dataapi.NewPrometheusClient(promApi, config.PrometheusConfig.Cluster) - blobMetadataStore = blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, 0) - sharedStorage = blobstore.NewSharedStorage(config.BlobstoreConfig.BucketName, s3Client, blobMetadataStore, logger) - subgraphApi = subgraph.NewApi(config.SubgraphApiBatchMetadataAddr, config.SubgraphApiOperatorStateAddr) - subgraphClient = dataapi.NewSubgraphClient(subgraphApi, logger) - chainState = coreeth.NewChainState(tx, client) - indexedChainState = thegraph.MakeIndexedChainState(config.ChainStateConfig, chainState, logger) - metrics = dataapi.NewMetrics(blobMetadataStore, config.MetricsConfig.HTTPPort, logger) - server = dataapi.NewServer( + promClient = dataapi.NewPrometheusClient(promApi, config.PrometheusConfig.Cluster) + blobMetadataStore = blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, 0) + blobMetadataStoreV2 = blobstorev2.NewBlobMetadataStore(dynamoClient, logger, config.BlobMetadataV2TableName) + sharedStorage = blobstore.NewSharedStorage(config.BlobstoreConfig.BucketName, s3Client, blobMetadataStore, logger) + subgraphApi = subgraph.NewApi(config.SubgraphApiBatchMetadataAddr, config.SubgraphApiOperatorStateAddr) + subgraphClient = dataapi.NewSubgraphClient(subgraphApi, logger) + chainState = coreeth.NewChainState(tx, client) + indexedChainState = thegraph.MakeIndexedChainState(config.ChainStateConfig, chainState, logger) + metrics = dataapi.NewMetrics(blobMetadataStore, config.MetricsConfig.HTTPPort, logger) + server = dataapi.NewServer( dataapi.Config{ ServerMode: config.ServerMode, SocketAddr: config.SocketAddr, @@ -119,6 +120,7 @@ func RunDataApi(ctx *cli.Context) error { nil, nil, nil, + blobMetadataStoreV2, ) ) @@ -129,29 +131,6 @@ func RunDataApi(ctx *cli.Context) error { logger.Info("Enabled metrics for Data Access API", "socket", httpSocket) } - if config.ServerVersion == 2 { - blobMetadataStorev2 := blobstorev2.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName) - serverv2 := dataapi.NewServerV2( - dataapi.Config{ - ServerMode: config.ServerMode, - SocketAddr: config.SocketAddr, - AllowOrigins: config.AllowOrigins, - DisperserHostname: config.DisperserHostname, - ChurnerHostname: config.ChurnerHostname, - BatcherHealthEndpt: config.BatcherHealthEndpt, - }, - blobMetadataStorev2, - promClient, - subgraphClient, - tx, - chainState, - indexedChainState, - logger, - metrics, - ) - return runServer(serverv2, logger) - } - return runServer(server, logger) } diff --git a/disperser/dataapi/docs/docs.go b/disperser/dataapi/docs/docs.go index 79e490408e..39b08aef55 100644 --- a/disperser/dataapi/docs/docs.go +++ b/disperser/dataapi/docs/docs.go @@ -15,99 +15,7 @@ const docTemplate = `{ "host": "{{.Host}}", "basePath": "{{.BasePath}}", "paths": { - "/batches/{batch_header_hash}": { - "get": { - "produces": [ - "application/json" - ], - "tags": [ - "Feed" - ], - "summary": "Fetch batch by the batch header hash", - "parameters": [ - { - "type": "string", - "description": "Batch header hash in hex string", - "name": "batch_header_hash", - "in": "path", - "required": true - } - ], - "responses": { - "200": { - "description": "OK", - "schema": { - "$ref": "#/definitions/dataapi.BlobResponse" - } - }, - "400": { - "description": "error: Bad request", - "schema": { - "$ref": "#/definitions/dataapi.ErrorResponse" - } - }, - "404": { - "description": "error: Not found", - "schema": { - "$ref": "#/definitions/dataapi.ErrorResponse" - } - }, - "500": { - "description": "error: Server error", - "schema": { - "$ref": "#/definitions/dataapi.ErrorResponse" - } - } - } - } - }, - "/blobs/{blob_key}": { - "get": { - "produces": [ - "application/json" - ], - "tags": [ - "Feed" - ], - "summary": "Fetch blob metadata by blob key", - "parameters": [ - { - "type": "string", - "description": "Blob key in hex string", - "name": "blob_key", - "in": "path", - "required": true - } - ], - "responses": { - "200": { - "description": "OK", - "schema": { - "$ref": "#/definitions/dataapi.BlobResponse" - } - }, - "400": { - "description": "error: Bad request", - "schema": { - "$ref": "#/definitions/dataapi.ErrorResponse" - } - }, - "404": { - "description": "error: Not found", - "schema": { - "$ref": "#/definitions/dataapi.ErrorResponse" - } - }, - "500": { - "description": "error: Server error", - "schema": { - "$ref": "#/definitions/dataapi.ErrorResponse" - } - } - } - } - }, - "/feed/batches/{batch_header_hash}/blobs": { + "/v1/feed/batches/{batch_header_hash}/blobs": { "get": { "produces": [ "application/json" @@ -165,7 +73,7 @@ const docTemplate = `{ } } }, - "/feed/blobs": { + "/v1/feed/blobs": { "get": { "produces": [ "application/json" @@ -210,7 +118,7 @@ const docTemplate = `{ } } }, - "/feed/blobs/{blob_key}": { + "/v1/feed/blobs/{blob_key}": { "get": { "produces": [ "application/json" @@ -256,7 +164,7 @@ const docTemplate = `{ } } }, - "/metrics": { + "/v1/metrics": { "get": { "produces": [ "application/json" @@ -313,13 +221,13 @@ const docTemplate = `{ } } }, - "/metrics/batcher-service-availability": { + "/v1/metrics/batcher-service-availability": { "get": { "produces": [ "application/json" ], "tags": [ - "Batcher Availability" + "Service Availability" ], "summary": "Get status of EigenDA batcher.", "responses": { @@ -350,13 +258,13 @@ const docTemplate = `{ } } }, - "/metrics/churner-service-availability": { + "/v1/metrics/churner-service-availability": { "get": { "produces": [ "application/json" ], "tags": [ - "Churner ServiceAvailability" + "Service Availability" ], "summary": "Get status of EigenDA churner service.", "responses": { @@ -387,13 +295,13 @@ const docTemplate = `{ } } }, - "/metrics/disperser-service-availability": { + "/v1/metrics/disperser-service-availability": { "get": { "produces": [ "application/json" ], "tags": [ - "ServiceAvailability" + "Service Availability" ], "summary": "Get status of EigenDA Disperser service.", "responses": { @@ -424,7 +332,7 @@ const docTemplate = `{ } } }, - "/metrics/non-signers": { + "/v1/metrics/non-signers": { "get": { "produces": [ "application/json" @@ -472,7 +380,7 @@ const docTemplate = `{ } } }, - "/metrics/operator-nonsigning-percentage": { + "/v1/metrics/operator-nonsigning-percentage": { "get": { "produces": [ "application/json" @@ -529,7 +437,7 @@ const docTemplate = `{ } } }, - "/metrics/throughput": { + "/v1/metrics/throughput": { "get": { "produces": [ "application/json" @@ -583,7 +491,7 @@ const docTemplate = `{ } } }, - "/operators-info/deregistered-operators": { + "/v1/operators-info/deregistered-operators": { "get": { "produces": [ "application/json" @@ -620,7 +528,7 @@ const docTemplate = `{ } } }, - "/operators-info/operator-ejections": { + "/v1/operators-info/operator-ejections": { "get": { "produces": [ "application/json" @@ -683,13 +591,13 @@ const docTemplate = `{ } } }, - "/operators-info/operators-stake": { + "/v1/operators-info/operators-stake": { "get": { "produces": [ "application/json" ], "tags": [ - "OperatorsStake" + "OperatorsInfo" ], "summary": "Operator stake distribution query", "parameters": [ @@ -729,7 +637,7 @@ const docTemplate = `{ } } }, - "/operators-info/port-check": { + "/v1/operators-info/port-check": { "get": { "produces": [ "application/json" @@ -775,7 +683,7 @@ const docTemplate = `{ } } }, - "/operators-info/registered-operators": { + "/v1/operators-info/registered-operators": { "get": { "produces": [ "application/json" @@ -812,7 +720,7 @@ const docTemplate = `{ } } }, - "/operators-info/semver-scan": { + "/v1/operators-info/semver-scan": { "get": { "produces": [ "application/json" @@ -837,13 +745,105 @@ const docTemplate = `{ } } }, - "/operators/nodeinfo": { + "/v2/batches/{batch_header_hash}": { "get": { "produces": [ "application/json" ], "tags": [ - "OperatorsNodeInfo" + "Feed" + ], + "summary": "Fetch batch by the batch header hash", + "parameters": [ + { + "type": "string", + "description": "Batch header hash in hex string", + "name": "batch_header_hash", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/dataapi.BlobResponse" + } + }, + "400": { + "description": "error: Bad request", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "404": { + "description": "error: Not found", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "500": { + "description": "error: Server error", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + } + } + } + }, + "/v2/blobs/{blob_key}": { + "get": { + "produces": [ + "application/json" + ], + "tags": [ + "Feed" + ], + "summary": "Fetch blob metadata by blob key", + "parameters": [ + { + "type": "string", + "description": "Blob key in hex string", + "name": "blob_key", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/dataapi.BlobResponse" + } + }, + "400": { + "description": "error: Bad request", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "404": { + "description": "error: Not found", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "500": { + "description": "error: Server error", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + } + } + } + }, + "/v2/operators/nodeinfo": { + "get": { + "produces": [ + "application/json" + ], + "tags": [ + "Operators" ], "summary": "Active operator semver", "responses": { @@ -862,13 +862,13 @@ const docTemplate = `{ } } }, - "/operators/reachability": { + "/v2/operators/reachability": { "get": { "produces": [ "application/json" ], "tags": [ - "OperatorsReachability" + "Operators" ], "summary": "Operator node reachability check", "parameters": [ @@ -907,13 +907,13 @@ const docTemplate = `{ } } }, - "/operators/stake": { + "/v2/operators/stake": { "get": { "produces": [ "application/json" ], "tags": [ - "OperatorsStake" + "Operators" ], "summary": "Operator stake distribution query", "parameters": [ @@ -1375,6 +1375,12 @@ const docTemplate = `{ "items": { "type": "integer" } + }, + "y": { + "type": "array", + "items": { + "type": "integer" + } } } }, @@ -1383,6 +1389,9 @@ const docTemplate = `{ "properties": { "x": { "$ref": "#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2" + }, + "y": { + "$ref": "#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2" } } }, @@ -1391,6 +1400,9 @@ const docTemplate = `{ "properties": { "x": { "$ref": "#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2" + }, + "y": { + "$ref": "#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2" } } }, @@ -1454,6 +1466,12 @@ const docTemplate = `{ "items": { "type": "integer" } + }, + "a1": { + "type": "array", + "items": { + "type": "integer" + } } } }, diff --git a/disperser/dataapi/docs/swagger.json b/disperser/dataapi/docs/swagger.json index 82011b9bbb..f80df67710 100644 --- a/disperser/dataapi/docs/swagger.json +++ b/disperser/dataapi/docs/swagger.json @@ -11,99 +11,7 @@ "version": "1" }, "paths": { - "/batches/{batch_header_hash}": { - "get": { - "produces": [ - "application/json" - ], - "tags": [ - "Feed" - ], - "summary": "Fetch batch by the batch header hash", - "parameters": [ - { - "type": "string", - "description": "Batch header hash in hex string", - "name": "batch_header_hash", - "in": "path", - "required": true - } - ], - "responses": { - "200": { - "description": "OK", - "schema": { - "$ref": "#/definitions/dataapi.BlobResponse" - } - }, - "400": { - "description": "error: Bad request", - "schema": { - "$ref": "#/definitions/dataapi.ErrorResponse" - } - }, - "404": { - "description": "error: Not found", - "schema": { - "$ref": "#/definitions/dataapi.ErrorResponse" - } - }, - "500": { - "description": "error: Server error", - "schema": { - "$ref": "#/definitions/dataapi.ErrorResponse" - } - } - } - } - }, - "/blobs/{blob_key}": { - "get": { - "produces": [ - "application/json" - ], - "tags": [ - "Feed" - ], - "summary": "Fetch blob metadata by blob key", - "parameters": [ - { - "type": "string", - "description": "Blob key in hex string", - "name": "blob_key", - "in": "path", - "required": true - } - ], - "responses": { - "200": { - "description": "OK", - "schema": { - "$ref": "#/definitions/dataapi.BlobResponse" - } - }, - "400": { - "description": "error: Bad request", - "schema": { - "$ref": "#/definitions/dataapi.ErrorResponse" - } - }, - "404": { - "description": "error: Not found", - "schema": { - "$ref": "#/definitions/dataapi.ErrorResponse" - } - }, - "500": { - "description": "error: Server error", - "schema": { - "$ref": "#/definitions/dataapi.ErrorResponse" - } - } - } - } - }, - "/feed/batches/{batch_header_hash}/blobs": { + "/v1/feed/batches/{batch_header_hash}/blobs": { "get": { "produces": [ "application/json" @@ -161,7 +69,7 @@ } } }, - "/feed/blobs": { + "/v1/feed/blobs": { "get": { "produces": [ "application/json" @@ -206,7 +114,7 @@ } } }, - "/feed/blobs/{blob_key}": { + "/v1/feed/blobs/{blob_key}": { "get": { "produces": [ "application/json" @@ -252,7 +160,7 @@ } } }, - "/metrics": { + "/v1/metrics": { "get": { "produces": [ "application/json" @@ -309,13 +217,13 @@ } } }, - "/metrics/batcher-service-availability": { + "/v1/metrics/batcher-service-availability": { "get": { "produces": [ "application/json" ], "tags": [ - "Batcher Availability" + "Service Availability" ], "summary": "Get status of EigenDA batcher.", "responses": { @@ -346,13 +254,13 @@ } } }, - "/metrics/churner-service-availability": { + "/v1/metrics/churner-service-availability": { "get": { "produces": [ "application/json" ], "tags": [ - "Churner ServiceAvailability" + "Service Availability" ], "summary": "Get status of EigenDA churner service.", "responses": { @@ -383,13 +291,13 @@ } } }, - "/metrics/disperser-service-availability": { + "/v1/metrics/disperser-service-availability": { "get": { "produces": [ "application/json" ], "tags": [ - "ServiceAvailability" + "Service Availability" ], "summary": "Get status of EigenDA Disperser service.", "responses": { @@ -420,7 +328,7 @@ } } }, - "/metrics/non-signers": { + "/v1/metrics/non-signers": { "get": { "produces": [ "application/json" @@ -468,7 +376,7 @@ } } }, - "/metrics/operator-nonsigning-percentage": { + "/v1/metrics/operator-nonsigning-percentage": { "get": { "produces": [ "application/json" @@ -525,7 +433,7 @@ } } }, - "/metrics/throughput": { + "/v1/metrics/throughput": { "get": { "produces": [ "application/json" @@ -579,7 +487,7 @@ } } }, - "/operators-info/deregistered-operators": { + "/v1/operators-info/deregistered-operators": { "get": { "produces": [ "application/json" @@ -616,7 +524,7 @@ } } }, - "/operators-info/operator-ejections": { + "/v1/operators-info/operator-ejections": { "get": { "produces": [ "application/json" @@ -679,13 +587,13 @@ } } }, - "/operators-info/operators-stake": { + "/v1/operators-info/operators-stake": { "get": { "produces": [ "application/json" ], "tags": [ - "OperatorsStake" + "OperatorsInfo" ], "summary": "Operator stake distribution query", "parameters": [ @@ -725,7 +633,7 @@ } } }, - "/operators-info/port-check": { + "/v1/operators-info/port-check": { "get": { "produces": [ "application/json" @@ -771,7 +679,7 @@ } } }, - "/operators-info/registered-operators": { + "/v1/operators-info/registered-operators": { "get": { "produces": [ "application/json" @@ -808,7 +716,7 @@ } } }, - "/operators-info/semver-scan": { + "/v1/operators-info/semver-scan": { "get": { "produces": [ "application/json" @@ -833,13 +741,105 @@ } } }, - "/operators/nodeinfo": { + "/v2/batches/{batch_header_hash}": { "get": { "produces": [ "application/json" ], "tags": [ - "OperatorsNodeInfo" + "Feed" + ], + "summary": "Fetch batch by the batch header hash", + "parameters": [ + { + "type": "string", + "description": "Batch header hash in hex string", + "name": "batch_header_hash", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/dataapi.BlobResponse" + } + }, + "400": { + "description": "error: Bad request", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "404": { + "description": "error: Not found", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "500": { + "description": "error: Server error", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + } + } + } + }, + "/v2/blobs/{blob_key}": { + "get": { + "produces": [ + "application/json" + ], + "tags": [ + "Feed" + ], + "summary": "Fetch blob metadata by blob key", + "parameters": [ + { + "type": "string", + "description": "Blob key in hex string", + "name": "blob_key", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/dataapi.BlobResponse" + } + }, + "400": { + "description": "error: Bad request", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "404": { + "description": "error: Not found", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "500": { + "description": "error: Server error", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + } + } + } + }, + "/v2/operators/nodeinfo": { + "get": { + "produces": [ + "application/json" + ], + "tags": [ + "Operators" ], "summary": "Active operator semver", "responses": { @@ -858,13 +858,13 @@ } } }, - "/operators/reachability": { + "/v2/operators/reachability": { "get": { "produces": [ "application/json" ], "tags": [ - "OperatorsReachability" + "Operators" ], "summary": "Operator node reachability check", "parameters": [ @@ -903,13 +903,13 @@ } } }, - "/operators/stake": { + "/v2/operators/stake": { "get": { "produces": [ "application/json" ], "tags": [ - "OperatorsStake" + "Operators" ], "summary": "Operator stake distribution query", "parameters": [ @@ -1371,6 +1371,12 @@ "items": { "type": "integer" } + }, + "y": { + "type": "array", + "items": { + "type": "integer" + } } } }, @@ -1379,6 +1385,9 @@ "properties": { "x": { "$ref": "#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2" + }, + "y": { + "$ref": "#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2" } } }, @@ -1387,6 +1396,9 @@ "properties": { "x": { "$ref": "#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2" + }, + "y": { + "$ref": "#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2" } } }, @@ -1450,6 +1462,12 @@ "items": { "type": "integer" } + }, + "a1": { + "type": "array", + "items": { + "type": "integer" + } } } }, diff --git a/disperser/dataapi/docs/swagger.yaml b/disperser/dataapi/docs/swagger.yaml index 84b9f71826..96a7d91754 100644 --- a/disperser/dataapi/docs/swagger.yaml +++ b/disperser/dataapi/docs/swagger.yaml @@ -284,16 +284,24 @@ definitions: items: type: integer type: array + "y": + items: + type: integer + type: array type: object encoding.G2Commitment: properties: x: $ref: '#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2' + "y": + $ref: '#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2' type: object encoding.LengthProof: properties: x: $ref: '#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2' + "y": + $ref: '#/definitions/github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2' type: object github_com_Layr-Labs_eigenda_core_v2.BlobHeader: properties: @@ -339,6 +347,10 @@ definitions: items: type: integer type: array + a1: + items: + type: integer + type: array type: object semver.SemverMetrics: properties: @@ -361,67 +373,7 @@ info: title: EigenDA Data Access API version: "1" paths: - /batches/{batch_header_hash}: - get: - parameters: - - description: Batch header hash in hex string - in: path - name: batch_header_hash - required: true - type: string - produces: - - application/json - responses: - "200": - description: OK - schema: - $ref: '#/definitions/dataapi.BlobResponse' - "400": - description: 'error: Bad request' - schema: - $ref: '#/definitions/dataapi.ErrorResponse' - "404": - description: 'error: Not found' - schema: - $ref: '#/definitions/dataapi.ErrorResponse' - "500": - description: 'error: Server error' - schema: - $ref: '#/definitions/dataapi.ErrorResponse' - summary: Fetch batch by the batch header hash - tags: - - Feed - /blobs/{blob_key}: - get: - parameters: - - description: Blob key in hex string - in: path - name: blob_key - required: true - type: string - produces: - - application/json - responses: - "200": - description: OK - schema: - $ref: '#/definitions/dataapi.BlobResponse' - "400": - description: 'error: Bad request' - schema: - $ref: '#/definitions/dataapi.ErrorResponse' - "404": - description: 'error: Not found' - schema: - $ref: '#/definitions/dataapi.ErrorResponse' - "500": - description: 'error: Server error' - schema: - $ref: '#/definitions/dataapi.ErrorResponse' - summary: Fetch blob metadata by blob key - tags: - - Feed - /feed/batches/{batch_header_hash}/blobs: + /v1/feed/batches/{batch_header_hash}/blobs: get: parameters: - description: Batch Header Hash @@ -459,7 +411,7 @@ paths: summary: Fetch blob metadata by batch header hash tags: - Feed - /feed/blobs: + /v1/feed/blobs: get: parameters: - description: 'Limit [default: 10]' @@ -488,7 +440,7 @@ paths: summary: Fetch blobs metadata list tags: - Feed - /feed/blobs/{blob_key}: + /v1/feed/blobs/{blob_key}: get: parameters: - description: Blob Key @@ -518,7 +470,7 @@ paths: summary: Fetch blob metadata by blob key tags: - Feed - /metrics: + /v1/metrics: get: parameters: - description: 'Start unix timestamp [default: 1 hour ago]' @@ -555,7 +507,7 @@ paths: summary: Fetch metrics tags: - Metrics - /metrics/batcher-service-availability: + /v1/metrics/batcher-service-availability: get: produces: - application/json @@ -578,8 +530,8 @@ paths: $ref: '#/definitions/dataapi.ErrorResponse' summary: Get status of EigenDA batcher. tags: - - Batcher Availability - /metrics/churner-service-availability: + - Service Availability + /v1/metrics/churner-service-availability: get: produces: - application/json @@ -602,8 +554,8 @@ paths: $ref: '#/definitions/dataapi.ErrorResponse' summary: Get status of EigenDA churner service. tags: - - Churner ServiceAvailability - /metrics/disperser-service-availability: + - Service Availability + /v1/metrics/disperser-service-availability: get: produces: - application/json @@ -626,8 +578,8 @@ paths: $ref: '#/definitions/dataapi.ErrorResponse' summary: Get status of EigenDA Disperser service. tags: - - ServiceAvailability - /metrics/non-signers: + - Service Availability + /v1/metrics/non-signers: get: parameters: - description: 'Interval to query for non signers in seconds [default: 3600]' @@ -658,7 +610,7 @@ paths: summary: Fetch non signers tags: - Metrics - /metrics/operator-nonsigning-percentage: + /v1/metrics/operator-nonsigning-percentage: get: parameters: - description: 'Interval to query for operators nonsigning percentage [default: @@ -697,7 +649,7 @@ paths: summary: Fetch operators non signing percentage tags: - Metrics - /metrics/throughput: + /v1/metrics/throughput: get: parameters: - description: 'Start unix timestamp [default: 1 hour ago]' @@ -732,7 +684,7 @@ paths: summary: Fetch throughput time series tags: - Metrics - /operators-info/deregistered-operators: + /v1/operators-info/deregistered-operators: get: produces: - application/json @@ -757,7 +709,7 @@ paths: is a query parameter with a default value of 14 and max value of 30. tags: - OperatorsInfo - /operators-info/operator-ejections: + /v1/operators-info/operator-ejections: get: parameters: - description: 'Lookback in days [default: 1]' @@ -798,7 +750,7 @@ paths: summary: Fetch list of operator ejections over last N days. tags: - OperatorsInfo - /operators-info/operators-stake: + /v1/operators-info/operators-stake: get: parameters: - description: Operator ID @@ -827,8 +779,8 @@ paths: $ref: '#/definitions/dataapi.ErrorResponse' summary: Operator stake distribution query tags: - - OperatorsStake - /operators-info/port-check: + - OperatorsInfo + /v1/operators-info/port-check: get: parameters: - description: Operator ID @@ -858,7 +810,7 @@ paths: summary: Operator node reachability port check tags: - OperatorsInfo - /operators-info/registered-operators: + /v1/operators-info/registered-operators: get: produces: - application/json @@ -883,7 +835,7 @@ paths: a query parameter with a default value of 14 and max value of 30. tags: - OperatorsInfo - /operators-info/semver-scan: + /v1/operators-info/semver-scan: get: produces: - application/json @@ -899,7 +851,67 @@ paths: summary: Active operator semver scan tags: - OperatorsInfo - /operators/nodeinfo: + /v2/batches/{batch_header_hash}: + get: + parameters: + - description: Batch header hash in hex string + in: path + name: batch_header_hash + required: true + type: string + produces: + - application/json + responses: + "200": + description: OK + schema: + $ref: '#/definitions/dataapi.BlobResponse' + "400": + description: 'error: Bad request' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + "404": + description: 'error: Not found' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + "500": + description: 'error: Server error' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + summary: Fetch batch by the batch header hash + tags: + - Feed + /v2/blobs/{blob_key}: + get: + parameters: + - description: Blob key in hex string + in: path + name: blob_key + required: true + type: string + produces: + - application/json + responses: + "200": + description: OK + schema: + $ref: '#/definitions/dataapi.BlobResponse' + "400": + description: 'error: Bad request' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + "404": + description: 'error: Not found' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + "500": + description: 'error: Server error' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + summary: Fetch blob metadata by blob key + tags: + - Feed + /v2/operators/nodeinfo: get: produces: - application/json @@ -914,8 +926,8 @@ paths: $ref: '#/definitions/dataapi.ErrorResponse' summary: Active operator semver tags: - - OperatorsNodeInfo - /operators/reachability: + - Operators + /v2/operators/reachability: get: parameters: - description: 'Operator ID in hex string [default: all operators if unspecified]' @@ -943,8 +955,8 @@ paths: $ref: '#/definitions/dataapi.ErrorResponse' summary: Operator node reachability check tags: - - OperatorsReachability - /operators/stake: + - Operators + /v2/operators/stake: get: parameters: - description: 'Operator ID in hex string [default: all operators if unspecified]' @@ -972,7 +984,7 @@ paths: $ref: '#/definitions/dataapi.ErrorResponse' summary: Operator stake distribution query tags: - - OperatorsStake + - Operators schemes: - https - http diff --git a/disperser/dataapi/server.go b/disperser/dataapi/server.go index eb6ee4f7e5..b6a7a71edc 100644 --- a/disperser/dataapi/server.go +++ b/disperser/dataapi/server.go @@ -17,11 +17,14 @@ import ( "time" "github.com/Layr-Labs/eigenda/core" + corev2 "github.com/Layr-Labs/eigenda/core/v2" "github.com/Layr-Labs/eigenda/encoding" "github.com/Layr-Labs/eigensdk-go/logging" "google.golang.org/grpc/health/grpc_health_v1" "github.com/Layr-Labs/eigenda/disperser" + blobstorev2 "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" + "github.com/Layr-Labs/eigenda/disperser/common/semver" "github.com/Layr-Labs/eigenda/disperser/dataapi/docs" "github.com/gin-contrib/cors" @@ -189,26 +192,51 @@ type ( Error string `json:"error"` } + SignedBatch struct { + BatchHeader *corev2.BatchHeader `json:"batch_header"` + Attestation *corev2.Attestation `json:"attestation"` + } + + BlobResponse struct { + BlobHeader *corev2.BlobHeader `json:"blob_header"` + Status string `json:"status"` + DispersedAt uint64 `json:"dispersed_at"` + BlobSizeBytes uint64 `json:"blob_size_bytes"` + } + + BatchResponse struct { + BatchHeaderHash string `json:"batch_header_hash"` + SignedBatch *SignedBatch `json:"signed_batch"` + BlobVerificationInfos []*corev2.BlobVerificationInfo `json:"blob_verification_infos"` + } + + ServerInterface interface { + Start() error + Shutdown() error + } server struct { serverMode string socketAddr string allowOrigins []string logger logging.Logger - blobstore disperser.BlobStore + metrics *Metrics promClient PrometheusClient subgraphClient SubgraphClient - transactor core.Reader chainState core.ChainState indexedChainState core.IndexedChainState + operatorHandler *operatorHandler - metrics *Metrics + // v1 + blobstore disperser.BlobStore + transactor core.Reader disperserHostName string churnerHostName string batcherHealthEndpt string eigenDAGRPCServiceChecker EigenDAGRPCServiceChecker eigenDAHttpServiceChecker EigenDAHttpServiceChecker - operatorHandler *operatorHandler + // v2 + blobMetadataStore *blobstorev2.BlobMetadataStore } ) @@ -225,6 +253,7 @@ func NewServer( grpcConn GRPCConn, eigenDAGRPCServiceChecker EigenDAGRPCServiceChecker, eigenDAHttpServiceChecker EigenDAHttpServiceChecker, + blobMetadataStore *blobstorev2.BlobMetadataStore, ) *server { // Initialize the health checker service for EigenDA services @@ -260,6 +289,7 @@ func NewServer( eigenDAGRPCServiceChecker: eigenDAGRPCServiceChecker, eigenDAHttpServiceChecker: eigenDAHttpServiceChecker, operatorHandler: newOperatorHandler(logger, metrics, transactor, chainState, indexedChainState, subgraphClient), + blobMetadataStore: blobMetadataStore, } } @@ -270,10 +300,14 @@ func (s *server) Start() error { } router := gin.New() - basePath := "/api/v1" + basePath := "/api" docs.SwaggerInfo.BasePath = basePath docs.SwaggerInfo.Host = os.Getenv("SWAGGER_HOST") - v1 := router.Group(basePath) + + // Move swagger route to root api level + router.GET("/api/swagger/*any", ginswagger.WrapHandler(swaggerfiles.Handler)) + + v1 := router.Group(basePath + "/v1") { feed := v1.Group("/feed") { @@ -300,9 +334,30 @@ func (s *server) Start() error { metrics.GET("/churner-service-availability", s.FetchChurnerServiceAvailability) metrics.GET("/batcher-service-availability", s.FetchBatcherAvailability) } - swagger := v1.Group("/swagger") + } + v2 := router.Group(basePath + "/v2") + { + blob := v2.Group("/blob") + { + blob.GET("/blobs/feed", s.FetchBlobFeedHandler) + blob.GET("/blobs/:blob_key", s.FetchBlobHandlerV2) + } + batch := v2.Group("/batch") { - swagger.GET("/*any", ginswagger.WrapHandler(swaggerfiles.Handler)) + batch.GET("/batches/feed", s.FetchBatchFeedHandler) + batch.GET("/batches/:batch_header_hash", s.FetchBatchHandler) + } + operators := v2.Group("/operators") + { + operators.GET("/non-signers", s.FetchNonSingers) + operators.GET("/stake", s.FetchOperatorsStake) + operators.GET("/nodeinfo", s.FetchOperatorsNodeInfo) + operators.GET("/reachability", s.CheckOperatorsReachability) + } + metrics := v2.Group("/metrics") + { + metrics.GET("/overview", s.FetchMetricsOverviewHandler) + metrics.GET("/throughput", s.FetchMetricsThroughputHandler) } } @@ -361,7 +416,7 @@ func (s *server) Shutdown() error { // @Failure 400 {object} ErrorResponse "error: Bad request" // @Failure 404 {object} ErrorResponse "error: Not found" // @Failure 500 {object} ErrorResponse "error: Server error" -// @Router /feed/blobs/{blob_key} [get] +// @Router /v1/feed/blobs/{blob_key} [get] func (s *server) FetchBlobHandler(c *gin.Context) { timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { s.metrics.ObserveLatency("FetchBlob", f*1000) // make milliseconds @@ -394,7 +449,7 @@ func (s *server) FetchBlobHandler(c *gin.Context) { // @Failure 400 {object} ErrorResponse "error: Bad request" // @Failure 404 {object} ErrorResponse "error: Not found" // @Failure 500 {object} ErrorResponse "error: Server error" -// @Router /feed/batches/{batch_header_hash}/blobs [get] +// @Router /v1/feed/batches/{batch_header_hash}/blobs [get] func (s *server) FetchBlobsFromBatchHeaderHash(c *gin.Context) { timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { s.metrics.ObserveLatency("FetchBlobsFromBatchHeaderHash", f*1000) // make milliseconds @@ -500,7 +555,7 @@ func encodeNextToken(key *disperser.BatchIndexExclusiveStartKey) (string, error) // @Failure 400 {object} ErrorResponse "error: Bad request" // @Failure 404 {object} ErrorResponse "error: Not found" // @Failure 500 {object} ErrorResponse "error: Server error" -// @Router /feed/blobs [get] +// @Router /v1/feed/blobs [get] func (s *server) FetchBlobsHandler(c *gin.Context) { timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { s.metrics.ObserveLatency("FetchBlobs", f*1000) // make milliseconds @@ -548,7 +603,7 @@ func (s *server) FetchBlobsHandler(c *gin.Context) { // @Failure 400 {object} ErrorResponse "error: Bad request" // @Failure 404 {object} ErrorResponse "error: Not found" // @Failure 500 {object} ErrorResponse "error: Server error" -// @Router /metrics [get] +// @Router /v1/metrics [get] func (s *server) FetchMetricsHandler(c *gin.Context) { timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { s.metrics.ObserveLatency("FetchMetrics", f*1000) // make milliseconds @@ -589,7 +644,7 @@ func (s *server) FetchMetricsHandler(c *gin.Context) { // @Failure 400 {object} ErrorResponse "error: Bad request" // @Failure 404 {object} ErrorResponse "error: Not found" // @Failure 500 {object} ErrorResponse "error: Server error" -// @Router /metrics/throughput [get] +// @Router /v1/metrics/throughput [get] func (s *server) FetchMetricsThroughputHandler(c *gin.Context) { timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { s.metrics.ObserveLatency("FetchMetricsTroughput", f*1000) // make milliseconds @@ -629,7 +684,7 @@ func (s *server) FetchMetricsThroughputHandler(c *gin.Context) { // @Failure 400 {object} ErrorResponse "error: Bad request" // @Failure 404 {object} ErrorResponse "error: Not found" // @Failure 500 {object} ErrorResponse "error: Server error" -// @Router /metrics/non-signers [get] +// @Router /v1/metrics/non-signers [get] func (s *server) FetchNonSigners(c *gin.Context) { timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { s.metrics.ObserveLatency("FetchNonSigners", f*1000) // make milliseconds @@ -664,7 +719,7 @@ func (s *server) FetchNonSigners(c *gin.Context) { // @Failure 400 {object} ErrorResponse "error: Bad request" // @Failure 404 {object} ErrorResponse "error: Not found" // @Failure 500 {object} ErrorResponse "error: Server error" -// @Router /metrics/operator-nonsigning-percentage [get] +// @Router /v1/metrics/operator-nonsigning-percentage [get] func (s *server) FetchOperatorsNonsigningPercentageHandler(c *gin.Context) { timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { s.metrics.ObserveLatency("FetchOperatorsNonsigningPercentageHandler", f*1000) // make milliseconds @@ -713,14 +768,14 @@ func (s *server) FetchOperatorsNonsigningPercentageHandler(c *gin.Context) { // OperatorsStake godoc // // @Summary Operator stake distribution query -// @Tags OperatorsStake +// @Tags OperatorsInfo // @Produce json // @Param operator_id query string true "Operator ID" // @Success 200 {object} OperatorsStakeResponse // @Failure 400 {object} ErrorResponse "error: Bad request" // @Failure 404 {object} ErrorResponse "error: Not found" // @Failure 500 {object} ErrorResponse "error: Server error" -// @Router /operators-info/operators-stake [get] +// @Router /v1/operators-info/operators-stake [get] func (s *server) OperatorsStake(c *gin.Context) { timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { s.metrics.ObserveLatency("OperatorsStake", f*1000) // make milliseconds @@ -751,7 +806,7 @@ func (s *server) OperatorsStake(c *gin.Context) { // @Failure 400 {object} ErrorResponse "error: Bad request" // @Failure 404 {object} ErrorResponse "error: Not found" // @Failure 500 {object} ErrorResponse "error: Server error" -// @Router /operators-info/deregistered-operators [get] +// @Router /v1/operators-info/deregistered-operators [get] func (s *server) FetchDeregisteredOperators(c *gin.Context) { timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { s.metrics.ObserveLatency("FetchDeregisteredOperators", f*1000) // make milliseconds @@ -801,7 +856,7 @@ func (s *server) FetchDeregisteredOperators(c *gin.Context) { // @Failure 400 {object} ErrorResponse "error: Bad request" // @Failure 404 {object} ErrorResponse "error: Not found" // @Failure 500 {object} ErrorResponse "error: Server error" -// @Router /operators-info/registered-operators [get] +// @Router /v1/operators-info/registered-operators [get] func (s *server) FetchRegisteredOperators(c *gin.Context) { timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { s.metrics.ObserveLatency("FetchRegisteredOperators", f*1000) // make milliseconds @@ -854,7 +909,7 @@ func (s *server) FetchRegisteredOperators(c *gin.Context) { // @Failure 400 {object} ErrorResponse "error: Bad request" // @Failure 404 {object} ErrorResponse "error: Not found" // @Failure 500 {object} ErrorResponse "error: Server error" -// @Router /operators-info/operator-ejections [get] +// @Router /v1/operators-info/operator-ejections [get] func (s *server) FetchOperatorEjections(c *gin.Context) { timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { s.metrics.ObserveLatency("FetchOperatorEjections", f*1000) // make milliseconds @@ -912,7 +967,7 @@ func (s *server) FetchOperatorEjections(c *gin.Context) { // @Failure 400 {object} ErrorResponse "error: Bad request" // @Failure 404 {object} ErrorResponse "error: Not found" // @Failure 500 {object} ErrorResponse "error: Server error" -// @Router /operators-info/port-check [get] +// @Router /v1/operators-info/port-check [get] func (s *server) OperatorPortCheck(c *gin.Context) { timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { s.metrics.ObserveLatency("OperatorPortCheck", f*1000) // make milliseconds @@ -946,7 +1001,7 @@ func (s *server) OperatorPortCheck(c *gin.Context) { // @Produce json // @Success 200 {object} SemverReportResponse // @Failure 500 {object} ErrorResponse "error: Server error" -// @Router /operators-info/semver-scan [get] +// @Router /v1/operators-info/semver-scan [get] func (s *server) SemverScan(c *gin.Context) { timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { s.metrics.ObserveLatency("SemverScan", f*1000) // make milliseconds @@ -966,13 +1021,13 @@ func (s *server) SemverScan(c *gin.Context) { // FetchDisperserServiceAvailability godoc // // @Summary Get status of EigenDA Disperser service. -// @Tags ServiceAvailability +// @Tags Service Availability // @Produce json // @Success 200 {object} ServiceAvailabilityResponse // @Failure 400 {object} ErrorResponse "error: Bad request" // @Failure 404 {object} ErrorResponse "error: Not found" // @Failure 500 {object} ErrorResponse "error: Server error" -// @Router /metrics/disperser-service-availability [get] +// @Router /v1/metrics/disperser-service-availability [get] func (s *server) FetchDisperserServiceAvailability(c *gin.Context) { timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { s.metrics.ObserveLatency("FetchDisperserServiceAvailability", f*1000) // make milliseconds @@ -1020,13 +1075,13 @@ func (s *server) FetchDisperserServiceAvailability(c *gin.Context) { // FetchChurnerServiceAvailability godoc // // @Summary Get status of EigenDA churner service. -// @Tags Churner ServiceAvailability +// @Tags Service Availability // @Produce json // @Success 200 {object} ServiceAvailabilityResponse // @Failure 400 {object} ErrorResponse "error: Bad request" // @Failure 404 {object} ErrorResponse "error: Not found" // @Failure 500 {object} ErrorResponse "error: Server error" -// @Router /metrics/churner-service-availability [get] +// @Router /v1/metrics/churner-service-availability [get] func (s *server) FetchChurnerServiceAvailability(c *gin.Context) { timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { s.metrics.ObserveLatency("FetchChurnerServiceAvailability", f*1000) // make milliseconds @@ -1074,13 +1129,13 @@ func (s *server) FetchChurnerServiceAvailability(c *gin.Context) { // FetchBatcherAvailability godoc // // @Summary Get status of EigenDA batcher. -// @Tags Batcher Availability +// @Tags Service Availability // @Produce json // @Success 200 {object} ServiceAvailabilityResponse // @Failure 400 {object} ErrorResponse "error: Bad request" // @Failure 404 {object} ErrorResponse "error: Not found" // @Failure 500 {object} ErrorResponse "error: Server error" -// @Router /metrics/batcher-service-availability [get] +// @Router /v1/metrics/batcher-service-availability [get] func (s *server) FetchBatcherAvailability(c *gin.Context) { timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { s.metrics.ObserveLatency("FetchBatcherAvailability", f*1000) // make milliseconds @@ -1125,6 +1180,191 @@ func (s *server) FetchBatcherAvailability(c *gin.Context) { }) } +func (s *server) FetchBlobFeedHandler(c *gin.Context) { + errorResponse(c, errors.New("FetchBlobFeedHandler unimplemented")) +} + +// FetchBlobHandler godoc +// +// @Summary Fetch blob metadata by blob key +// @Tags Feed +// @Produce json +// @Param blob_key path string true "Blob key in hex string" +// @Success 200 {object} BlobResponse +// @Failure 400 {object} ErrorResponse "error: Bad request" +// @Failure 404 {object} ErrorResponse "error: Not found" +// @Failure 500 {object} ErrorResponse "error: Server error" +// @Router /v2/blobs/{blob_key} [get] +func (s *server) FetchBlobHandlerV2(c *gin.Context) { + start := time.Now() + blobKey, err := corev2.HexToBlobKey(c.Param("blob_key")) + if err != nil { + s.metrics.IncrementInvalidArgRequestNum("FetchBlob") + errorResponse(c, err) + return + } + metadata, err := s.blobMetadataStore.GetBlobMetadata(c.Request.Context(), blobKey) + if err != nil { + s.metrics.IncrementFailedRequestNum("FetchBlob") + errorResponse(c, err) + return + } + response := &BlobResponse{ + BlobHeader: metadata.BlobHeader, + Status: metadata.BlobStatus.String(), + DispersedAt: metadata.RequestedAt, + BlobSizeBytes: metadata.BlobSize, + } + s.metrics.IncrementSuccessfulRequestNum("FetchBlob") + s.metrics.ObserveLatency("FetchBlob", float64(time.Since(start).Milliseconds())) + c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxFeedBlobAge)) + c.JSON(http.StatusOK, response) +} + +func (s *server) FetchBatchFeedHandler(c *gin.Context) { + errorResponse(c, errors.New("FetchBatchFeedHandler unimplemented")) +} + +// FetchBatchHandler godoc +// +// @Summary Fetch batch by the batch header hash +// @Tags Feed +// @Produce json +// @Param batch_header_hash path string true "Batch header hash in hex string" +// @Success 200 {object} BlobResponse +// @Failure 400 {object} ErrorResponse "error: Bad request" +// @Failure 404 {object} ErrorResponse "error: Not found" +// @Failure 500 {object} ErrorResponse "error: Server error" +// @Router /v2/batches/{batch_header_hash} [get] +func (s *server) FetchBatchHandler(c *gin.Context) { + start := time.Now() + batchHeaderHashHex := c.Param("batch_header_hash") + batchHeaderHash, err := ConvertHexadecimalToBytes([]byte(batchHeaderHashHex)) + if err != nil { + s.metrics.IncrementInvalidArgRequestNum("FetchBatch") + errorResponse(c, errors.New("invalid batch header hash")) + return + } + batchHeader, attestation, err := s.blobMetadataStore.GetSignedBatch(c.Request.Context(), batchHeaderHash) + if err != nil { + s.metrics.IncrementFailedRequestNum("FetchBatch") + errorResponse(c, err) + return + } + // TODO: support fetch of blob verification info + batchResponse := &BatchResponse{ + BatchHeaderHash: batchHeaderHashHex, + SignedBatch: &SignedBatch{ + BatchHeader: batchHeader, + Attestation: attestation, + }, + } + s.metrics.IncrementSuccessfulRequestNum("FetchBatch") + s.metrics.ObserveLatency("FetchBatch", float64(time.Since(start).Milliseconds())) + c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxFeedBlobAge)) + c.JSON(http.StatusOK, batchResponse) +} + +// FetchOperatorsStake godoc +// +// @Summary Operator stake distribution query +// @Tags Operators +// @Produce json +// @Param operator_id query string false "Operator ID in hex string [default: all operators if unspecified]" +// @Success 200 {object} OperatorsStakeResponse +// @Failure 400 {object} ErrorResponse "error: Bad request" +// @Failure 404 {object} ErrorResponse "error: Not found" +// @Failure 500 {object} ErrorResponse "error: Server error" +// @Router /v2/operators/stake [get] +func (s *server) FetchOperatorsStake(c *gin.Context) { + timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { + s.metrics.ObserveLatency("FetchOperatorsStake", f*1000) // make milliseconds + })) + defer timer.ObserveDuration() + + operatorId := c.DefaultQuery("operator_id", "") + s.logger.Info("getting operators stake distribution", "operatorId", operatorId) + + operatorsStakeResponse, err := s.operatorHandler.getOperatorsStake(c.Request.Context(), operatorId) + if err != nil { + s.metrics.IncrementFailedRequestNum("FetchOperatorsStake") + errorResponse(c, fmt.Errorf("failed to get operator stake - %s", err)) + return + } + + s.metrics.IncrementSuccessfulRequestNum("FetchOperatorsStake") + c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxOperatorsStakeAge)) + c.JSON(http.StatusOK, operatorsStakeResponse) +} + +// FetchOperatorsNodeInfo godoc +// +// @Summary Active operator semver +// @Tags Operators +// @Produce json +// @Success 200 {object} SemverReportResponse +// @Failure 500 {object} ErrorResponse "error: Server error" +// @Router /v2/operators/nodeinfo [get] +func (s *server) FetchOperatorsNodeInfo(c *gin.Context) { + timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { + s.metrics.ObserveLatency("FetchOperatorsNodeInfo", f*1000) // make milliseconds + })) + defer timer.ObserveDuration() + + report, err := s.operatorHandler.scanOperatorsHostInfo(c.Request.Context()) + if err != nil { + s.logger.Error("failed to scan operators host info", "error", err) + s.metrics.IncrementFailedRequestNum("FetchOperatorsNodeInfo") + errorResponse(c, err) + } + c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxOperatorPortCheckAge)) + c.JSON(http.StatusOK, report) +} + +// CheckOperatorsReachability godoc +// +// @Summary Operator node reachability check +// @Tags Operators +// @Produce json +// @Param operator_id query string false "Operator ID in hex string [default: all operators if unspecified]" +// @Success 200 {object} OperatorPortCheckResponse +// @Failure 400 {object} ErrorResponse "error: Bad request" +// @Failure 404 {object} ErrorResponse "error: Not found" +// @Failure 500 {object} ErrorResponse "error: Server error" +// @Router /v2/operators/reachability [get] +func (s *server) CheckOperatorsReachability(c *gin.Context) { + timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { + s.metrics.ObserveLatency("OperatorPortCheck", f*1000) // make milliseconds + })) + defer timer.ObserveDuration() + + operatorId := c.DefaultQuery("operator_id", "") + s.logger.Info("checking operator ports", "operatorId", operatorId) + portCheckResponse, err := s.operatorHandler.probeOperatorHosts(c.Request.Context(), operatorId) + if err != nil { + if strings.Contains(err.Error(), "not found") { + err = errNotFound + s.logger.Warn("operator not found", "operatorId", operatorId) + s.metrics.IncrementNotFoundRequestNum("OperatorPortCheck") + } else { + s.logger.Error("operator port check failed", "error", err) + s.metrics.IncrementFailedRequestNum("OperatorPortCheck") + } + errorResponse(c, err) + return + } + c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxOperatorPortCheckAge)) + c.JSON(http.StatusOK, portCheckResponse) +} + +func (s *server) FetchNonSingers(c *gin.Context) { + errorResponse(c, errors.New("FetchNonSingers unimplemented")) +} + +func (s *server) FetchMetricsOverviewHandler(c *gin.Context) { + errorResponse(c, errors.New("FetchMetricsOverviewHandler unimplemented")) +} + func errorResponse(c *gin.Context, err error) { _ = c.Error(err) var code int diff --git a/disperser/dataapi/server_test.go b/disperser/dataapi/server_test.go index 50c5723954..01762da410 100644 --- a/disperser/dataapi/server_test.go +++ b/disperser/dataapi/server_test.go @@ -19,6 +19,7 @@ import ( coremock "github.com/Layr-Labs/eigenda/core/mock" "github.com/Layr-Labs/eigenda/disperser" "github.com/Layr-Labs/eigenda/disperser/common/inmem" + blobstorev2 "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" "github.com/Layr-Labs/eigenda/disperser/dataapi" prommock "github.com/Layr-Labs/eigenda/disperser/dataapi/prometheus/mock" "github.com/Layr-Labs/eigenda/disperser/dataapi/subgraph" @@ -45,6 +46,7 @@ var ( expectedBlobCommitment *encoding.BlobCommitments mockLogger = logging.NewNoopLogger() blobstore = inmem.NewBlobStore() + blobMetadataStoreV2 *blobstorev2.BlobMetadataStore mockPrometheusApi = &prommock.MockPrometheusApi{} prometheusClient = dataapi.NewPrometheusClient(mockPrometheusApi, "test-cluster") mockSubgraphApi = &subgraphmock.MockSubgraphApi{} @@ -71,7 +73,7 @@ var ( 1: 10, 2: 10, }) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, subgraphClient, mockTx, mockChainState, mockIndexedChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, subgraphClient, mockTx, mockChainState, mockIndexedChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil, blobMetadataStoreV2) expectedRequestedAt = uint64(5567830000000000000) expectedDataLength = 32 expectedBatchId = uint32(99) @@ -548,7 +550,7 @@ func TestPortCheck(t *testing.T) { func TestCheckBatcherHealthExpectServing(t *testing.T) { r := setUpRouter() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, &MockHttpClient{ShouldSucceed: true}) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, &MockHttpClient{ShouldSucceed: true}, blobMetadataStoreV2) r.GET("/v1/metrics/batcher-service-availability", testDataApiServer.FetchBatcherAvailability) @@ -581,7 +583,7 @@ func TestCheckBatcherHealthExpectServing(t *testing.T) { func TestCheckBatcherHealthExpectNotServing(t *testing.T) { r := setUpRouter() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, &MockHttpClient{ShouldSucceed: false}) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, &MockHttpClient{ShouldSucceed: false}, blobMetadataStoreV2) r.GET("/v1/metrics/batcher-service-availability", testDataApiServer.FetchBatcherAvailability) @@ -619,7 +621,7 @@ func TestFetchDisperserServiceAvailabilityHandler(t *testing.T) { Status: grpc_health_v1.HealthCheckResponse_SERVING, }) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, mockHealthCheckService, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, mockHealthCheckService, nil, blobMetadataStoreV2) r.GET("/v1/metrics/disperser-service-availability", testDataApiServer.FetchDisperserServiceAvailability) @@ -657,7 +659,7 @@ func TestChurnerServiceAvailabilityHandler(t *testing.T) { Status: grpc_health_v1.HealthCheckResponse_SERVING, }) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, mockHealthCheckService, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, mockHealthCheckService, nil, blobMetadataStoreV2) r.GET("/v1/metrics/churner-service-availability", testDataApiServer.FetchChurnerServiceAvailability) @@ -701,7 +703,7 @@ func TestFetchDeregisteredOperatorNoSocketInfoOneOperatorHandler(t *testing.T) { // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfoNoSocketInfo, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil, blobMetadataStoreV2) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) @@ -746,7 +748,7 @@ func TestFetchDeregisteredMultipleOperatorsOneWithNoSocketInfoHandler(t *testing // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfoNoSocketInfo, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil, blobMetadataStoreV2) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) @@ -810,7 +812,7 @@ func TestFetchDeregisteredOperatorInfoInvalidTimeStampHandler(t *testing.T) { // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil, blobMetadataStoreV2) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) @@ -851,7 +853,7 @@ func TestFetchDeregisteredOperatorInfoInvalidTimeStampTwoOperatorsHandler(t *tes // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil, blobMetadataStoreV2) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) @@ -903,7 +905,7 @@ func TestFetchMetricsDeregisteredOperatorHandler(t *testing.T) { // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil, blobMetadataStoreV2) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) @@ -964,7 +966,7 @@ func TestFetchDeregisteredOperatorOffline(t *testing.T) { mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregistered, nil) mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil, blobMetadataStoreV2) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorState, nil) @@ -1013,7 +1015,7 @@ func TestFetchDeregisteredOperatorsWithoutDaysQueryParam(t *testing.T) { // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil, blobMetadataStoreV2) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) @@ -1067,7 +1069,7 @@ func TestFetchDeregisteredOperatorInvalidDaysQueryParam(t *testing.T) { mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregistered, nil) mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil, blobMetadataStoreV2) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) @@ -1105,7 +1107,7 @@ func TestFetchDeregisteredOperatorQueryDaysGreaterThan30(t *testing.T) { mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregistered, nil) mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil, blobMetadataStoreV2) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorState, nil) @@ -1147,7 +1149,7 @@ func TestFetchDeregisteredOperatorsMultipleOffline(t *testing.T) { // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil, blobMetadataStoreV2) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) @@ -1201,7 +1203,7 @@ func TestFetchDeregisteredOperatorOnline(t *testing.T) { mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregistered, nil) mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil, blobMetadataStoreV2) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorState, nil) @@ -1254,7 +1256,7 @@ func TestFetchDeregisteredOperatorsMultipleOfflineOnline(t *testing.T) { // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil, blobMetadataStoreV2) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) @@ -1317,7 +1319,7 @@ func TestFetchDeregisteredOperatorsMultipleOnline(t *testing.T) { mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphTwoOperatorsDeregistered, nil) mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil, blobMetadataStoreV2) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) @@ -1388,7 +1390,7 @@ func TestFetchDeregisteredOperatorsMultipleOfflineSameBlock(t *testing.T) { mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo3, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil, blobMetadataStoreV2) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) @@ -1445,7 +1447,7 @@ func TestFetchRegisteredOperatorOnline(t *testing.T) { indexedOperatorState[core.OperatorID{0}] = subgraphDeregisteredOperatorInfo mockSubgraphApi.On("QueryRegisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorRegistered, nil) mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockIndexedChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil, blobMetadataStoreV2) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorState, nil) diff --git a/disperser/dataapi/server_v2.go b/disperser/dataapi/server_v2.go deleted file mode 100644 index 74d12e4a80..0000000000 --- a/disperser/dataapi/server_v2.go +++ /dev/null @@ -1,356 +0,0 @@ -package dataapi - -import ( - "errors" - "fmt" - "net/http" - "os" - "strings" - "time" - - "github.com/Layr-Labs/eigenda/core" - corev2 "github.com/Layr-Labs/eigenda/core/v2" - "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" - "github.com/Layr-Labs/eigenda/disperser/dataapi/docs" - "github.com/Layr-Labs/eigensdk-go/logging" - "github.com/gin-contrib/cors" - "github.com/gin-contrib/logger" - "github.com/gin-gonic/gin" - "github.com/prometheus/client_golang/prometheus" - swaggerfiles "github.com/swaggo/files" - ginswagger "github.com/swaggo/gin-swagger" -) - -type ( - SignedBatch struct { - BatchHeader *corev2.BatchHeader `json:"batch_header"` - Attestation *corev2.Attestation `json:"attestation"` - } - - BlobResponse struct { - BlobHeader *corev2.BlobHeader `json:"blob_header"` - Status string `json:"status"` - DispersedAt uint64 `json:"dispersed_at"` - BlobSizeBytes uint64 `json:"blob_size_bytes"` - } - - BatchResponse struct { - BatchHeaderHash string `json:"batch_header_hash"` - SignedBatch *SignedBatch `json:"signed_batch"` - BlobVerificationInfos []*corev2.BlobVerificationInfo `json:"blob_verification_infos"` - } -) - -type ServerInterface interface { - Start() error - Shutdown() error -} - -type ServerV2 struct { - serverMode string - socketAddr string - allowOrigins []string - logger logging.Logger - - blobMetadataStore *blobstore.BlobMetadataStore - subgraphClient SubgraphClient - chainReader core.Reader - chainState core.ChainState - indexedChainState core.IndexedChainState - promClient PrometheusClient - metrics *Metrics - - operatorHandler *operatorHandler -} - -func NewServerV2( - config Config, - blobMetadataStore *blobstore.BlobMetadataStore, - promClient PrometheusClient, - subgraphClient SubgraphClient, - chainReader core.Reader, - chainState core.ChainState, - indexedChainState core.IndexedChainState, - logger logging.Logger, - metrics *Metrics, -) *ServerV2 { - l := logger.With("component", "DataAPIServerV2") - return &ServerV2{ - logger: l, - serverMode: config.ServerMode, - socketAddr: config.SocketAddr, - allowOrigins: config.AllowOrigins, - blobMetadataStore: blobMetadataStore, - promClient: promClient, - subgraphClient: subgraphClient, - chainReader: chainReader, - chainState: chainState, - indexedChainState: indexedChainState, - metrics: metrics, - operatorHandler: newOperatorHandler(l, metrics, chainReader, chainState, indexedChainState, subgraphClient), - } -} - -func (s *ServerV2) Start() error { - if s.serverMode == gin.ReleaseMode { - // optimize performance and disable debug features. - gin.SetMode(gin.ReleaseMode) - } - - router := gin.New() - basePath := "/api/v2" - docs.SwaggerInfo.BasePath = basePath - docs.SwaggerInfo.Host = os.Getenv("SWAGGER_HOST") - v2 := router.Group(basePath) - { - blob := v2.Group("/blob") - { - blob.GET("/blobs/feed", s.FetchBlobFeedHandler) - blob.GET("/blobs/:blob_key", s.FetchBlobHandler) - } - batch := v2.Group("/batch") - { - batch.GET("/batches/feed", s.FetchBatchFeedHandler) - batch.GET("/batches/:batch_header_hash", s.FetchBatchHandler) - } - operators := v2.Group("/operators") - { - operators.GET("/non-signers", s.FetchNonSingers) - operators.GET("/stake", s.FetchOperatorsStake) - operators.GET("/nodeinfo", s.FetchOperatorsNodeInfo) - operators.GET("/reachability", s.CheckOperatorsReachability) - } - metrics := v2.Group("/metrics") - { - metrics.GET("/overview", s.FetchMetricsOverviewHandler) - metrics.GET("/throughput", s.FetchMetricsThroughputHandler) - } - swagger := v2.Group("/swagger") - { - swagger.GET("/*any", ginswagger.WrapHandler(swaggerfiles.Handler)) - } - } - - router.GET("/", func(g *gin.Context) { - g.JSON(http.StatusAccepted, gin.H{"status": "OK"}) - }) - - router.Use(logger.SetLogger( - logger.WithSkipPath([]string{"/"}), - )) - - config := cors.DefaultConfig() - config.AllowOrigins = s.allowOrigins - config.AllowCredentials = true - config.AllowMethods = []string{"GET", "POST", "HEAD", "OPTIONS"} - - if s.serverMode != gin.ReleaseMode { - config.AllowOrigins = []string{"*"} - } - router.Use(cors.New(config)) - - srv := &http.Server{ - Addr: s.socketAddr, - Handler: router, - ReadTimeout: 5 * time.Second, - ReadHeaderTimeout: 5 * time.Second, - WriteTimeout: 20 * time.Second, - IdleTimeout: 120 * time.Second, - } - - errChan := run(s.logger, srv) - return <-errChan -} - -func (s *ServerV2) Shutdown() error { - return nil -} - -func (s *ServerV2) FetchBlobFeedHandler(c *gin.Context) { - errorResponse(c, errors.New("FetchBlobFeedHandler unimplemented")) -} - -// FetchBlobHandler godoc -// -// @Summary Fetch blob metadata by blob key -// @Tags Feed -// @Produce json -// @Param blob_key path string true "Blob key in hex string" -// @Success 200 {object} BlobResponse -// @Failure 400 {object} ErrorResponse "error: Bad request" -// @Failure 404 {object} ErrorResponse "error: Not found" -// @Failure 500 {object} ErrorResponse "error: Server error" -// @Router /blobs/{blob_key} [get] -func (s *ServerV2) FetchBlobHandler(c *gin.Context) { - start := time.Now() - blobKey, err := corev2.HexToBlobKey(c.Param("blob_key")) - if err != nil { - s.metrics.IncrementInvalidArgRequestNum("FetchBlob") - errorResponse(c, err) - return - } - metadata, err := s.blobMetadataStore.GetBlobMetadata(c.Request.Context(), blobKey) - if err != nil { - s.metrics.IncrementFailedRequestNum("FetchBlob") - errorResponse(c, err) - return - } - response := &BlobResponse{ - BlobHeader: metadata.BlobHeader, - Status: metadata.BlobStatus.String(), - DispersedAt: metadata.RequestedAt, - BlobSizeBytes: metadata.BlobSize, - } - s.metrics.IncrementSuccessfulRequestNum("FetchBlob") - s.metrics.ObserveLatency("FetchBlob", float64(time.Since(start).Milliseconds())) - c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxFeedBlobAge)) - c.JSON(http.StatusOK, response) -} - -func (s *ServerV2) FetchBatchFeedHandler(c *gin.Context) { - errorResponse(c, errors.New("FetchBatchFeedHandler unimplemented")) -} - -// FetchBatchHandler godoc -// -// @Summary Fetch batch by the batch header hash -// @Tags Feed -// @Produce json -// @Param batch_header_hash path string true "Batch header hash in hex string" -// @Success 200 {object} BlobResponse -// @Failure 400 {object} ErrorResponse "error: Bad request" -// @Failure 404 {object} ErrorResponse "error: Not found" -// @Failure 500 {object} ErrorResponse "error: Server error" -// @Router /batches/{batch_header_hash} [get] -func (s *ServerV2) FetchBatchHandler(c *gin.Context) { - start := time.Now() - batchHeaderHashHex := c.Param("batch_header_hash") - batchHeaderHash, err := ConvertHexadecimalToBytes([]byte(batchHeaderHashHex)) - if err != nil { - s.metrics.IncrementInvalidArgRequestNum("FetchBatch") - errorResponse(c, errors.New("invalid batch header hash")) - return - } - batchHeader, attestation, err := s.blobMetadataStore.GetSignedBatch(c.Request.Context(), batchHeaderHash) - if err != nil { - s.metrics.IncrementFailedRequestNum("FetchBatch") - errorResponse(c, err) - return - } - // TODO: support fetch of blob verification info - batchResponse := &BatchResponse{ - BatchHeaderHash: batchHeaderHashHex, - SignedBatch: &SignedBatch{ - BatchHeader: batchHeader, - Attestation: attestation, - }, - } - s.metrics.IncrementSuccessfulRequestNum("FetchBatch") - s.metrics.ObserveLatency("FetchBatch", float64(time.Since(start).Milliseconds())) - c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxFeedBlobAge)) - c.JSON(http.StatusOK, batchResponse) -} - -// FetchOperatorsStake godoc -// -// @Summary Operator stake distribution query -// @Tags OperatorsStake -// @Produce json -// @Param operator_id query string false "Operator ID in hex string [default: all operators if unspecified]" -// @Success 200 {object} OperatorsStakeResponse -// @Failure 400 {object} ErrorResponse "error: Bad request" -// @Failure 404 {object} ErrorResponse "error: Not found" -// @Failure 500 {object} ErrorResponse "error: Server error" -// @Router /operators/stake [get] -func (s *ServerV2) FetchOperatorsStake(c *gin.Context) { - timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { - s.metrics.ObserveLatency("FetchOperatorsStake", f*1000) // make milliseconds - })) - defer timer.ObserveDuration() - - operatorId := c.DefaultQuery("operator_id", "") - s.logger.Info("getting operators stake distribution", "operatorId", operatorId) - - operatorsStakeResponse, err := s.operatorHandler.getOperatorsStake(c.Request.Context(), operatorId) - if err != nil { - s.metrics.IncrementFailedRequestNum("FetchOperatorsStake") - errorResponse(c, fmt.Errorf("failed to get operator stake - %s", err)) - return - } - - s.metrics.IncrementSuccessfulRequestNum("FetchOperatorsStake") - c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxOperatorsStakeAge)) - c.JSON(http.StatusOK, operatorsStakeResponse) -} - -// FetchOperatorsNodeInfo godoc -// -// @Summary Active operator semver -// @Tags OperatorsNodeInfo -// @Produce json -// @Success 200 {object} SemverReportResponse -// @Failure 500 {object} ErrorResponse "error: Server error" -// @Router /operators/nodeinfo [get] -func (s *ServerV2) FetchOperatorsNodeInfo(c *gin.Context) { - timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { - s.metrics.ObserveLatency("FetchOperatorsNodeInfo", f*1000) // make milliseconds - })) - defer timer.ObserveDuration() - - report, err := s.operatorHandler.scanOperatorsHostInfo(c.Request.Context()) - if err != nil { - s.logger.Error("failed to scan operators host info", "error", err) - s.metrics.IncrementFailedRequestNum("FetchOperatorsNodeInfo") - errorResponse(c, err) - } - c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxOperatorPortCheckAge)) - c.JSON(http.StatusOK, report) -} - -// CheckOperatorsReachability godoc -// -// @Summary Operator node reachability check -// @Tags OperatorsReachability -// @Produce json -// @Param operator_id query string false "Operator ID in hex string [default: all operators if unspecified]" -// @Success 200 {object} OperatorPortCheckResponse -// @Failure 400 {object} ErrorResponse "error: Bad request" -// @Failure 404 {object} ErrorResponse "error: Not found" -// @Failure 500 {object} ErrorResponse "error: Server error" -// @Router /operators/reachability [get] -func (s *ServerV2) CheckOperatorsReachability(c *gin.Context) { - timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { - s.metrics.ObserveLatency("OperatorPortCheck", f*1000) // make milliseconds - })) - defer timer.ObserveDuration() - - operatorId := c.DefaultQuery("operator_id", "") - s.logger.Info("checking operator ports", "operatorId", operatorId) - portCheckResponse, err := s.operatorHandler.probeOperatorHosts(c.Request.Context(), operatorId) - if err != nil { - if strings.Contains(err.Error(), "not found") { - err = errNotFound - s.logger.Warn("operator not found", "operatorId", operatorId) - s.metrics.IncrementNotFoundRequestNum("OperatorPortCheck") - } else { - s.logger.Error("operator port check failed", "error", err) - s.metrics.IncrementFailedRequestNum("OperatorPortCheck") - } - errorResponse(c, err) - return - } - c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxOperatorPortCheckAge)) - c.JSON(http.StatusOK, portCheckResponse) -} - -func (s *ServerV2) FetchNonSingers(c *gin.Context) { - errorResponse(c, errors.New("FetchNonSingers unimplemented")) -} - -func (s *ServerV2) FetchMetricsOverviewHandler(c *gin.Context) { - errorResponse(c, errors.New("FetchMetricsOverviewHandler unimplemented")) -} - -func (s *ServerV2) FetchMetricsThroughputHandler(c *gin.Context) { - errorResponse(c, errors.New("FetchMetricsThroughputHandler unimplemented")) -} diff --git a/disperser/dataapi/server_v2_test.go b/disperser/dataapi/server_v2_test.go index 17458e5718..061df10c6a 100644 --- a/disperser/dataapi/server_v2_test.go +++ b/disperser/dataapi/server_v2_test.go @@ -35,7 +35,7 @@ import ( var ( blobMetadataStore *blobstorev2.BlobMetadataStore - testDataApiServerV2 *dataapi.ServerV2 + testDataApiServerV2 *dataapi.server logger = logging.NewNoopLogger() @@ -94,7 +94,9 @@ func setup(m *testing.M) { panic("failed to create dynamodb client: " + err.Error()) } blobMetadataStore = blobstorev2.NewBlobMetadataStore(dynamoClient, logger, metadataTableName) - testDataApiServerV2 = dataapi.NewServerV2(config, blobMetadataStore, prometheusClient, subgraphClient, mockTx, mockChainState, mockIndexedChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger)) + //testDataApiServerV2 = dataapi.NewServerV2(config, blobMetadataStore, prometheusClient, subgraphClient, mockTx, mockChainState, mockIndexedChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger)) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, subgraphClient, mockTx, mockChainState, mockIndexedChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil, blobMetadataStoreV2) + } // makeCommitment returns a test hardcoded BlobCommitments