forked from seomoz/qless-py
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest.py
executable file
·2541 lines (2326 loc) · 111 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#! /usr/bin/env python
import time
import math
import qless
import redis
import unittest
import time as _time
class FooJob(qless.Job):
pass
# Let's mock up time
time._frozen = False
# Save out the original time function
time._time = time.time
def _freeze():
time._frozen = True
time._when = time._time()
def _unfreeze():
time._frozen = False
def _advance(increment):
time._when += increment
def _time():
return (time._frozen and time._when) or time._time()
time.freeze = _freeze
time.unfreeze = _unfreeze
time.advance = _advance
time.time = _time
class TestQless(unittest.TestCase):
def setUp(self):
self.redis = redis.Redis()
assert(len(self.redis.keys('*')) == 0)
# Clear the script cache, and nuke everything
self.redis.execute_command('script', 'flush')
# The qless client we're using
self.client = qless.client()
# Our main queue
self.q = self.client.queues['testing']
# This represents worker 'a'
tmp = qless.client(); tmp.worker_name = 'worker-a'
self.a = tmp.queues['testing']
# This represents worker b
tmp = qless.client(); tmp.worker_name = 'worker-b'
self.b = tmp.queues['testing']
# This is just a second queue
self.other = self.client.queues['other']
def tearDown(self):
self.redis.flushdb()
time.unfreeze()
class TestEvents(TestQless):
def setUp(self):
TestQless.setUp(self)
import threading
from collections import defaultdict
self.tracked = self.client.jobs[self.q.put(qless.Job, {'tracked': True })]
self.untracked = self.client.jobs[self.q.put(qless.Job, {'tracked': False})]
self.tracked.track()
self.events = defaultdict(set)
self.pubsub = qless.client(socket_timeout=0.01)
# We need to make sure this gets lazily loaded first in this thread
self.pubsub.events
self.t = threading.Thread(target=self._listen)
def tearDown(self):
TestQless.tearDown(self)
def _events(self, evt, jid):
self.events[evt].add(jid)
def _listen(self):
from functools import partial
for evt in ('canceled', 'completed', 'failed', 'popped', 'put', 'stalled', 'track', 'untrack'):
self.pubsub.events.on(evt, partial(self._events, evt))
self.pubsub.events.listen()
def test_cancel(self):
# We should be able to see when tracked jobs are canceled
self.t.start()
self.tracked.cancel()
self.untracked.cancel()
self.t.join()
self.assertEqual(self.events['canceled'], set([self.tracked.jid]))
def test_complete(self):
# And when they've completed
self.t.start()
# Complete both jobs
r = [j.complete() for j in self.q.pop(2)]
self.t.join()
self.assertEqual(self.events['completed'], set([self.tracked.jid]))
def test_failed(self):
# And when they've completed
self.t.start()
# Fail both jobs
r = [j.fail('foo', 'bar') for j in self.q.pop(2)]
self.t.join()
self.assertEqual(self.events['failed'], set([self.tracked.jid]))
def test_pop(self):
# And when they've completed
self.t.start()
# Pop both jobs
jobs = self.q.pop(2)
self.t.join()
self.assertEqual(self.events['popped'], set([self.tracked.jid]))
def test_put(self):
# And when they've completed
self.t.start()
# Move both jobs
self.untracked.move('other')
self.tracked.move('other')
self.t.join()
self.assertEqual(self.events['put'], set([self.tracked.jid]))
def test_stalled(self):
# And when they've completed
self.t.start()
# Stall both jobs
time.freeze()
jobs = self.q.pop(2)
time.advance(600)
jobs = self.q.pop(2)
# Make sure they in fact stalled
self.assertEqual(len(jobs), 2)
self.assertEqual(jobs[0].original_retries - jobs[0].retries_left, 1)
self.assertEqual(jobs[1].original_retries - jobs[1].retries_left, 1)
self.t.join()
self.assertEqual(self.events['stalled'], set([self.tracked.jid]))
def test_track(self):
# And when they've completed
self.t.start()
# Start tracking the untracked job, and untrack the tracked job
self.tracked.untrack()
self.untracked.track()
self.t.join()
self.assertEqual(self.events['track'], set([self.untracked.jid]))
self.assertEqual(self.events['untrack'], set([self.tracked.jid]))
class TestRecurring(TestQless):
def test_recur_on(self):
# In this test, we want to enqueue a job and make sure that
# we can get some jobs from it in the most basic way. We should
# get jobs out of the queue every _k_ seconds
time.freeze()
self.q.recur(qless.Job, {'test':'test_recur_on'}, interval=1800)
self.assertEqual(self.q.pop().complete(), 'complete')
self.assertEqual(self.q.pop(), None)
time.advance(1799)
self.assertEqual(self.q.pop(), None)
time.advance(2)
job = self.q.pop()
self.assertNotEqual(job, None)
self.assertEqual(job.data, {'test':'test_recur_on'})
job.complete()
# We should not be able to pop a second job
self.assertEqual(self.q.pop(), None)
# Let's advance almost to the next one, and then check again
time.advance(1798)
self.assertEqual(self.q.pop(), None)
time.advance(2)
self.assertNotEqual(self.q.pop(), None)
time.unfreeze()
def test_recur_on_by_string(self):
time.freeze()
self.q.recur('qless.Job', {'test':'test_recur_on'}, interval=1800)
self.assertEqual(self.q.pop().complete(), 'complete')
self.assertEqual(self.q.pop(), None)
time.advance(1799)
self.assertEqual(self.q.pop(), None)
time.advance(2)
job = self.q.pop()
self.assertNotEqual(job, None)
self.assertEqual(job.data, {'test':'test_recur_on'})
job.complete()
# We should not be able to pop a second job
self.assertEqual(self.q.pop(), None)
# Let's advance almost to the next one, and then check again
time.advance(1798)
self.assertEqual(self.q.pop(), None)
time.advance(2)
self.assertNotEqual(self.q.pop(), None)
time.unfreeze()
def test_recur_attributes(self):
# Popped jobs should have the same priority, tags, etc. that the
# recurring job has
time.freeze()
self.q.recur(qless.Job, {'test':'test_recur_attributes'}, interval=100, priority=-10, tags=['foo', 'bar'], retries=2)
self.assertEqual(self.q.pop().complete(), 'complete')
for i in range(10):
time.advance(100)
job = self.q.pop()
self.assertNotEqual(job, None)
self.assertEqual(job.priority, -10)
self.assertEqual(job.tags, ['foo', 'bar'])
self.assertEqual(job.original_retries, 2)
self.assertIn( job.jid, self.client.jobs.tagged('foo')['jobs'])
self.assertIn( job.jid, self.client.jobs.tagged('bar')['jobs'])
self.assertNotIn(job.jid, self.client.jobs.tagged('hey')['jobs'])
job.complete()
self.assertEqual(self.q.pop(), None)
time.unfreeze()
def test_recur_offset(self):
# In this test, we should get a job after offset and interval
# have passed
time.freeze()
self.q.recur(qless.Job, {'test':'test_recur_offset'}, interval=100, offset=50)
self.assertEqual(self.q.pop(), None)
time.advance(30)
self.assertEqual(self.q.pop(), None)
time.advance(20)
job = self.q.pop()
self.assertNotEqual(job, None)
job.complete()
# And henceforth we should have jobs periodically at 100 seconds
time.advance(99)
self.assertEqual(self.q.pop(), None)
time.advance(2)
self.assertNotEqual(self.q.pop(), None)
def test_recur_off(self):
# In this test, we want to make sure that we can stop recurring
# jobs
# We should see these recurring jobs crop up under queues when
# we request them
time.freeze()
jid = self.q.recur(qless.Job, {'test':'test_recur_off'}, interval=100)
self.assertEqual(self.q.pop().complete(), 'complete')
self.assertEqual(self.client.queues['testing'].counts['recurring'], 1)
self.assertEqual(self.client.queues.counts[0]['recurring'], 1)
# Now, let's pop off a job, and then cancel the thing
time.advance(110)
self.assertEqual(self.q.pop().complete(), 'complete')
job = self.client.jobs[jid]
self.assertEqual(job.__class__, qless.RecurringJob)
job.cancel()
self.assertEqual(self.client.queues['testing'].counts['recurring'], 0)
self.assertEqual(self.client.queues.counts[0]['recurring'], 0)
time.advance(1000)
self.assertEqual(self.q.pop(), None)
def test_jobs_recur(self):
# We should be able to list the jids of all the recurring jobs
# in a queue
jids = [self.q.recur(qless.Job, {'test':'test_jobs_recur'}, interval=i * 10) for i in range(1, 10)]
self.assertEqual(self.q.jobs.recurring(), jids)
for jid in jids:
self.assertEqual(self.client.jobs[jid].__class__, qless.RecurringJob)
def test_recur_get(self):
# We should be able to get the data for a recurring job
time.freeze()
jid = self.q.recur(qless.Job, {'test':'test_recur_get'}, interval=100, priority=-10, tags=['foo', 'bar'], retries=2)
job = self.client.jobs[jid]
self.assertEqual(job.__class__ , qless.RecurringJob)
self.assertEqual(job.priority , -10)
self.assertEqual(job.queue_name, 'testing')
self.assertEqual(job.data , {'test':'test_recur_get'})
self.assertEqual(job.tags , ['foo', 'bar'])
self.assertEqual(job.interval , 100)
self.assertEqual(job.retries , 2)
self.assertEqual(job.count , 0)
self.assertEqual(job.klass_name, 'qless.job.Job')
# Now let's pop a job
self.q.pop()
self.assertEqual(self.client.jobs[jid].count, 1)
def test_passed_interval(self):
# We should get multiple jobs if we've passed the interval time
# several times.
time.freeze()
jid = self.q.recur(qless.Job, {'test':'test_passed_interval'}, interval=100)
self.assertEqual(self.q.pop().complete(), 'complete')
time.advance(850)
jobs = self.q.pop(100)
self.assertEqual(len(jobs), 8)
for job in jobs:
job.complete()
# If we are popping fewer jobs than the number of jobs that would have
# been scheduled, it should only make that many available
time.advance(800)
jobs = self.q.pop(5)
self.assertEqual(len(jobs), 5)
self.assertEqual(len(self.q), 5)
for job in jobs:
job.complete()
# Even if there are several recurring jobs, both of which need jobs
# scheduled, it only pops off the needed number
jid = self.q.recur(qless.Job, {'test': 'test_passed_interval_2'}, 10)
time.advance(500)
jobs = self.q.pop(5)
self.assertEqual(len(jobs), 5)
self.assertEqual(len(self.q), 5)
for job in jobs:
job.complete()
# And if there are other jobs that are there, it should only move over
# as many recurring jobs as needed
jid = self.q.put(qless.Job, {'foo': 'bar'}, priority = 10)
jobs = self.q.pop(5)
self.assertEqual(len(jobs), 5)
# Not sure why this is 6, but it's not a huge deal in my opinion
self.assertEqual(len(self.q), 6)
def test_queues_endpoint(self):
# We should see these recurring jobs crop up under queues when
# we request them
jid = self.q.recur(qless.Job, {'test':'test_queues_endpoint'}, interval=100)
self.assertEqual(self.client.queues['testing'].counts['recurring'], 1)
self.assertEqual(self.client.queues.counts[0]['recurring'], 1)
def test_change_attributes(self):
# We should be able to change the attributes of a recurring job,
# and future spawned jobs should be affected appropriately. In
# addition, when we change the interval, the effect should be
# immediate (evaluated from the last time it was run)
time.freeze()
jid = self.q.recur(qless.Job, {'test':'test_change_attributes'}, interval=1)
self.assertEqual(self.q.pop().complete(), 'complete')
job = self.client.jobs[jid]
# First, test priority
time.advance(1)
self.assertNotEqual(self.q.pop().priority , -10)
self.assertNotEqual(self.client.jobs[jid].priority, -10)
job.priority = -10
time.advance(1)
self.assertEqual( self.q.pop().priority , -10)
self.assertEqual( self.client.jobs[jid].priority, -10)
# And data
time.advance(1)
self.assertNotEqual(self.q.pop().data , {'foo': 'bar'})
self.assertNotEqual(self.client.jobs[jid].data , {'foo': 'bar'})
job.data = {'foo': 'bar'}
time.advance(1)
self.assertEqual( self.q.pop().data , {'foo': 'bar'})
self.assertEqual( self.client.jobs[jid].data , {'foo': 'bar'})
# And retries
time.advance(1)
self.assertNotEqual(self.q.pop().original_retries , 10)
self.assertNotEqual(self.client.jobs[jid].retries , 10)
job.retries = 10
time.advance(1)
self.assertEqual( self.q.pop().original_retries , 10)
self.assertEqual( self.client.jobs[jid].retries , 10)
# And klass
time.advance(1)
self.assertNotEqual(self.q.peek().klass_name , 'qless.job.RecurringJob')
self.assertNotEqual(self.q.pop().klass , qless.RecurringJob)
self.assertNotEqual(self.client.jobs[jid].klass_name, 'qless.job.RecurringJob')
self.assertNotEqual(self.client.jobs[jid].klass , qless.RecurringJob)
job.klass = qless.RecurringJob
time.advance(1)
self.assertEqual( self.q.peek().klass_name , 'qless.job.RecurringJob')
self.assertEqual( self.q.pop().klass , qless.RecurringJob)
self.assertEqual( self.client.jobs[jid].klass_name, 'qless.job.RecurringJob')
self.assertEqual( self.client.jobs[jid].klass , qless.RecurringJob)
def test_change_interval(self):
# If we update a recurring job's interval, then we should get
# jobs from it as if it had been scheduled this way from the
# last time it had a job popped
time.freeze()
jid = self.q.recur(qless.Job, {'test':'test_change_interval'}, interval=100)
self.assertEqual(self.q.pop().complete(), 'complete')
time.advance(100)
self.assertEqual(self.q.pop().complete(), 'complete')
time.advance(50)
# Now, let's update the interval to make it more frequent
self.client.jobs[jid].interval = 10
jobs = self.q.pop(100)
self.assertEqual(len(jobs), 5)
results = [job.complete() for job in jobs]
# Now let's make the interval much longer
time.advance(49) ; self.client.jobs[jid].interval = 1000; self.assertEqual(self.q.pop(), None)
time.advance(100); self.client.jobs[jid].interval = 1000; self.assertEqual(self.q.pop(), None)
time.advance(849); self.client.jobs[jid].interval = 1000; self.assertEqual(self.q.pop(), None)
time.advance(1) ; self.client.jobs[jid].interval = 1000; self.assertEqual(self.q.pop(), None)
def test_move(self):
# If we move a recurring job from one queue to another, then
# all future spawned jobs should be popped from that queue
time.freeze()
jid = self.q.recur(qless.Job, {'test':'test_move'}, interval = 100)
self.assertEqual(self.q.pop().complete(), 'complete')
time.advance(110)
self.assertEqual(self.q.pop().complete(), 'complete')
self.assertEqual(self.other.pop(), None)
# Now let's move it to another queue
self.client.jobs[jid].move('other')
self.assertEqual(self.q.pop() , None)
self.assertEqual(self.other.pop(), None)
time.advance(100)
self.assertEqual(self.q.pop() , None)
self.assertEqual(self.other.pop().complete(), 'complete')
self.assertEqual(self.client.jobs[jid].queue_name, 'other')
def test_change_tags(self):
# We should be able to add and remove tags from a recurring job,
# and see the impact in all the jobs it subsequently spawns
time.freeze()
jid = self.q.recur(qless.Job, {'test':'test_change_tags'}, tags = ['foo', 'bar'], interval = 1)
self.assertEqual(self.q.pop().complete(), 'complete')
time.advance(1)
self.assertEqual(self.q.pop().tags, ['foo', 'bar'])
# Now let's untag the job
self.client.jobs[jid].untag('foo')
self.assertEqual(self.client.jobs[jid].tags, ['bar'])
time.advance(1)
self.assertEqual(self.q.pop().tags, ['bar'])
# Now let's add 'foo' back in, and also add 'hey'
self.client.jobs[jid].tag('foo', 'hey')
self.assertEqual(self.client.jobs[jid].tags, ['bar', 'foo', 'hey'])
time.advance(1)
self.assertEqual(self.q.pop().tags, ['bar', 'foo', 'hey'])
def test_peek(self):
# When we peek at jobs in a queue, it should take recurring jobs
# into account
time.freeze()
jid = self.q.recur(qless.Job, {'test':'test_peek'}, interval = 100)
self.assertEqual(self.q.pop().complete(), 'complete')
self.assertEqual(self.q.peek(), None)
time.advance(110)
self.assertNotEqual(self.q.peek(), None)
self.q.pop().complete()
# If we are popping fewer jobs than the number of jobs that would have
# been scheduled, it should only make that many available
time.advance(800)
jobs = self.q.peek(5)
self.assertEqual(len(jobs), 5)
self.assertEqual(len(self.q), 5)
for job in self.q.pop(100):
job.complete()
self.assertEqual(len(self.q), 0)
# Even if there are several recurring jobs, both of which need jobs
# scheduled, it only pops off the needed number
jid = self.q.recur(qless.Job, {'test': 'test_peek'}, interval = 10)
time.advance(800)
jobs = self.q.peek(5)
self.assertEqual(len(jobs), 5)
self.assertEqual(len(self.q), 5)
for job in self.q.pop(100):
job.complete()
self.assertEqual(len(self.q), 0)
# And if there are other jobs that are there, it should only move over
# as many recurring jobs as needed
time.advance(800)
jid = self.q.put(qless.Job, {'foo': 'bar'}, priority=10)
jobs = self.q.peek(5)
self.assertEqual(len(jobs), 5)
# Not sure why this is 6, but it's not a huge deal in my opinion
self.assertEqual(len(self.q), 6)
def test_jid_counter(self):
# When we re-recur a job, it should not reset the jid counter
jid = self.q.recur(qless.Job, {'test': 'test_jid_counter'}, 10,
jid='my_recurring_job')
job = self.q.pop()
time.freeze()
jid = self.q.recur(qless.Job, {'test': 'test_jid_counter'}, 10,
jid='my_recurring_job')
time.advance(15)
self.assertNotEqual(self.q.pop().jid, job.jid)
def test_idempotent_recur(self):
# When we re-recur a job, it should update the attributes
jid = self.q.recur(qless.Job, {'test': 'test_idempotent_recur'}, 10,
jid='my_recurring_job', priority=10, tags=['foo'], retries=5)
# Now, let's /re-recur/ the thing, and make sure that its properties
# have indeed updated as expected
jid = self.q.recur(qless.Job, {'test': 'test_idempotent_recur_2'}, 20,
jid='my_recurring_job', priority=20, tags=['bar'], retries=10)
job = self.client.jobs[jid]
self.assertEqual(job.data['test'], 'test_idempotent_recur_2')
self.assertEqual(job.interval, 20)
self.assertEqual(job.priority, 20)
self.assertEqual(job.retries, 10)
self.assertEqual(job.tags, ['bar'])
def test_reput(self):
# If a recurring job exits, and you try to put it into the queue again,
# then it should behave as a 'move'
jid = self.q.recur(qless.Job, {'test': 'test_recur_update'}, 10,
jid='my_recurring_job')
counts = self.client.queues.counts
stats = [c for c in counts if c['name'] == self.q.name]
self.assertEqual(len(stats), 1)
self.assertEqual(stats[0]['recurring'], 1)
# And we'll reput it into another queue
jid = self.other.recur(qless.Job, {'test': 'test_recur_update'}, 10,
jid='my_recurring_job')
counts = self.client.queues.counts
stats = [c for c in counts if c['name'] == self.q.name]
self.assertEqual(len(stats), 1)
self.assertEqual(stats[0]['recurring'], 0)
# Make sure it's in the queue
counts = self.client.queues.counts
stats = [c for c in counts if c['name'] == self.other.name]
self.assertEqual(len(stats), 1)
self.assertEqual(stats[0]['recurring'], 1)
class TestDependencies(TestQless):
def test_depends_put(self):
# In this test, we want to put a job, and put a second job
# that depends on it. We'd then like to verify that it's
# only available for popping once its dependency has completed
jid = self.q.put(qless.Job, {'test': 'depends_put'})
job = self.q.pop()
jid = self.q.put(qless.Job, {'test': 'depends_put'}, depends=[job.jid])
self.assertEqual(self.q.pop(), None)
self.assertEqual(self.client.jobs[jid].state, 'depends')
job.complete()
self.assertEqual(self.client.jobs[jid].state, 'waiting')
self.assertEqual(self.q.pop().jid, jid)
# Let's try this dance again, but with more job dependencies
jids = [self.q.put(qless.Job, {'test': 'depends_put'}) for i in range(10)]
jid = self.q.put(qless.Job, {'test': 'depends_put'}, depends=jids)
# Pop more than we put on
jobs = self.q.pop(20)
self.assertEqual(len(jobs), 10)
# Complete them, and then make sure the last one's available
for job in jobs:
self.assertEqual(self.q.pop(), None)
job.complete()
# It's only when all the dependencies have been completed that
# we should be able to pop this job off
self.assertEqual(self.q.pop().jid, jid)
def test_depends_complete(self):
# In this test, we want to put a job, put a second job, and
# complete the first job, making it dependent on the second
# job. This should test the ability to add dependency during
# completion
a = self.q.put(qless.Job, {'test': 'depends_complete'})
b = self.q.put(qless.Job, {'test': 'depends_complete'})
job = self.q.pop()
job.complete('testing', depends=[b])
self.assertEqual(self.client.jobs[a].state, 'depends')
jobs = self.q.pop(20)
self.assertEqual(len(jobs), 1)
jobs[0].complete()
self.assertEqual(self.client.jobs[a].state, 'waiting')
job = self.q.pop()
self.assertEqual(job.jid, a)
# Like above, let's try this dance again with more dependencies
jids = [self.q.put(qless.Job, {'test': 'depends_put'}) for i in range(10)]
jid = job.jid
job.complete('testing', depends=jids)
# Pop more than we put on
jobs = self.q.pop(20)
self.assertEqual(len(jobs), 10)
# Complete them, and then make sure the last one's available
for job in jobs:
j = self.q.pop()
self.assertEqual(j, None)
job.complete()
# It's only when all the dependencies have been completed that
# we should be able to pop this job off
self.assertEqual(self.q.pop().jid, jid)
def test_depends_state(self):
# Put a job, and make it dependent on a canceled job, and a
# non-existent job, and a complete job. It should be available
# from the start.
jids = ['foobar',
self.q.put(qless.Job, {'test': 'test_depends_state'}),
self.q.put(qless.Job, {'test': 'test_depends_state'})]
# Cancel one, complete one
self.q.pop().cancel()
self.q.pop().complete()
# Ensure there are none in the queue, then put one, should pop right off
self.assertEqual(len(self.q), 0)
jid = self.q.put(qless.Job, {'test': 'test_depends_state'}, depends=jids)
self.assertEqual(self.q.pop().jid, jid)
def test_depends_canceled(self):
# B is dependent on A, but then we cancel B, then A is still
# able to complete without any problems. If you try to cancel
# a job that others depend on, you should have an exception thrown
a = self.q.put(qless.Job, {'test': 'test_depends_canceled'})
b = self.q.put(qless.Job, {'test': 'test_depends_canceled'}, depends=[a])
self.client.jobs[b].cancel()
job = self.q.pop()
self.assertEqual(job.jid, a)
self.assertEqual(job.complete(), 'complete')
self.assertEqual(self.q.pop(), None)
a = self.q.put(qless.Job, {'test': 'cancel_dependency'})
b = self.q.put(qless.Job, {'test': 'cancel_dependency'}, depends=[a])
try:
self.assertTrue(self.client.jobs[a].cancel(), 'We should not be able to cancel jobs with dependencies')
except Exception as e:
self.assertTrue('Cancel()' in e.message, 'Cancel() threw the wrong error')
# When canceling a job, we should remove that job from the jobs' list
# of dependents.
self.client.jobs[b].cancel()
self.assertEqual(self.client.jobs[a].dependents, [])
# We should also just be able to cancel a now
self.client.jobs[a].cancel()
def test_depends_complete_advance(self):
# If we make B depend on A, and then move A through several
# queues, then B should only be availble once A has finished
# its whole run.
a = self.q.put(qless.Job, {'test': 'test_depends_advance'})
b = self.q.put(qless.Job, {'test': 'test_depends_advance'}, depends=[a])
for i in range(10):
job = self.q.pop()
self.assertEqual(job.jid, a)
job.complete('testing')
self.q.pop().complete()
self.assertEqual(self.q.pop().jid, b)
def test_cascading_dependency(self):
# If we make a dependency chain, then we validate that we can
# only access them one at a time, in the order of their dependency
jids = [self.q.put(qless.Job, {'test': 'cascading_depencency'})]
for i in range(10):
jids.append(self.q.put(qless.Job, {'test': 'cascading_dependency'}, depends=[jids[-1]]))
# Pop off the first 10 dependencies, ensuring only one comes off at a time
for i in range(11):
jobs = self.q.pop(10)
self.assertEqual(len(jobs), 1)
self.assertEqual(jobs[0].jid, jids[i])
jobs[0].complete()
def test_move_dependency(self):
# If we put a job into a queue with dependencies, and then
# move it to another queue, then all the original dependencies
# should be honored. The reason for this is that dependencies
# can always be removed after the fact, but this prevents us
# from the running the risk of moving a job, and it getting
# popped before we can describe its dependencies
a = self.q.put(qless.Job, {'test': 'move_dependency'})
b = self.q.put(qless.Job, {'test': 'move_dependency'}, depends=[a])
self.client.jobs[b].move('other')
self.assertEqual(self.client.jobs[b].state, 'depends')
self.assertEqual(self.other.pop(), None)
self.q.pop().complete()
self.assertEqual(self.client.jobs[b].state, 'waiting')
self.assertEqual(self.other.pop().jid, b)
def test_add_dependency(self):
# If we have a job that already depends on on other jobs, then
# we should be able to add more dependencies. If it's not, then
# we can't
a = self.q.put(qless.Job, {'test': 'add_dependency'})
b = self.client.jobs[self.q.put(qless.Job, {'test': 'add_dependency'}, depends=[a])]
c = self.q.put(qless.Job, {'test': 'add_dependency'})
self.assertEqual(b.depend(c), True)
jobs = self.q.pop(20)
self.assertEqual(len(jobs), 2)
self.assertEqual(jobs[0].jid, a)
self.assertEqual(jobs[1].jid, c)
jobs[0].complete(); jobs[1].complete()
job = self.q.pop()
self.assertEqual(job.jid, b.jid)
job.complete()
# If the job's put, but waiting, we can't add dependencies
a = self.q.put(qless.Job, {'test': 'add_dependency'})
b = self.q.put(qless.Job, {'test': 'add_dependency'})
self.assertEqual(self.client.jobs[a].depend(b), False)
job = self.q.pop()
self.assertEqual(job.depend(b), False)
job.fail('what', 'something')
self.assertEqual(self.client.jobs[job.jid].depend(b), False)
def test_remove_dependency(self):
# If we have a job that already depends on others, then we should
# we able to remove them. If it's not dependent on any, then we can't.
a = self.q.put(qless.Job, {'test': 'remove_dependency'})
b = self.client.jobs[self.q.put(qless.Job, {'test': 'remove_dependency'}, depends=[a])]
self.assertEqual(len(self.q.pop(20)), 1)
b.undepend(a)
self.assertEqual(self.q.pop().jid, b.jid)
# Make sure we removed the dependents from the first one, as well
self.assertEqual(self.client.jobs[a].dependents, [])
# Let's try removing /all/ dependencies
jids = [self.q.put(qless.Job, {'test': 'remove_dependency'}) for i in range(10)]
b = self.client.jobs[self.q.put(qless.Job, {'test': 'remove_dependency'}, depends=jids)]
self.assertEqual(len(self.q.pop(20)), 10)
b.undepend(all=True)
self.assertEqual(self.client.jobs[b.jid].state, 'waiting')
# Let's make sure that each of the jobs we removed as dependencies also go their
# dependencies removed, too.
for jid in jids:
self.assertEqual(self.client.jobs[jid].dependents, [])
# If the job's put, but waiting, we can't add dependencies
a = self.q.put(qless.Job, {'test': 'add_dependency'})
b = self.q.put(qless.Job, {'test': 'add_dependency'})
self.assertEqual(self.client.jobs[a].undepend(b), False)
job = self.q.pop()
self.assertEqual(job.undepend(b), False)
job.fail('what', 'something')
self.assertEqual(self.client.jobs[job.jid].undepend(b), False)
def test_jobs_depends(self):
# When we have jobs that have dependencies, we should be able to
# get access to them.
a = self.q.put(qless.Job, {'test': 'jobs_depends'})
b = self.q.put(qless.Job, {'test': 'jobs_depends'}, depends=[a])
self.assertEqual(self.client.queues.counts[0]['depends'], 1)
self.assertEqual(self.client.queues['testing'].counts['depends'], 1)
self.assertEqual(self.q.jobs.depends(), [b])
# When we remove a dependency, we should no longer see that job as a dependency
self.client.jobs[b].undepend(a)
self.assertEqual(self.client.queues.counts[0]['depends'], 0)
self.assertEqual(self.client.queues['testing'].counts['depends'], 0)
self.assertEqual(self.q.jobs.depends(), [])
# When we move a job that has a dependency, we should no longer
# see it in the depends() of the original job
a = self.q.put(qless.Job, {'test': 'jobs_depends'})
b = self.q.put(qless.Job, {'test': 'jobs_depends'}, depends=[a])
self.assertEqual(self.client.queues.counts[0]['depends'], 1)
self.assertEqual(self.client.queues['testing'].counts['depends'], 1)
self.assertEqual(self.q.jobs.depends(), [b])
# Now, move the job
self.client.jobs[b].move('other')
self.assertEqual(self.client.queues.counts[0]['depends'], 0)
self.assertEqual(self.client.queues['testing'].counts['depends'], 0)
self.assertEqual(self.q.jobs.depends(), [])
class TestRetry(TestQless):
# It should decrement retries, and put it back in the queue. If retries
# have been exhausted, then it should be marked as failed.
# Prohibitions:
# 1) We can't retry from another worker
# 2) We can't retry if it's not running
def test_retry(self):
jid = self.q.put(qless.Job, {'test': 'test_retry'})
job = self.q.pop()
self.assertEqual(job.original_retries, job.retries_left)
job.retry()
# Pop it off again
self.assertEqual(self.q.jobs.scheduled(), [])
self.assertEqual(self.client.jobs[job.jid].state, 'waiting')
job = self.q.pop()
self.assertNotEqual(job, None)
self.assertEqual(job.original_retries, job.retries_left + 1)
# Retry it again, with a backoff
job.retry(60)
self.assertEqual(self.q.pop(), None)
self.assertEqual(self.q.jobs.scheduled(), [jid])
job = self.client.jobs[jid]
self.assertEqual(job.original_retries, job.retries_left + 2)
self.assertEqual(job.state, 'scheduled')
def test_retry_fail(self):
# Make sure that if we exhaust a job's retries, that it fails
jid = self.q.put(qless.Job, {'test': 'test_retry_fail'}, retries=2)
self.assertEqual(self.client.jobs.failed(), {})
self.assertEqual(self.q.pop().retry(), 1)
self.assertEqual(self.q.pop().retry(), 0)
self.assertEqual(self.q.pop().retry(), -1)
self.assertEqual(self.client.jobs.failed(), {
'failed-retries-testing': 1
})
def test_retry_error(self):
# These are some of the conditions under which we cannot retry a job
job = self.client.jobs[self.q.put(qless.Job, {'test': 'test_retry_error'})]
self.assertEqual(job.retry(), False)
self.q.pop().fail('foo', 'bar')
self.assertEqual(self.client.jobs[job.jid].retry(), False)
self.client.jobs[job.jid].move('testing')
job = self.q.pop(); job.worker_name = 'foobar'
self.assertEqual(job.retry(), False)
job.worker_name = self.client.worker_name
job.complete()
self.assertEqual(job.retry(), False)
def test_retry_workers(self):
# When we retry a job, it shouldn't be reported as belonging to that worker
# any longer
jid = self.q.put(qless.Job, {'test': 'test_retry_workers'})
job = self.q.pop()
self.assertEqual(self.client.workers[self.client.worker_name], {'jobs': [jid], 'stalled': []})
self.assertEqual(job.retry(), 4)
self.assertEqual(self.client.workers[self.client.worker_name], {'jobs': [], 'stalled': []})
def test_retry_complete(self):
# We shouldn't be able to complete a job that has been selected for
# retry
jid = self.q.put(qless.Job, {'test': 'test_retry_complete'})
job = self.q.pop()
job.retry()
self.assertEqual(job.complete(), False)
self.assertEqual(job.retry(), False)
job = self.client.jobs[jid]
self.assertEqual(job.state, 'waiting')
class TestPriority(TestQless):
# Basically all we need to test:
# 1) If the job doesn't exist, then attempts to set the priority should
# return false. This doesn't really matter for us since we're using the
# __setattr__ magic method
# 2) If the job's in a queue, but not yet popped, we should update its
# priority in that queue.
# 3) If a job's in a queue, but already popped, then we just update the
# job's priority.
def test_priority(self):
a = self.q.put(qless.Job, {'test': 'test_priority'}, priority = 10)
b = self.q.put(qless.Job, {'test': 'test_priority'})
self.assertEqual(self.q.peek().jid, a)
job = self.client.jobs[b]
job.priority = 20
self.assertEqual(len(self.q), 2)
self.assertEqual(self.q.peek().jid, b)
job = self.q.pop()
self.assertEqual(len(self.q), 2)
self.assertEqual(job.jid, b)
job = self.q.pop()
self.assertEqual(len(self.q), 2)
self.assertEqual(job.jid, a)
job.priority = 30
# Make sure it didn't get doubly-inserted in the queue
self.assertEqual(len(self.q), 2)
self.assertEqual(self.q.peek(), None)
self.assertEqual(self.q.pop(), None)
class TestTag(TestQless):
# 1) Should make sure that when we double-tag an item, that we don't
# see it show up twice when we get it back with the job
# 2) Should also preserve tags in the order in which they were inserted
# 3) When a job expires or is canceled, it should be removed from the
# set of jobs with that tag
def test_tag(self):
job = self.client.jobs[self.q.put(qless.Job, {'test': 'tag'})]
self.assertEqual(self.client.jobs.tagged('foo'), {'total': 0, 'jobs': {}})
self.assertEqual(self.client.jobs.tagged('bar'), {'total': 0, 'jobs': {}})
job.tag('foo')
self.assertEqual(self.client.jobs.tagged('foo'), {'total': 1, 'jobs': [job.jid]})
self.assertEqual(self.client.jobs.tagged('bar'), {'total': 0, 'jobs': {}})
job.tag('bar')
self.assertEqual(self.client.jobs.tagged('foo'), {'total': 1, 'jobs': [job.jid]})
self.assertEqual(self.client.jobs.tagged('bar'), {'total': 1, 'jobs': [job.jid]})
job.untag('foo')
self.assertEqual(self.client.jobs.tagged('foo'), {'total': 0, 'jobs': {}})
self.assertEqual(self.client.jobs.tagged('bar'), {'total': 1, 'jobs': [job.jid]})
job.untag('bar')
self.assertEqual(self.client.jobs.tagged('foo'), {'total': 0, 'jobs': {}})
self.assertEqual(self.client.jobs.tagged('bar'), {'total': 0, 'jobs': {}})
def test_preserve_order(self):
job = self.client.jobs[self.q.put(qless.Job, {'test': 'preserve_order'})]
tags = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']
for i in range(len(tags)):
job.tag(tags[i])
self.assertEqual(self.client.jobs[job.jid].tags, tags[0:i+1])
# Now let's take a select few out
job.untag('a', 'c', 'e', 'g')
self.assertEqual(self.client.jobs[job.jid].tags, ['b', 'd', 'f', 'h'])
def test_cancel_expire(self):
# First, we'll cancel a job
job = self.client.jobs[self.q.put(qless.Job, {'test': 'cancel_expire'})]
job.tag('foo', 'bar')
self.assertEqual(self.client.jobs.tagged('foo'), {'total': 1, 'jobs': [job.jid]})
self.assertEqual(self.client.jobs.tagged('bar'), {'total': 1, 'jobs': [job.jid]})
job.cancel()
self.assertEqual(self.client.jobs.tagged('foo'), {'total': 0, 'jobs': {}})
self.assertEqual(self.client.jobs.tagged('bar'), {'total': 0, 'jobs': {}})
# Now, we'll have a job expire from completion
self.client.config['jobs-history-count'] = 0
self.q.put(qless.Job, {'test': 'cancel_expire'})
job = self.q.pop()
self.assertNotEqual(job, None)
job.tag('foo', 'bar')
self.assertEqual(self.client.jobs.tagged('foo'), {'total': 1, 'jobs': [job.jid]})
self.assertEqual(self.client.jobs.tagged('bar'), {'total': 1, 'jobs': [job.jid]})
job.complete()
self.assertEqual(self.client.jobs[job.jid], None)
self.assertEqual(self.client.jobs.tagged('foo'), {'total': 0, 'jobs': {}})
self.assertEqual(self.client.jobs.tagged('bar'), {'total': 0, 'jobs': {}})
# If the job no longer exists, attempts to tag it should not add to the set
job.tag('foo', 'bar')
self.assertEqual(self.client.jobs.tagged('foo'), {'total': 0, 'jobs': {}})
self.assertEqual(self.client.jobs.tagged('bar'), {'total': 0, 'jobs': {}})
def test_tag_put(self):
# We should make sure that we can tag a job when we initially put it, too
self.assertEqual(self.client.jobs.tagged('foo'), {'total': 0, 'jobs': {}})
self.assertEqual(self.client.jobs.tagged('bar'), {'total': 0, 'jobs': {}})
jid = self.q.put(qless.Job, {'test': 'tag_put'}, tags=['foo', 'bar'])
self.assertEqual(self.client.jobs.tagged('foo'), {'total': 1, 'jobs': [jid]})
self.assertEqual(self.client.jobs.tagged('bar'), {'total': 1, 'jobs': [jid]})
def test_tag_top(self):
# 1) Make sure that it only includes tags with more than one job associated with it
# 2) Make sure that when jobs are untagged, it decrements the count
# 3) When we tag a job, it increments the count
# 4) When jobs complete and expire, it decrements the count
# 5) When jobs are put, make sure it shows up in the tags
# 6) When canceled, decrements
self.assertEqual(self.client.tags(), {})
jids = [self.q.put(qless.Job, {}, tags=['foo']) for i in range(10)]
self.assertEqual(self.client.tags(), ['foo'])
jobs = [self.client.jobs[jid].cancel() for jid in jids]
self.assertEqual(self.client.tags(), {})
# Add only one back
a = self.q.put(qless.Job, {}, tags=['foo'])
self.assertEqual(self.client.tags(), {})
# Add a second, and then tag it
b = self.client.jobs[self.q.put(qless.Job, {})]
b.tag('foo')
self.assertEqual(self.client.tags(), ['foo'])
b.untag('foo')
self.assertEqual(self.client.tags(), {})
b.tag('foo')
# Test job expiration
self.client.config['jobs-history-count'] = 0
self.assertEqual(len(self.q), 2)
self.q.pop().complete()
self.assertEqual(self.client.tags(), {})
class TestFail(TestQless):
def test_fail_failed(self):
# In this test, we want to make sure that we can correctly
# fail a job
# 1) Put a job
# 2) Fail a job
# 3) Ensure the queue is empty, and that there's something
# in the failed endpoint
# 4) Ensure that the job still has its original queue
self.assertEqual(len(self.q), 0, 'Start with an empty queue')
self.assertEqual(len(self.client.jobs.failed()), 0)
jid = self.q.put(qless.Job, {'test': 'fail_failed'})
job = self.q.pop()
job.fail('foo', 'Some sort of message')
self.assertEqual(self.q.pop(), None)
self.assertEqual(self.client.jobs.failed(), {
'foo': 1
})
results = self.client.jobs.failed('foo')
self.assertEqual(results['total'], 1)
job = results['jobs'][0]
self.assertEqual(job.jid , jid)
self.assertEqual(job.queue_name , 'testing')
self.assertEqual(job.queue.name , 'testing')
self.assertEqual(job.data , {'test': 'fail_failed'})
self.assertEqual(job.worker_name, '')
self.assertEqual(job.state , 'failed')
self.assertEqual(job.retries_left , 5)
self.assertEqual(job.original_retries , 5)
self.assertEqual(job.klass_name , 'qless.job.Job')
self.assertEqual(job.klass , qless.Job)
self.assertEqual(job.tags , [])