From 764704032758540df3dc3ff98bf08ad54234740a Mon Sep 17 00:00:00 2001 From: Ares <75481906+ice-ares@users.noreply.github.com> Date: Thu, 30 Mar 2023 13:04:35 +0300 Subject: [PATCH] replaced update to upsert for EXTRA_BONUS_PROCESSING_WORKER_xxx operations --- tokenomics/extra_bonus.go | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/tokenomics/extra_bonus.go b/tokenomics/extra_bonus.go index 8395b9f..4c75368 100644 --- a/tokenomics/extra_bonus.go +++ b/tokenomics/extra_bonus.go @@ -170,12 +170,14 @@ func (r *repository) mustPopulateExtraBonusWorker(workerIndex uint64, extraBonus } type ( + //nolint:govet // We can't update the order of fields in space. extraBonusProcessingWorker struct { - _msgpack struct{} `msgpack:",asArray"` //nolint:unused,tagliatelle,revive,nosnakecase // . - ExtraBonusStartedAt, ExtraBonusEndedAt *time.Time - UserID string - UTCOffset int64 - NewsSeen, ExtraBonus, LastExtraBonusIndexNotified uint64 // LastExtraBonusIndexNotified might be null, which is != 0. + _msgpack struct{} `msgpack:",asArray"` //nolint:unused,tagliatelle,revive,nosnakecase // . + ExtraBonusStartedAt, ExtraBonusEndedAt *time.Time + UserID string + UTCOffset int64 + NewsSeen, ExtraBonus uint64 + LastExtraBonusIndexNotified *uint64 } ) @@ -202,10 +204,10 @@ func (s *deviceMetadataTableSource) Process(ctx context.Context, msg *messagebro return errors.Wrapf(err, "failed to getWorkerIndex for userID:%v", dm.UserID) } space := fmt.Sprintf("EXTRA_BONUS_PROCESSING_WORKER_%v", workerIndex) - pkIndex := fmt.Sprintf("pk_unnamed_%v_1", space) - ops := append(make([]tarantool.Op, 0, 1), tarantool.Op{Op: "=", Field: 3, Arg: duration / stdlibtime.Minute}) //nolint:gomnd // `utc_offset` column index. + tuple := &extraBonusProcessingWorker{UserID: dm.UserID, UTCOffset: int64(duration / stdlibtime.Minute)} + ops := append(make([]tarantool.Op, 0, 1), tarantool.Op{Op: "=", Field: 3, Arg: tuple.UTCOffset}) //nolint:gomnd // `utc_offset` column index. - return errors.Wrapf(storage.CheckNoSQLDMLErr(s.db.UpdateTyped(space, pkIndex, tarantool.StringKey{S: dm.UserID}, ops, &[]*extraBonusProcessingWorker{})), + return errors.Wrapf(storage.CheckNoSQLDMLErr(s.db.UpsertTyped(space, tuple, ops, &[]*extraBonusProcessingWorker{})), "failed to update users' timezone for %#v", &dm) } @@ -231,9 +233,9 @@ func (s *viewedNewsSource) Process(ctx context.Context, msg *messagebroker.Messa return errors.Wrapf(err, "failed to getWorkerIndex for userID:%v", vn.UserID) } space := fmt.Sprintf("EXTRA_BONUS_PROCESSING_WORKER_%v", workerIndex) - pkIndex := fmt.Sprintf("pk_unnamed_%v_1", space) - ops := append(make([]tarantool.Op, 0, 1), tarantool.Op{Op: "+", Field: 4, Arg: uint64(1)}) //nolint:gomnd // `news_seen` column index. - if err = storage.CheckNoSQLDMLErr(s.db.UpdateTyped(space, pkIndex, tarantool.StringKey{S: vn.UserID}, ops, &[]*extraBonusProcessingWorker{})); err != nil { + tuple := &extraBonusProcessingWorker{UserID: vn.UserID, NewsSeen: 1} + ops := append(make([]tarantool.Op, 0, 1), tarantool.Op{Op: "+", Field: 4, Arg: tuple.NewsSeen}) //nolint:gomnd // `news_seen` column index. + if err = storage.CheckNoSQLDMLErr(s.db.UpsertTyped(space, tuple, ops, &[]*extraBonusProcessingWorker{})); err != nil { return multierror.Append( //nolint:wrapcheck // Not needed. errors.Wrapf(err, "failed to update users' newsSeen count for %#v", &vn), errors.Wrapf(storage.CheckNoSQLDMLErr(s.db.DeleteTyped("PROCESSED_SEEN_NEWS", "pk_unnamed_PROCESSED_SEEN_NEWS_1", []any{vn.UserID, vn.NewsID}, &[]*viewedNews{})), //nolint:lll // .