Skip to content

Commit

Permalink
replaced update to upsert for EXTRA_BONUS_PROCESSING_WORKER_xxx opera…
Browse files Browse the repository at this point in the history
…tions
  • Loading branch information
ice-ares committed Mar 30, 2023
1 parent 8b61ada commit 7647040
Showing 1 changed file with 13 additions and 11 deletions.
24 changes: 13 additions & 11 deletions tokenomics/extra_bonus.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,14 @@ func (r *repository) mustPopulateExtraBonusWorker(workerIndex uint64, extraBonus
}

type (
//nolint:govet // We can't update the order of fields in space.
extraBonusProcessingWorker struct {
_msgpack struct{} `msgpack:",asArray"` //nolint:unused,tagliatelle,revive,nosnakecase // .
ExtraBonusStartedAt, ExtraBonusEndedAt *time.Time
UserID string
UTCOffset int64
NewsSeen, ExtraBonus, LastExtraBonusIndexNotified uint64 // LastExtraBonusIndexNotified might be null, which is != 0.
_msgpack struct{} `msgpack:",asArray"` //nolint:unused,tagliatelle,revive,nosnakecase // .
ExtraBonusStartedAt, ExtraBonusEndedAt *time.Time
UserID string
UTCOffset int64
NewsSeen, ExtraBonus uint64
LastExtraBonusIndexNotified *uint64
}
)

Expand All @@ -202,10 +204,10 @@ func (s *deviceMetadataTableSource) Process(ctx context.Context, msg *messagebro
return errors.Wrapf(err, "failed to getWorkerIndex for userID:%v", dm.UserID)
}
space := fmt.Sprintf("EXTRA_BONUS_PROCESSING_WORKER_%v", workerIndex)
pkIndex := fmt.Sprintf("pk_unnamed_%v_1", space)
ops := append(make([]tarantool.Op, 0, 1), tarantool.Op{Op: "=", Field: 3, Arg: duration / stdlibtime.Minute}) //nolint:gomnd // `utc_offset` column index.
tuple := &extraBonusProcessingWorker{UserID: dm.UserID, UTCOffset: int64(duration / stdlibtime.Minute)}
ops := append(make([]tarantool.Op, 0, 1), tarantool.Op{Op: "=", Field: 3, Arg: tuple.UTCOffset}) //nolint:gomnd // `utc_offset` column index.

return errors.Wrapf(storage.CheckNoSQLDMLErr(s.db.UpdateTyped(space, pkIndex, tarantool.StringKey{S: dm.UserID}, ops, &[]*extraBonusProcessingWorker{})),
return errors.Wrapf(storage.CheckNoSQLDMLErr(s.db.UpsertTyped(space, tuple, ops, &[]*extraBonusProcessingWorker{})),
"failed to update users' timezone for %#v", &dm)
}

Expand All @@ -231,9 +233,9 @@ func (s *viewedNewsSource) Process(ctx context.Context, msg *messagebroker.Messa
return errors.Wrapf(err, "failed to getWorkerIndex for userID:%v", vn.UserID)
}
space := fmt.Sprintf("EXTRA_BONUS_PROCESSING_WORKER_%v", workerIndex)
pkIndex := fmt.Sprintf("pk_unnamed_%v_1", space)
ops := append(make([]tarantool.Op, 0, 1), tarantool.Op{Op: "+", Field: 4, Arg: uint64(1)}) //nolint:gomnd // `news_seen` column index.
if err = storage.CheckNoSQLDMLErr(s.db.UpdateTyped(space, pkIndex, tarantool.StringKey{S: vn.UserID}, ops, &[]*extraBonusProcessingWorker{})); err != nil {
tuple := &extraBonusProcessingWorker{UserID: vn.UserID, NewsSeen: 1}
ops := append(make([]tarantool.Op, 0, 1), tarantool.Op{Op: "+", Field: 4, Arg: tuple.NewsSeen}) //nolint:gomnd // `news_seen` column index.
if err = storage.CheckNoSQLDMLErr(s.db.UpsertTyped(space, tuple, ops, &[]*extraBonusProcessingWorker{})); err != nil {
return multierror.Append( //nolint:wrapcheck // Not needed.
errors.Wrapf(err, "failed to update users' newsSeen count for %#v", &vn),
errors.Wrapf(storage.CheckNoSQLDMLErr(s.db.DeleteTyped("PROCESSED_SEEN_NEWS", "pk_unnamed_PROCESSED_SEEN_NEWS_1", []any{vn.UserID, vn.NewsID}, &[]*viewedNews{})), //nolint:lll // .
Expand Down

0 comments on commit 7647040

Please sign in to comment.