Skip to content

Commit

Permalink
fix up tests and organize
Browse files Browse the repository at this point in the history
  • Loading branch information
bgentry committed Sep 20, 2024
1 parent eeecef5 commit ab57b0f
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 46 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `rivertest.WorkContext`, a test function that can be used to initialize a context to test a `JobArgs.Work` implementation that will have a client set to context for use with `river.ClientFromContext`. [PR #526](https://github.com/riverqueue/river/pull/526).
- A new `river migrate-list` command is available which lists available migrations and which version a target database is migrated to. [PR #534](https://github.com/riverqueue/river/pull/534).
- `river version` or `river --version` now prints River version information. [PR #537](https://github.com/riverqueue/river/pull/537).
- `Config.JobCleanerTimeout` was added to allow configuration of the job cleaner query timeout. In some deployments with millions of stale jobs, the cleaner may not be able to complete its query within the default 30 seconds.

### Changed

Expand Down
14 changes: 8 additions & 6 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,6 @@ type Config struct {
// Defaults to 7 days.
DiscardedJobRetentionPeriod time.Duration

// JobCleanerTimeout is the timeout of the job cleaner's runner.
//
// Defaults to 30 seconds.
JobCleanerTimeout time.Duration

// ErrorHandler can be configured to be invoked in case of an error or panic
// occurring in a job. This is often useful for logging and exception
// tracking, but can also be used to customize retry behavior.
Expand Down Expand Up @@ -136,6 +131,13 @@ type Config struct {
// If in doubt, leave this property empty.
ID string

// JobCleanerTimeout is the timeout of the individual queries within the job
// cleaner.
//
// Defaults to 30 seconds, which should be more than enough time for most
// deployments.
JobCleanerTimeout time.Duration

// JobTimeout is the maximum amount of time a job is allowed to run before its
// context is cancelled. A timeout of zero means JobTimeoutDefault will be
// used, whereas a value of -1 means the job's context will not be cancelled
Expand Down Expand Up @@ -563,7 +565,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
CancelledJobRetentionPeriod: config.CancelledJobRetentionPeriod,
CompletedJobRetentionPeriod: config.CompletedJobRetentionPeriod,
DiscardedJobRetentionPeriod: config.DiscardedJobRetentionPeriod,
JobCleanerTimeout: config.JobCleanerTimeout,
Timeout: config.JobCleanerTimeout,
}, driver.GetExecutor())
maintenanceServices = append(maintenanceServices, jobCleaner)
client.testSignals.jobCleaner = &jobCleaner.TestSignals
Expand Down
1 change: 1 addition & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4627,6 +4627,7 @@ func Test_NewClient_Defaults(t *testing.T) {
require.Equal(t, maintenance.CancelledJobRetentionPeriodDefault, jobCleaner.Config.CancelledJobRetentionPeriod)
require.Equal(t, maintenance.CompletedJobRetentionPeriodDefault, jobCleaner.Config.CompletedJobRetentionPeriod)
require.Equal(t, maintenance.DiscardedJobRetentionPeriodDefault, jobCleaner.Config.DiscardedJobRetentionPeriod)
require.Equal(t, maintenance.JobCleanerTimeoutDefault, jobCleaner.Config.Timeout)
require.False(t, jobCleaner.StaggerStartupIsDisabled())

enqueuer := maintenance.GetService[*maintenance.PeriodicJobEnqueuer](client.queueMaintainer)
Expand Down
11 changes: 7 additions & 4 deletions internal/maintenance/job_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ type JobCleanerConfig struct {
// Interval is the amount of time to wait between runs of the cleaner.
Interval time.Duration

// JobCleanerTimeout is the timeout of the job cleaner runner.
JobCleanerTimeout time.Duration
// Timeout of the individual queries in the job cleaner.
Timeout time.Duration
}

func (c *JobCleanerConfig) mustValidate() *JobCleanerConfig {
Expand All @@ -67,6 +67,9 @@ func (c *JobCleanerConfig) mustValidate() *JobCleanerConfig {
if c.Interval <= 0 {
panic("JobCleanerConfig.Interval must be above zero")
}
if c.Timeout <= 0 {
panic("JobCleanerConfig.Timeout must be above zero")
}

return c
}
Expand All @@ -92,7 +95,7 @@ func NewJobCleaner(archetype *baseservice.Archetype, config *JobCleanerConfig, e
CompletedJobRetentionPeriod: valutil.ValOrDefault(config.CompletedJobRetentionPeriod, CompletedJobRetentionPeriodDefault),
DiscardedJobRetentionPeriod: valutil.ValOrDefault(config.DiscardedJobRetentionPeriod, DiscardedJobRetentionPeriodDefault),
Interval: valutil.ValOrDefault(config.Interval, JobCleanerIntervalDefault),
JobCleanerTimeout: valutil.ValOrDefault(config.JobCleanerTimeout, JobCleanerTimeoutDefault),
Timeout: valutil.ValOrDefault(config.Timeout, JobCleanerTimeoutDefault),
}).mustValidate(),

batchSize: BatchSizeDefault,
Expand Down Expand Up @@ -152,7 +155,7 @@ func (s *JobCleaner) runOnce(ctx context.Context) (*jobCleanerRunOnceResult, err
for {
// Wrapped in a function so that defers run as expected.
numDeleted, err := func() (int, error) {
ctx, cancelFunc := context.WithTimeout(ctx, s.Config.JobCleanerTimeout)
ctx, cancelFunc := context.WithTimeout(ctx, s.Config.Timeout)
defer cancelFunc()

numDeleted, err := s.exec.JobDeleteBefore(ctx, &riverdriver.JobDeleteBeforeParams{
Expand Down
37 changes: 1 addition & 36 deletions internal/maintenance/job_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestJobCleaner(t *testing.T) {
require.Equal(t, CompletedJobRetentionPeriodDefault, cleaner.Config.CompletedJobRetentionPeriod)
require.Equal(t, DiscardedJobRetentionPeriodDefault, cleaner.Config.DiscardedJobRetentionPeriod)
require.Equal(t, JobCleanerIntervalDefault, cleaner.Config.Interval)
require.Equal(t, JobCleanerTimeoutDefault, cleaner.Config.JobCleanerTimeout)
require.Equal(t, JobCleanerTimeoutDefault, cleaner.Config.Timeout)
})

t.Run("StartStopStress", func(t *testing.T) {
Expand Down Expand Up @@ -163,41 +163,6 @@ func TestJobCleaner(t *testing.T) {
}
})

t.Run("DeletesCompletedJobsWithTimeout", func(t *testing.T) {
t.Parallel()

cleaner, bundle := setup(t)
cleaner.Config.JobCleanerTimeout = 1 * time.Nanosecond

// none of these get removed
testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateAvailable)})
testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)})
testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateScheduled)})

testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCancelled), FinalizedAt: ptrutil.Ptr(bundle.cancelledDeleteHorizon.Add(-1 * time.Hour))})
testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCancelled), FinalizedAt: ptrutil.Ptr(bundle.cancelledDeleteHorizon.Add(-1 * time.Minute))})
testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCancelled), FinalizedAt: ptrutil.Ptr(bundle.cancelledDeleteHorizon.Add(1 * time.Minute))}) // won't be deleted

testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCompleted), FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Hour))})
testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCompleted), FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Minute))})
testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCompleted), FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(1 * time.Minute))}) // won't be deleted

testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateDiscarded), FinalizedAt: ptrutil.Ptr(bundle.discardedDeleteHorizon.Add(-1 * time.Hour))})
testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateDiscarded), FinalizedAt: ptrutil.Ptr(bundle.discardedDeleteHorizon.Add(-1 * time.Minute))})
testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateDiscarded), FinalizedAt: ptrutil.Ptr(bundle.discardedDeleteHorizon.Add(1 * time.Minute))}) // won't be deleted

require.NoError(t, cleaner.Start(ctx))

timeout := riversharedtest.WaitTimeout()

select {
case <-cleaner.TestSignals.DeletedBatch.WaitC():
t.Error("That supposed to have timed out")
case <-time.After(timeout):
t.Log("Expected timeout")
}
})

t.Run("CustomizableInterval", func(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit ab57b0f

Please sign in to comment.