diff --git a/cluster/inventory.go b/cluster/inventory.go index 8fe4ce66..ce191c45 100644 --- a/cluster/inventory.go +++ b/cluster/inventory.go @@ -11,13 +11,12 @@ import ( inventoryV1 "github.com/akash-network/akash-api/go/inventory/v1" provider "github.com/akash-network/akash-api/go/provider/v1" "github.com/boz/go-lifecycle" + sdk "github.com/cosmos/cosmos-sdk/types" "github.com/desertbit/timer" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - tpubsub "github.com/troian/pubsub" - - sdk "github.com/cosmos/cosmos-sdk/types" "github.com/tendermint/tendermint/libs/log" + tpubsub "github.com/troian/pubsub" dtypes "github.com/akash-network/akash-api/go/node/deployment/v1beta3" mtypes "github.com/akash-network/akash-api/go/node/market/v1beta4" @@ -486,23 +485,23 @@ func (is *inventoryService) run(ctx context.Context, reservationsArg []*reservat invupch := make(chan ctypes.Inventory, 1) invch := is.clients.inventory.ResultChan() - var reserveChLocal <-chan inventoryRequest + var reservech <-chan inventoryRequest resumeProcessingReservations := func() { - reserveChLocal = is.reservech + reservech = is.reservech } t := timer.NewStoppedTimer() updateIPs := func() { if is.clients.ip != nil { - reserveChLocal = nil + reservech = nil if runch == nil { t.Stop() runch = is.runCheck(rctx, state) } - } else if reserveChLocal == nil && state.inventory != nil { - reserveChLocal = is.reservech + } else if reservech == nil && state.inventory != nil { + reservech = is.reservech } } @@ -516,7 +515,6 @@ func (is *inventoryService) run(ctx context.Context, reservationsArg []*reservat default: } } - loop: for { select { @@ -566,7 +564,7 @@ loop: } case <-t.C: updateIPs() - case req := <-reserveChLocal: + case req := <-reservech: is.handleRequest(req, state) case req := <-is.lookupch: // lookup registration @@ -612,14 +610,21 @@ loop: inventoryRequestsCounter.WithLabelValues("unreserve", "not-found").Inc() req.ch <- inventoryResponse{err: errReservationNotFound} case responseCh := <-is.statusch: - responseCh <- is.getStatus(state) + select { + case responseCh <- is.getStatus(state): + default: + } inventoryRequestsCounter.WithLabelValues("status", "success").Inc() case responseCh := <-is.statusV1ch: resp, err := is.getStatusV1(state) - responseCh <- invSnapshotResp{ + select { + case responseCh <- invSnapshotResp{ res: resp, err: err, + }: + default: } + if err == nil { inventoryRequestsCounter.WithLabelValues("status", "success").Inc() } else { @@ -631,11 +636,11 @@ loop: } select { - case invupch <- inv: + case <-invupch: default: - <-invupch - invupch <- inv } + + invupch <- inv case inv := <-invupch: currinv = inv.Dup() state.inventory = inv @@ -692,6 +697,7 @@ loop: if err != nil { continue } + bus.Pub(inv, []string{ptypes.PubSubTopicInventoryStatus}, tpubsub.WithRetain()) } diff --git a/cluster/service.go b/cluster/service.go index fd560bc8..0fe80996 100644 --- a/cluster/service.go +++ b/cluster/service.go @@ -4,7 +4,6 @@ import ( "context" "github.com/boz/go-lifecycle" - sdkquery "github.com/cosmos/cosmos-sdk/types/query" tpubsub "github.com/troian/pubsub" "github.com/pkg/errors" @@ -13,12 +12,11 @@ import ( "github.com/tendermint/tendermint/libs/log" - sdktypes "github.com/cosmos/cosmos-sdk/types" - aclient "github.com/akash-network/akash-api/go/node/client/v1beta2" dtypes "github.com/akash-network/akash-api/go/node/deployment/v1beta3" mtypes "github.com/akash-network/akash-api/go/node/market/v1beta4" provider "github.com/akash-network/akash-api/go/provider/v1" + sdktypes "github.com/cosmos/cosmos-sdk/types" "github.com/akash-network/node/pubsub" @@ -485,92 +483,102 @@ func findDeployments( aqc aclient.QueryClient, accAddr sdktypes.AccAddress, ) ([]ctypes.IDeployment, error) { - var presp *sdkquery.PageResponse - var leases []mtypes.QueryLeaseResponse - bids := make(map[string]mtypes.Bid) - - limit := uint64(100) - errorCnt := 0 - - for { - preq := &sdkquery.PageRequest{ - Key: nil, - Limit: limit, - } - - if presp != nil { - preq.Key = presp.NextKey - } - - resp, err := aqc.Bids(ctx, &mtypes.QueryBidsRequest{ - Filters: mtypes.BidFilters{ - Provider: accAddr.String(), - State: mtypes.BidActive.String(), - }}) - if err != nil { - if errorCnt > 1 { - return nil, err - } - - errorCnt++ - - continue - } - errorCnt = 0 - - for _, resp := range resp.Bids { - bids[resp.Bid.BidID.DeploymentID().String()] = resp.Bid - } - - if uint64(len(resp.Bids)) < limit { - break - } - - presp = resp.Pagination - } - - presp = nil - for { - preq := &sdkquery.PageRequest{ - Key: nil, - Limit: limit, - } - - if presp != nil { - preq.Key = presp.NextKey - } - - resp, err := aqc.Leases(ctx, &mtypes.QueryLeasesRequest{ - Filters: mtypes.LeaseFilters{ - Provider: accAddr.String(), - State: mtypes.LeaseActive.String(), - }}) - if err != nil { - if errorCnt > 1 { - return nil, err - } - - errorCnt++ - - continue - } - - errorCnt = 0 - - leases = append(leases, resp.Leases...) - if uint64(len(resp.Leases)) < limit { - break - } - - presp = resp.Pagination - } - - for _, resp := range leases { - did := resp.Lease.LeaseID.DeploymentID().String() - if _, exists := bids[did]; !exists { - delete(bids, did) - } - } + // TODO @troian + // var presp *sdkquery.PageResponse + // var leases []mtypes.QueryLeaseResponse + // bids := make(map[string]mtypes.Bid) + // + // limit := uint64(10) + // errorCnt := 0 + // + // log.Info("detecting active leases") + // + // for { + // preq := &sdkquery.PageRequest{ + // Key: nil, + // Limit: limit, + // } + // + // if presp != nil { + // preq.Key = presp.NextKey + // } + // + // resp, err := aqc.Bids(ctx, &mtypes.QueryBidsRequest{ + // Filters: mtypes.BidFilters{ + // Provider: accAddr.String(), + // State: mtypes.BidActive.String(), + // }, + // Pagination: preq, + // }) + // if err != nil { + // if errorCnt > 1 { + // return nil, err + // } + // + // errorCnt++ + // + // continue + // } + // errorCnt = 0 + // + // log.Info("found active bids", "active", len(resp.Bids)) + // for _, resp := range resp.Bids { + // bids[resp.Bid.BidID.DeploymentID().String()] = resp.Bid + // } + // + // if uint64(len(resp.Bids)) < limit { + // break + // } + // + // presp = resp.Pagination + // } + // + // presp = nil + // for { + // preq := &sdkquery.PageRequest{ + // Key: nil, + // Limit: limit, + // } + // + // if presp != nil { + // preq.Key = presp.NextKey + // } + // + // resp, err := aqc.Leases(ctx, &mtypes.QueryLeasesRequest{ + // Filters: mtypes.LeaseFilters{ + // Provider: accAddr.String(), + // State: mtypes.LeaseActive.String(), + // }, + // Pagination: preq, + // }) + // if err != nil { + // if errorCnt > 1 { + // return nil, err + // } + // + // errorCnt++ + // + // continue + // } + // + // errorCnt = 0 + // + // log.Info("found active leases", "active", len(resp.Leases)) + // + // leases = append(leases, resp.Leases...) + // if uint64(len(resp.Leases)) < limit { + // break + // } + // + // presp = resp.Pagination + // } + // + // for _, resp := range leases { + // did := resp.Lease.LeaseID.DeploymentID().String() + // if _, exists := bids[did]; !exists { + // delete(bids, did) + // } + // } deployments, err := client.Deployments(ctx) if err != nil { diff --git a/go.mod b/go.mod index e2cf936c..3b7e8921 100644 --- a/go.mod +++ b/go.mod @@ -34,7 +34,7 @@ require ( github.com/spf13/viper v1.18.2 github.com/stretchr/testify v1.9.0 github.com/tendermint/tendermint v0.34.27 - github.com/troian/pubsub v0.1.1 + github.com/troian/pubsub v0.1.2 github.com/vektra/mockery/v2 v2.40.2 go.uber.org/zap v1.27.0 golang.org/x/net v0.30.0 @@ -51,7 +51,10 @@ require ( sigs.k8s.io/structured-merge-diff/v4 v4.4.2 ) -retract v0.6.0 +retract ( + v0.6.0 + v0.6.5 +) replace ( // use cosmos fork of keyring diff --git a/go.sum b/go.sum index 25d49b33..cfc52290 100644 --- a/go.sum +++ b/go.sum @@ -1495,8 +1495,8 @@ github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1 github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/troian/hid v0.13.2 h1:O7PWZQm5YGyg0nVvknFVLVrNTPillz4ZXvxJOtoyteE= github.com/troian/hid v0.13.2/go.mod h1:n6adloQ1876oEXZr6fFsthy4FDHxwJhh7QYQspm30Ds= -github.com/troian/pubsub v0.1.1 h1:huc5qneo0rtSKKsrkroyyMu+b8bw0talql2tt7GXl98= -github.com/troian/pubsub v0.1.1/go.mod h1:fOUAEWXes/SkyWPTdBpW3L/ovyg74N+eBxRpWKik+2Q= +github.com/troian/pubsub v0.1.2 h1:XPS8Y5nawdNRyPyhfFw/SFhPKO1SadY83ZlOVlaxadE= +github.com/troian/pubsub v0.1.2/go.mod h1:fOUAEWXes/SkyWPTdBpW3L/ovyg74N+eBxRpWKik+2Q= github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31/go.mod h1:onvgF043R+lC5RZ8IT9rBXDaEDnpnw/Cl+HFiw+v/7Q= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/tyler-smith/go-bip39 v1.0.1-0.20181017060643-dbb3b84ba2ef/go.mod h1:sJ5fKU0s6JVwZjjcUEX2zFOnvq0ASQ2K9Zr6cf67kNs=