Skip to content

Commit

Permalink
added retries for viewedNewsSource, addBalanceCommandsSource, deviceM…
Browse files Browse the repository at this point in the history
…etadataTableSource when fetching the user's worker index, so it has time for the user to be propagated
  • Loading branch information
ice-ares committed Mar 30, 2023
1 parent 7647040 commit a3cc169
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 12 deletions.
8 changes: 6 additions & 2 deletions tokenomics/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,9 +499,13 @@ func (s *addBalanceCommandsSource) Process(ctx context.Context, message *message
if err = storage.CheckNoSQLDMLErr(s.db.InsertTyped("PROCESSED_ADD_BALANCE_COMMANDS", tuple, &[]*processedAddBalanceCommand{})); err != nil {
return errors.Wrapf(err, "failed to insert PROCESSED_ADD_BALANCE_COMMAND:%#v)", tuple)
}
workerIndex, err := s.getWorkerIndex(ctx, val.UserID)
if err != nil {
var workerIndex uint64
if err = retry(ctx, func() error {
workerIndex, err = s.getWorkerIndex(ctx, val.UserID)

return errors.Wrapf(err, "failed to getWorkerIndex for userID:%v", val.UserID)
}); err != nil {
return errors.Wrapf(err, "permanently failed to getWorkerIndex for userID:%v", val.UserID)
}
err = errors.Wrapf(retry(ctx, func() error {
if err = s.insertOrReplaceBalances(ctx, workerIndex, true, time.New(message.Timestamp), bal); err != nil {
Expand Down
25 changes: 16 additions & 9 deletions tokenomics/extra_bonus.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,38 +199,45 @@ func (s *deviceMetadataTableSource) Process(ctx context.Context, msg *messagebro
if err != nil {
return errors.Wrapf(err, "invalid timezone:%#v", &dm)
}
workerIndex, err := s.getWorkerIndex(ctx, dm.UserID)
if err != nil {
var workerIndex uint64
if err = retry(ctx, func() error {
workerIndex, err = s.getWorkerIndex(ctx, dm.UserID)

return errors.Wrapf(err, "failed to getWorkerIndex for userID:%v", dm.UserID)
}); err != nil {
return errors.Wrapf(err, "permanently failed to getWorkerIndex for userID:%v", dm.UserID)
}
space := fmt.Sprintf("EXTRA_BONUS_PROCESSING_WORKER_%v", workerIndex)
tuple := &extraBonusProcessingWorker{UserID: dm.UserID, UTCOffset: int64(duration / stdlibtime.Minute)}
ops := append(make([]tarantool.Op, 0, 1), tarantool.Op{Op: "=", Field: 3, Arg: tuple.UTCOffset}) //nolint:gomnd // `utc_offset` column index.

return errors.Wrapf(storage.CheckNoSQLDMLErr(s.db.UpsertTyped(space, tuple, ops, &[]*extraBonusProcessingWorker{})),
"failed to update users' timezone for %#v", &dm)
return errors.Wrapf(storage.CheckNoSQLDMLErr(s.db.UpsertTyped(space, tuple, ops, &[]*extraBonusProcessingWorker{})), "failed to update users' timezone for %#v", &dm) //nolint:lll // .
}

func (s *viewedNewsSource) Process(ctx context.Context, msg *messagebroker.Message) error { //nolint:funlen // .
func (s *viewedNewsSource) Process(ctx context.Context, msg *messagebroker.Message) (err error) { //nolint:funlen // .
if ctx.Err() != nil {
return errors.Wrap(ctx.Err(), "unexpected deadline while processing message")
}
if len(msg.Value) == 0 {
return nil
}
var vn viewedNews
if err := json.UnmarshalContext(ctx, msg.Value, &vn); err != nil {
if err = json.UnmarshalContext(ctx, msg.Value, &vn); err != nil {
return errors.Wrapf(err, "process: cannot unmarshall %v into %#v", string(msg.Value), &vn)
}
if vn.UserID == "" {
return nil
}
if err := storage.CheckNoSQLDMLErr(s.db.InsertTyped("PROCESSED_SEEN_NEWS", &vn, &[]*viewedNews{})); err != nil {
if err = storage.CheckNoSQLDMLErr(s.db.InsertTyped("PROCESSED_SEEN_NEWS", &vn, &[]*viewedNews{})); err != nil {
return errors.Wrapf(err, "failed to insert PROCESSED_SEEN_NEWS:%#v)", &vn)
}
workerIndex, err := s.getWorkerIndex(ctx, vn.UserID)
if err != nil {
var workerIndex uint64
if err = retry(ctx, func() error {
workerIndex, err = s.getWorkerIndex(ctx, vn.UserID)

return errors.Wrapf(err, "failed to getWorkerIndex for userID:%v", vn.UserID)
}); err != nil {
return errors.Wrapf(err, "permanently failed to getWorkerIndex for userID:%v", vn.UserID)
}
space := fmt.Sprintf("EXTRA_BONUS_PROCESSING_WORKER_%v", workerIndex)
tuple := &extraBonusProcessingWorker{UserID: vn.UserID, NewsSeen: 1}
Expand Down
2 changes: 1 addition & 1 deletion tokenomics/mining_sessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ return ''`,
if err := r.db.EvalTyped(script, []any{}, &resp); err != nil {
return errors.Wrapf(err, "failed to eval script to insert mining session for %#v", ms)
} else if errMessage := resp[0]; errMessage != "" {
if strings.Contains(errMessage, `race condition`) {
if strings.Contains(errMessage, `race condition`) || strings.Contains(errMessage, "Transaction has been aborted by conflict") {
return ErrRaceCondition
}

Expand Down

0 comments on commit a3cc169

Please sign in to comment.