Skip to content

Commit

Permalink
refactor/fix the repository and migration
Browse files Browse the repository at this point in the history
  • Loading branch information
OlehParyshkura2 committed Apr 16, 2024
1 parent e2ceac1 commit 401fa1d
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 170 deletions.
22 changes: 15 additions & 7 deletions services/migration/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,28 @@ func historicalNotificationMigration(db *mongo.Client, dbName string, logger *za
return nil
}

// Create index { watcher_id: 1, notification.timestamp: -1 }

indexModel := mongo.IndexModel{
Keys: bson.D{
{Key: "watcher_id", Value: 1},
{Key: "notification.timestamp", Value: -1},
{Key: "timestamp", Value: -1},
},
}
_, err := historyCollection.Indexes().CreateOne(context.Background(), indexModel)
if err != nil {
return err
}

pushTokenIndex := mongo.IndexModel{
Keys: bson.D{
{Key: "push_token", Value: 1},
},
}

_, err = watcherCollection.Indexes().CreateOne(context.Background(), pushTokenIndex)
if err != nil {
return err
}

cursor, err := watcherCollection.Find(context.Background(), bson.M{})
if err != nil {
return err
Expand All @@ -81,10 +90,9 @@ func historicalNotificationMigration(db *mongo.Client, dbName string, logger *za

// Iterate over historical notifications only if it's not nil
for _, notification := range *watcher.HistoricalNotifications {
_, err := historyCollection.InsertOne(context.Background(), &HistoryNotificationDocument{
WatcherID: watcher.ID,
Notification: notification,
})
notification.ID = primitive.NewObjectID()
notification.WatcherID = watcher.ID
_, err := historyCollection.InsertOne(context.Background(), notification)
if err != nil {
return err
}
Expand Down
234 changes: 76 additions & 158 deletions services/watcher/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ package watcher
import (
"context"
"errors"
"runtime"
"time"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
Expand All @@ -28,12 +26,6 @@ type Repository interface {
DeleteWatchersWithStaleData(ctx context.Context) error
}

type HistoryNotificationDocument struct {
ID primitive.ObjectID `json:"id" bson:"_id"`
WatcherID primitive.ObjectID `json:"watcher_id" bson:"watcher_id"`
Notification *HistoryNotification `json:"notification" bson:"notification"`
}

type repository struct {
db *mongo.Client
dbName string
Expand All @@ -58,154 +50,77 @@ func NewRepository(db *mongo.Client, dbName string, logger *zap.SugaredLogger) (

func (r *repository) GetWatcher(ctx context.Context, filters bson.M) (*Watcher, error) {
r.logger.Info("GetWatcher is called")

collection := r.db.Database(r.dbName).Collection(r.watchersCollectionName)

var watcher Watcher
if err := r.db.Database(r.dbName).Collection(r.watchersCollectionName).FindOne(ctx, filters).Decode(&watcher); err != nil {
err := collection.FindOne(ctx, filters).Decode(&watcher)
if err != nil {
if err == mongo.ErrNoDocuments {
r.logger.Errorf("unable to find watcher: %v", err)
// return nil, errors.New("watcher not found")
r.logger.Errorf("watcher not found")
return nil, nil
}

r.logger.Errorf("unable to find watcher due to internal error: %v", err)
return nil, err
}
r.logger.Info(" GetWatcher got watcher, now fetching history data")

if err := r.attachHistory(ctx, &watcher); err != nil {
r.logger.Errorf("unable to fetch and set history notifications: %v", err)
return nil, err
}
r.logger.Info("GetWatcher got watcher history data")

return &watcher, nil
}

func (r *repository) GetAllWatchers(ctx context.Context) ([]*Watcher, error) {
r.logger.Info("GetAllWatchers is called")
var stats runtime.MemStats
runtime.ReadMemStats(&stats)
r.logger.Infof("Allocated memory before getAll: %d MB", stats.Alloc/1024/1024)
//time.Sleep(time.Second)

collection := r.db.Database(r.dbName).Collection(r.watchersCollectionName)

filter := bson.D{{}}

cursor, err := collection.Find(ctx, filter)
if err != nil {
r.logger.Errorf("unable to find watcher due to internal error: %v", err)
return nil, nil
}

r.logger.Info("GetAllWatchers got all watchers")

defer cursor.Close(ctx)

var watchers []*Watcher
for cursor.Next(ctx) {
watcher := new(Watcher)
if err := cursor.Decode(watcher); err != nil {
r.logger.Errorf("Unable to decode watcher document: %v", err)
return nil, nil
page := 1
watchers := make([]*Watcher, 0)
for {
watchersPage, err := r.GetWatcherList(ctx, bson.M{}, page)
if err != nil {
r.logger.Errorf("unable to get GetAllWatchers watchers: %v page %v", err, page)
return nil, err
}
watchers = append(watchers, watcher)
}

// Check for any errors that occurred during iteration
if err := cursor.Err(); err != nil {
r.logger.Errorf("cursor iteration error: %v", err)
return nil, nil
}

// attach notifications history to each watcher
for _, watcher := range watchers {
if err := r.attachHistory(ctx, watcher); err != nil {
r.logger.Errorf("unable to fetch and set history notifications: %v", err)
return nil, nil
if len(watchersPage) == 0 {
break
}
r.logger.Info("GetAllWatchers got all watchers history data")

var stats2 runtime.MemStats
runtime.ReadMemStats(&stats2)
r.logger.Infof("Allocated memory while attachHistory getAll: %d MB", stats2.Alloc/1024/1024)
watchers = append(watchers, watchersPage...)
page++
}

return watchers, nil
}

// Fetches and assigns historical notifications to the watcher.
func (r *repository) attachHistory(ctx context.Context, watcher *Watcher) error {
r.logger.Info("attachHistory is called")
historyNotifications := make([]*HistoryNotification, 0)

sortField := bson.E{
Key: "notification.timestamp",
Value: -1,
}
sortCriteria := bson.D{sortField}

cursor, err := r.db.Database(r.dbName).Collection(r.historyNotificationCollectionName).Find(ctx, bson.M{"watcher_id": watcher.ID}, options.Find().SetSort(sortCriteria))
func (r *repository) DeleteWatchersWithStaleData(ctx context.Context) error {
r.logger.Info("DeleteWatchersWithStaleData is called")
watchers, err := r.GetAllWatchers(ctx)
if err != nil {
r.logger.Errorf("unable to find history notifications due to internal error: %v", err)
return err
}
defer cursor.Close(ctx)

for cursor.Next(ctx) {
// fetch history notification document, set notification field in HistoryNotification struct
historyNotificationDocument := new(HistoryNotificationDocument)
if err := cursor.Decode(historyNotificationDocument); err != nil {
r.logger.Errorf("Unable to decode history notification document: %v", err)
return err
}

historyNotifications = append(historyNotifications, historyNotificationDocument.Notification)
r.logger.Errorf("unable to get all watchers: %v", err)
}
r.logger.Info("attachHistory got history notifications")

if err := cursor.Err(); err != nil {
r.logger.Errorf("cursor iteration error: %v", err)
return err
}

if len(historyNotifications) > 0 {
watcher.HistoricalNotifications = &historyNotifications
if watchers == nil {
r.logger.Info("no watchers found")
return nil
}

return nil
}

func (r *repository) DeleteWatchersWithStaleData(ctx context.Context) error {
page := 1
for {
watchers, err := r.GetWatcherList(ctx, bson.M{}, page)
if err != nil {
return err
}

if len(watchers) == 0 {
break
}

for _, watcher := range watchers {
r.logger.Info("checking watcher")
if watcher.LastFailDate.Before(watcher.LastSuccessDate.Add(-7 * 24 * time.Hour)) {
r.logger.Info("Delete watcher with ID %s \n", watcher.ID.Hex())
filter := bson.M{"_id": watcher.ID}
if err := r.DeleteWatcher(ctx, filter); err != nil {
return err
}
r.logger.Info("Watcher with ID %s deleted due to stale data\n", watcher.ID.Hex())
for _, watcher := range watchers {
r.logger.Info("checking watcher")
if watcher.LastFailDate.Before(watcher.LastSuccessDate.Add(-7 * 24 * time.Hour)) {
r.logger.Infof("Delete watcher with ID %s \n", watcher.ID.Hex())
filter := bson.M{"_id": watcher.ID}
if err := r.DeleteWatcher(ctx, filter); err != nil {
return err
}
r.logger.Infof("Watcher with ID %s deleted due to stale data\n", watcher.ID.Hex())
}

page++
}

return nil
}

func (r *repository) GetWatcherList(ctx context.Context, filters bson.M, page int) ([]*Watcher, error) {
r.logger.Info("GetWatcherList is called")
r.logger.Info("GetPaginatedWatchers is called")

pageSize := 100

skip := (page - 1) * pageSize
Expand All @@ -214,41 +129,51 @@ func (r *repository) GetWatcherList(ctx context.Context, filters bson.M, page in

cur, err := r.db.Database(r.dbName).Collection(r.watchersCollectionName).Find(ctx, filters, findOptions)
if err != nil {
r.logger.Errorf("unable to find watcher due to internal error: %v", err)
r.logger.Errorf("unable to find watchers due to internal error: %v", err)
return nil, err
}
defer cur.Close(ctx)

r.logger.Info("GetWatcherList got all watchers, now fetching history data")
var watchers []*Watcher
for cur.Next(ctx) {
watcher := new(Watcher)
if err := cur.Decode(&watcher); err != nil {
r.logger.Errorf("unable to decode watcher document: %v", err)
return nil, err
}

watchers = append(watchers, watcher)
}

// Check for any errors that occurred during iteration
if err := cur.Err(); err != nil {
r.logger.Errorf("cursor iteration error: %v", err)
if err := cur.All(ctx, &watchers); err != nil {
r.logger.Errorf("unable to decode watchers: %v", err)
return nil, err
}

// attach notifications history to each watcher
for _, watcher := range watchers {
if err := r.attachHistory(ctx, watcher); err != nil {
r.logger.Errorf("unable to fetch and set history notifications: %v", err)
return nil, err
}
r.logger.Info("GetWatcherList got all watchers history data")
r.logger.Info("GetWatcherList got watcher history data")
}

return watchers, nil
}

func (r *repository) attachHistory(ctx context.Context, watcher *Watcher) error {
r.logger.Info("attachHistory is called")
sortField := bson.E{
Key: "timestamp",
Value: -1,
}
sortCriteria := bson.D{sortField}

cursor, err := r.db.Database(r.dbName).Collection(r.historyNotificationCollectionName).Find(ctx, bson.M{"watcher_id": watcher.ID}, options.Find().SetSort(sortCriteria))
if err != nil {
r.logger.Errorf("unable to find history notifications due to internal error: %v", err)
return err
}
defer cursor.Close(ctx)
var historyNotifications []*HistoryNotification
if err := cursor.All(ctx, &historyNotifications); err != nil {
r.logger.Errorf("unable to decode watchers: %v", err)
return err
}
watcher.HistoricalNotifications = &historyNotifications

return nil
}

func (r *repository) CreateWatcher(ctx context.Context, watcher *Watcher) error {
r.logger.Info("CreateWatcher is called")
historicalNotifications := watcher.HistoricalNotifications
Expand All @@ -270,12 +195,8 @@ func (r *repository) CreateWatcher(ctx context.Context, watcher *Watcher) error
// add new history notifications to history notifications collection
if historicalNotifications != nil {
for _, historyNotification := range *historicalNotifications {
historyNotificationDocument := &HistoryNotificationDocument{
ID: primitive.NewObjectID(),
WatcherID: watcher.ID,
Notification: historyNotification,
}
_, err := r.db.Database(r.dbName).Collection(r.historyNotificationCollectionName).InsertOne(ctx, historyNotificationDocument)
historyNotification.WatcherID = watcher.ID
_, err := r.db.Database(r.dbName).Collection(r.historyNotificationCollectionName).InsertOne(ctx, historyNotification)
if err != nil {
r.logger.Errorf("failed to insert history notification to db: %s", err)
return errors.New("failed to create history notification")
Expand Down Expand Up @@ -334,22 +255,19 @@ func (r *repository) UpdateWatcher(ctx context.Context, watcher *Watcher) error

r.logger.Info("UpdateWatcher deleted history notifications")

// add updated history notifications to history notifications collection
if historicalNotifications != nil {
for _, historyNotification := range *historicalNotifications {
historyNotificationDocument := &HistoryNotificationDocument{
ID: primitive.NewObjectID(),
WatcherID: watcher.ID,
Notification: historyNotification,
}
_, err := r.db.Database(r.dbName).Collection(r.historyNotificationCollectionName).InsertOne(ctx, historyNotificationDocument)
if err != nil {
r.logger.Errorf("failed to insert history notification to db: %s", err)
return errors.New("failed to create history notification")
}
}
if historicalNotifications == nil || len(*historicalNotifications) == 0 {
return nil
}
docs := make([]any, len(*historicalNotifications))
for i, v := range *historicalNotifications {
docs[i] = v
}

_, err = r.db.Database(r.dbName).Collection(r.historyNotificationCollectionName).InsertMany(ctx, docs)
if err != nil {
r.logger.Errorf("failed to insert history notifications: %v", err)
return err
}
r.logger.Info("UpdateWatcher inserted history data")

return nil
Expand Down
Loading

0 comments on commit 401fa1d

Please sign in to comment.