Skip to content

Commit

Permalink
Add a new dataapi server (#955)
Browse files Browse the repository at this point in the history
  • Loading branch information
jianoaix authored Dec 5, 2024
1 parent 9041f72 commit 44680a6
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 1 deletion.
9 changes: 9 additions & 0 deletions disperser/cmd/dataapi/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import (
"fmt"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/common/aws"
"github.com/Layr-Labs/eigenda/common/geth"
Expand All @@ -13,6 +15,7 @@ import (
)

type Config struct {
ServerVersion uint
AwsClientConfig aws.ClientConfig
BlobstoreConfig blobstore.Config
EthClientConfig geth.EthClientConfig
Expand All @@ -37,6 +40,11 @@ type Config struct {
}

func NewConfig(ctx *cli.Context) (Config, error) {
version := ctx.GlobalUint(flags.DataApiServerVersionFlag.Name)
if version != 1 && version != 2 {
return Config{}, fmt.Errorf("unknown server version %d, must be in [1, 2]", version)
}

loggerConfig, err := common.ReadLoggerCLIConfig(ctx, flags.FlagPrefix)
if err != nil {
return Config{}, err
Expand All @@ -56,6 +64,7 @@ func NewConfig(ctx *cli.Context) (Config, error) {
BLSOperatorStateRetrieverAddr: ctx.GlobalString(flags.BlsOperatorStateRetrieverFlag.Name),
EigenDAServiceManagerAddr: ctx.GlobalString(flags.EigenDAServiceManagerFlag.Name),
ServerMode: ctx.GlobalString(flags.ServerModeFlag.Name),
ServerVersion: version,
PrometheusConfig: prometheus.Config{
ServerURL: ctx.GlobalString(flags.PrometheusServerURLFlag.Name),
Username: ctx.GlobalString(flags.PrometheusServerUsernameFlag.Name),
Expand Down
8 changes: 8 additions & 0 deletions disperser/cmd/dataapi/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,13 @@ var (
Value: "9100",
EnvVar: common.PrefixEnvVar(envVarPrefix, "METRICS_HTTP_PORT"),
}
DataApiServerVersionFlag = cli.UintFlag{
Name: common.PrefixFlag(FlagPrefix, "dataapi-version"),
Usage: "DataApi server version. Options are 1 and 2.",
Required: false,
Value: 1,
EnvVar: common.PrefixEnvVar(envVarPrefix, "DATA_API_VERSION"),
}
)

var requiredFlags = []cli.Flag{
Expand All @@ -156,6 +163,7 @@ var requiredFlags = []cli.Flag{
var optionalFlags = []cli.Flag{
ServerModeFlag,
MetricsHTTPPort,
DataApiServerVersionFlag,
}

// Flags contains the list of configuration options available to the binary.
Expand Down
31 changes: 30 additions & 1 deletion disperser/cmd/dataapi/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ import (
"github.com/Layr-Labs/eigenda/core/thegraph"
"github.com/Layr-Labs/eigenda/disperser/cmd/dataapi/flags"
"github.com/Layr-Labs/eigenda/disperser/common/blobstore"
blobstorev2 "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore"
"github.com/Layr-Labs/eigenda/disperser/dataapi"
"github.com/Layr-Labs/eigenda/disperser/dataapi/prometheus"
"github.com/Layr-Labs/eigenda/disperser/dataapi/subgraph"
"github.com/Layr-Labs/eigensdk-go/logging"

gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/urfave/cli"
Expand Down Expand Up @@ -127,6 +129,33 @@ func RunDataApi(ctx *cli.Context) error {
logger.Info("Enabled metrics for Data Access API", "socket", httpSocket)
}

if config.ServerVersion == 2 {
blobMetadataStorev2 := blobstorev2.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName)
serverv2 := dataapi.NewServerV2(
dataapi.Config{
ServerMode: config.ServerMode,
SocketAddr: config.SocketAddr,
AllowOrigins: config.AllowOrigins,
DisperserHostname: config.DisperserHostname,
ChurnerHostname: config.ChurnerHostname,
BatcherHealthEndpt: config.BatcherHealthEndpt,
},
blobMetadataStorev2,
promClient,
subgraphClient,
tx,
chainState,
indexedChainState,
logger,
metrics,
)
return runServer(serverv2, logger)
}

return runServer(server, logger)
}

func runServer[T dataapi.ServerInterface](server T, logger logging.Logger) error {
// Setup channel to listen for termination signals
quit := make(chan os.Signal, 1)
// catch SIGINT (Ctrl+C) and SIGTERM (e.g., from `kill`)
Expand All @@ -142,7 +171,7 @@ func RunDataApi(ctx *cli.Context) error {
// Block until a signal is received.
<-quit
logger.Info("Shutting down server...")
err = server.Shutdown()
err := server.Shutdown()

if err != nil {
logger.Errorf("Failed to shutdown server: %v", err)
Expand Down
178 changes: 178 additions & 0 deletions disperser/dataapi/server_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package dataapi

import (
"errors"
"net/http"
"os"
"time"

"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore"
"github.com/Layr-Labs/eigenda/disperser/dataapi/docs"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/gin-contrib/cors"
"github.com/gin-contrib/logger"
"github.com/gin-gonic/gin"
swaggerfiles "github.com/swaggo/files"
ginswagger "github.com/swaggo/gin-swagger"
)

type ServerInterface interface {
Start() error
Shutdown() error
}

type serverv2 struct {
serverMode string
socketAddr string
allowOrigins []string
logger logging.Logger

blobMetadataStore *blobstore.BlobMetadataStore
subgraphClient SubgraphClient
chainReader core.Reader
chainState core.ChainState
indexedChainState core.IndexedChainState
promClient PrometheusClient
metrics *Metrics
}

func NewServerV2(
config Config,
blobMetadataStore *blobstore.BlobMetadataStore,
promClient PrometheusClient,
subgraphClient SubgraphClient,
chainReader core.Reader,
chainState core.ChainState,
indexedChainState core.IndexedChainState,
logger logging.Logger,
metrics *Metrics,
) *serverv2 {
return &serverv2{
logger: logger.With("component", "DataAPIServerV2"),
serverMode: config.ServerMode,
socketAddr: config.SocketAddr,
allowOrigins: config.AllowOrigins,
blobMetadataStore: blobMetadataStore,
promClient: promClient,
subgraphClient: subgraphClient,
chainReader: chainReader,
chainState: chainState,
indexedChainState: indexedChainState,
metrics: metrics,
}
}

func (s *serverv2) Start() error {
if s.serverMode == gin.ReleaseMode {
// optimize performance and disable debug features.
gin.SetMode(gin.ReleaseMode)
}

router := gin.New()
basePath := "/api/v2"
docs.SwaggerInfo.BasePath = basePath
docs.SwaggerInfo.Host = os.Getenv("SWAGGER_HOST")
v2 := router.Group(basePath)
{
feed := v2.Group("/feed")
{
// Blob feed
feed.GET("/blobs", s.FetchBlobsHandler)
feed.GET("/blobs/:blob_key", s.FetchBlobHandler)
// Batch feed
feed.GET("/batches", s.FetchBatchesHandler)
feed.GET("/batches/:batch_header_hash", s.FetchBatchHandler)
}
operators := v2.Group("/operators")
{
operators.GET("/non-signers", s.FetchNonSingers)
operators.GET("/stake", s.FetchOperatorsStake)
operators.GET("/nodeinfo", s.FetchOperatorsNodeInfo)
operators.GET("/reachability", s.CheckOperatorsReachability)
}
metrics := v2.Group("/metrics")
{
metrics.GET("/overview", s.FetchMetricsOverviewHandler)
metrics.GET("/throughput", s.FetchMetricsThroughputHandler)
}
swagger := v2.Group("/swagger")
{
swagger.GET("/*any", ginswagger.WrapHandler(swaggerfiles.Handler))
}
}

router.GET("/", func(g *gin.Context) {
g.JSON(http.StatusAccepted, gin.H{"status": "OK"})
})

router.Use(logger.SetLogger(
logger.WithSkipPath([]string{"/"}),
))

config := cors.DefaultConfig()
config.AllowOrigins = s.allowOrigins
config.AllowCredentials = true
config.AllowMethods = []string{"GET", "POST", "HEAD", "OPTIONS"}

if s.serverMode != gin.ReleaseMode {
config.AllowOrigins = []string{"*"}
}
router.Use(cors.New(config))

srv := &http.Server{
Addr: s.socketAddr,
Handler: router,
ReadTimeout: 5 * time.Second,
ReadHeaderTimeout: 5 * time.Second,
WriteTimeout: 20 * time.Second,
IdleTimeout: 120 * time.Second,
}

errChan := run(s.logger, srv)
return <-errChan
}

func (s *serverv2) Shutdown() error {
return nil
}

func (s *serverv2) FetchBlobsHandler(c *gin.Context) {
errorResponse(c, errors.New("FetchBlobsHandler unimplemented"))
}

func (s *serverv2) FetchBlobHandler(c *gin.Context) {
errorResponse(c, errors.New("FetchBlobHandler unimplemented"))
}

func (s *serverv2) FetchBatchesHandler(c *gin.Context) {
errorResponse(c, errors.New("FetchBatchesHandler unimplemented"))
}

func (s *serverv2) FetchBatchHandler(c *gin.Context) {
errorResponse(c, errors.New("FetchBatchHandler unimplemented"))
}

func (s *serverv2) FetchOperatorsStake(c *gin.Context) {
errorResponse(c, errors.New("FetchOperatorsStake unimplemented"))
}

func (s *serverv2) FetchOperatorsNodeInfo(c *gin.Context) {
errorResponse(c, errors.New("FetchOperatorsNodeInfo unimplemented"))
}

func (s *serverv2) CheckOperatorsReachability(c *gin.Context) {
errorResponse(c, errors.New("CheckOperatorsReachability unimplemented"))
}

func (s *serverv2) FetchNonSingers(c *gin.Context) {
errorResponse(c, errors.New("FetchNonSingers unimplemented"))
}

func (s *serverv2) FetchMetricsOverviewHandler(c *gin.Context) {
errorResponse(c, errors.New("FetchMetricsOverviewHandler unimplemented"))
}

func (s *serverv2) FetchMetricsThroughputHandler(c *gin.Context) {
errorResponse(c, errors.New("FetchMetricsThroughputHandler unimplemented"))
}

0 comments on commit 44680a6

Please sign in to comment.