Skip to content

Commit

Permalink
Add timeouts to yugabytedb operations
Browse files Browse the repository at this point in the history
  • Loading branch information
AchoArnold committed Dec 8, 2023
1 parent 37cae32 commit 033732a
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 8 deletions.
16 changes: 8 additions & 8 deletions api/pkg/di/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,14 +239,14 @@ func (container *Container) YugaByteDB() (db *gorm.DB) {
sql.SetMaxIdleConns(3)
sql.SetConnMaxLifetime(time.Minute * 10)

container.logger.Debug(fmt.Sprintf("Running migrations for yugabyte [%T]", db))
if err = db.AutoMigrate(&entities.Heartbeat{}); err != nil {
container.logger.Fatal(stacktrace.Propagate(err, fmt.Sprintf("cannot migrate %T", &entities.Heartbeat{})))
}

if err = db.AutoMigrate(&entities.HeartbeatMonitor{}); err != nil {
container.logger.Fatal(stacktrace.Propagate(err, fmt.Sprintf("cannot migrate %T", &entities.HeartbeatMonitor{})))
}
//container.logger.Debug(fmt.Sprintf("Running migrations for yugabyte [%T]", db))
//if err = db.AutoMigrate(&entities.Heartbeat{}); err != nil {
// container.logger.Fatal(stacktrace.Propagate(err, fmt.Sprintf("cannot migrate %T", &entities.Heartbeat{})))
//}
//
//if err = db.AutoMigrate(&entities.HeartbeatMonitor{}); err != nil {
// container.logger.Fatal(stacktrace.Propagate(err, fmt.Sprintf("cannot migrate %T", &entities.HeartbeatMonitor{})))
//}

container.yugaByteDB = db
return container.yugaByteDB
Expand Down
18 changes: 18 additions & 0 deletions api/pkg/repositories/gorm_heartbeat_monitor_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ func (repository *gormHeartbeatMonitorRepository) UpdateQueueID(ctx context.Cont
ctx, span := repository.tracer.Start(ctx)
defer span.End()

ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()

err := repository.db.
Model(&entities.HeartbeatMonitor{}).
Where("id = ?", monitorID).
Expand All @@ -45,6 +48,9 @@ func (repository *gormHeartbeatMonitorRepository) Delete(ctx context.Context, us
ctx, span := repository.tracer.Start(ctx)
defer span.End()

ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()

err := repository.db.WithContext(ctx).
Where("user_id = ?", userID).
Where("owner = ?", owner).
Expand Down Expand Up @@ -75,6 +81,9 @@ func (repository *gormHeartbeatMonitorRepository) Index(ctx context.Context, use
ctx, span := repository.tracer.Start(ctx)
defer span.End()

ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()

query := repository.db.WithContext(ctx).Where("user_id = ?", userID).Where("owner = ?", owner)
heartbeats := new([]entities.Heartbeat)
if err := query.Order("timestamp DESC").Limit(params.Limit).Offset(params.Skip).Find(&heartbeats).Error; err != nil {
Expand All @@ -90,6 +99,9 @@ func (repository *gormHeartbeatMonitorRepository) Store(ctx context.Context, hea
ctx, span := repository.tracer.Start(ctx)
defer span.End()

ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()

if err := repository.db.WithContext(ctx).Create(heartbeatMonitor).Error; err != nil {
msg := fmt.Sprintf("cannot save heartbeatMonitor monitor with ID [%s]", heartbeatMonitor.ID)
return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
Expand All @@ -103,6 +115,9 @@ func (repository *gormHeartbeatMonitorRepository) Load(ctx context.Context, user
ctx, span := repository.tracer.Start(ctx)
defer span.End()

ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()

phone := new(entities.HeartbeatMonitor)
err := repository.db.WithContext(ctx).
Where("user_id = ?", userID).
Expand All @@ -127,6 +142,9 @@ func (repository *gormHeartbeatMonitorRepository) Exists(ctx context.Context, us
ctx, span := repository.tracer.Start(ctx)
defer span.End()

ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()

var exists bool
err := repository.db.WithContext(ctx).
Model(&entities.HeartbeatMonitor{}).
Expand Down
10 changes: 10 additions & 0 deletions api/pkg/repositories/gorm_heartbeat_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package repositories
import (
"context"
"fmt"
"time"

"github.com/pkg/errors"

Expand Down Expand Up @@ -36,6 +37,9 @@ func (repository *gormHeartbeatRepository) Last(ctx context.Context, userID enti
ctx, span := repository.tracer.Start(ctx)
defer span.End()

ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()

heartbeat := new(entities.Heartbeat)
err := repository.db.WithContext(ctx).
Where("user_id = ?", userID).
Expand All @@ -60,6 +64,9 @@ func (repository *gormHeartbeatRepository) Index(ctx context.Context, userID ent
ctx, span := repository.tracer.Start(ctx)
defer span.End()

ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()

query := repository.db.WithContext(ctx).Where("user_id = ?", userID).Where("owner = ?", owner)
if len(params.Query) > 0 {
queryPattern := "%" + params.Query + "%"
Expand All @@ -80,6 +87,9 @@ func (repository *gormHeartbeatRepository) Store(ctx context.Context, heartbeat
ctx, span := repository.tracer.Start(ctx)
defer span.End()

ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()

if err := repository.db.WithContext(ctx).Create(heartbeat).Error; err != nil {
msg := fmt.Sprintf("cannot save heartbeat with ID [%s]", heartbeat.ID)
return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
Expand Down

0 comments on commit 033732a

Please sign in to comment.