-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_kafkaesque.py
113 lines (80 loc) · 2.7 KB
/
test_kafkaesque.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import itertools
import pytest
from redis.client import StrictRedis
from kafkaesque.topic import Topic
@pytest.yield_fixture
def client():
client = StrictRedis()
try:
yield client
finally:
client.flushdb()
def test_create(client):
topic = Topic(client, 'topic')
topic.create()
with pytest.raises(Exception):
topic.create()
def test_produce(client):
name = 'example'
size = 10
items = []
generator = itertools.imap(
lambda i: (i, str(i)),
itertools.count(),
)
topic = Topic(client, name)
topic.create(size)
payload, offset = generator.next()
topic.produce((payload,))
items.append((payload, offset))
assert client.zrangebyscore('{}/pages'.format(name), '-inf', 'inf', withscores=True) == [('0', 0.0)]
assert list(enumerate(client.lrange('{}/pages/{}'.format(name, 0), 0, size))) == items
for payload, offset in itertools.islice(generator, size):
topic.produce((payload,))
items.append((payload, offset))
assert client.zrangebyscore('{}/pages'.format(name), '-inf', 'inf', withscores=True) == [('0', 0.0), ('1', float(size))]
assert list(enumerate(client.lrange('{}/pages/{}'.format(name, 0), 0, size))) == items[:size]
assert list(enumerate(client.lrange('{}/pages/{}'.format(name, 1), 0, size), size)) == items[size:]
def test_consume_page_sizes(client):
name = 'example'
size = 10
items = []
generator = itertools.imap(
lambda i: [i, str(i)],
itertools.count(),
)
topic = Topic(client, name)
topic.create(size)
for offset, payload in itertools.islice(generator, size + 1):
topic.produce((payload,))
items.append([offset, payload])
offset, batch = list(topic.consume(0, limit=size))
assert items[:size] == batch
offset, batch = list(topic.consume(offset, limit=size))
assert items[size:] == batch
def test_consume_across_pages(client):
name = 'example'
size = 10
items = []
generator = itertools.imap(
lambda i: [i, str(i)],
itertools.count(),
)
topic = Topic(client, name)
topic.create(size)
for offset, payload in itertools.islice(generator, size + 1):
topic.produce((payload,))
items.append([offset, payload])
# Check with batches crossing pages.
offset, batch = topic.consume(5)
assert batch == items[5:]
def test_ttl(client):
name = 'example'
size = 10
ttl = 60
topic = Topic(client, name)
topic.create(size, ttl=ttl)
for i in xrange(0, size + 1):
topic.produce((i,))
assert ttl - 1 <= client.ttl('{}/pages/{}'.format(name, 0)) <= ttl
assert client.ttl('{}/pages/{}'.format(name, 1)) == -1