Skip to content

Commit

Permalink
allow batch production
Browse files Browse the repository at this point in the history
  • Loading branch information
tkaemming committed Dec 9, 2015
1 parent 1b434d1 commit df6aa5c
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 20 deletions.
20 changes: 12 additions & 8 deletions benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,39 +9,43 @@


@click.command()
@click.option('--batch-size', type=click.INT, default=1024)
@click.option('--page-size', type=click.INT, default=2 ** 16)
@click.option('--payload-length', type=click.INT, default=1024)
@click.option('-c', '--count', type=click.INT, default=None)
@click.option('-p', '--payload-length', type=click.INT, default=1024)
@click.option('-s', '--size', type=click.INT, default=2 ** 16)
@click.option('-t', '--topic', default='benchmark')
def benchmark(count, topic, size, payload_length):
def benchmark(count, topic, batch_size, page_size, payload_length):
client = StrictRedis()

payload = "x" * payload_length
batch = ["x" * payload_length] * batch_size

topic = Topic(client, topic)
try:
topic.create(size)
topic.create(page_size)
except Exception:
pass

gc.disable()
start = time.time()

if count:
assert count % batch_size == 0

generator = xrange(1, count + 1) if count else itertools.count(1)

i = 0
try:
for i in generator:
topic.produce(payload)
topic.produce(batch)
if i % 10000 == 0:
print 'Produced', i, 'records.'
except KeyboardInterrupt:
pass

end = time.time()

print 'Produced', i, payload_length / 1024.0, 'KB records in', end - start, 'seconds'
print i / (end - start), 'messages/sec'
print 'Produced', i * batch_size, payload_length / 1024.0, 'KB records in', end - start, 'seconds'
print (i * batch_size) / (end - start), 'messages/sec'


if __name__ == '__main__':
Expand Down
19 changes: 15 additions & 4 deletions kafkaesque.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ def create(self, size=1024, ttl=None):
def consume(self, offset, limit=1024):
return self.__pull((self.topic,), (offset, limit))

def produce(self, record):
return self.__push((self.topic,), (record,))
def produce(self, batch):
return self.__push((self.topic,), batch)

def offset(self, consumer):
score = self.client.zscore('{}/consumers'.format(self.topic), consumer)
Expand All @@ -57,10 +57,21 @@ def create(topic, page_size, ttl):
@cli.command(help="Write messages to a topic.")
@click.argument('topic')
@click.argument('input', type=click.File('rb'), default='-')
def produce(topic, input):
@click.option('--batch-size', type=click.INT, default=1)
def produce(topic, input, batch_size):
topic = Topic(StrictRedis(), topic)
batch = []

def flush():
print topic.produce(batch), batch
del batch[:]

for line in itertools.imap(operator.methodcaller('strip'), input):
print topic.produce(line), line
batch.append(line)
if len(batch) == batch_size:
flush()

flush()


@cli.command(help="Read messages from a topic.")
Expand Down
12 changes: 9 additions & 3 deletions scripts/push.lua
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ local function close_page ()
redis.log(redis.LOG_DEBUG, string.format('Set %s#%s to expire in %s seconds.', topic, number, configuration['ttl']))
end
number = number + 1
length = 0
end

local function check_page ()
if length >= configuration['size'] then
close_page()
start_page()
end
return configuration['size'] - length
end

local last = redis.call('ZREVRANGE', topic .. '/pages', '0', '0', 'WITHSCORES')
Expand All @@ -39,9 +41,13 @@ else
start_page()
end

for i=1,#items do
check_page()
redis.call('RPUSH', topic .. '/pages/' .. number, items[i])
local cursor = 0
while #items > cursor do
local remaining = check_page()
for i=1,remaining do
redis.call('RPUSH', topic .. '/pages/' .. number, items[cursor + i])
end
cursor = cursor + remaining
end

return offset
10 changes: 5 additions & 5 deletions test_kafkaesque.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ def test_produce(client):
topic.create(size)

payload, offset = generator.next()
topic.produce(payload)
topic.produce((payload,))
items.append((payload, offset))

assert client.zrangebyscore('{}/pages'.format(name), '-inf', 'inf', withscores=True) == [('0', 0.0)]
assert list(enumerate(client.lrange('{}/pages/{}'.format(name, 0), 0, size))) == items

for payload, offset in itertools.islice(generator, size):
topic.produce(payload)
topic.produce((payload,))
items.append((payload, offset))

assert client.zrangebyscore('{}/pages'.format(name), '-inf', 'inf', withscores=True) == [('0', 0.0), ('1', float(size))]
Expand All @@ -66,7 +66,7 @@ def test_consume_page_sizes(client):
topic.create(size)

for offset, payload in itertools.islice(generator, size + 1):
topic.produce(payload)
topic.produce((payload,))
items.append([offset, payload])

offset, batch = list(topic.consume(0, limit=size))
Expand All @@ -90,7 +90,7 @@ def test_consume_across_pages(client):
topic.create(size)

for offset, payload in itertools.islice(generator, size + 1):
topic.produce(payload)
topic.produce((payload,))
items.append([offset, payload])

# Check with batches crossing pages.
Expand All @@ -107,7 +107,7 @@ def test_ttl(client):
topic.create(size, ttl=ttl)

for i in xrange(0, size + 1):
topic.produce(i)
topic.produce((i,))

assert ttl - 1 <= client.ttl('{}/pages/{}'.format(name, 0)) <= ttl
assert client.ttl('{}/pages/{}'.format(name, 1)) == -1

0 comments on commit df6aa5c

Please sign in to comment.