Skip to content

Commit

Permalink
Remove one time mining notification functionality as no more needed. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ice-myles authored Aug 13, 2024
1 parent 44ddd6d commit 25e3d89
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 100 deletions.
1 change: 0 additions & 1 deletion notifications/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,6 @@ type (
telemetryTelegramNotifications *telemetry
telemetryAnnouncements *telemetry
db *storage.DB
eskimoDB *storage.DB
wg *sync.WaitGroup
cancel context.CancelFunc
}
Expand Down
100 changes: 1 addition & 99 deletions notifications/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package notifications
import (
"context"
"fmt"
"math/rand"
"net/url"
"strings"
"sync"
Expand Down Expand Up @@ -33,11 +32,9 @@ func MustStartScheduler(ctx context.Context, cancel context.CancelFunc) *Schedul
loadTelegramNotificationTranslationTemplates(cfg.TenantName)

ddlWorkersParam := fmt.Sprintf(ddl, schedulerWorkersCount)
eskimoDB := storage.MustConnect(ctx, "", eskimoApplicationYamlKey)
sh := Scheduler{
cfg: &cfg,
db: storage.MustConnect(context.Background(), ddlWorkersParam, applicationYamlKey), //nolint:contextcheck // .
eskimoDB: eskimoDB,
pictureClient: picture.New(applicationYamlKey),
pushNotificationsClient: push.New(applicationYamlKey),
wg: new(sync.WaitGroup),
Expand All @@ -50,7 +47,6 @@ func MustStartScheduler(ctx context.Context, cancel context.CancelFunc) *Schedul
schedulerTelegramNotificationsMX: &sync.Mutex{},
}
go sh.startWeeklyStatsUpdater(ctx)
go sh.runOneTimeMiningExtendFeature(ctx)
sh.wg = new(sync.WaitGroup)
sh.wg.Add(3 * int(schedulerWorkersCount)) //nolint:gomnd,mnd // .
sh.cancel = cancel
Expand All @@ -76,9 +72,6 @@ func (s *Scheduler) CheckHealth(ctx context.Context) error {
if err := s.db.Ping(ctx); err != nil {
return errors.Wrap(err, "[health-check] failed to ping DB")
}
if err := s.eskimoDB.Ping(ctx); err != nil {
return errors.Wrap(err, "[health-check] failed to ping eskimo DB")
}

return nil
}
Expand All @@ -105,7 +98,7 @@ func (s *Scheduler) Close() error {
s.cancel()
s.wg.Wait()

return errors.Wrap(multierror.Append(nil, s.db.Close(), s.eskimoDB.Close()).ErrorOrNil(), "can't close one of postgres connection")
return errors.Wrap(s.db.Close(), "can't close postgres connection")
}

//nolint:funlen // .
Expand Down Expand Up @@ -255,94 +248,3 @@ func (s *Scheduler) deleteScheduledNotifications(ctx context.Context, notificati

return errors.Wrapf(err, "failed to delete scheduled notifications %#v", notifications)
}

//nolint:funlen // .
func (s *Scheduler) runOneTimeMiningExtendFeature(ctx context.Context) {
type notActiveUser struct {
ID string
Language string
}
const limit = 1000
offset := uint64(0)
now := time.Now()
for {
sql := fmt.Sprintf(`SELECT
id,
language
FROM users
WHERE last_mining_ended_at IS NOT NULL AND last_mining_ended_at < $1
ORDER BY created_at ASC
LIMIT %v OFFSET %v`, limit, offset)
resp, err := storage.Select[notActiveUser](ctx, s.eskimoDB, sql, now)
if err != nil {
log.Error(err, "can't get users to send the single mining extend message")
stdlibtime.Sleep(100 * stdlibtime.Millisecond) //nolint:gomnd,mnd // .

continue
}
if len(resp) == 0 {
break
}
var randMax int
if s.cfg.Development {
randMax = 2
} else {
randMax = 240
}
scheduled := make([]*scheduledNotification, 0, len(resp))
for _, usr := range resp {
scheduled = append(scheduled, &scheduledNotification{
ScheduledAt: now,
ScheduledFor: time.New(time.Now().Add(stdlibtime.Duration(1+rand.Intn(randMax)) * stdlibtime.Minute)), //nolint:gosec // Not an issue.
Language: usr.Language,
UserID: usr.ID,
NotificationType: string(MiningEndingSoonNotificationType),
Uniqueness: fmt.Sprintf("%v_%v_single", usr.ID, MiningEndingSoonNotificationType),
NotificationChannel: string(TelegramNotificationChannel),
NotificationChannelValue: usr.ID,
Data: &users.JSON{
"TenantName": s.cfg.TenantName,
"TokenName": strings.ToUpper(s.cfg.TokenName),
},
})
}
if iErr := insertScheduledNotificationsWithSentCheck(ctx, s.db, scheduled); iErr != nil {
log.Error(iErr, "can't insert scheduled notifications for single mining ending soon call", resp)
}

offset += limit
stdlibtime.Sleep(100 * stdlibtime.Millisecond) //nolint:gomnd,mnd // .
}
diff := stdlibtime.Since(*now.Time)
log.Info("Finished handling one time mining extend notifications generation in", diff)
}

func insertScheduledNotificationsWithSentCheck(ctx context.Context, db *storage.DB, scheduled []*scheduledNotification) error {
const numFields = 9
values := make([]string, 0, numFields*len(scheduled))
params := make([]any, 0, numFields*len(scheduled))
for ix, sch := range scheduled {
values = append(values, fmt.Sprintf("(to_timestamp($%[1]v, 'YYYY-MM-DD HH24:MI:SS'), to_timestamp($%[2]v, 'YYYY-MM-DD HH24:MI:SS'), $%[3]v::jsonb, $%[4]v, $%[5]v, $%[6]v, $%[7]v, $%[8]v, $%[9]v)", ix*numFields+1, ix*numFields+2, ix*numFields+3, ix*numFields+4, ix*numFields+5, ix*numFields+6, ix*numFields+7, ix*numFields+8, ix*numFields+9)) //nolint:lll,gomnd,mnd // .
params = append(params, sch.ScheduledAt.Time, sch.ScheduledFor.Time, sch.Data, sch.Language, sch.UserID, sch.Uniqueness, sch.NotificationType, sch.NotificationChannel, sch.NotificationChannelValue) //nolint:lll // .
}
//nolint:lll // .
sql := fmt.Sprintf(`
INSERT INTO scheduled_notifications(
scheduled_at,
scheduled_for,
data,
language,
user_id,
uniqueness,
notification_type,
notification_channel,
notification_channel_value
)
SELECT * FROM (VALUES %[1]v) AS t(scheduled_at, scheduled_for, data, language, user_id, uniqueness, notification_type, notification_channel, notification_channel_value)
WHERE t.user_id NOT IN (SELECT user_id FROM sent_notifications WHERE uniqueness = user_id || '_%[2]v' || '_single')
ON CONFLICT (user_id,uniqueness,notification_type,notification_channel,notification_channel_value)
DO NOTHING`, strings.Join(values, ","), MiningEndingSoonNotificationType)
_, err := storage.Exec(ctx, db, sql, params...)

return errors.Wrapf(err, "can't execute insert scheduled notifications records:%#v", values)
}

0 comments on commit 25e3d89

Please sign in to comment.