Skip to content

Commit

Permalink
Fix merge conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
Siddharth More authored and Siddharth More committed May 1, 2024
2 parents a846eb1 + 06f794b commit 8c7a90b
Show file tree
Hide file tree
Showing 86 changed files with 2,778 additions and 456 deletions.
13 changes: 13 additions & 0 deletions .github/workflows/docker-publish-opr-node-images.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ on:
commit_sha:
description: 'Specific Commit SHA (Required)'
required: true
version:
description: 'Version (Required)'
required: true
gitcommit:
description: 'GitCommit (Required)'
required: true
release_tag:
description: 'Release Tag (Optional)'
required: false
Expand All @@ -27,6 +33,12 @@ jobs:
uses: actions/checkout@v3
with:
ref: ${{ github.event.inputs.commit_sha }}
- name: Get Commit Date
id: get_date
run: |
GIT_DATE=$(git log -1 --format=%cd --date=format:'%Y-%m-%d' ${{ github.event.inputs.gitcommit }} || date '+%Y-%m-%d')
echo "GIT_DATE=$GIT_DATE" >> $GITHUB_ENV
echo "::set-output name=gitDate::$GIT_DATE"
- name: Setup Buildx
uses: docker/setup-buildx-action@v1
Expand Down Expand Up @@ -64,6 +76,7 @@ jobs:
tags: ${{ env.REGISTRY }}/layr-labs/eigenda/opr-node:${{ steps.set_tag.outputs.tag }}
cache-from: type=local,src=/tmp/.buildx-cache
cache-to: type=local,dest=/tmp/.buildx-cache-new
build-args: SEMVER=${{ github.event.inputs.version }},GITCOMMIT=${{ github.event.inputs.gitcommit }},GITDATE=${{ steps.get_date.outputs.gitDate }}
if: ${{ success() }}

- name: Build and Push NodePlugin Image
Expand Down
2 changes: 1 addition & 1 deletion api/docs/disperser.md
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ Disperser defines the public APIs for dispersing blobs.

| Method Name | Request Type | Response Type | Description |
| ----------- | ------------ | ------------- | ------------|
| DisperseBlob | [DisperseBlobRequest](#disperser-DisperseBlobRequest) | [DisperseBlobReply](#disperser-DisperseBlobReply) | This API accepts blob to disperse from clients. This executes the dispersal async, i.e. it returns once the request is accepted. The client could use GetBlobStatus() API to poll the the processing status of the blob. |
| DisperseBlob | [DisperseBlobRequest](#disperser-DisperseBlobRequest) | [DisperseBlobReply](#disperser-DisperseBlobReply) | This API accepts blob to disperse from clients. This executes the dispersal async, i.e. it returns once the request is accepted. The client could use GetBlobStatus() API to poll the processing status of the blob. |
| DisperseBlobAuthenticated | [AuthenticatedRequest](#disperser-AuthenticatedRequest) stream | [AuthenticatedReply](#disperser-AuthenticatedReply) stream | DisperseBlobAuthenticated is similar to DisperseBlob, except that it requires the client to authenticate itself via the AuthenticationData message. The protoco is as follows: 1. The client sends a DisperseBlobAuthenticated request with the DisperseBlobRequest message 2. The Disperser sends back a BlobAuthHeader message containing information for the client to verify and sign. 3. The client verifies the BlobAuthHeader and sends back the signed BlobAuthHeader in an AuthenticationData message. 4. The Disperser verifies the signature and returns a DisperseBlobReply message. |
| GetBlobStatus | [BlobStatusRequest](#disperser-BlobStatusRequest) | [BlobStatusReply](#disperser-BlobStatusReply) | This API is meant to be polled for the blob status. |
| RetrieveBlob | [RetrieveBlobRequest](#disperser-RetrieveBlobRequest) | [RetrieveBlobReply](#disperser-RetrieveBlobReply) | This retrieves the requested blob from the Disperser's backend. This is a more efficient way to retrieve blobs than directly retrieving from the DA Nodes (see detail about this approach in api/proto/retriever/retriever.proto). The blob should have been initially dispersed via this Disperser service for this API to work. |
Expand Down
23 changes: 17 additions & 6 deletions api/grpc/disperser/disperser.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 13 additions & 6 deletions api/proto/disperser/disperser.proto
Original file line number Diff line number Diff line change
Expand Up @@ -128,19 +128,26 @@ message RetrieveBlobReply {

// Data Types

// BlobStatus represents the status of a blob.
// The status of a blob is updated as the blob is processed by the disperser.
// The status of a blob can be queried by the client using the GetBlobStatus API.
// Intermediate states are states that the blob can be in while being processed, and it can be updated to a differet state:
// - PROCESSING
// - DISPERSING
// - CONFIRMED
// Terminal states are states that will not be updated to a different state:
// - FAILED
// - FINALIZED
// - INSUFFICIENT_SIGNATURES
enum BlobStatus {
UNKNOWN = 0;

// Intermediate states

// PROCESSING means that the blob is currently being processed by the disperser
PROCESSING = 1;
// CONFIRMED means that the blob has been dispersed to DA Nodes and the dispersed
// batch containing the blob has been confirmed onchain
CONFIRMED = 2;

// Terminal states

// FAILED means that the blob has failed permanently (for reasons other than insufficient
// signatures, which is a separate state)
FAILED = 3;
Expand All @@ -150,8 +157,8 @@ enum BlobStatus {
// for at least one quorum.
INSUFFICIENT_SIGNATURES = 5;

// CONFIRMING means that the blob has been dispersed to DA nodes and it's waiting for the confirmation onchain
CONFIRMING = 6;
// DISPERSING means that the blob is currently being dispersed to DA Nodes and being confirmed onchain
DISPERSING = 6;
}

// Types below correspond to the types necessary to verify a blob
Expand Down
2 changes: 1 addition & 1 deletion common/aws/dynamodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (c *Client) QueryIndexCount(ctx context.Context, tableName string, indexNam

// QueryIndexWithPagination returns all items in the index that match the given key
// Results are limited to the given limit and the pagination token is returned
// When limit is is 0, all items are returned
// When limit is 0, all items are returned
func (c *Client) QueryIndexWithPagination(ctx context.Context, tableName string, indexName string, keyCondition string, expAttributeValues ExpresseionValues, limit int32, exclusiveStartKey map[string]types.AttributeValue) (QueryResult, error) {
var queryInput *dynamodb.QueryInput

Expand Down
12 changes: 12 additions & 0 deletions common/fireblocks_config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package common

import (
"time"

"github.com/urfave/cli"
)

Expand All @@ -12,6 +14,7 @@ const (
FireblocksWalletAddressFlagName = "fireblocks-wallet-address"
FireblocksSecretManagerRegion = "fireblocks-secret-manager-region"
FireblocksDisable = "fireblocks-disable"
FireblocksAPITimeoutFlagName = "fireblocks-api-timeout"
)

type FireblocksConfig struct {
Expand All @@ -22,6 +25,7 @@ type FireblocksConfig struct {
WalletAddress string
Region string
Disable bool
APITimeout time.Duration
}

func FireblocksCLIFlags(envPrefix string, flagPrefix string) []cli.Flag {
Expand Down Expand Up @@ -68,6 +72,13 @@ func FireblocksCLIFlags(envPrefix string, flagPrefix string) []cli.Flag {
Required: false,
EnvVar: PrefixEnvVar(envPrefix, "FIREBLOCKS_DISABLE"),
},
cli.DurationFlag{
Name: PrefixFlag(flagPrefix, FireblocksAPITimeoutFlagName),
Usage: "Timeout for Fireblocks API requests",
Required: false,
Value: 2 * time.Minute,
EnvVar: PrefixEnvVar(envPrefix, "FIREBLOCKS_API_TIMEOUT"),
},
}
}

Expand All @@ -80,5 +91,6 @@ func ReadFireblocksCLIConfig(ctx *cli.Context, flagPrefix string) FireblocksConf
WalletAddress: ctx.GlobalString(PrefixFlag(flagPrefix, FireblocksWalletAddressFlagName)),
Region: ctx.GlobalString(PrefixFlag(flagPrefix, FireblocksSecretManagerRegion)),
Disable: ctx.GlobalBool(PrefixFlag(flagPrefix, FireblocksDisable)),
APITimeout: ctx.GlobalDuration(PrefixFlag(flagPrefix, FireblocksAPITimeoutFlagName)),
}
}
4 changes: 2 additions & 2 deletions common/geth/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func NewFailoverController(logger logging.Logger, rpcUrls []string) (*FailoverCo

// ProcessError attributes the error and updates total number of fault for RPC
// It returns if RPC should immediately give up
func (f *FailoverController) ProcessError(err error, rpcIndex int) bool {
func (f *FailoverController) ProcessError(err error, rpcIndex int, funcName string) bool {
f.mu.Lock()
defer f.mu.Unlock()
if err == nil {
Expand All @@ -47,7 +47,7 @@ func (f *FailoverController) ProcessError(err error, rpcIndex int) bool {
urlDomain = f.UrlDomains[rpcIndex]
}

nextEndpoint, action := f.handleError(err, urlDomain)
nextEndpoint, action := f.handleError(err, urlDomain, funcName)

if nextEndpoint == NewRPC {
f.numberRpcFault += 1
Expand Down
12 changes: 6 additions & 6 deletions common/geth/handle_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ const (

// handleHttpError returns a boolean indicating if the current RPC should be rotated
// the second boolean indicating if should giveup immediately
func (f *FailoverController) handleHttpError(httpRespError rpc.HTTPError, urlDomain string) (NextEndpoint, ImmediateAction) {
func (f *FailoverController) handleHttpError(httpRespError rpc.HTTPError, urlDomain string, funcName string) (NextEndpoint, ImmediateAction) {
sc := httpRespError.StatusCode
// Default to rotation the current RPC, because it allows a higher chance to get the query completed.
f.Logger.Info("[HTTP Response Error]", "urlDomain", urlDomain, "statusCode", sc, "err", httpRespError)
f.Logger.Info("[HTTP Response Error]", "urlDomain", urlDomain, "statusCode", sc, "funcName", funcName, "err", httpRespError)

if sc >= 200 && sc < 300 {
// 2xx error, however it should not be reachable
Expand All @@ -51,28 +51,28 @@ func (f *FailoverController) handleHttpError(httpRespError rpc.HTTPError, urlDom
// If the error is http, non2xx error would generate HTTP error, https://github.com/ethereum/go-ethereum/blob/master/rpc/http.go#L233
// but a 2xx http response could contain JSON RPC error, https://github.com/ethereum/go-ethereum/blob/master/rpc/http.go#L181
// If the error is Websocket or IPC, we only look for JSON error, https://github.com/ethereum/go-ethereum/blob/master/rpc/json.go#L67
func (f *FailoverController) handleError(err error, urlDomain string) (NextEndpoint, ImmediateAction) {
func (f *FailoverController) handleError(err error, urlDomain string, funcName string) (NextEndpoint, ImmediateAction) {

var httpRespError rpc.HTTPError
if errors.As(err, &httpRespError) {
// if error is http error, i.e. non 2xx error, it is handled here
// if it is 2xx error, the error message is nil, https://github.com/ethereum/go-ethereum/blob/master/rpc/http.go,
// execution does not enter here.
return f.handleHttpError(httpRespError, urlDomain)
return f.handleHttpError(httpRespError, urlDomain, funcName)
} else {
// it might be http2xx error, websocket error or ipc error. Parse json error code
var rpcError rpc.Error
if errors.As(err, &rpcError) {
ec := rpcError.ErrorCode()
f.Logger.Warn("[JSON RPC Response Error]", "urlDomain", urlDomain, "errorCode", ec, "err", rpcError)
f.Logger.Warn("[JSON RPC Response Error]", "urlDomain", urlDomain, "errorCode", ec, "funcName", funcName, "err", rpcError)
// we always attribute JSON RPC error as receiver's fault, i.e new connection rotation
return NewRPC, Return
}

// If no http response or no rpc response is returned, it is a connection issue,
// since we can't accurately attribute the network issue to neither sender nor receiver
// side. Optimistically, switch rpc client
f.Logger.Warn("[Default Response Error]", "urlDomain", urlDomain, "err", err)
f.Logger.Warn("[Default Response Error]", "urlDomain", urlDomain, "funcName", funcName, "err", err)
return NewRPC, Retry
}
}
Loading

0 comments on commit 8c7a90b

Please sign in to comment.