Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add delete stale watchers functionality #38

Merged
merged 9 commits into from
Dec 4, 2023
18 changes: 18 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"log"
"os/signal"
"syscall"
"time"

"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/cors"
Expand Down Expand Up @@ -102,6 +103,23 @@ func main() {
ServerHeader: "AIRDAO-Mobile-Api", // add custom server header
}

// Run DeleteWatchersWithStaleData on start for check and delete stale data
if err := watcherService.DeleteWatchersWithStaleData(context.Background()); err != nil {
zapLogger.Errorf("failed to delete watchers with stale data - %v", err)
}

// Run DeleteWatchersWithStaleData every 24 hours for check and delete stale data
go func() {
for {
err := watcherService.DeleteWatchersWithStaleData(context.Background())
if err != nil {
zapLogger.Errorf("failed to delete watchers with stale data - %v", err)
}

time.Sleep(24 * time.Hour)
}
}()

// Create fiber app
app := fiber.New(config)

Expand Down
58 changes: 56 additions & 2 deletions services/watcher/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package watcher
import (
"context"
"errors"
"fmt"
"time"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
Expand All @@ -13,11 +15,13 @@ import (
//go:generate mockgen -source=repository.go -destination=mocks/repository_mock.go
type Repository interface {
GetWatcher(ctx context.Context, filters bson.M) (*Watcher, error)
GetAllWatchers(ctx context.Context) ([]*Watcher, error)
GetWatcherList(ctx context.Context, filters bson.M, page int) ([]*Watcher, error)

CreateWatcher(ctx context.Context, watcher *Watcher) error
UpdateWatcher(ctx context.Context, watcher *Watcher) error
DeleteWatcher(ctx context.Context, filters bson.M) error
DeleteWatchersWithStaleData(ctx context.Context) error
}

type repository struct {
Expand Down Expand Up @@ -58,6 +62,56 @@ func (r *repository) GetWatcher(ctx context.Context, filters bson.M) (*Watcher,
return &watcher, nil
}

func (r *repository) GetAllWatchers(ctx context.Context) ([]*Watcher, error) {
collection := r.db.Database(r.dbName).Collection(r.dbCollectionName)

filter := bson.D{{}}

cursor, err := collection.Find(ctx, filter)
if err != nil {
r.logger.Errorf("unable to find watcher due to internal error: %v", err)
return nil, nil
}

defer cursor.Close(ctx)

var watchers []*Watcher
for cursor.Next(ctx) {
watcher := new(Watcher)
if err := cursor.Decode(watcher); err != nil {
r.logger.Errorf("Unable to decode watcher document: %v", err)
return nil, nil
}
watchers = append(watchers, watcher)
}

if err := cursor.Err(); err != nil {
r.logger.Errorf("cursor iteration error: %v", err)
return nil, nil
}

return watchers, nil
}

func (r *repository) DeleteWatchersWithStaleData(ctx context.Context) error {
watchers, err := r.GetAllWatchers(context.Background())
if err != nil {
return err
}

for _, watcher := range watchers {
if watcher.LastFailDate.Before(watcher.LastSuccessDate.Add(-7 * 24 * time.Hour)) {
filter := bson.M{"_id": watcher.ID}
if err := r.DeleteWatcher(ctx, filter); err != nil {
return err
}
fmt.Printf("Watcher with ID %s deleted due to stale data\n", watcher.ID.Hex())
}
}

return nil
}

func (r *repository) GetWatcherList(ctx context.Context, filters bson.M, page int) ([]*Watcher, error) {
pageSize := 100

Expand All @@ -74,12 +128,12 @@ func (r *repository) GetWatcherList(ctx context.Context, filters bson.M, page in

var watchers []*Watcher
for cur.Next(ctx) {
var watcher Watcher
watcher := new(Watcher)
if err := cur.Decode(&watcher); err != nil {
r.logger.Errorf("unable to decode watcher document: %v", err)
return nil, err
}
watchers = append(watchers, &watcher)
watchers = append(watchers, watcher)
}

// Check for any errors that occurred during iteration
Expand Down
35 changes: 34 additions & 1 deletion services/watcher/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Service interface {
UpdateWatcher(ctx context.Context, pushToken string, addresses *[]string, threshold *float64, txNotification, priceNotification *string) error
DeleteWatcher(ctx context.Context, pushToken string) error
DeleteWatcherAddresses(ctx context.Context, pushToken string, addresses []string) error
DeleteWatchersWithStaleData(ctx context.Context) error
UpdateWatcherPushToken(ctx context.Context, olpPushToken string, newPushToken string) error
}

Expand Down Expand Up @@ -298,6 +299,8 @@ func (s *service) PriceWatch(ctx context.Context, watcherId string, stopChan cha
if err != nil {
s.logger.Errorf("PriceWatch (Up) cloudMessagingSvc.SendMessage error %v\n", err)
if err.Error() == "http error status: 404; reason: app instance has been unregistered; code: registration-token-not-registered; details: Requested entity was not found." {
// Set date of fail and remove watcher if success date more than 7 days earlier than this date
watcher.SetLastFailDate(time.Now())
return
}
}
Expand All @@ -306,6 +309,8 @@ func (s *service) PriceWatch(ctx context.Context, watcherId string, stopChan cha
sent = true
}

// Set date of success to compare with date of fail
watcher.SetLastSuccessDate(time.Now())
watcher.AddNotification(title, body, sent, time.Now())
}

Expand All @@ -327,6 +332,8 @@ func (s *service) PriceWatch(ctx context.Context, watcherId string, stopChan cha
if err != nil {
s.logger.Errorf("PriceWatch (Down) cloudMessagingSvc.SendMessage error %v\n", err)
if err.Error() == "http error status: 404; reason: app instance has been unregistered; code: registration-token-not-registered; details: Requested entity was not found." {
// Set date of fail and remove watcher if success date more than 7 days earlier than this date
watcher.SetLastFailDate(time.Now())
return
}
}
Expand All @@ -335,6 +342,8 @@ func (s *service) PriceWatch(ctx context.Context, watcherId string, stopChan cha
sent = true
}

// Set date of success to compare with date of fail
watcher.SetLastSuccessDate(time.Now())
watcher.AddNotification(title, body, sent, time.Now())
}

Expand Down Expand Up @@ -419,8 +428,11 @@ func (s *service) TransactionWatch(ctx context.Context, address string, txHash s
if err != nil {
s.logger.Errorf("TransactionWatch cloudMessagingSvc.SendMessage error %v\n", err)
if err.Error() == "http error status: 404; reason: app instance has been unregistered; code: registration-token-not-registered; details: Requested entity was not found." {
// Set date of fail and remove watcher if success date more than 7 days earlier than this date
watcher.SetLastFailDate(time.Now())

s.mx.RLock()
watchers.Remove(watcher.PushToken) //TODO: check if this is needed
watchers.Remove(watcher.PushToken)
s.mx.RUnlock()
}
}
Expand All @@ -429,6 +441,8 @@ func (s *service) TransactionWatch(ctx context.Context, address string, txHash s
sent = true
}

// Set date of success to compare with date of fail
watcher.SetLastSuccessDate(time.Now())
watcher.AddNotification(title, body, sent, time.Now())

cache[itemId] = true
Expand Down Expand Up @@ -691,6 +705,25 @@ func (s *service) DeleteWatcherAddresses(ctx context.Context, pushToken string,
return nil
}

func (s *service) DeleteWatchersWithStaleData(ctx context.Context) error {
if err := s.repository.DeleteWatchersWithStaleData(ctx); err != nil {
s.logger.Errorf("DeleteWatchersWithStaleData repository.DeleteWatchersWithStaleData error %v\n", err)
return err
}

// Delete from cache
s.mx.RLock()
for pushToken, watcher := range s.cachedWatcher {
if watcher.LastFailDate.Before(watcher.LastSuccessDate.Add(-7 * 24 * time.Hour)) {
delete(s.cachedWatcher, pushToken)
delete(s.cachedChan, pushToken)
}
}
s.mx.RUnlock()

return nil
}

func (s *service) UpdateWatcherPushToken(ctx context.Context, olpPushToken string, newPushToken string) error {
encodePushToken := base64.StdEncoding.EncodeToString([]byte(olpPushToken))

Expand Down
16 changes: 16 additions & 0 deletions services/watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ type Watcher struct {

HistoricalNotifications *[]*HistoryNotification `json:"historical_notifications" bson:"historical_notifications"`

LastSuccessDate time.Time `json:"last_success_date" bson:"last_success_date"`
LastFailDate time.Time `json:"last_fail_date" bson:"last_fail_date"`

CreatedAt time.Time `json:"created_at" bson:"created_at"`
UpdatedAt time.Time `json:"updated_at" bson:"updated_at"`
}
Expand All @@ -54,6 +57,9 @@ func NewWatcher(pushToken string) (*Watcher, error) {
Addresses: nil,
HistoricalNotifications: nil,

LastSuccessDate: time.Time{},
LastFailDate: time.Time{},

CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}, nil
Expand Down Expand Up @@ -130,6 +136,16 @@ func (w *Watcher) SetPriceNotification(v string) {
w.UpdatedAt = time.Now()
}

func (w *Watcher) SetLastFailDate(date time.Time) {
w.LastFailDate = date
w.UpdatedAt = time.Now()
}

func (w *Watcher) SetLastSuccessDate(date time.Time) {
w.LastSuccessDate = date
w.UpdatedAt = time.Now()
}

func (w *Watcher) SetPushToken(v string) {
w.PushToken = v
w.UpdatedAt = time.Now()
Expand Down