diff --git a/client/broadcaster/serial.go b/client/broadcaster/serial.go index e5f82aa27d..f3ea0d65d4 100644 --- a/client/broadcaster/serial.go +++ b/client/broadcaster/serial.go @@ -2,34 +2,40 @@ package broadcaster import ( "context" + "encoding/hex" "errors" "fmt" - "regexp" - "strconv" + "strings" "time" - "github.com/ovrclk/akash/sdkutil" - "github.com/boz/go-lifecycle" sdkclient "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/client/tx" "github.com/cosmos/cosmos-sdk/crypto/keyring" sdk "github.com/cosmos/cosmos-sdk/types" + sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" + authtx "github.com/cosmos/cosmos-sdk/x/auth/tx" "github.com/tendermint/tendermint/libs/log" + ttypes "github.com/tendermint/tendermint/types" + + "github.com/ovrclk/akash/sdkutil" ) const ( - syncDuration = 10 * time.Second - errCodeMismatch = 32 - // invalid group - 7 + broadcastBlockRetryPeriod = time.Second ) var ( ErrNotRunning = errors.New("not running") + // sadface. + + // Only way to detect the timeout error. + // https://github.com/tendermint/tendermint/blob/46e06c97320bc61c4d98d3018f59d47ec69863c9/rpc/core/mempool.go#L124 + timeoutErrorMessage = "timed out waiting for tx to be included in a block" - // errors are of the form: - // "account sequence mismatch, expected 25, got 27: incorrect account sequence" - recoverRegexp = regexp.MustCompile(`^account sequence mismatch, expected (\d+), got (\d+):`) + // Only way to check for tx not found error. + // https://github.com/tendermint/tendermint/blob/46e06c97320bc61c4d98d3018f59d47ec69863c9/rpc/core/tx.go#L31-L33 + notFoundErrorMessageSuffix = ") not found" ) type SerialClient interface { @@ -37,30 +43,39 @@ type SerialClient interface { Close() } -type serialBroadcaster struct { - cctx sdkclient.Context - txf tx.Factory - info keyring.Info - broadcastch chan broadcastRequest - lc lifecycle.Lifecycle - log log.Logger +type seqreq struct { + curr uint64 + ch chan<- uint64 } -func NewSerialClient(log log.Logger, cctx sdkclient.Context, txf tx.Factory, info keyring.Info) (SerialClient, error) { +type serialBroadcaster struct { + cctx sdkclient.Context + txf tx.Factory + info keyring.Info + broadcastTimeout time.Duration + broadcastch chan broadcastRequest + seqreqch chan seqreq + lc lifecycle.Lifecycle + log log.Logger +} +func NewSerialClient(log log.Logger, cctx sdkclient.Context, timeout time.Duration, txf tx.Factory, info keyring.Info) (SerialClient, error) { // populate account number, current sequence number poptxf, err := sdkutil.PrepareFactory(cctx, txf) if err != nil { return nil, err } + poptxf = poptxf.WithSimulateAndExecute(true) client := &serialBroadcaster{ - cctx: cctx, - txf: poptxf, - info: info, - lc: lifecycle.New(), - broadcastch: make(chan broadcastRequest), - log: log.With("cmp", "client/broadcaster"), + cctx: cctx, + txf: poptxf, + info: info, + broadcastTimeout: timeout, + lc: lifecycle.New(), + broadcastch: make(chan broadcastRequest), + seqreqch: make(chan seqreq), + log: log.With("cmp", "client/broadcaster"), } go client.run() @@ -85,7 +100,6 @@ func (c *serialBroadcaster) Broadcast(ctx context.Context, msgs ...sdk.Msg) erro } select { - // request received, return response case c.broadcastch <- request: return <-responsech @@ -97,22 +111,17 @@ func (c *serialBroadcaster) Broadcast(ctx context.Context, msgs ...sdk.Msg) erro // loop shutting down, return error case <-c.lc.ShuttingDown(): return ErrNotRunning - } } func (c *serialBroadcaster) run() { defer c.lc.ShutdownCompleted() - var ( - txf = c.txf - synch = make(chan uint64) - donech = make(chan struct{}) - ) + donech := make(chan struct{}) go func() { defer close(donech) - c.syncLoop(synch) + c.syncLoop() }() defer func() { <-donech }() @@ -125,126 +134,136 @@ loop: break loop case req := <-c.broadcastch: // broadcast the message - var err error - txf, err = c.doBroadcast(txf, false, req.msgs...) - + txf, err := c.broadcast(c.txf, false, req.msgs...) // send response req.responsech <- err - - case seqno := <-synch: - - c.log.Info("syncing sequence", "local", txf.Sequence(), "remote", seqno) - - // fast-forward current sequence if necessary - if seqno > txf.Sequence() { - txf = txf.WithSequence(seqno) - } + c.txf = txf } } } -func (c *serialBroadcaster) syncLoop(ch chan<- uint64) { - // TODO: add jitter, force update on "sequence mismatch"-type errors. - ticker := time.NewTicker(syncDuration) - defer ticker.Stop() - +func (c *serialBroadcaster) syncLoop() { for { select { case <-c.lc.ShuttingDown(): return - case <-ticker.C: - + case req := <-c.seqreqch: // query sequence number - _, seq, err := c.cctx.AccountRetriever. - GetAccountNumberSequence(c.cctx, c.info.GetAddress()) + _, seq, err := c.cctx.AccountRetriever.GetAccountNumberSequence(c.cctx, c.info.GetAddress()) - // send to main loop if no error if err != nil { c.log.Error("error requesting account", "err", err) - break + seq = req.curr } select { - case ch <- seq: + case req.ch <- seq: case <-c.lc.ShuttingDown(): } - } } } -func (c *serialBroadcaster) doBroadcast(txf tx.Factory, retried bool, msgs ...sdk.Msg) (tx.Factory, error) { - txf, err := sdkutil.AdjustGas(c.cctx, txf, msgs...) - if err != nil { - return txf, err - } +func (c *serialBroadcaster) broadcast(txf tx.Factory, retry bool, msgs ...sdk.Msg) (tx.Factory, error) { + var err error - response, err := doBroadcast(c.cctx, txf, c.info.GetName(), msgs...) - - c.log.Info("broadcast response", "response", response, "err", err) + if !retry { + txf, err = sdkutil.AdjustGas(c.cctx, txf, msgs...) + if err != nil { + return txf, err + } + } + response, err := c.doBroadcast(c.cctx, txf, c.broadcastTimeout, c.info.GetName(), msgs...) if err != nil { + c.log.Error("broadcast response", "response", response, "err", err) return txf, err } - // if no error, increment sequence. if response.Code == 0 { - return txf.WithSequence(txf.Sequence() + 1), nil + txf = txf.WithSequence(txf.Sequence() + 1) + return txf, nil } - // if not mismatch error, don't increment sequence and return - if response.Code != errCodeMismatch { - return txf, fmt.Errorf("%w: response code %d - (%#v)", ErrBroadcastTx, response.Code, response) + c.log.Error("broadcast response", "response", response) + // transaction has failed, perform the query of account sequence to make sure correct one is used + // for the next transaction + ch := make(chan uint64) + c.seqreqch <- seqreq{ + curr: txf.Sequence(), + ch: ch, } - // if we're retrying a parsed sequence (see below), don't try to fix it again. - if retried { - return txf, fmt.Errorf("%w: retried response code %d - (%#v)", ErrBroadcastTx, response.Code, response) + select { + case curseq := <-ch: + txf = txf.WithSequence(curseq) + case <-c.lc.ShuttingDown(): + return txf, ErrNotRunning } - // attempt to parse correct next sequence - nextseq, ok := parseNextSequence(txf.Sequence(), response.RawLog) - - if !ok { - return txf, fmt.Errorf("%w: response code %d - (%#v)", ErrBroadcastTx, response.Code, response) + if retry || (response.Code != sdkerrors.ErrWrongSequence.ABCICode()) { + return txf, fmt.Errorf("%w: response code %d", ErrBroadcastTx, response.Code) } - txf = txf.WithSequence(nextseq) - - // try again - return c.doBroadcast(txf, true, msgs...) - + return c.broadcast(txf, retry, msgs...) } -func parseNextSequence(current uint64, message string) (uint64, bool) { - - // errors are of the form: - // "account sequence mismatch, expected 25, got 27: incorrect account sequence" - - matches := recoverRegexp.FindStringSubmatch(message) - - if len(matches) != 3 { - return 0, false +func (c *serialBroadcaster) doBroadcast(cctx sdkclient.Context, txf tx.Factory, timeout time.Duration, keyName string, msgs ...sdk.Msg) (*sdk.TxResponse, error) { + txn, err := tx.BuildUnsignedTx(txf, msgs...) + if err != nil { + return nil, err } - if len(matches[1]) == 0 || len(matches[2]) == 0 { - return 0, false + txn.SetFeeGranter(cctx.GetFeeGranterAddress()) + err = tx.Sign(txf, keyName, txn, true) + if err != nil { + return nil, err } - expected, err := strconv.ParseUint(matches[1], 10, 64) - if err != nil || expected == 0 { - return 0, false + bytes, err := cctx.TxConfig.TxEncoder()(txn.GetTx()) + if err != nil { + return nil, err } - received, err := strconv.ParseUint(matches[2], 10, 64) - if err != nil || received == 0 { - return 0, false + txb := ttypes.Tx(bytes) + hash := hex.EncodeToString(txb.Hash()) + + // broadcast-mode=block + // submit with mode commit/block + cres, err := cctx.BroadcastTxCommit(txb) + if err == nil { + // good job + return cres, nil + } else if !strings.HasSuffix(err.Error(), timeoutErrorMessage) { + return cres, err } - if received != current { - // XXX not sure wtf todo. - return expected, true + // timeout error, continue on to retry + + // loop + lctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + for lctx.Err() == nil { + // wait up to one second + select { + case <-lctx.Done(): + return cres, err + case <-time.After(broadcastBlockRetryPeriod): + } + + // check transaction + // https://github.com/cosmos/cosmos-sdk/pull/8734 + res, err := authtx.QueryTx(cctx, hash) + if err == nil { + return res, nil + } + + // if it's not a "not found" error, return + if !strings.HasSuffix(err.Error(), notFoundErrorMessageSuffix) { + return res, err + } } - return expected, true + return cres, lctx.Err() } diff --git a/client/broadcaster/serial_test.go b/client/broadcaster/serial_test.go deleted file mode 100644 index 31779cdd68..0000000000 --- a/client/broadcaster/serial_test.go +++ /dev/null @@ -1,24 +0,0 @@ -package broadcaster - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func Test_parseNextSequence(t *testing.T) { - const ( - rawlog = "account sequence mismatch, expected 25, got 27: incorrect account sequence" - ) - - const ( - expected uint64 = 25 - current uint64 = 27 - ) - - nextseq, ok := parseNextSequence(current, rawlog) - assert.True(t, ok) - - assert.Equal(t, expected, nextseq) - -} diff --git a/provider/balance_checker.go b/provider/balance_checker.go index 6d338d0111..fc77e82bd8 100644 --- a/provider/balance_checker.go +++ b/provider/balance_checker.go @@ -12,14 +12,15 @@ import ( tmrpc "github.com/tendermint/tendermint/rpc/core/types" "github.com/ovrclk/akash/client" - "github.com/ovrclk/akash/provider/event" - "github.com/ovrclk/akash/provider/session" "github.com/ovrclk/akash/pubsub" "github.com/ovrclk/akash/util/runner" dtypes "github.com/ovrclk/akash/x/deployment/types/v1beta2" "github.com/ovrclk/akash/x/escrow/client/util" mtypes "github.com/ovrclk/akash/x/market/types/v1beta2" + "github.com/ovrclk/akash/provider/event" + "github.com/ovrclk/akash/provider/session" + aclient "github.com/ovrclk/akash/client" netutil "github.com/ovrclk/akash/util/network" ) @@ -182,7 +183,32 @@ func (bc *balanceChecker) doEscrowCheck(ctx context.Context, lid mtypes.LeaseID, } func (bc *balanceChecker) startWithdraw(lid mtypes.LeaseID) error { - return bc.bus.Publish(event.LeaseWithdraw{LeaseID: lid}) + msg := &mtypes.MsgWithdrawLease{ + LeaseID: lid, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + errch := make(chan error, 1) + + go func(ch chan<- error) { + ch <- bc.session.Client().Tx().Broadcast(ctx, msg) + }(errch) + + select { + case <-bc.lc.Done(): + // give request extra 30s to finish before force canceling + select { + case <-time.After(30 * time.Second): + cancel() + return bc.lc.Error() + case err := <-errch: + return err + } + case err := <-errch: + return err + } } func (bc *balanceChecker) run(startCh chan<- error) { @@ -202,15 +228,16 @@ func (bc *balanceChecker) run(startCh chan<- error) { }() leaseCheckCh := make(chan leaseCheckResponse, 1) - resultCh := make(chan runner.Result, 1) + var resultch chan runner.Result subscriber, err := bc.bus.Subscribe() - startCh <- err if err != nil { return } + resultch = make(chan runner.Result, 1) + loop: for { select { @@ -234,7 +261,7 @@ loop: bc.leases[ev.LeaseID] = lState - // if there was provider restart with bunch of active leases + // if there was provider restart with a bunch of active leases // spread their requests across 1min interval // to reduce pressure on the RPC if !ev.IsNewLease { @@ -266,14 +293,16 @@ loop: switch res.state { case respStateScheduledWithdraw: + withdraw = true + bc.log.Debug("sending scheduled withdraw", "lease", res.lid) // reschedule periodic withdraw if configured if bc.cfg.WithdrawalPeriod > 0 { lState.scheduledWithdrawAt = time.Now().Add(bc.cfg.WithdrawalPeriod) } - fallthrough case respStateOutOfFunds: bc.log.Debug("lease is out of fund. sending withdraw", "lease", res.lid) withdraw = true + // reschedule funds check. if lease not being topped up then network will close it fallthrough case respStateNextCheck: timerPeriod := res.checkAfter @@ -282,14 +311,12 @@ loop: bc.log.Info("couldn't check lease balance. retrying in 1m", "leaseId", res.lid, "error", res.err.Error()) timerPeriod = time.Minute } else { - if !lState.scheduledWithdrawAt.IsZero() { - withdrawIn := lState.scheduledWithdrawAt.Sub(time.Now()) + if !withdraw && !lState.scheduledWithdrawAt.IsZero() { + withdrawIn := time.Until(lState.scheduledWithdrawAt) if timerPeriod >= withdrawIn { timerPeriod = withdrawIn scheduledWithdraw = true } - } else { - bc.log.Debug("lease is out of fund. sending withdraw event", "lease", res.lid) } } @@ -300,11 +327,11 @@ loop: go func() { select { case <-bc.ctx.Done(): - case resultCh <- runner.NewResult(res.lid, bc.startWithdraw(res.lid)): + case resultch <- runner.NewResult(res.lid, bc.startWithdraw(res.lid)): } }() } - case res := <-resultCh: + case res := <-resultch: if err := res.Error(); err != nil { bc.log.Error("failed to do lease withdrawal", "err", err, "LeaseID", res.Value().(mtypes.LeaseID)) } diff --git a/provider/cluster/lease_withdraw.go b/provider/cluster/lease_withdraw.go deleted file mode 100644 index b4d46d4fa1..0000000000 --- a/provider/cluster/lease_withdraw.go +++ /dev/null @@ -1,107 +0,0 @@ -package cluster - -import ( - "context" - - "github.com/boz/go-lifecycle" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/tendermint/tendermint/libs/log" - - "github.com/ovrclk/akash/provider/event" - "github.com/ovrclk/akash/provider/session" - "github.com/ovrclk/akash/pubsub" - metricsutils "github.com/ovrclk/akash/util/metrics" - "github.com/ovrclk/akash/util/runner" - mtypes "github.com/ovrclk/akash/x/market/types/v1beta2" -) - -type deploymentWithdrawal struct { - session session.Session - lease mtypes.LeaseID - log log.Logger - lc lifecycle.Lifecycle -} - -var ( - leaseWithdrawalCounter = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "provider_lease_withdrawal", - }, []string{"result"}) -) - -func newDeploymentWithdrawal(dm *deploymentManager) (*deploymentWithdrawal, error) { - m := &deploymentWithdrawal{ - session: dm.session, - lease: dm.lease, - log: dm.log.With("cmp", "deployment-withdrawal"), - lc: lifecycle.New(), - } - - events, err := dm.bus.Subscribe() - if err != nil { - return nil, err - } - - go m.lc.WatchChannel(dm.lc.ShuttingDown()) - go m.run(events) - - return m, nil -} - -func (dw *deploymentWithdrawal) doWithdrawal(ctx context.Context) error { - msg := &mtypes.MsgWithdrawLease{ - LeaseID: dw.lease, - } - - result := dw.session.Client().Tx().Broadcast(ctx, msg) - - label := metricsutils.SuccessLabel - if result != nil { - label = metricsutils.FailLabel - } - leaseWithdrawalCounter.WithLabelValues(label).Inc() - return result -} - -func (dw *deploymentWithdrawal) run(events pubsub.Subscriber) { - defer func() { - dw.lc.ShutdownCompleted() - events.Close() - }() - ctx, cancel := context.WithCancel(context.Background()) - - var result <-chan runner.Result -loop: - for { - select { - case err := <-dw.lc.ShutdownRequest(): - dw.log.Debug("shutting down") - dw.lc.ShutdownInitiated(err) - break loop - case ev := <-events.Events(): - if evt, valid := ev.(event.LeaseWithdraw); valid { - if !evt.LeaseID.Equals(dw.lease) { - continue loop - } - - // do the withdrawal - result = runner.Do(func() runner.Result { - return runner.NewResult(nil, dw.doWithdrawal(ctx)) - }) - } - case r := <-result: - result = nil - if err := r.Error(); err != nil { - dw.log.Error("failed to do withdrawal", "err", err) - } - } - } - cancel() - - if result != nil { - // The context has been cancelled, so wait for the result now and discard it - <-result - } - - dw.log.Debug("shutdown complete") -} diff --git a/provider/cluster/manager.go b/provider/cluster/manager.go index db39f3f180..f2dba98afa 100644 --- a/provider/cluster/manager.go +++ b/provider/cluster/manager.go @@ -60,7 +60,6 @@ type deploymentManager struct { mgroup *manifest.Group monitor *deploymentMonitor - withdrawal *deploymentWithdrawal wg sync.WaitGroup updatech chan *manifest.Group teardownch chan struct{} @@ -96,13 +95,8 @@ func newDeploymentManager(s *service, lease mtypes.LeaseID, mgroup *manifest.Gro currentHostnames: make(map[string]struct{}), } - startCh := make(chan struct{}, 1) go dm.lc.WatchChannel(dm.serviceShuttingDown) - go dm.run(startCh) - - // ensures lease withdraw monitor is started and subscribed to the bus prior - // sending LeaseAddFundsMonitor event - <-startCh + go dm.run() go func() { <-dm.lc.Done() @@ -150,7 +144,7 @@ func (dm *deploymentManager) handleUpdate() <-chan error { return nil } -func (dm *deploymentManager) run(startCh chan<- struct{}) { +func (dm *deploymentManager) run() { defer dm.lc.ShutdownCompleted() var shutdownErr error @@ -164,12 +158,6 @@ func (dm *deploymentManager) run(startCh chan<- struct{}) { dm.log.Debug("hostnames released") }() - startCh <- struct{}{} - - if err := dm.startWithdrawal(); err != nil { - dm.log.Error("couldn't start if withdraw monitor", "err", err, "lease", dm.lease) - } - loop: for { select { @@ -239,27 +227,12 @@ loop: dm.log.Debug("read from runch during shutdown") } - if nil != dm.withdrawal { - dm.log.Debug("waiting on withdrawal") - <-dm.withdrawal.lc.Done() - } - dm.log.Debug("waiting on dm.wg") dm.wg.Wait() dm.log.Info("shutdown complete") } -func (dm *deploymentManager) startWithdrawal() error { - var err error - dm.withdrawal, err = newDeploymentWithdrawal(dm) - if err != nil { - return err - } - - return nil -} - func (dm *deploymentManager) startMonitor() { dm.wg.Add(1) dm.monitor = newDeploymentMonitor(dm) diff --git a/provider/cmd/run.go b/provider/cmd/run.go index 2a1654c5c1..ab7f576657 100644 --- a/provider/cmd/run.go +++ b/provider/cmd/run.go @@ -92,6 +92,7 @@ const ( FlagProviderConfig = "provider-config" FlagCachedResultMaxAge = "cached-result-max-age" FlagRPCQueryTimeout = "rpc-query-timeout" + FlagTxBroadcastTimeout = "tx-broadcast-timeout" ) var ( @@ -321,6 +322,11 @@ func RunCmd() *cobra.Command { return nil } + cmd.Flags().Duration(FlagTxBroadcastTimeout, 30*time.Second, "tx broadcast timeout. defaults to 30s") + if err := viper.BindPFlag(FlagTxBroadcastTimeout, cmd.Flags().Lookup(FlagTxBroadcastTimeout)); err != nil { + return nil + } + return cmd } @@ -433,6 +439,7 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error { providerConfig := viper.GetString(FlagProviderConfig) cachedResultMaxAge := viper.GetDuration(FlagCachedResultMaxAge) rpcQueryTimeout := viper.GetDuration(FlagRPCQueryTimeout) + txTimeout := viper.GetDuration(FlagTxBroadcastTimeout) var metricsRouter http.Handler if len(metricsListener) != 0 { @@ -499,7 +506,7 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error { logger := openLogger() - broadcasterClient, err := broadcaster.NewSerialClient(logger, cctx, txFactory, info) + broadcasterClient, err := broadcaster.NewSerialClient(logger, cctx, txTimeout, txFactory, info) if err != nil { return err } diff --git a/provider/event/events.go b/provider/event/events.go index 6ea2828830..79507f80fa 100644 --- a/provider/event/events.go +++ b/provider/event/events.go @@ -54,11 +54,6 @@ type ClusterDeployment struct { Status ClusterDeploymentStatus } -// LeaseWithdraw Empty type used as a marker to indicate specified lease should be withdrawn now -type LeaseWithdraw struct { - mtypes.LeaseID -} - type LeaseAddFundsMonitor struct { mtypes.LeaseID IsNewLease bool diff --git a/provider/service.go b/provider/service.go index 240caef9af..efc7af9492 100644 --- a/provider/service.go +++ b/provider/service.go @@ -4,15 +4,14 @@ import ( "context" "time" - clustertypes "github.com/ovrclk/akash/provider/cluster/types/v1beta2" - - "github.com/cosmos/cosmos-sdk/client" - sdk "github.com/cosmos/cosmos-sdk/types" bankTypes "github.com/cosmos/cosmos-sdk/x/bank/types" aclient "github.com/ovrclk/akash/client" + clustertypes "github.com/ovrclk/akash/provider/cluster/types/v1beta2" "github.com/boz/go-lifecycle" + "github.com/cosmos/cosmos-sdk/client" + sdk "github.com/cosmos/cosmos-sdk/types" "github.com/pkg/errors" "github.com/ovrclk/akash/provider/bidengine" @@ -75,9 +74,17 @@ func NewService(ctx context.Context, cctx client.Context, accAddr sdk.AccAddress clusterConfig.DeploymentIngressDomain = cfg.DeploymentIngressDomain clusterConfig.ClusterSettings = cfg.ClusterSettings + bc, err := newBalanceChecker(ctx, bankTypes.NewQueryClient(cctx), aclient.NewQueryClientFromCtx(cctx), accAddr, session, bus, cfg.BalanceCheckerCfg) + if err != nil { + session.Log().Error("starting balance checker", "err", err) + cancel() + return nil, err + } + cluster, err := cluster.NewService(ctx, session, bus, cclient, clusterConfig) if err != nil { cancel() + <-bc.lc.Done() return nil, err } @@ -87,6 +94,7 @@ func NewService(ctx context.Context, cctx client.Context, accAddr sdk.AccAddress session.Log().Error(ErrClusterReadTimedout.Error()) cancel() <-cluster.Done() + <-bc.lc.Done() return nil, ErrClusterReadTimedout } @@ -102,6 +110,7 @@ func NewService(ctx context.Context, cctx client.Context, accAddr sdk.AccAddress session.Log().Error(errmsg, "err", err) cancel() <-cluster.Done() + <-bc.lc.Done() return nil, errors.Wrap(err, errmsg) } @@ -118,15 +127,7 @@ func NewService(ctx context.Context, cctx client.Context, accAddr sdk.AccAddress cancel() <-cluster.Done() <-bidengine.Done() - return nil, err - } - - bc, err := newBalanceChecker(ctx, bankTypes.NewQueryClient(cctx), aclient.NewQueryClientFromCtx(cctx), accAddr, session, bus, cfg.BalanceCheckerCfg) - if err != nil { - session.Log().Error("starting balance checker", "err", err) - cancel() - <-cluster.Done() - <-bidengine.Done() + <-bc.lc.Done() return nil, err }