diff --git a/.github/workflows/docker-publish-opr-node-images.yaml b/.github/workflows/docker-publish-opr-node-images.yaml deleted file mode 100644 index ed3aed9cc5..0000000000 --- a/.github/workflows/docker-publish-opr-node-images.yaml +++ /dev/null @@ -1,99 +0,0 @@ -name: Push Public Docker images to GHCR with Caching - -on: - workflow_dispatch: - inputs: - commit_sha: - description: 'Specific Commit SHA (Required)' - required: true - version: - description: 'Version (Required)' - required: true - gitcommit: - description: 'GitCommit (Required)' - required: true - release_tag: - description: 'Release Tag (Optional)' - required: false - default: '' - -env: - REGISTRY: ghcr.io - CACHE-FROM: /tmp/.buildx-cache - CACHE-TO: /tmp/.buildx-cache-new - -jobs: - build-and-push: - runs-on: ubuntu-latest - permissions: - contents: read - packages: write - steps: - - name: Checkout repository at specified commit - uses: actions/checkout@v3 - with: - ref: ${{ github.event.inputs.commit_sha }} - - name: Get Commit Date - id: get_date - run: | - GIT_DATE=$(git log -1 --format=%cd --date=format:'%Y-%m-%d' ${{ github.event.inputs.gitcommit }} || date '+%Y-%m-%d') - echo "GIT_DATE=$GIT_DATE" >> $GITHUB_ENV - echo "::set-output name=gitDate::$GIT_DATE" - - - name: Setup Buildx - uses: docker/setup-buildx-action@v1 - with: - install: true - driver-opts: image=moby/buildkit:master - - - name: Cache Docker layers - uses: actions/cache@v2 - with: - path: /tmp/.buildx-cache - key: ${{ runner.os }}-buildx-${{ github.event.inputs.commit_sha }} - restore-keys: | - ${{ runner.os }}-buildx- - if: ${{ success() }} - - - name: Log into registry ${{ env.REGISTRY }} - uses: docker/login-action@v2 - with: - registry: ${{ env.REGISTRY }} - username: ${{ github.actor }} - password: ${{ secrets.GITHUB_TOKEN }} - if: ${{ success() }} - - - name: Set Tag Name - id: set_tag - run: echo "tag=${{ github.event.inputs.release_tag || github.event.inputs.commit_sha }}" >> $GITHUB_OUTPUT - if: ${{ success() }} - - - name: Build and Push Opr Node Image - uses: docker/build-push-action@v2 - with: - file: ./node/cmd/Dockerfile - push: true - tags: ${{ env.REGISTRY }}/layr-labs/eigenda/opr-node:${{ steps.set_tag.outputs.tag }} - cache-from: type=local,src=/tmp/.buildx-cache - cache-to: type=local,dest=/tmp/.buildx-cache-new - build-args: SEMVER=${{ github.event.inputs.version }},GITCOMMIT=${{ github.event.inputs.gitcommit }},GITDATE=${{ steps.get_date.outputs.gitDate }} - if: ${{ success() }} - - - name: Build and Push NodePlugin Image - uses: docker/build-push-action@v2 - with: - context: . - file: ./node/plugin/cmd/Dockerfile - push: true - tags: ${{ env.REGISTRY }}/layr-labs/eigenda/opr-nodeplugin:${{ steps.set_tag.outputs.tag }} - cache-from: type=local,src=${{ env.CACHE-FROM }} - cache-to: type=local,dest=${{ env.CACHE-TO }} - if: ${{ success() }} - - - name: Update cache - uses: actions/cache@v2 - with: - path: ${{ env.CACHE-TO }} - key: ${{ runner.os }}-buildx-${{ steps.set_tag.outputs.tag }} - restore-keys: | - ${{ runner.os }}-buildx- \ No newline at end of file diff --git a/.github/workflows/docker-publish-release.yaml b/.github/workflows/docker-publish-release.yaml new file mode 100644 index 0000000000..60f19ed027 --- /dev/null +++ b/.github/workflows/docker-publish-release.yaml @@ -0,0 +1,78 @@ +name: docker-publish-release + +on: + push: + tags: + - v* + pull_request: + workflow_dispatch: + inputs: + force: + description: "Force untagged release (expert mode)" + required: false + default: false + type: boolean + +env: + REGISTRY: ghcr.io + CACHE-FROM: /tmp/.buildx-cache + CACHE-TO: /tmp/.buildx-cache-new + +jobs: + build-and-push: + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + steps: + - name: Checkout repository + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Install GitVersion + uses: gittools/actions/gitversion/setup@v1.1.1 + with: + versionSpec: '5.x' + + - name: Determine SemVer + uses: gittools/actions/gitversion/execute@v1.1.1 + with: + useConfigFile: true + + - run: | + echo "SemVer ${{ env.fullSemVer }} Forced ${{ github.event.inputs.force }}" + name: Display SemVer + + - name: Setup Buildx + uses: docker/setup-buildx-action@v1 + with: + install: true + driver-opts: image=moby/buildkit:master + + - name: Cache docker layers + uses: actions/cache@v2 + with: + path: /tmp/.buildx-cache + key: ${{ runner.os }}-buildx-${{ github.sha }} + restore-keys: | + ${{ runner.os }}-buildx- + if: ${{ success() }} + + - name: Log into registry ${{ env.REGISTRY }} + uses: docker/login-action@v2 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + if: ${{ success() }} + + # Build And Push Image + - name: Build docker image release + run: make docker-release-build + if: ${{ success() }} + + # Publish if release is tagged or force == true + - name: Push docker image release + run: make docker-release-push + if: startsWith(github.ref, 'refs/tags/v') || github.event.inputs.force == 'true' diff --git a/GitVersion.yml b/GitVersion.yml new file mode 100644 index 0000000000..52981c4aca --- /dev/null +++ b/GitVersion.yml @@ -0,0 +1,29 @@ +increment: None +branches: + main: + mode: ContinuousDelivery + tag: pre + increment: Patch + prevent-increment-of-merged-branch-version: true + track-merge-target: false + regex: ^master$|^main$ + source-branches: + - release + tracks-release-branches: true + is-release-branch: false + is-mainline: true + pre-release-weight: 55000 + release: + mode: ContinuousDelivery + tag: rc + increment: None + prevent-increment-of-merged-branch-version: true + track-merge-target: false + regex: ^v*|^releases?[/-] + source-branches: + - main + - release + tracks-release-branches: false + is-release-branch: true + is-mainline: false + pre-release-weight: 30000 diff --git a/Makefile b/Makefile index 97420a3eed..c40ad271c5 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,22 @@ .PHONY: compile-el compile-dl clean protoc lint build unit-tests integration-tests-churner integration-tests-indexer integration-tests-inabox integration-tests-inabox-nochurner integration-tests-graph-indexer +ifeq ($(wildcard .git/*),) +$(warning semver disabled - building from release zip) +GITCOMMIT := "" +GITDATE := "" +SEMVER := $(shell basename $(CURDIR)) +else +GITCOMMIT := $(shell git rev-parse --short HEAD) +GITDATE := $(shell git log -1 --format=%cd --date=unix) +SEMVER := $(shell docker run --rm --volume "$(PWD):/repo" gittools/gitversion:5.12.0 /repo -output json -showvariable SemVer) +ifeq ($(SEMVER), ) +$(warning semver disabled - docker not installed) +SEMVER := "0.0.0" +endif +endif + +RELEASE_TAG := $(or $(RELEASE_TAG),latest) + PROTOS := ./api/proto PROTOS_DISPERSER := ./disperser/api/proto PROTO_GEN := ./api/grpc @@ -75,3 +92,12 @@ integration-tests-graph-indexer: integration-tests-dataapi: make dataapi-build go test -v ./disperser/dataapi + +docker-release-build: + RELEASE_TAG=${SEMVER} docker compose -f docker-compose-release.yaml build --build-arg SEMVER=${SEMVER} --build-arg GITCOMMIT=${GITCOMMIT} --build-arg GITDATE=${GITDATE} + +docker-release-push: + RELEASE_TAG=${SEMVER} docker compose -f docker-compose-release.yaml push + +semver: + echo "${SEMVER}" diff --git a/common/ratelimit.go b/common/ratelimit.go index 5d4950019e..64534ec6fb 100644 --- a/common/ratelimit.go +++ b/common/ratelimit.go @@ -15,17 +15,22 @@ import ( // ID is the authenticated Account ID. For retrieval requests, the requester ID will be the requester's IP address. type RequesterID = string +// RequesterName is the friendly name of the party making the request. In the case +// of a rollup making a dispersal request, the RequesterName is the name of the rollup. +type RequesterName = string + type RequestParams struct { - RequesterID RequesterID - BlobSize uint - Rate RateParam - Info interface{} + RequesterID RequesterID + RequesterName RequesterName + BlobSize uint + Rate RateParam + Info interface{} } type RateLimiter interface { // AllowRequest checks whether the request should be allowed. If the request is allowed, the function returns true. // If the request is not allowed, the function returns false and the RequestParams of the request that was not allowed. - // In order to for the request to be allowed, all of the requests represented by the RequestParams slice must be allowed. + // In order for the request to be allowed, all of the requests represented by the RequestParams slice must be allowed. // Each RequestParams object represents a single request. Each request is subjected to the same GlobalRateParams, but the // individual parameters of the request can differ. // @@ -37,7 +42,7 @@ type RateLimiter interface { type GlobalRateParams struct { // BucketSizes are the time scales at which the rate limit is enforced. - // For each time scale, the rate limiter will make sure that the give rate (possibly subject to a relaxation given + // For each time scale, the rate limiter will make sure that the given rate (possibly subject to a relaxation given // by one of the Multipliers) is observed when the request bandwidth is averaged at this time scale. // In terms of implementation, the rate limiter uses a set of "time buckets". A time bucket, i, is filled to a maximum of // `BucketSizes[i]` at a rate of 1, and emptied by an amount equal to `(size of request)/RateParam` each time a diff --git a/common/ratelimit/limiter.go b/common/ratelimit/limiter.go index 08387cf446..f522f9b64a 100644 --- a/common/ratelimit/limiter.go +++ b/common/ratelimit/limiter.go @@ -2,10 +2,13 @@ package ratelimit import ( "context" + "strconv" "time" "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" ) type BucketStore = common.KVStore[common.RateBucketParams] @@ -15,13 +18,20 @@ type rateLimiter struct { bucketStore BucketStore logger logging.Logger + + // Prometheus metrics + bucketLevels *prometheus.GaugeVec } -func NewRateLimiter(rateParams common.GlobalRateParams, bucketStore BucketStore, logger logging.Logger) common.RateLimiter { +func NewRateLimiter(reg prometheus.Registerer, rateParams common.GlobalRateParams, bucketStore BucketStore, logger logging.Logger) common.RateLimiter { return &rateLimiter{ globalRateParams: rateParams, bucketStore: bucketStore, logger: logger.With("component", "RateLimiter"), + bucketLevels: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "rate_limiter_bucket_levels", + Help: "Current level of each bucket for rate limiting", + }, []string{"requester_id", "requester_name", "bucket_index"}), } } @@ -109,7 +119,18 @@ func (d *rateLimiter) checkAllowed(ctx context.Context, params common.RequestPar bucketParams.BucketLevels[i] = getBucketLevel(bucketParams.BucketLevels[i], size, interval, deduction) allowed = allowed && bucketParams.BucketLevels[i] > 0 - d.logger.Debug("Bucket level", "key", params.RequesterID, "prevLevel", prevLevel, "level", bucketParams.BucketLevels[i], "size", size, "interval", interval, "deduction", deduction, "allowed", allowed) + d.logger.Debug("Bucket level updated", "key", params.RequesterID, "name", params.RequesterName, "prevLevel", prevLevel, "level", bucketParams.BucketLevels[i], "size", size, "interval", interval, "deduction", deduction, "allowed", allowed) + + // Update metrics only if the requester name is provided. We're making + // an assumption that the requester name is only provided for authenticated + // requests so it should limit the cardinality of the requester_id label. + if params.RequesterName != "" { + d.bucketLevels.With(prometheus.Labels{ + "requester_id": params.RequesterID, + "requester_name": params.RequesterName, + "bucket_index": strconv.Itoa(i), + }).Set(float64(bucketParams.BucketLevels[i])) + } } return allowed, bucketParams diff --git a/common/ratelimit/ratelimit_test.go b/common/ratelimit/ratelimit_test.go index 0555f2e748..969d698dbf 100644 --- a/common/ratelimit/ratelimit_test.go +++ b/common/ratelimit/ratelimit_test.go @@ -9,6 +9,7 @@ import ( "github.com/Layr-Labs/eigenda/common/ratelimit" "github.com/Layr-Labs/eigenda/common/store" "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" ) @@ -25,7 +26,7 @@ func makeTestRatelimiter() (common.RateLimiter, error) { return nil, err } - ratelimiter := ratelimit.NewRateLimiter(globalParams, bucketStore, logging.NewNoopLogger()) + ratelimiter := ratelimit.NewRateLimiter(prometheus.NewRegistry(), globalParams, bucketStore, logging.NewNoopLogger()) return ratelimiter, nil diff --git a/core/state.go b/core/state.go index 62fc4e2ff7..7f8092eaaf 100644 --- a/core/state.go +++ b/core/state.go @@ -2,8 +2,11 @@ package core import ( "context" + "crypto/md5" + "encoding/json" "fmt" "math/big" + "slices" "strings" ) @@ -60,6 +63,46 @@ type OperatorState struct { BlockNumber uint } +func (s *OperatorState) Hash() (map[QuorumID][16]byte, error) { + res := make(map[QuorumID][16]byte) + type operatorInfoWithID struct { + OperatorID string + Stake string + Index uint + } + for quorumID, opInfos := range s.Operators { + marshalable := struct { + Operators []operatorInfoWithID + Totals OperatorInfo + BlockNumber uint + }{ + Operators: make([]operatorInfoWithID, 0, len(opInfos)), + Totals: OperatorInfo{}, + BlockNumber: s.BlockNumber, + } + + for opID, opInfo := range opInfos { + marshalable.Operators = append(marshalable.Operators, operatorInfoWithID{ + OperatorID: opID.Hex(), + Stake: opInfo.Stake.String(), + Index: uint(opInfo.Index), + }) + } + slices.SortStableFunc(marshalable.Operators, func(a, b operatorInfoWithID) int { + return strings.Compare(a.OperatorID, b.OperatorID) + }) + + marshalable.Totals = *s.Totals[quorumID] + data, err := json.Marshal(marshalable) + if err != nil { + return nil, err + } + res[quorumID] = md5.Sum(data) + } + + return res, nil +} + // IndexedOperatorInfo contains information about an operator which is contained in events from the EigenDA smart contracts. Note that // this information does not depend on the quorum. type IndexedOperatorInfo struct { diff --git a/core/state_test.go b/core/state_test.go new file mode 100644 index 0000000000..f8f0697458 --- /dev/null +++ b/core/state_test.go @@ -0,0 +1,98 @@ +package core_test + +import ( + "encoding/hex" + "math/big" + "testing" + + "github.com/Layr-Labs/eigenda/core" + "github.com/stretchr/testify/assert" +) + +func TestOperatorStateHash(t *testing.T) { + s1 := core.OperatorState{ + Operators: map[core.QuorumID]map[core.OperatorID]*core.OperatorInfo{ + 0: { + [32]byte{0}: &core.OperatorInfo{ + Stake: big.NewInt(12), + Index: uint(2), + }, + [32]byte{1}: &core.OperatorInfo{ + Stake: big.NewInt(23), + Index: uint(3), + }, + }, + 1: { + [32]byte{1}: &core.OperatorInfo{ + Stake: big.NewInt(23), + Index: uint(3), + }, + [32]byte{2}: &core.OperatorInfo{ + Stake: big.NewInt(34), + Index: uint(4), + }, + }, + }, + Totals: map[core.QuorumID]*core.OperatorInfo{ + 0: { + Stake: big.NewInt(35), + Index: uint(2), + }, + 1: { + Stake: big.NewInt(57), + Index: uint(2), + }, + }, + BlockNumber: uint(123), + } + + hash1, err := s1.Hash() + assert.NoError(t, err) + q0 := hash1[0] + q1 := hash1[1] + assert.Equal(t, "3805338f34f77ff1fa23bbc23b1e86c4", hex.EncodeToString(q0[:])) + assert.Equal(t, "2f110a29f2bdd8a19c2d87d05736be0a", hex.EncodeToString(q1[:])) + + s2 := core.OperatorState{ + Operators: map[core.QuorumID]map[core.OperatorID]*core.OperatorInfo{ + 0: { + [32]byte{0}: &core.OperatorInfo{ + Stake: big.NewInt(12), + Index: uint(3), // different from s1 + }, + [32]byte{1}: &core.OperatorInfo{ + Stake: big.NewInt(23), + Index: uint(3), + }, + }, + 1: { + [32]byte{1}: &core.OperatorInfo{ + Stake: big.NewInt(23), + Index: uint(3), + }, + [32]byte{2}: &core.OperatorInfo{ + Stake: big.NewInt(34), + Index: uint(4), + }, + }, + }, + Totals: map[core.QuorumID]*core.OperatorInfo{ + 0: { + Stake: big.NewInt(35), + Index: uint(2), + }, + 1: { + Stake: big.NewInt(57), + Index: uint(2), + }, + }, + BlockNumber: uint(123), + } + + hash2, err := s2.Hash() + assert.NoError(t, err) + q0 = hash2[0] + q1 = hash2[1] + assert.Equal(t, "1836448b57ae79decdcb77157cf31698", hex.EncodeToString(q0[:])) + assert.Equal(t, "2f110a29f2bdd8a19c2d87d05736be0a", hex.EncodeToString(q1[:])) +} diff --git a/disperser/apiserver/server.go b/disperser/apiserver/server.go index 640758c3a7..e604b2a3ae 100644 --- a/disperser/apiserver/server.go +++ b/disperser/apiserver/server.go @@ -355,11 +355,14 @@ func (s *DispersalServer) getAccountRate(origin, authenticatedAddress string, qu rates.BlobRate = rateInfo.BlobRate } + if len(rateInfo.Name) > 0 { + rates.Name = rateInfo.Name + } + break } return rates, key, nil - } // Enum of rateTypes for the limiterInfo struct @@ -446,6 +449,9 @@ func (s *DispersalServer) checkRateLimitsAndAddRatesToHeader(ctx context.Context s.metrics.HandleInternalFailureRpcRequest(apiMethodName) return api.NewInternalError(err.Error()) } + + // Note: There's an implicit assumption that an empty name means the account + // is not in the allow list. requesterName = accountRates.Name // Update the quorum rate @@ -458,9 +464,10 @@ func (s *DispersalServer) checkRateLimitsAndAddRatesToHeader(ctx context.Context // System Level key := fmt.Sprintf("%s:%d-%s", systemAccountKey, param.QuorumID, SystemThroughputType.Plug()) requestParams = append(requestParams, common.RequestParams{ - RequesterID: key, - BlobSize: encodedSize, - Rate: globalRates.TotalUnauthThroughput, + RequesterID: key, + RequesterName: systemAccountKey, + BlobSize: encodedSize, + Rate: globalRates.TotalUnauthThroughput, Info: limiterInfo{ RateType: SystemThroughputType, QuorumID: param.QuorumID, @@ -469,9 +476,10 @@ func (s *DispersalServer) checkRateLimitsAndAddRatesToHeader(ctx context.Context key = fmt.Sprintf("%s:%d-%s", systemAccountKey, param.QuorumID, SystemBlobRateType.Plug()) requestParams = append(requestParams, common.RequestParams{ - RequesterID: key, - BlobSize: blobRateMultiplier, - Rate: globalRates.TotalUnauthBlobRate, + RequesterID: key, + RequesterName: systemAccountKey, + BlobSize: blobRateMultiplier, + Rate: globalRates.TotalUnauthBlobRate, Info: limiterInfo{ RateType: SystemBlobRateType, QuorumID: param.QuorumID, @@ -481,9 +489,10 @@ func (s *DispersalServer) checkRateLimitsAndAddRatesToHeader(ctx context.Context // Account Level key = fmt.Sprintf("%s:%d-%s", accountKey, param.QuorumID, AccountThroughputType.Plug()) requestParams = append(requestParams, common.RequestParams{ - RequesterID: key, - BlobSize: encodedSize, - Rate: accountRates.Throughput, + RequesterID: key, + RequesterName: requesterName, + BlobSize: encodedSize, + Rate: accountRates.Throughput, Info: limiterInfo{ RateType: AccountThroughputType, QuorumID: param.QuorumID, @@ -492,9 +501,10 @@ func (s *DispersalServer) checkRateLimitsAndAddRatesToHeader(ctx context.Context key = fmt.Sprintf("%s:%d-%s", accountKey, param.QuorumID, AccountBlobRateType.Plug()) requestParams = append(requestParams, common.RequestParams{ - RequesterID: key, - BlobSize: blobRateMultiplier, - Rate: accountRates.BlobRate, + RequesterID: key, + RequesterName: requesterName, + BlobSize: blobRateMultiplier, + Rate: accountRates.BlobRate, Info: limiterInfo{ RateType: AccountBlobRateType, QuorumID: param.QuorumID, diff --git a/disperser/apiserver/server_test.go b/disperser/apiserver/server_test.go index d86beae396..b4c90c93de 100644 --- a/disperser/apiserver/server_test.go +++ b/disperser/apiserver/server_test.go @@ -20,6 +20,7 @@ import ( gethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/google/uuid" + "github.com/prometheus/client_golang/prometheus" "github.com/urfave/cli" pb "github.com/Layr-Labs/eigenda/api/grpc/disperser" @@ -641,7 +642,7 @@ func newTestServer(transactor core.Transactor) *apiserver.DispersalServer { if err != nil { panic("failed to create bucket store") } - ratelimiter := ratelimit.NewRateLimiter(globalParams, bucketStore, logger) + ratelimiter := ratelimit.NewRateLimiter(prometheus.NewRegistry(), globalParams, bucketStore, logger) rateConfig := apiserver.RateConfig{ QuorumRateInfos: map[core.QuorumID]apiserver.QuorumRateInfo{ @@ -662,20 +663,24 @@ func newTestServer(transactor core.Transactor) *apiserver.DispersalServer { Allowlist: apiserver.Allowlist{ "1.2.3.4": map[uint8]apiserver.PerUserRateInfo{ 0: { + Name: "eigenlabs", Throughput: 100 * 1024, BlobRate: 5 * 1e6, }, 1: { + Name: "eigenlabs", Throughput: 1024 * 1024, BlobRate: 5 * 1e6, }, }, "0x1aa8226f6d354380dDE75eE6B634875c4203e522": map[uint8]apiserver.PerUserRateInfo{ 0: { + Name: "eigenlabs", Throughput: 100 * 1024, BlobRate: 5 * 1e6, }, 1: { + Name: "eigenlabs", Throughput: 1024 * 1024, BlobRate: 5 * 1e6, }, @@ -693,7 +698,7 @@ func newTestServer(transactor core.Transactor) *apiserver.DispersalServer { return apiserver.NewDispersalServer(disperser.ServerConfig{ GrpcPort: "51001", GrpcTimeout: 1 * time.Second, - }, queue, transactor, logger, disperser.NewMetrics("9001", logger), ratelimiter, rateConfig) + }, queue, transactor, logger, disperser.NewMetrics(prometheus.NewRegistry(), "9001", logger), ratelimiter, rateConfig) } func disperseBlob(t *testing.T, server *apiserver.DispersalServer, data []byte) (pb.BlobStatus, uint, []byte) { diff --git a/disperser/batcher/batcher.go b/disperser/batcher/batcher.go index 56cf352f53..738deeb588 100644 --- a/disperser/batcher/batcher.go +++ b/disperser/batcher/batcher.go @@ -420,6 +420,15 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error { stageTimer = time.Now() update := b.Dispatcher.DisperseBatch(ctx, batch.State, batch.EncodedBlobs, batch.BatchHeader) log.Debug("DisperseBatch took", "duration", time.Since(stageTimer)) + h, err := batch.State.OperatorState.Hash() + if err != nil { + log.Error("HandleSingleBatch: error getting operator state hash", "err", err) + } + hStr := make([]string, 0, len(h)) + for q, hash := range h { + hStr = append(hStr, fmt.Sprintf("%d: %x", q, hash)) + } + log.Info("Dispatched encoded batch", "operatorStateHash", hStr) // Get the batch header hash log.Debug("Getting batch header hash...") diff --git a/disperser/batcher/grpc/dispatcher.go b/disperser/batcher/grpc/dispatcher.go index 96f9fe12dc..533acb53ea 100644 --- a/disperser/batcher/grpc/dispatcher.go +++ b/disperser/batcher/grpc/dispatcher.go @@ -14,6 +14,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/proto" ) type Config struct { @@ -130,7 +131,7 @@ func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.BlobMessage, } opt := grpc.MaxCallSendMsgSize(60 * 1024 * 1024 * 1024) - c.logger.Debug("sending chunks to operator", "operator", op.Socket, "size", totalSize) + c.logger.Debug("sending chunks to operator", "operator", op.Socket, "size", totalSize, "request message size", proto.Size(request)) reply, err := gc.StoreChunks(ctx, request, opt) if err != nil { diff --git a/disperser/cmd/apiserver/main.go b/disperser/cmd/apiserver/main.go index f8a8ac037f..e6fa92df06 100644 --- a/disperser/cmd/apiserver/main.go +++ b/disperser/cmd/apiserver/main.go @@ -10,6 +10,7 @@ import ( "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/disperser/apiserver" "github.com/Layr-Labs/eigenda/disperser/common/blobstore" + "github.com/prometheus/client_golang/prometheus" "github.com/Layr-Labs/eigenda/common/aws/dynamodb" "github.com/Layr-Labs/eigenda/common/aws/s3" @@ -91,6 +92,8 @@ func RunDisperserServer(ctx *cli.Context) error { blobMetadataStore := blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, time.Duration((storeDurationBlocks+blockStaleMeasure)*12)*time.Second) blobStore := blobstore.NewSharedStorage(bucketName, s3Client, blobMetadataStore, logger) + reg := prometheus.NewRegistry() + var ratelimiter common.RateLimiter if config.EnableRatelimiter { globalParams := config.RatelimiterConfig.GlobalRateParams @@ -108,12 +111,19 @@ func RunDisperserServer(ctx *cli.Context) error { return err } } - ratelimiter = ratelimit.NewRateLimiter(globalParams, bucketStore, logger) + ratelimiter = ratelimit.NewRateLimiter(reg, globalParams, bucketStore, logger) } - // TODO: create a separate metrics for batcher - metrics := disperser.NewMetrics(config.MetricsConfig.HTTPPort, logger) - server := apiserver.NewDispersalServer(config.ServerConfig, blobStore, transactor, logger, metrics, ratelimiter, config.RateConfig) + metrics := disperser.NewMetrics(reg, config.MetricsConfig.HTTPPort, logger) + server := apiserver.NewDispersalServer( + config.ServerConfig, + blobStore, + transactor, + logger, + metrics, + ratelimiter, + config.RateConfig, + ) // Enable Metrics Block if config.MetricsConfig.EnableMetrics { diff --git a/disperser/metrics.go b/disperser/metrics.go index f9ed4950b1..6a762344d9 100644 --- a/disperser/metrics.go +++ b/disperser/metrics.go @@ -37,9 +37,8 @@ const ( AccountRateLimitedFailure string = "ratelimited-account" // The request rate limited at account level ) -func NewMetrics(httpPort string, logger logging.Logger) *Metrics { +func NewMetrics(reg *prometheus.Registry, httpPort string, logger logging.Logger) *Metrics { namespace := "eigenda_disperser" - reg := prometheus.NewRegistry() reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) reg.MustRegister(collectors.NewGoCollector()) diff --git a/docker-compose-release.yaml b/docker-compose-release.yaml new file mode 100644 index 0000000000..a6e74e4ecf --- /dev/null +++ b/docker-compose-release.yaml @@ -0,0 +1,12 @@ +# This file is used for building and pushing images +services: + node: + build: + context: . + dockerfile: node/cmd/Dockerfile + image: ghcr.io/layr-labs/eigenda/opr-node:${RELEASE_TAG} + nodeplugin: + build: + context: . + dockerfile: node/plugin/cmd/Dockerfile + image: ghcr.io/layr-labs/eigenda/opr-nodeplugin:${RELEASE_TAG} diff --git a/node/Makefile b/node/Makefile index 5b91ed1517..6b2715211d 100644 --- a/node/Makefile +++ b/node/Makefile @@ -1,12 +1,20 @@ +ifeq ($(wildcard ../.git/*),) +$(warning semver disabled - building from release zip) +GITCOMMIT := "" +GITDATE := "" +SEMVER := $(shell basename $(CURDIR)) +else GITCOMMIT := $(shell git rev-parse --short HEAD) GITDATE := $(shell git log -1 --format=%cd --date=unix) - -# GitVersion provides the semantic versioning for the project. -SEMVER := $(shell docker run --rm --volume "${PWD}/../:/repo" gittools/gitversion:5.12.0 /repo -output json -showvariable SemVer) +SEMVER := $(shell docker run --rm --volume "$(PWD)/../:/repo" gittools/gitversion:5.12.0 /repo -output json -showvariable SemVer) ifeq ($(SEMVER), ) -SEMVER = "0.0.0" # Fallback if docker is not installed or gitversion fails +$(warning semver disabled - docker not installed) +SEMVER := "0.0.0" +endif endif +RELEASE_TAG := $(or $(RELEASE_TAG),latest) + build: clean go mod tidy go build -o ./bin/node ./cmd @@ -17,10 +25,10 @@ clean: docker: docker-node docker-plugin docker-node: - cd ../ && docker build --build-arg SEMVER=${SEMVER} --build-arg GITCOMMIT=${GITCOMMIT} --build-arg GITDATE=${GITDATE} . -t opr-node:${SEMVER} -t opr-node:latest -f node/cmd/Dockerfile + cd ../ && docker build --build-arg SEMVER=${SEMVER} --build-arg GITCOMMIT=${GITCOMMIT} --build-arg GITDATE=${GITDATE} . -t opr-node:${SEMVER} -t opr-node:${RELEASE_TAG} -f node/cmd/Dockerfile docker-plugin: - cd ../ && docker build --build-arg SEMVER=${SEMVER} --build-arg GITCOMMIT=${GITCOMMIT} --build-arg GITDATE=${GITDATE} . -t opr-nodeplugin:${SEMVER} -t opr-nodeplugin:latest -f node/plugin/cmd/Dockerfile + cd ../ && docker build --build-arg SEMVER=${SEMVER} --build-arg GITCOMMIT=${GITCOMMIT} --build-arg GITDATE=${GITDATE} . -t opr-nodeplugin:${SEMVER} -t opr-nodeplugin:${RELEASE_TAG} -f node/plugin/cmd/Dockerfile semver: echo "${SEMVER}" diff --git a/node/cmd/main.go b/node/cmd/main.go index 1fe3a9564c..00b959213d 100644 --- a/node/cmd/main.go +++ b/node/cmd/main.go @@ -8,12 +8,13 @@ import ( "time" "github.com/Layr-Labs/eigenda/common/pubip" + "github.com/Layr-Labs/eigenda/common/ratelimit" + "github.com/Layr-Labs/eigenda/common/store" + "github.com/prometheus/client_golang/prometheus" "github.com/urfave/cli" "github.com/Layr-Labs/eigenda/common" - "github.com/Layr-Labs/eigenda/common/ratelimit" - "github.com/Layr-Labs/eigenda/common/store" "github.com/Layr-Labs/eigenda/node" "github.com/Layr-Labs/eigenda/node/flags" "github.com/Layr-Labs/eigenda/node/grpc" @@ -56,18 +57,8 @@ func NodeMain(ctx *cli.Context) error { pubIPProvider := pubip.ProviderOrDefault(config.PubIPProvider) - // Create the node. - node, err := node.NewNode(config, pubIPProvider, logger) - if err != nil { - return err - } - - err = node.Start(context.Background()) - if err != nil { - node.Logger.Error("could not start node", "error", err) - return err - } - + // Rate limiter + reg := prometheus.NewRegistry() globalParams := common.GlobalRateParams{ BucketSizes: []time.Duration{bucketDuration}, Multipliers: []float32{bucketMultiplier}, @@ -79,7 +70,19 @@ func NodeMain(ctx *cli.Context) error { return err } - ratelimiter := ratelimit.NewRateLimiter(globalParams, bucketStore, logger) + ratelimiter := ratelimit.NewRateLimiter(reg, globalParams, bucketStore, logger) + + // Create the node. + node, err := node.NewNode(reg, config, pubIPProvider, logger) + if err != nil { + return err + } + + err = node.Start(context.Background()) + if err != nil { + node.Logger.Error("could not start node", "error", err) + return err + } // Creates the GRPC server. server := grpc.NewServer(config, node, logger, ratelimiter) diff --git a/node/grpc/server.go b/node/grpc/server.go index c2862ed156..73dabc436f 100644 --- a/node/grpc/server.go +++ b/node/grpc/server.go @@ -200,6 +200,8 @@ func (s *Server) validateStoreChunkRequest(in *pb.StoreChunksRequest) error { func (s *Server) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (*pb.StoreChunksReply, error) { start := time.Now() + s.node.Logger.Info("StoreChunks RPC request recieved", "request message size", proto.Size(in)) + // Validate the request. if err := s.validateStoreChunkRequest(in); err != nil { return nil, err diff --git a/node/node.go b/node/node.go index a39f6dcc8b..d6ce9157c6 100644 --- a/node/node.go +++ b/node/node.go @@ -12,6 +12,7 @@ import ( "net/http" "net/url" "os" + "strings" "sync" "time" @@ -66,16 +67,15 @@ type Node struct { } // NewNode creates a new Node with the provided config. -func NewNode(config *Config, pubIPProvider pubip.Provider, logger logging.Logger) (*Node, error) { +func NewNode(reg *prometheus.Registry, config *Config, pubIPProvider pubip.Provider, logger logging.Logger) (*Node, error) { // Setup metrics // sdkClients, err := buildSdkClients(config, logger) // if err != nil { // return nil, err // } - promReg := prometheus.NewRegistry() - eigenMetrics := metrics.NewEigenMetrics(AppName, ":"+config.MetricsPort, promReg, logger.With("component", "EigenMetrics")) - rpcCallsCollector := rpccalls.NewCollector(AppName, promReg) + eigenMetrics := metrics.NewEigenMetrics(AppName, ":"+config.MetricsPort, reg, logger.With("component", "EigenMetrics")) + rpcCallsCollector := rpccalls.NewCollector(AppName, reg) // Generate BLS keys keyPair, err := core.MakeKeyPairFromString(config.PrivateBls) @@ -113,7 +113,7 @@ func NewNode(config *Config, pubIPProvider pubip.Provider, logger logging.Logger // Setup Node Api nodeApi := nodeapi.NewNodeApi(AppName, SemVer, ":"+config.NodeApiPort, logger.With("component", "NodeApi")) - metrics := NewMetrics(eigenMetrics, promReg, logger, ":"+config.MetricsPort, config.ID, config.OnchainMetricsInterval, tx, cst) + metrics := NewMetrics(eigenMetrics, reg, logger, ":"+config.MetricsPort, config.ID, config.OnchainMetricsInterval, tx, cst) // Make validator v, err := verifier.NewVerifier(&config.EncoderConfig, false) @@ -360,7 +360,7 @@ func (n *Node) ProcessBatch(ctx context.Context, header *core.BatchHeader, blobs log.Error("Failed to delete the invalid batch that should be rolled back", "batchHeaderHash", batchHeaderHashHex, "err", deleteKeysErr) } } - return nil, fmt.Errorf("failed to validate batch: %w", err) + return nil, err } n.Metrics.RecordStoreChunksStage("validated", batchSize, time.Since(stageTimer)) log.Debug("Validate batch took", "duration:", time.Since(stageTimer)) @@ -395,7 +395,20 @@ func (n *Node) ValidateBatch(ctx context.Context, header *core.BatchHeader, blob } pool := workerpool.New(n.Config.NumBatchValidators) - return n.Validator.ValidateBatch(header, blobs, operatorState, pool) + err = n.Validator.ValidateBatch(header, blobs, operatorState, pool) + if err != nil { + h, hashErr := operatorState.Hash() + if hashErr != nil { + n.Logger.Error("failed to get operator state hash", "err", hashErr) + } + + hStr := make([]string, 0, len(h)) + for q, hash := range h { + hStr = append(hStr, fmt.Sprintf("%d: %x", q, hash)) + } + return fmt.Errorf("failed to validate batch with operator state %x: %w", strings.Join(hStr, ","), err) + } + return nil } func (n *Node) updateSocketAddress(ctx context.Context, newSocketAddr string) { diff --git a/test/integration_test.go b/test/integration_test.go index fc5c8c4661..c75afeee0b 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -168,7 +168,7 @@ func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser } finalizer := batchermock.NewFinalizer() - disperserMetrics := disperser.NewMetrics("9100", logger) + disperserMetrics := disperser.NewMetrics(prometheus.NewRegistry(), "9100", logger) txnManager := batchermock.NewTxnManager() batcher, err := batcher.NewBatcher(batcherConfig, timeoutConfig, store, dispatcher, cst, asn, encoderClient, agg, &commonmock.MockEthClient{}, finalizer, transactor, txnManager, logger, batcherMetrics, handleBatchLivenessChan)