-
Notifications
You must be signed in to change notification settings - Fork 94
/
client.go
2080 lines (1823 loc) · 77.3 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package river
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"os"
"regexp"
"strings"
"sync"
"time"
"github.com/riverqueue/river/internal/dblist"
"github.com/riverqueue/river/internal/dbunique"
"github.com/riverqueue/river/internal/jobcompleter"
"github.com/riverqueue/river/internal/leadership"
"github.com/riverqueue/river/internal/maintenance"
"github.com/riverqueue/river/internal/notifier"
"github.com/riverqueue/river/internal/notifylimiter"
"github.com/riverqueue/river/internal/rivercommon"
"github.com/riverqueue/river/internal/workunit"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivershared/riverpilot"
"github.com/riverqueue/river/rivershared/startstop"
"github.com/riverqueue/river/rivershared/testsignal"
"github.com/riverqueue/river/rivershared/util/maputil"
"github.com/riverqueue/river/rivershared/util/sliceutil"
"github.com/riverqueue/river/rivershared/util/valutil"
"github.com/riverqueue/river/rivertype"
)
const (
FetchCooldownDefault = 100 * time.Millisecond
FetchCooldownMin = 1 * time.Millisecond
FetchPollIntervalDefault = 1 * time.Second
FetchPollIntervalMin = 1 * time.Millisecond
JobTimeoutDefault = 1 * time.Minute
MaxAttemptsDefault = rivercommon.MaxAttemptsDefault
PriorityDefault = rivercommon.PriorityDefault
QueueDefault = rivercommon.QueueDefault
QueueNumWorkersMax = 10_000
)
// Config is the configuration for a Client.
//
// Both Queues and Workers are required for a client to work jobs, but an
// insert-only client can be initialized by omitting Queues, and not calling
// Start for the client. Workers can also be omitted, but it's better to include
// it so River can check that inserted job kinds have a worker that can run
// them.
type Config struct {
// AdvisoryLockPrefix is a configurable 32-bit prefix that River will use
// when generating any key to acquire a Postgres advisory lock. All advisory
// locks share the same 64-bit number space, so this allows a calling
// application to guarantee that a River advisory lock will never conflict
// with one of its own by cordoning each type to its own prefix.
//
// If this value isn't set, River defaults to generating key hashes across
// the entire 64-bit advisory lock number space, which is large enough that
// conflicts are exceedingly unlikely. If callers don't strictly need this
// option then it's recommended to leave it unset because the prefix leaves
// only 32 bits of number space for advisory lock hashes, so it makes
// internally conflicting River-generated keys more likely.
//
// Advisory locks are currently only used for the deprecated fallback/slow
// path of unique job insertion when pending, scheduled, available, or running
// are omitted from a customized ByState configuration.
AdvisoryLockPrefix int32
// CancelledJobRetentionPeriod is the amount of time to keep cancelled jobs
// around before they're removed permanently.
//
// Defaults to 24 hours.
CancelledJobRetentionPeriod time.Duration
// CompletedJobRetentionPeriod is the amount of time to keep completed jobs
// around before they're removed permanently.
//
// Defaults to 24 hours.
CompletedJobRetentionPeriod time.Duration
// DiscardedJobRetentionPeriod is the amount of time to keep discarded jobs
// around before they're removed permanently.
//
// Defaults to 7 days.
DiscardedJobRetentionPeriod time.Duration
// ErrorHandler can be configured to be invoked in case of an error or panic
// occurring in a job. This is often useful for logging and exception
// tracking, but can also be used to customize retry behavior.
ErrorHandler ErrorHandler
// FetchCooldown is the minimum amount of time to wait between fetches of new
// jobs. Jobs will only be fetched *at most* this often, but if no new jobs
// are coming in via LISTEN/NOTIFY then fetches may be delayed as long as
// FetchPollInterval.
//
// Throughput is limited by this value.
//
// Defaults to 100 ms.
FetchCooldown time.Duration
// FetchPollInterval is the amount of time between periodic fetches for new
// jobs. Typically new jobs will be picked up ~immediately after insert via
// LISTEN/NOTIFY, but this provides a fallback.
//
// Defaults to 1 second.
FetchPollInterval time.Duration
// ID is the unique identifier for this client. If not set, a random
// identifier will be generated.
//
// This is used to identify the client in job attempts and for leader election.
// This value must be unique across all clients in the same database and
// schema and there must not be more than one process running with the same
// ID at the same time.
//
// A client ID should differ between different programs and must be unique
// across all clients in the same database and schema. There must not be
// more than one process running with the same ID at the same time.
// Duplicate IDs between processes will lead to facilities like leader
// election or client statistics to fail in novel ways. However, the client
// ID is shared by all executors within any given client. (i.e. different
// Go processes have different IDs, but IDs are shared within any given
// process.)
//
// If in doubt, leave this property empty.
ID string
// JobCleanerTimeout is the timeout of the individual queries within the job
// cleaner.
//
// Defaults to 30 seconds, which should be more than enough time for most
// deployments.
JobCleanerTimeout time.Duration
// JobInsertMiddleware are optional functions that can be called around job
// insertion.
JobInsertMiddleware []rivertype.JobInsertMiddleware
// JobTimeout is the maximum amount of time a job is allowed to run before its
// context is cancelled. A timeout of zero means JobTimeoutDefault will be
// used, whereas a value of -1 means the job's context will not be cancelled
// unless the Client is shutting down.
//
// Defaults to 1 minute.
JobTimeout time.Duration
// Logger is the structured logger to use for logging purposes. If none is
// specified, logs will be emitted to STDOUT with messages at warn level
// or higher.
Logger *slog.Logger
// MaxAttempts is the default number of times a job will be retried before
// being discarded. This value is applied to all jobs by default, and can be
// overridden on individual job types on the JobArgs or on a per-job basis at
// insertion time.
//
// If not specified, defaults to 25 (MaxAttemptsDefault).
MaxAttempts int
// PeriodicJobs are a set of periodic jobs to run at the specified intervals
// in the client.
PeriodicJobs []*PeriodicJob
// PollOnly starts the client in "poll only" mode, which avoids issuing
// `LISTEN` statements to wait for events like a leadership resignation or
// new job available. The program instead polls periodically to look for
// changes (checking for new jobs on the period in FetchPollInterval).
//
// The downside of this mode of operation is that events will usually be
// noticed less quickly. A new job in the queue may have to wait up to
// FetchPollInterval to be locked for work. When a leader resigns, it will
// be up to five seconds before a new one elects itself.
//
// The upside is that it makes River compatible with systems where
// listen/notify isn't available. For example, PgBouncer in transaction
// pooling mode.
PollOnly bool
// Queues is a list of queue names for this client to operate on along with
// configuration for the queue like the maximum number of workers to run for
// each queue.
//
// This field may be omitted for a program that's only queueing jobs rather
// than working them. If it's specified, then Workers must also be given.
Queues map[string]QueueConfig
// ReindexerSchedule is the schedule for running the reindexer. If nil, the
// reindexer will run at midnight UTC every day.
ReindexerSchedule PeriodicSchedule
// RescueStuckJobsAfter is the amount of time a job can be running before it
// is considered stuck. A stuck job which has not yet reached its max attempts
// will be scheduled for a retry, while one which has exhausted its attempts
// will be discarded. This prevents jobs from being stuck forever if a worker
// crashes or is killed.
//
// Note that this can result in repeat or duplicate execution of a job that is
// not actually stuck but is still working. The value should be set higher
// than the maximum duration you expect your jobs to run. Setting a value too
// low will result in more duplicate executions, whereas too high of a value
// will result in jobs being stuck for longer than necessary before they are
// retried.
//
// RescueStuckJobsAfter must be greater than JobTimeout. Otherwise, jobs
// would become eligible for rescue while they're still running.
//
// Defaults to 1 hour, or in cases where JobTimeout has been configured and
// is greater than 1 hour, JobTimeout + 1 hour.
RescueStuckJobsAfter time.Duration
// RetryPolicy is a configurable retry policy for the client.
//
// Defaults to DefaultRetryPolicy.
RetryPolicy ClientRetryPolicy
// TestOnly can be set to true to disable certain features that are useful
// in production, but which may be harmful to tests, in ways like having the
// effect of making them slower. It should not be used outside of test
// suites.
//
// For example, queue maintenance services normally stagger their startup
// with a random jittered sleep so they don't all try to work at the same
// time. This is nice in production, but makes starting and stopping the
// client in a test case slower.
TestOnly bool
// Workers is a bundle of registered job workers.
//
// This field may be omitted for a program that's only enqueueing jobs
// rather than working them, but if it is configured the client can validate
// ahead of time that a worker is properly registered for an inserted job.
// (i.e. That it wasn't forgotten by accident.)
Workers *Workers
// WorkerMiddleware are optional functions that can be called around
// all job executions.
WorkerMiddleware []rivertype.WorkerMiddleware
// Scheduler run interval. Shared between the scheduler and producer/job
// executors, but not currently exposed for configuration.
schedulerInterval time.Duration
// Time generator to make time stubbable in tests.
time baseservice.TimeGenerator
}
func (c *Config) validate() error {
if c.CancelledJobRetentionPeriod < 0 {
return errors.New("CancelledJobRetentionPeriod time cannot be less than zero")
}
if c.CompletedJobRetentionPeriod < 0 {
return errors.New("CompletedJobRetentionPeriod cannot be less than zero")
}
if c.DiscardedJobRetentionPeriod < 0 {
return errors.New("DiscardedJobRetentionPeriod cannot be less than zero")
}
if c.FetchCooldown < FetchCooldownMin {
return fmt.Errorf("FetchCooldown must be at least %s", FetchCooldownMin)
}
if c.FetchPollInterval < FetchPollIntervalMin {
return fmt.Errorf("FetchPollInterval must be at least %s", FetchPollIntervalMin)
}
if c.FetchPollInterval < c.FetchCooldown {
return fmt.Errorf("FetchPollInterval cannot be shorter than FetchCooldown (%s)", c.FetchCooldown)
}
if len(c.ID) > 100 {
return errors.New("ID cannot be longer than 100 characters")
}
if c.JobTimeout < -1 {
return errors.New("JobTimeout cannot be negative, except for -1 (infinite)")
}
if c.MaxAttempts < 0 {
return errors.New("MaxAttempts cannot be less than zero")
}
if c.RescueStuckJobsAfter < 0 {
return errors.New("RescueStuckJobsAfter cannot be less than zero")
}
if c.RescueStuckJobsAfter < c.JobTimeout {
return errors.New("RescueStuckJobsAfter cannot be less than JobTimeout")
}
for queue, queueConfig := range c.Queues {
if err := queueConfig.validate(queue); err != nil {
return err
}
}
if c.Workers == nil && c.Queues != nil {
return errors.New("Workers must be set if Queues is set")
}
return nil
}
// Indicates whether with the given configuration, this client will be expected
// to execute jobs (rather than just being used to enqueue them). Executing jobs
// requires a set of configured queues.
func (c *Config) willExecuteJobs() bool {
return len(c.Queues) > 0
}
// QueueConfig contains queue-specific configuration.
type QueueConfig struct {
// MaxWorkers is the maximum number of workers to run for the queue, or put
// otherwise, the maximum parallelism to run.
//
// This is the maximum number of workers within this particular client
// instance, but note that it doesn't control the total number of workers
// across parallel processes. Installations will want to calculate their
// total number by multiplying this number by the number of parallel nodes
// running River clients configured to the same database and queue.
//
// Requires a minimum of 1, and a maximum of 10,000.
MaxWorkers int
}
func (c QueueConfig) validate(queueName string) error {
if c.MaxWorkers < 1 || c.MaxWorkers > QueueNumWorkersMax {
return fmt.Errorf("invalid number of workers for queue %q: %d", queueName, c.MaxWorkers)
}
if err := validateQueueName(queueName); err != nil {
return err
}
return nil
}
// Client is a single isolated instance of River. Your application may use
// multiple instances operating on different databases or Postgres schemas
// within a single database.
type Client[TTx any] struct {
// BaseService and BaseStartStop can't be embedded like on other services
// because their properties would leak to the external API.
baseService baseservice.BaseService
baseStartStop startstop.BaseStartStop
completer jobcompleter.JobCompleter
config *Config
driver riverdriver.Driver[TTx]
elector *leadership.Elector
insertNotifyLimiter *notifylimiter.Limiter
notifier *notifier.Notifier // may be nil in poll-only mode
periodicJobs *PeriodicJobBundle
pilot riverpilot.Pilot
producersByQueueName map[string]*producer
queueMaintainer *maintenance.QueueMaintainer
queues *QueueBundle
services []startstop.Service
stopped <-chan struct{}
subscriptionManager *subscriptionManager
testSignals clientTestSignals
// workCancel cancels the context used for all work goroutines. Normal Stop
// does not cancel that context.
workCancel context.CancelCauseFunc
}
// Test-only signals.
type clientTestSignals struct {
electedLeader testsignal.TestSignal[struct{}] // notifies when elected leader
jobCleaner *maintenance.JobCleanerTestSignals
jobRescuer *maintenance.JobRescuerTestSignals
jobScheduler *maintenance.JobSchedulerTestSignals
periodicJobEnqueuer *maintenance.PeriodicJobEnqueuerTestSignals
queueCleaner *maintenance.QueueCleanerTestSignals
reindexer *maintenance.ReindexerTestSignals
}
func (ts *clientTestSignals) Init() {
ts.electedLeader.Init()
if ts.jobCleaner != nil {
ts.jobCleaner.Init()
}
if ts.jobRescuer != nil {
ts.jobRescuer.Init()
}
if ts.jobScheduler != nil {
ts.jobScheduler.Init()
}
if ts.periodicJobEnqueuer != nil {
ts.periodicJobEnqueuer.Init()
}
if ts.queueCleaner != nil {
ts.queueCleaner.Init()
}
if ts.reindexer != nil {
ts.reindexer.Init()
}
}
var (
// ErrNotFound is returned when a query by ID does not match any existing
// rows. For example, attempting to cancel a job that doesn't exist will
// return this error.
ErrNotFound = rivertype.ErrNotFound
errMissingConfig = errors.New("missing config")
errMissingDatabasePoolWithQueues = errors.New("must have a non-nil database pool to execute jobs (either use a driver with database pool or don't configure Queues)")
errMissingDriver = errors.New("missing database driver (try wrapping a Pgx pool with river/riverdriver/riverpgxv5.New)")
)
// NewClient creates a new Client with the given database driver and
// configuration.
//
// Currently only one driver is supported, which is Pgx v5. See package
// riverpgxv5.
//
// The function takes a generic parameter TTx representing a transaction type,
// but it can be omitted because it'll generally always be inferred from the
// driver. For example:
//
// import "github.com/riverqueue/river"
// import "github.com/riverqueue/river/riverdriver/riverpgxv5"
//
// ...
//
// dbPool, err := pgxpool.New(ctx, os.Getenv("DATABASE_URL"))
// if err != nil {
// // handle error
// }
// defer dbPool.Close()
//
// riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
// ...
// })
// if err != nil {
// // handle error
// }
func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client[TTx], error) {
if driver == nil {
return nil, errMissingDriver
}
if config == nil {
return nil, errMissingConfig
}
logger := config.Logger
if logger == nil {
logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelWarn,
}))
}
retryPolicy := config.RetryPolicy
if retryPolicy == nil {
retryPolicy = &DefaultClientRetryPolicy{}
}
// For convenience, in case the user's specified a large JobTimeout but no
// RescueStuckJobsAfter, since RescueStuckJobsAfter must be greater than
// JobTimeout, set a reasonable default value that's longer than JobTimeout.
rescueAfter := maintenance.JobRescuerRescueAfterDefault
if config.JobTimeout > 0 && config.RescueStuckJobsAfter < 1 && config.JobTimeout > config.RescueStuckJobsAfter {
rescueAfter = config.JobTimeout + maintenance.JobRescuerRescueAfterDefault
}
// Create a new version of config with defaults filled in. This replaces the
// original object, so everything that we care about must be initialized
// here, even if it's only carrying over the original value.
config = &Config{
AdvisoryLockPrefix: config.AdvisoryLockPrefix,
CancelledJobRetentionPeriod: valutil.ValOrDefault(config.CancelledJobRetentionPeriod, maintenance.CancelledJobRetentionPeriodDefault),
CompletedJobRetentionPeriod: valutil.ValOrDefault(config.CompletedJobRetentionPeriod, maintenance.CompletedJobRetentionPeriodDefault),
DiscardedJobRetentionPeriod: valutil.ValOrDefault(config.DiscardedJobRetentionPeriod, maintenance.DiscardedJobRetentionPeriodDefault),
ErrorHandler: config.ErrorHandler,
FetchCooldown: valutil.ValOrDefault(config.FetchCooldown, FetchCooldownDefault),
FetchPollInterval: valutil.ValOrDefault(config.FetchPollInterval, FetchPollIntervalDefault),
ID: valutil.ValOrDefaultFunc(config.ID, func() string { return defaultClientID(time.Now().UTC()) }),
JobInsertMiddleware: config.JobInsertMiddleware,
JobTimeout: valutil.ValOrDefault(config.JobTimeout, JobTimeoutDefault),
Logger: logger,
MaxAttempts: valutil.ValOrDefault(config.MaxAttempts, MaxAttemptsDefault),
PeriodicJobs: config.PeriodicJobs,
PollOnly: config.PollOnly,
Queues: config.Queues,
ReindexerSchedule: config.ReindexerSchedule,
RescueStuckJobsAfter: valutil.ValOrDefault(config.RescueStuckJobsAfter, rescueAfter),
RetryPolicy: retryPolicy,
TestOnly: config.TestOnly,
Workers: config.Workers,
WorkerMiddleware: config.WorkerMiddleware,
schedulerInterval: valutil.ValOrDefault(config.schedulerInterval, maintenance.JobSchedulerIntervalDefault),
time: config.time,
}
if err := config.validate(); err != nil {
return nil, err
}
archetype := baseservice.NewArchetype(config.Logger)
if config.time != nil {
archetype.Time = config.time
}
client := &Client[TTx]{
config: config,
driver: driver,
producersByQueueName: make(map[string]*producer),
testSignals: clientTestSignals{},
workCancel: func(cause error) {}, // replaced on start, but here in case StopAndCancel is called before start up
}
client.queues = &QueueBundle{addProducer: client.addProducer}
baseservice.Init(archetype, &client.baseService)
client.baseService.Name = "Client" // Have to correct the name because base service isn't embedded like it usually is
client.insertNotifyLimiter = notifylimiter.NewLimiter(archetype, config.FetchCooldown)
plugin, _ := driver.(driverPlugin[TTx])
if plugin != nil {
plugin.PluginInit(archetype, client)
client.pilot = plugin.PluginPilot()
}
if client.pilot == nil {
client.pilot = &riverpilot.StandardPilot{}
}
client.pilot.PilotInit(archetype)
// There are a number of internal components that are only needed/desired if
// we're actually going to be working jobs (as opposed to just enqueueing
// them):
if config.willExecuteJobs() {
if !driver.HasPool() {
return nil, errMissingDatabasePoolWithQueues
}
client.completer = jobcompleter.NewBatchCompleter(archetype, driver.GetExecutor(), client.pilot, nil)
client.subscriptionManager = newSubscriptionManager(archetype, nil)
client.services = append(client.services, client.completer, client.subscriptionManager)
if driver.SupportsListener() {
// In poll only mode, we don't try to initialize a notifier that
// uses listen/notify. Instead, each service polls for changes it's
// interested in. e.g. Elector polls to see if leader has expired.
if !config.PollOnly {
client.notifier = notifier.New(archetype, driver.GetListener())
client.services = append(client.services, client.notifier)
}
} else {
logger.Info("Driver does not support listener; entering poll only mode")
}
client.elector = leadership.NewElector(archetype, driver.GetExecutor(), client.notifier, &leadership.Config{
ClientID: config.ID,
})
client.services = append(client.services, client.elector)
for queue, queueConfig := range config.Queues {
client.addProducer(queue, queueConfig)
}
client.services = append(client.services,
startstop.StartStopFunc(client.logStatsLoop))
client.services = append(client.services,
startstop.StartStopFunc(client.handleLeadershipChangeLoop))
if plugin != nil {
client.services = append(client.services, plugin.PluginServices()...)
}
//
// Maintenance services
//
maintenanceServices := []startstop.Service{}
{
jobCleaner := maintenance.NewJobCleaner(archetype, &maintenance.JobCleanerConfig{
CancelledJobRetentionPeriod: config.CancelledJobRetentionPeriod,
CompletedJobRetentionPeriod: config.CompletedJobRetentionPeriod,
DiscardedJobRetentionPeriod: config.DiscardedJobRetentionPeriod,
Timeout: config.JobCleanerTimeout,
}, driver.GetExecutor())
maintenanceServices = append(maintenanceServices, jobCleaner)
client.testSignals.jobCleaner = &jobCleaner.TestSignals
}
{
jobRescuer := maintenance.NewRescuer(archetype, &maintenance.JobRescuerConfig{
ClientRetryPolicy: retryPolicy,
RescueAfter: config.RescueStuckJobsAfter,
WorkUnitFactoryFunc: func(kind string) workunit.WorkUnitFactory {
if workerInfo, ok := config.Workers.workersMap[kind]; ok {
return workerInfo.workUnitFactory
}
return nil
},
}, driver.GetExecutor())
maintenanceServices = append(maintenanceServices, jobRescuer)
client.testSignals.jobRescuer = &jobRescuer.TestSignals
}
{
jobScheduler := maintenance.NewJobScheduler(archetype, &maintenance.JobSchedulerConfig{
Interval: config.schedulerInterval,
NotifyInsert: client.maybeNotifyInsertForQueues,
}, driver.GetExecutor())
maintenanceServices = append(maintenanceServices, jobScheduler)
client.testSignals.jobScheduler = &jobScheduler.TestSignals
}
{
periodicJobEnqueuer := maintenance.NewPeriodicJobEnqueuer(archetype, &maintenance.PeriodicJobEnqueuerConfig{
AdvisoryLockPrefix: config.AdvisoryLockPrefix,
Insert: client.insertMany,
}, driver.GetExecutor())
maintenanceServices = append(maintenanceServices, periodicJobEnqueuer)
client.testSignals.periodicJobEnqueuer = &periodicJobEnqueuer.TestSignals
client.periodicJobs = newPeriodicJobBundle(client.config, periodicJobEnqueuer)
client.periodicJobs.AddMany(config.PeriodicJobs)
}
{
queueCleaner := maintenance.NewQueueCleaner(archetype, &maintenance.QueueCleanerConfig{
RetentionPeriod: maintenance.QueueRetentionPeriodDefault,
}, driver.GetExecutor())
maintenanceServices = append(maintenanceServices, queueCleaner)
client.testSignals.queueCleaner = &queueCleaner.TestSignals
}
{
var scheduleFunc func(time.Time) time.Time
if config.ReindexerSchedule != nil {
scheduleFunc = config.ReindexerSchedule.Next
}
reindexer := maintenance.NewReindexer(archetype, &maintenance.ReindexerConfig{ScheduleFunc: scheduleFunc}, driver.GetExecutor())
maintenanceServices = append(maintenanceServices, reindexer)
client.testSignals.reindexer = &reindexer.TestSignals
}
if plugin != nil {
maintenanceServices = append(maintenanceServices, plugin.PluginMaintenanceServices()...)
}
// Not added to the main services list because the queue maintainer is
// started conditionally based on whether the client is the leader.
client.queueMaintainer = maintenance.NewQueueMaintainer(archetype, maintenanceServices)
if config.TestOnly {
client.queueMaintainer.StaggerStartupDisable(true)
}
}
return client, nil
}
// Start starts the client's job fetching and working loops. Once this is called,
// the client will run in a background goroutine until stopped. All jobs are
// run with a context inheriting from the provided context, but with a timeout
// deadline applied based on the job's settings.
//
// A graceful shutdown stops fetching new jobs but allows any previously fetched
// jobs to complete. This can be initiated with the Stop method.
//
// A more abrupt shutdown can be achieved by either cancelling the provided
// context or by calling StopAndCancel. This will not only stop fetching new
// jobs, but will also cancel the context for any currently-running jobs. If
// using StopAndCancel, there's no need to also call Stop.
func (c *Client[TTx]) Start(ctx context.Context) error {
fetchCtx, shouldStart, started, stopped := c.baseStartStop.StartInit(ctx)
if !shouldStart {
return nil
}
c.queues.startStopMu.Lock()
defer c.queues.startStopMu.Unlock()
// BaseStartStop will set its stopped channel to nil after it stops, so make
// sure to take a channel reference before finishing stopped.
c.stopped = c.baseStartStop.StoppedUnsafe()
producersAsServices := func() []startstop.Service {
return sliceutil.Map(
maputil.Values(c.producersByQueueName),
func(p *producer) startstop.Service { return p },
)
}
// Startup code. Wrapped in a closure so it doesn't have to remember to
// close the stopped channel if returning with an error.
if err := func() error {
if !c.config.willExecuteJobs() {
return errors.New("client Queues and Workers must be configured for a client to start working")
}
if c.config.Workers != nil && len(c.config.Workers.workersMap) < 1 {
return errors.New("at least one Worker must be added to the Workers bundle")
}
// Before doing anything else, make an initial connection to the database to
// verify that it appears healthy. Many of the subcomponents below start up
// in a goroutine and in case of initial failure, only produce a log line,
// so even in the case of a fundamental failure like the database not being
// available, the client appears to have started even though it's completely
// non-functional. Here we try to make an initial assessment of health and
// return quickly in case of an apparent problem.
_, err := c.driver.GetExecutor().Exec(fetchCtx, "SELECT 1")
if err != nil {
return fmt.Errorf("error making initial connection to database: %w", err)
}
// Each time we start, we need a fresh completer subscribe channel to
// send job completion events on, because the completer will close it
// each time it shuts down.
completerSubscribeCh := make(chan []jobcompleter.CompleterJobUpdated, 10)
c.completer.ResetSubscribeChan(completerSubscribeCh)
c.subscriptionManager.ResetSubscribeChan(completerSubscribeCh)
// In case of error, stop any services that might have started. This
// is safe because even services that were never started will still
// tolerate being stopped.
stopServicesOnError := func() {
startstop.StopAllParallel(c.services...)
}
// The completer is part of the services list below, but although it can
// stop gracefully along with all the other services, it needs to be
// started with a context that's _not_ cancelled if the user-provided
// context is cancelled. This ensures that even when fetch is cancelled on
// shutdown, the completer is still given a separate opportunity to start
// stopping only after the producers have finished up and returned.
if err := c.completer.Start(context.WithoutCancel(ctx)); err != nil {
stopServicesOnError()
return err
}
// We use separate contexts for fetching and working to allow for a graceful
// stop. Both inherit from the provided context, so if it's cancelled, a
// more aggressive stop will be initiated.
workCtx, workCancel := context.WithCancelCause(withClient[TTx](ctx, c))
if err := startstop.StartAll(fetchCtx, c.services...); err != nil {
stopServicesOnError()
return err
}
for _, producer := range c.producersByQueueName {
producer := producer
if err := producer.StartWorkContext(fetchCtx, workCtx); err != nil {
startstop.StopAllParallel(producersAsServices()...)
stopServicesOnError()
return err
}
}
c.queues.fetchCtx = fetchCtx //nolint:fatcontext
c.queues.workCtx = workCtx
c.workCancel = workCancel
return nil
}(); err != nil {
defer stopped()
if errors.Is(context.Cause(fetchCtx), startstop.ErrStop) {
return rivercommon.ErrShutdown
}
return err
}
// Generate producer services while c.queues.startStopMu.Lock() is still
// held. This is used for WaitAllStarted below, but don't use it elsewhere
// because new producers may have been added while the client is running.
producerServices := producersAsServices()
go func() {
// Wait for all subservices to start up before signaling our own start.
// This isn't strictly needed, but gives tests a way to fully confirm
// that all goroutines for subservices are spun up before continuing.
//
// Stop also cancels the "started" channel, so in case of a context
// cancellation, this statement will fall through. The client will
// briefly start, but then immediately stop again.
startstop.WaitAllStarted(append(
c.services,
producerServices..., // see comment on this variable
)...)
started()
defer stopped()
c.baseService.Logger.InfoContext(ctx, "River client started", slog.String("client_id", c.ID()))
defer c.baseService.Logger.InfoContext(ctx, "River client stopped", slog.String("client_id", c.ID()))
// The call to Stop cancels this context. Block here until shutdown.
<-fetchCtx.Done()
c.queues.startStopMu.Lock()
defer c.queues.startStopMu.Unlock()
// On stop, have the producers stop fetching first of all.
c.baseService.Logger.DebugContext(ctx, c.baseService.Name+": Stopping producers")
startstop.StopAllParallel(producersAsServices()...)
c.baseService.Logger.DebugContext(ctx, c.baseService.Name+": All producers stopped")
// Stop all mainline services where stop order isn't important.
startstop.StopAllParallel(append(
// This list of services contains the completer, which should always
// stop after the producers so that any remaining work that was enqueued
// will have a chance to have its state completed as it finishes.
//
// TODO: there's a risk here that the completer is stuck on a job that
// won't complete. We probably need a timeout or way to move on in those
// cases.
c.services,
// Will only be started if this client was leader, but can tolerate a
// stop without having been started.
c.queueMaintainer,
)...)
}()
return nil
}
// Stop performs a graceful shutdown of the Client. It signals all producers
// to stop fetching new jobs and waits for any fetched or in-progress jobs to
// complete before exiting. If the provided context is done before shutdown has
// completed, Stop will return immediately with the context's error.
//
// There's no need to call this method if a hard stop has already been initiated
// by cancelling the context passed to Start or by calling StopAndCancel.
func (c *Client[TTx]) Stop(ctx context.Context) error {
shouldStop, stopped, finalizeStop := c.baseStartStop.StopInit()
if !shouldStop {
return nil
}
select {
case <-ctx.Done(): // stop context cancelled
finalizeStop(false) // not stopped; allow Stop to be called again
return ctx.Err()
case <-stopped:
finalizeStop(true)
return nil
}
}
// StopAndCancel shuts down the client and cancels all work in progress. It is a
// more aggressive stop than Stop because the contexts for any in-progress jobs
// are cancelled. However, it still waits for jobs to complete before returning,
// even though their contexts are cancelled. If the provided context is done
// before shutdown has completed, Stop will return immediately with the
// context's error.
//
// This can also be initiated by cancelling the context passed to Run. There is
// no need to call this method if the context passed to Run is cancelled
// instead.
func (c *Client[TTx]) StopAndCancel(ctx context.Context) error {
c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": Hard stop started; cancelling all work")
c.workCancel(rivercommon.ErrShutdown)
shouldStop, stopped, finalizeStop := c.baseStartStop.StopInit()
if !shouldStop {
return nil
}
select {
case <-ctx.Done(): // stop context cancelled
finalizeStop(false) // not stopped; allow Stop to be called again
return ctx.Err()
case <-stopped:
finalizeStop(true)
return nil
}
}
// Stopped returns a channel that will be closed when the Client has stopped.
// It can be used to wait for a graceful shutdown to complete.
//
// It is not affected by any contexts passed to Stop or StopAndCancel.
func (c *Client[TTx]) Stopped() <-chan struct{} {
return c.stopped
}
// Subscribe subscribes to the provided kinds of events that occur within the
// client, like EventKindJobCompleted for when a job completes.
//
// Returns a channel over which to receive events along with a cancel function
// that can be used to cancel and tear down resources associated with the
// subscription. It's recommended but not necessary to invoke the cancel
// function. Resources will be freed when the client stops in case it's not.
//
// The event channel is buffered and sends on it are non-blocking. Consumers
// must process events in a timely manner or it's possible for events to be
// dropped. Any slow operations performed in a response to a receipt (e.g.
// persisting to a database) should be made asynchronous to avoid event loss.
//
// Callers must specify the kinds of events they're interested in. This allows
// for forward compatibility in case new kinds of events are added in future
// versions. If new event kinds are added, callers will have to explicitly add
// them to their requested list and ensure they can be handled correctly.
func (c *Client[TTx]) Subscribe(kinds ...EventKind) (<-chan *Event, func()) {
return c.SubscribeConfig(&SubscribeConfig{Kinds: kinds})
}
// The default maximum size of the subscribe channel. Events that would overflow
// it will be dropped.
const subscribeChanSizeDefault = 1_000
// SubscribeConfig is more thorough subscription configuration used for
// Client.SubscribeConfig.
type SubscribeConfig struct {
// ChanSize is the size of the buffered channel that will be created for the
// subscription. Incoming events that overall this number because a listener
// isn't reading from the channel in a timely manner will be dropped.
//
// Defaults to 1000.
ChanSize int
// Kinds are the kinds of events that the subscription will receive.
// Requiring that kinds are specified explicitly allows for forward
// compatibility in case new kinds of events are added in future versions.
// If new event kinds are added, callers will have to explicitly add them to
// their requested list and ensure they can be handled correctly.
Kinds []EventKind
}
// Special internal variant that lets us inject an overridden size.
func (c *Client[TTx]) SubscribeConfig(config *SubscribeConfig) (<-chan *Event, func()) {
if c.subscriptionManager == nil {
panic("created a subscription on a client that will never work jobs (Workers not configured)")
}
return c.subscriptionManager.SubscribeConfig(config)
}
// Dump aggregate stats from job completions to logs periodically. These
// numbers don't mean much in themselves, but can give a rough idea of the
// proportions of each compared to each other, and may help flag outlying values
// indicative of a problem.
func (c *Client[TTx]) logStatsLoop(ctx context.Context, shouldStart bool, started, stopped func()) error {
if !shouldStart {
return nil
}
go func() {
started()
defer stopped() // this defer should come first so it's last out
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
c.subscriptionManager.logStats(ctx, c.baseService.Name)
}
}
}()
return nil
}
func (c *Client[TTx]) handleLeadershipChangeLoop(ctx context.Context, shouldStart bool, started, stopped func()) error {
handleLeadershipChange := func(ctx context.Context, notification *leadership.Notification) {
c.baseService.Logger.DebugContext(ctx, c.baseService.Name+": Election change received",
slog.String("client_id", c.config.ID), slog.Bool("is_leader", notification.IsLeader))
switch {
case notification.IsLeader:
// Starting the queue maintainer can take a little time so send to
// this test signal _first_ so tests waiting on it can finish,
// cancel the queue maintainer start, and overall run much faster.
c.testSignals.electedLeader.Signal(struct{}{})
if err := c.queueMaintainer.Start(ctx); err != nil {
c.baseService.Logger.ErrorContext(ctx, "Error starting queue maintainer", slog.String("err", err.Error()))
}
default:
c.queueMaintainer.Stop()
}
}
if !shouldStart {
return nil
}
go func() {
started()
defer stopped() // this defer should come first so it's last out
sub := c.elector.Listen()
defer sub.Unlisten()
for {
select {