Skip to content

Commit

Permalink
Block port checks for private/loopback/unspecificed/invalid operator IPs
Browse files Browse the repository at this point in the history
  • Loading branch information
pschork committed Apr 27, 2024
1 parent 1063b0a commit 07e5a6f
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 40 deletions.
67 changes: 62 additions & 5 deletions disperser/dataapi/docs/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,63 @@ const docTemplate = `{
"host": "{{.Host}}",
"basePath": "{{.BasePath}}",
"paths": {
"/ejector/ejection": {
"get": {
"produces": [
"application/json"
],
"tags": [
"Ejector"
],
"summary": "Eject operators who violate the SLAs during the given time interval",
"parameters": [
{
"type": "integer",
"description": "Lookback window for operator ejection [default: 86400]",
"name": "interval",
"in": "query"
},
{
"type": "integer",
"description": "End time for evaluating operator ejection [default: now]",
"name": "end",
"in": "query"
},
{
"type": "string",
"description": "Whether it's periodic or urgent ejection request [default: periodic]",
"name": "mode",
"in": "query"
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/dataapi.BlobMetadataResponse"
}
},
"400": {
"description": "error: Bad request",
"schema": {
"$ref": "#/definitions/dataapi.ErrorResponse"
}
},
"404": {
"description": "error: Not found",
"schema": {
"$ref": "#/definitions/dataapi.ErrorResponse"
}
},
"500": {
"description": "error: Server error",
"schema": {
"$ref": "#/definitions/dataapi.ErrorResponse"
}
}
}
}
},
"/feed/blobs": {
"get": {
"produces": [
Expand Down Expand Up @@ -723,19 +780,19 @@ const docTemplate = `{
"dataapi.OperatorPortCheckResponse": {
"type": "object",
"properties": {
"disperser_online": {
"dispersal_online": {
"type": "boolean"
},
"disperser_socket": {
"dispersal_socket": {
"type": "string"
},
"operator_id": {
"type": "string"
},
"retriever_online": {
"retrieval_online": {
"type": "boolean"
},
"retriever_socket": {
"retrieval_socket": {
"type": "string"
}
}
Expand Down Expand Up @@ -850,7 +907,7 @@ const docTemplate = `{
"Failed",
"Finalized",
"InsufficientSignatures",
"Confirming"
"Dispersing"
]
},
"github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2": {
Expand Down
67 changes: 62 additions & 5 deletions disperser/dataapi/docs/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,63 @@
"version": "1"
},
"paths": {
"/ejector/ejection": {
"get": {
"produces": [
"application/json"
],
"tags": [
"Ejector"
],
"summary": "Eject operators who violate the SLAs during the given time interval",
"parameters": [
{
"type": "integer",
"description": "Lookback window for operator ejection [default: 86400]",
"name": "interval",
"in": "query"
},
{
"type": "integer",
"description": "End time for evaluating operator ejection [default: now]",
"name": "end",
"in": "query"
},
{
"type": "string",
"description": "Whether it's periodic or urgent ejection request [default: periodic]",
"name": "mode",
"in": "query"
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/dataapi.BlobMetadataResponse"
}
},
"400": {
"description": "error: Bad request",
"schema": {
"$ref": "#/definitions/dataapi.ErrorResponse"
}
},
"404": {
"description": "error: Not found",
"schema": {
"$ref": "#/definitions/dataapi.ErrorResponse"
}
},
"500": {
"description": "error: Server error",
"schema": {
"$ref": "#/definitions/dataapi.ErrorResponse"
}
}
}
}
},
"/feed/blobs": {
"get": {
"produces": [
Expand Down Expand Up @@ -719,19 +776,19 @@
"dataapi.OperatorPortCheckResponse": {
"type": "object",
"properties": {
"disperser_online": {
"dispersal_online": {
"type": "boolean"
},
"disperser_socket": {
"dispersal_socket": {
"type": "string"
},
"operator_id": {
"type": "string"
},
"retriever_online": {
"retrieval_online": {
"type": "boolean"
},
"retriever_socket": {
"retrieval_socket": {
"type": "string"
}
}
Expand Down Expand Up @@ -846,7 +903,7 @@
"Failed",
"Finalized",
"InsufficientSignatures",
"Confirming"
"Dispersing"
]
},
"github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2": {
Expand Down
48 changes: 43 additions & 5 deletions disperser/dataapi/docs/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,15 @@ definitions:
type: object
dataapi.OperatorPortCheckResponse:
properties:
disperser_online:
dispersal_online:
type: boolean
disperser_socket:
dispersal_socket:
type: string
operator_id:
type: string
retriever_online:
retrieval_online:
type: boolean
retriever_socket:
retrieval_socket:
type: string
type: object
dataapi.OperatorsNonsigningPercentage:
Expand Down Expand Up @@ -226,7 +226,7 @@ definitions:
- Failed
- Finalized
- InsufficientSignatures
- Confirming
- Dispersing
github_com_consensys_gnark-crypto_ecc_bn254_internal_fptower.E2:
properties:
a0:
Expand All @@ -240,6 +240,44 @@ info:
title: EigenDA Data Access API
version: "1"
paths:
/ejector/ejection:
get:
parameters:
- description: 'Lookback window for operator ejection [default: 86400]'
in: query
name: interval
type: integer
- description: 'End time for evaluating operator ejection [default: now]'
in: query
name: end
type: integer
- description: 'Whether it''s periodic or urgent ejection request [default:
periodic]'
in: query
name: mode
type: string
produces:
- application/json
responses:
"200":
description: OK
schema:
$ref: '#/definitions/dataapi.BlobMetadataResponse'
"400":
description: 'error: Bad request'
schema:
$ref: '#/definitions/dataapi.ErrorResponse'
"404":
description: 'error: Not found'
schema:
$ref: '#/definitions/dataapi.ErrorResponse'
"500":
description: 'error: Server error'
schema:
$ref: '#/definitions/dataapi.ErrorResponse'
summary: Eject operators who violate the SLAs during the given time interval
tags:
- Ejector
/feed/blobs:
get:
parameters:
Expand Down
48 changes: 33 additions & 15 deletions disperser/dataapi/operator_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func checkIsOnlineAndProcessOperator(operatorStatus OperatorOnlineStatus, operat
var socket string
if operatorStatus.IndexedOperatorInfo != nil {
socket = core.OperatorSocket(operatorStatus.IndexedOperatorInfo.Socket).GetRetrievalSocket()
isOnline = checkIsOperatorOnline(socket)
isOnline = checkIsOperatorOnline(socket, 10, logger)
}

// Log the online status
Expand All @@ -104,42 +104,60 @@ func checkIsOnlineAndProcessOperator(operatorStatus OperatorOnlineStatus, operat
operatorOnlineStatusresultsChan <- metadata
}

func validOperatorIP(socketString string) bool {
ip, _, _, err := core.ParseOperatorSocket(socketString)
if err != nil {
return true
}
ipAddr := net.ParseIP(ip)
if ipAddr == nil {
return true
}
return ipAddr.IsPrivate() || !ipAddr.IsUnspecified() || !ipAddr.IsLoopback()
}

func (s *server) probeOperatorPorts(ctx context.Context, operatorId string) (*OperatorPortCheckResponse, error) {
operatorInfo, err := s.subgraphClient.QueryOperatorInfoByOperatorId(context.Background(), operatorId)
if err != nil {
s.logger.Warn("Failed to fetch operator info", "error", err)
s.logger.Warn("failed to fetch operator info", "error", err)
return &OperatorPortCheckResponse{}, errors.New("not found")
}

retrieverSocket := core.OperatorSocket(operatorInfo.Socket).GetRetrievalSocket()
retrieverOnline := checkIsOperatorOnline(retrieverSocket)
operatorSocket := core.OperatorSocket(operatorInfo.Socket)
retrievalSocket := operatorSocket.GetRetrievalSocket()
retrievalOnline := checkIsOperatorOnline(retrievalSocket, 3, s.logger)

disperserSocket := core.OperatorSocket(operatorInfo.Socket).GetDispersalSocket()
disperserOnline := checkIsOperatorOnline(disperserSocket)

// Log the online status
s.logger.Info("Operator port status", "retrieverOnline", retrieverOnline, "retrieverSocket", retrieverSocket, "disperserOnline", disperserOnline, "disperserSocket", disperserSocket)
dispersalSocket := operatorSocket.GetDispersalSocket()
dispersalOnline := checkIsOperatorOnline(dispersalSocket, 3, s.logger)

// Create the metadata regardless of online status
portCheckResponse := &OperatorPortCheckResponse{
OperatorId: operatorId,
DisperserSocket: disperserSocket,
RetrieverSocket: retrieverSocket,
DisperserOnline: disperserOnline,
RetrieverOnline: retrieverOnline,
DispersalSocket: dispersalSocket,
RetrievalSocket: retrievalSocket,
DispersalOnline: dispersalOnline,
RetrievalOnline: retrievalOnline,
}

// Log the online status
s.logger.Info("operator port check response", portCheckResponse)

// Send the metadata to the results channel
return portCheckResponse, nil
}

// method to check if operator is online
// Note: This method is least intrusive way to check if operator is online
// AlternateSolution: Should we add an endpt to check if operator is online?
func checkIsOperatorOnline(socket string) bool {
timeout := time.Second * 10
func checkIsOperatorOnline(socket string, timeoutSecs int, logger logging.Logger) bool {
if !validOperatorIP(socket) {
logger.Error("port check blocked invalid operator IP", "socket", socket)
return false
}
timeout := time.Second * time.Duration(timeoutSecs)
conn, err := net.DialTimeout("tcp", socket, timeout)
if err != nil {
logger.Warn("port check timeout", "socket", socket, "timeout", timeoutSecs, "error", err)
return false
}
defer conn.Close() // Close the connection after checking
Expand Down
20 changes: 10 additions & 10 deletions disperser/dataapi/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,10 @@ type (

OperatorPortCheckResponse struct {
OperatorId string `json:"operator_id"`
DisperserSocket string `json:"disperser_socket"`
RetrieverSocket string `json:"retriever_socket"`
DisperserOnline bool `json:"disperser_online"`
RetrieverOnline bool `json:"retriever_online"`
DispersalSocket string `json:"dispersal_socket"`
RetrievalSocket string `json:"retrieval_socket"`
DispersalOnline bool `json:"dispersal_online"`
RetrievalOnline bool `json:"retrieval_online"`
}
ErrorResponse struct {
Error string `json:"error"`
Expand Down Expand Up @@ -319,9 +319,9 @@ func (s *server) Shutdown() error {
// @Summary Eject operators who violate the SLAs during the given time interval
// @Tags Ejector
// @Produce json
// @Param interval query int false "Lookback window for operator ejection [default: 86400]"
// @Param end query int false "End time for evaluating operator ejection [default: now]"
// @Param mode query string "Whether it's periodic or urgent ejection request [default: periodic]"
// @Param interval query int false "Lookback window for operator ejection [default: 86400]"
// @Param end query int false "End time for evaluating operator ejection [default: now]"
// @Param mode query string false "Whether it's periodic or urgent ejection request [default: periodic]"
// @Success 200 {object} BlobMetadataResponse
// @Failure 400 {object} ErrorResponse "error: Bad request"
// @Failure 404 {object} ErrorResponse "error: Not found"
Expand Down Expand Up @@ -688,15 +688,15 @@ func (s *server) OperatorPortCheck(c *gin.Context) {
defer timer.ObserveDuration()

operatorId := c.DefaultQuery("operator_id", "")
s.logger.Info("Checking operator ports", "operatorId", operatorId)
s.logger.Info("checking operator ports", "operatorId", operatorId)
portCheckResponse, err := s.probeOperatorPorts(c.Request.Context(), operatorId)
if err != nil {
if strings.Contains(err.Error(), "not found") {
err = errNotFound
s.logger.Warn("Operator not found", "operatorId", operatorId)
s.logger.Warn("operator not found", "operatorId", operatorId)
s.metrics.IncrementNotFoundRequestNum("OperatorPortCheck")
} else {
s.logger.Error("Operator port check failed", "error", err)
s.logger.Error("operator port check failed", "error", err)
s.metrics.IncrementFailedRequestNum("OperatorPortCheck")
}
errorResponse(c, err)
Expand Down

0 comments on commit 07e5a6f

Please sign in to comment.