Skip to content

Commit

Permalink
fix: bump pubsub package with fix for deadlock
Browse files Browse the repository at this point in the history
refs troian/pubsub#3

Signed-off-by: Artur Troian <[email protected]>
  • Loading branch information
troian committed Jan 29, 2025
1 parent 8b0f59b commit 089667d
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 108 deletions.
36 changes: 21 additions & 15 deletions cluster/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@ import (
inventoryV1 "github.com/akash-network/akash-api/go/inventory/v1"
provider "github.com/akash-network/akash-api/go/provider/v1"
"github.com/boz/go-lifecycle"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/desertbit/timer"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
tpubsub "github.com/troian/pubsub"

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/tendermint/tendermint/libs/log"
tpubsub "github.com/troian/pubsub"

dtypes "github.com/akash-network/akash-api/go/node/deployment/v1beta3"
mtypes "github.com/akash-network/akash-api/go/node/market/v1beta4"
Expand Down Expand Up @@ -486,23 +485,23 @@ func (is *inventoryService) run(ctx context.Context, reservationsArg []*reservat
invupch := make(chan ctypes.Inventory, 1)

invch := is.clients.inventory.ResultChan()
var reserveChLocal <-chan inventoryRequest
var reservech <-chan inventoryRequest

resumeProcessingReservations := func() {
reserveChLocal = is.reservech
reservech = is.reservech
}

t := timer.NewStoppedTimer()

updateIPs := func() {
if is.clients.ip != nil {
reserveChLocal = nil
reservech = nil
if runch == nil {
t.Stop()
runch = is.runCheck(rctx, state)
}
} else if reserveChLocal == nil && state.inventory != nil {
reserveChLocal = is.reservech
} else if reservech == nil && state.inventory != nil {
reservech = is.reservech
}
}

Expand All @@ -516,7 +515,6 @@ func (is *inventoryService) run(ctx context.Context, reservationsArg []*reservat
default:
}
}

loop:
for {
select {
Expand Down Expand Up @@ -566,7 +564,7 @@ loop:
}
case <-t.C:
updateIPs()
case req := <-reserveChLocal:
case req := <-reservech:
is.handleRequest(req, state)
case req := <-is.lookupch:
// lookup registration
Expand Down Expand Up @@ -612,14 +610,21 @@ loop:
inventoryRequestsCounter.WithLabelValues("unreserve", "not-found").Inc()
req.ch <- inventoryResponse{err: errReservationNotFound}
case responseCh := <-is.statusch:
responseCh <- is.getStatus(state)
select {
case responseCh <- is.getStatus(state):
default:
}
inventoryRequestsCounter.WithLabelValues("status", "success").Inc()
case responseCh := <-is.statusV1ch:
resp, err := is.getStatusV1(state)
responseCh <- invSnapshotResp{
select {
case responseCh <- invSnapshotResp{
res: resp,
err: err,
}:
default:
}

if err == nil {
inventoryRequestsCounter.WithLabelValues("status", "success").Inc()
} else {
Expand All @@ -631,11 +636,11 @@ loop:
}

select {
case invupch <- inv:
case <-invupch:
default:
<-invupch
invupch <- inv
}

invupch <- inv
case inv := <-invupch:
currinv = inv.Dup()
state.inventory = inv
Expand Down Expand Up @@ -692,6 +697,7 @@ loop:
if err != nil {
continue
}

bus.Pub(inv, []string{ptypes.PubSubTopicInventoryStatus}, tpubsub.WithRetain())
}

Expand Down
186 changes: 97 additions & 89 deletions cluster/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"

"github.com/boz/go-lifecycle"
sdkquery "github.com/cosmos/cosmos-sdk/types/query"
tpubsub "github.com/troian/pubsub"

"github.com/pkg/errors"
Expand All @@ -13,12 +12,11 @@ import (

"github.com/tendermint/tendermint/libs/log"

sdktypes "github.com/cosmos/cosmos-sdk/types"

aclient "github.com/akash-network/akash-api/go/node/client/v1beta2"
dtypes "github.com/akash-network/akash-api/go/node/deployment/v1beta3"
mtypes "github.com/akash-network/akash-api/go/node/market/v1beta4"
provider "github.com/akash-network/akash-api/go/provider/v1"
sdktypes "github.com/cosmos/cosmos-sdk/types"

"github.com/akash-network/node/pubsub"

Expand Down Expand Up @@ -485,92 +483,102 @@ func findDeployments(
aqc aclient.QueryClient,

Check failure on line 483 in cluster/service.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'aqc' seems to be unused, consider removing or renaming it as _ (revive)

Check failure on line 483 in cluster/service.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'aqc' seems to be unused, consider removing or renaming it as _ (revive)
accAddr sdktypes.AccAddress,

Check failure on line 484 in cluster/service.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'accAddr' seems to be unused, consider removing or renaming it as _ (revive)

Check failure on line 484 in cluster/service.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'accAddr' seems to be unused, consider removing or renaming it as _ (revive)
) ([]ctypes.IDeployment, error) {
var presp *sdkquery.PageResponse
var leases []mtypes.QueryLeaseResponse
bids := make(map[string]mtypes.Bid)

limit := uint64(100)
errorCnt := 0

for {
preq := &sdkquery.PageRequest{
Key: nil,
Limit: limit,
}

if presp != nil {
preq.Key = presp.NextKey
}

resp, err := aqc.Bids(ctx, &mtypes.QueryBidsRequest{
Filters: mtypes.BidFilters{
Provider: accAddr.String(),
State: mtypes.BidActive.String(),
}})
if err != nil {
if errorCnt > 1 {
return nil, err
}

errorCnt++

continue
}
errorCnt = 0

for _, resp := range resp.Bids {
bids[resp.Bid.BidID.DeploymentID().String()] = resp.Bid
}

if uint64(len(resp.Bids)) < limit {
break
}

presp = resp.Pagination
}

presp = nil
for {
preq := &sdkquery.PageRequest{
Key: nil,
Limit: limit,
}

if presp != nil {
preq.Key = presp.NextKey
}

resp, err := aqc.Leases(ctx, &mtypes.QueryLeasesRequest{
Filters: mtypes.LeaseFilters{
Provider: accAddr.String(),
State: mtypes.LeaseActive.String(),
}})
if err != nil {
if errorCnt > 1 {
return nil, err
}

errorCnt++

continue
}

errorCnt = 0

leases = append(leases, resp.Leases...)
if uint64(len(resp.Leases)) < limit {
break
}

presp = resp.Pagination
}

for _, resp := range leases {
did := resp.Lease.LeaseID.DeploymentID().String()
if _, exists := bids[did]; !exists {
delete(bids, did)
}
}
// TODO @troian
// var presp *sdkquery.PageResponse
// var leases []mtypes.QueryLeaseResponse
// bids := make(map[string]mtypes.Bid)
//
// limit := uint64(10)
// errorCnt := 0
//
// log.Info("detecting active leases")
//
// for {
// preq := &sdkquery.PageRequest{
// Key: nil,
// Limit: limit,
// }
//
// if presp != nil {
// preq.Key = presp.NextKey
// }
//
// resp, err := aqc.Bids(ctx, &mtypes.QueryBidsRequest{
// Filters: mtypes.BidFilters{
// Provider: accAddr.String(),
// State: mtypes.BidActive.String(),
// },
// Pagination: preq,
// })
// if err != nil {
// if errorCnt > 1 {
// return nil, err
// }
//
// errorCnt++
//
// continue
// }
// errorCnt = 0
//
// log.Info("found active bids", "active", len(resp.Bids))
// for _, resp := range resp.Bids {
// bids[resp.Bid.BidID.DeploymentID().String()] = resp.Bid
// }
//
// if uint64(len(resp.Bids)) < limit {
// break
// }
//
// presp = resp.Pagination
// }
//
// presp = nil
// for {
// preq := &sdkquery.PageRequest{
// Key: nil,
// Limit: limit,
// }
//
// if presp != nil {
// preq.Key = presp.NextKey
// }
//
// resp, err := aqc.Leases(ctx, &mtypes.QueryLeasesRequest{
// Filters: mtypes.LeaseFilters{
// Provider: accAddr.String(),
// State: mtypes.LeaseActive.String(),
// },
// Pagination: preq,
// })
// if err != nil {
// if errorCnt > 1 {
// return nil, err
// }
//
// errorCnt++
//
// continue
// }
//
// errorCnt = 0
//
// log.Info("found active leases", "active", len(resp.Leases))
//
// leases = append(leases, resp.Leases...)
// if uint64(len(resp.Leases)) < limit {
// break
// }
//
// presp = resp.Pagination
// }
//
// for _, resp := range leases {
// did := resp.Lease.LeaseID.DeploymentID().String()
// if _, exists := bids[did]; !exists {
// delete(bids, did)
// }
// }

deployments, err := client.Deployments(ctx)
if err != nil {
Expand Down
7 changes: 5 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ require (
github.com/spf13/viper v1.18.2
github.com/stretchr/testify v1.9.0
github.com/tendermint/tendermint v0.34.27
github.com/troian/pubsub v0.1.1
github.com/troian/pubsub v0.1.2
github.com/vektra/mockery/v2 v2.40.2
go.uber.org/zap v1.27.0
golang.org/x/net v0.30.0
Expand All @@ -51,7 +51,10 @@ require (
sigs.k8s.io/structured-merge-diff/v4 v4.4.2
)

retract v0.6.0
retract (
v0.6.0
v0.6.5
)

replace (
// use cosmos fork of keyring
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1495,8 +1495,8 @@ github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/troian/hid v0.13.2 h1:O7PWZQm5YGyg0nVvknFVLVrNTPillz4ZXvxJOtoyteE=
github.com/troian/hid v0.13.2/go.mod h1:n6adloQ1876oEXZr6fFsthy4FDHxwJhh7QYQspm30Ds=
github.com/troian/pubsub v0.1.1 h1:huc5qneo0rtSKKsrkroyyMu+b8bw0talql2tt7GXl98=
github.com/troian/pubsub v0.1.1/go.mod h1:fOUAEWXes/SkyWPTdBpW3L/ovyg74N+eBxRpWKik+2Q=
github.com/troian/pubsub v0.1.2 h1:XPS8Y5nawdNRyPyhfFw/SFhPKO1SadY83ZlOVlaxadE=
github.com/troian/pubsub v0.1.2/go.mod h1:fOUAEWXes/SkyWPTdBpW3L/ovyg74N+eBxRpWKik+2Q=
github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31/go.mod h1:onvgF043R+lC5RZ8IT9rBXDaEDnpnw/Cl+HFiw+v/7Q=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/tyler-smith/go-bip39 v1.0.1-0.20181017060643-dbb3b84ba2ef/go.mod h1:sJ5fKU0s6JVwZjjcUEX2zFOnvq0ASQ2K9Zr6cf67kNs=
Expand Down

0 comments on commit 089667d

Please sign in to comment.