Skip to content

Commit

Permalink
Telegram commands handling. Fixed logs for push notifications. (#58)
Browse files Browse the repository at this point in the history
Telegram commands handling. Concurrency for get polling updates for
different telegram bots. Fixed logs for push notifications.
  • Loading branch information
ice-myles authored Jul 15, 2024
1 parent 6228bd1 commit f49eaa6
Show file tree
Hide file tree
Showing 9 changed files with 222 additions and 102 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/ice-blockchain/eskimo v1.371.0
github.com/ice-blockchain/freezer v1.488.0
github.com/ice-blockchain/go-tarantool-client v0.0.0-20230327200757-4fc71fa3f7bb
github.com/ice-blockchain/wintr v1.144.0
github.com/ice-blockchain/wintr v1.145.0
github.com/imroc/req/v3 v3.43.7
github.com/pkg/errors v0.9.1
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
Expand Down Expand Up @@ -137,7 +137,7 @@ require (
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/quic-go/qpack v0.4.0 // indirect
github.com/quic-go/quic-go v0.45.1 // indirect
github.com/redis/go-redis/v9 v9.5.3 // indirect
github.com/redis/go-redis/v9 v9.5.4 // indirect
github.com/refraction-networking/utls v1.6.7 // indirect
github.com/rs/zerolog v1.33.0 // indirect
github.com/sagikazarmark/locafero v0.6.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,8 @@ github.com/ice-blockchain/freezer v1.488.0 h1:Q3XGS16eDQ4zeSTqfYLLwAt+H3TbDda9hI
github.com/ice-blockchain/freezer v1.488.0/go.mod h1:bSwFbBC95BBF+/WrPJp+l9qyrGruim0pnTpAmkkWoLQ=
github.com/ice-blockchain/go-tarantool-client v0.0.0-20230327200757-4fc71fa3f7bb h1:8TnFP3mc7O+tc44kv2e0/TpZKnEVUaKH+UstwfBwRkk=
github.com/ice-blockchain/go-tarantool-client v0.0.0-20230327200757-4fc71fa3f7bb/go.mod h1:ZsQU7i3mxhgBBu43Oev7WPFbIjP4TniN/b1UPNGbrq8=
github.com/ice-blockchain/wintr v1.144.0 h1:YQE0olkPdSI6AOlw7r/j5jGI6uLciZQrvXFIkN4C4l4=
github.com/ice-blockchain/wintr v1.144.0/go.mod h1:3HAl5nodsetqQN30q3gUvsxgfq2B7F86Os/II7/5GPQ=
github.com/ice-blockchain/wintr v1.145.0 h1:ObAgnS2Mqc2tbSM1pRpmi4dGUtkD/T+lacpaq+Ri8Mw=
github.com/ice-blockchain/wintr v1.145.0/go.mod h1:CgbY1UEyIx//+tM57Hz72Jd28UOs5tpOlJOByyQhi2c=
github.com/imroc/req/v3 v3.43.7 h1:dOcNb9n0X83N5/5/AOkiU+cLhzx8QFXjv5MhikazzQA=
github.com/imroc/req/v3 v3.43.7/go.mod h1:SQIz5iYop16MJxbo8ib+4LnostGCok8NQf8ToyQc2xA=
github.com/ip2location/ip2location-go/v9 v9.7.0 h1:ipwl67HOWcrw+6GOChkEXcreRQR37NabqBd2ayYa4Q0=
Expand Down Expand Up @@ -403,8 +403,8 @@ github.com/quic-go/quic-go v0.45.1 h1:tPfeYCk+uZHjmDRwHHQmvHRYL2t44ROTujLeFVBmjC
github.com/quic-go/quic-go v0.45.1/go.mod h1:1dLehS7TIR64+vxGR70GDcatWTOtMX2PUtnKsjbTurI=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/redis/go-redis/v9 v9.5.3 h1:fOAp1/uJG+ZtcITgZOfYFmTKPE7n4Vclj1wZFgRciUU=
github.com/redis/go-redis/v9 v9.5.3/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/redis/go-redis/v9 v9.5.4 h1:vOFYDKKVgrI5u++QvnMT7DksSMYg7Aw/Np4vLJLKLwY=
github.com/redis/go-redis/v9 v9.5.4/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/refraction-networking/utls v1.6.7 h1:zVJ7sP1dJx/WtVuITug3qYUq034cDq9B2MR1K67ULZM=
github.com/refraction-networking/utls v1.6.7/go.mod h1:BC3O4vQzye5hqpmDTWUqi4P5DDhzJfkV1tdqtawQIH0=
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
Expand Down
14 changes: 14 additions & 0 deletions notifications/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ const (
WeeklyStatsNotificationType NotificationType = "weekly_stats"
ReplyNotificationType NotificationType = "reply"
SocialsNotificationType NotificationType = "socials"
StartCommandNotificationType NotificationType = "start"
)

const (
StartTelegramCommand TelegramCommand = "/start"
SocialsTelegramCommand TelegramCommand = "/socials"
)

var (
Expand Down Expand Up @@ -129,6 +135,7 @@ var (
NewReferralNotificationType,
SocialsNotificationType,
ReplyNotificationType,
StartCommandNotificationType,
}
//nolint:gochecknoglobals // It's just for more descriptive validation messages.
AllNotificationDomains = map[NotificationChannel][]NotificationDomain{
Expand Down Expand Up @@ -160,6 +167,7 @@ type (
NotificationChannel string
NotificationDomain string
NotificationType string
TelegramCommand string
NotificationChannels struct {
NotificationChannels *users.Enum[NotificationChannel] `json:"notificationChannels,omitempty" swaggertype:"array,string" enums:"inapp,sms,email,push,push||email"` //nolint:lll // .
}
Expand Down Expand Up @@ -238,6 +246,11 @@ var (
internationalizedEmailDisplayNames = map[string]string{
"en": "ice: Decentralized Future",
}
//nolint:gochecknoglobals // It's just for more descriptive commands names.
availableBotCommands = []TelegramCommand{
StartTelegramCommand,
SocialsTelegramCommand,
}
)

type (
Expand Down Expand Up @@ -359,6 +372,7 @@ type (
}
telegramUserInfo struct {
UserID string `json:"userId,omitempty"`
Username string `json:"username,omitempty"`
TelegramUserID string `json:"telegramUserId,omitempty"`
Language string `json:"language,omitempty"`
}
Expand Down
1 change: 1 addition & 0 deletions notifications/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func StartProcessor(ctx context.Context, cancel context.CancelFunc) Processor {
prc.shutdown = closeAll(mbConsumer, prc.mb, prc.db, prc.pushNotificationsClient.Close, prc.freezerDB.Close)
go prc.startOldSentNotificationsCleaner(ctx)
go prc.startOldSentAnnouncementsCleaner(ctx)
go prc.startGetUpdatesTelegramLongPolling(ctx)

return prc
}
Expand Down
19 changes: 0 additions & 19 deletions notifications/push_notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,25 +168,6 @@ func (r *repository) sendPushNotification(ctx context.Context, pn *pushNotificat
return nil
}

func (r *repository) sendTelegramNotification(ctx context.Context, tn *telegramNotification) error {
if ctx.Err() != nil {
return errors.Wrap(ctx.Err(), "unexpected deadline")
}
if err := r.insertSentNotification(ctx, tn.sn); err != nil {
return errors.Wrapf(err, "failed to insert %#v", tn.sn)
}
responder := make(chan error, 1)
defer close(responder)
if err := r.telegramNotificationsClient.Send(ctx, tn.tn); err != nil {
return multierror.Append( //nolint:wrapcheck // Not needed.
errors.Wrapf(err, "failed to send telegram notification:%#v, desired to be sent:%#v", tn.sn, tn.sn),
errors.Wrapf(r.deleteSentNotification(ctx, tn.sn), "failed to delete SENT_NOTIFICATIONS as a rollback for %#v", tn.sn),
).ErrorOrNil()
}

return nil
}

func (r *repository) clearInvalidPushNotificationToken(ctx context.Context, userID string, token push.DeviceToken) error {
if ctx.Err() != nil {
return errors.Wrap(ctx.Err(), "unexpected deadline")
Expand Down
3 changes: 0 additions & 3 deletions notifications/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@ func MustStartScheduler(ctx context.Context, cancel context.CancelFunc) *Schedul
schedulerTelegramNotificationsMX: &sync.Mutex{},
}
go sh.startWeeklyStatsUpdater(ctx)
if false {
go sh.startGetUpdatesTelegramLongPolling(ctx)
}
sh.wg = new(sync.WaitGroup)
sh.wg.Add(3 * int(schedulerWorkersCount)) //nolint:gomnd,mnd // .
sh.cancel = cancel
Expand Down
6 changes: 4 additions & 2 deletions notifications/scheduler_push_notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,16 @@ func (s *Scheduler) runPushNotificationsProcessor(ctx context.Context, workerNum
******************************************************************************************************************************************************/
if eErr := runConcurrentlyBatch(ctx, s.sendPushNotification, toSendPushNotifications, func(arg *pushNotification, err error) {
if errors.Is(err, push.ErrInvalidDeviceToken) {
log.Error(errors.Wrapf(err, "invalid token:%v for:%v", arg.pn.Target, arg.sn.UserID))

s.schedulerPushNotificationsMX.Lock()
invalidTokens = append(invalidTokens, &invalidToken{
UserID: arg.sn.UserID,
Token: arg.pn.Target,
})
s.schedulerPushNotificationsMX.Unlock()
} else {
log.Error(errors.Wrapf(err, "can't send notification for:%v", arg.sn.UserID))
log.Error(errors.Wrapf(err, "can't send notification:%+v", arg))
}
}, func(arg *pushNotification) {
s.schedulerPushNotificationsMX.Lock()
Expand Down Expand Up @@ -193,7 +195,7 @@ func (s *Scheduler) sendPushNotification(ctx context.Context, pn *pushNotificati
defer close(responder)
s.pushNotificationsClient.Send(ctx, pn.pn, responder)

return errors.Wrapf(<-responder, "can't send push notification for pn:%#v", pn)
return errors.Wrapf(<-responder, "can't send push notification for pn:%+v", pn)
}

func (s *Scheduler) clearInvalidPushNotificationTokens(ctx context.Context, pnInvalidTokens []*invalidToken) error {
Expand Down
Loading

0 comments on commit f49eaa6

Please sign in to comment.