diff --git a/benchmark.py b/benchmark.py index cbc825f..8a49fd1 100644 --- a/benchmark.py +++ b/benchmark.py @@ -9,30 +9,34 @@ @click.command() +@click.option('--batch-size', type=click.INT, default=1024) +@click.option('--page-size', type=click.INT, default=2 ** 16) +@click.option('--payload-length', type=click.INT, default=1024) @click.option('-c', '--count', type=click.INT, default=None) -@click.option('-p', '--payload-length', type=click.INT, default=1024) -@click.option('-s', '--size', type=click.INT, default=2 ** 16) @click.option('-t', '--topic', default='benchmark') -def benchmark(count, topic, size, payload_length): +def benchmark(count, topic, batch_size, page_size, payload_length): client = StrictRedis() - payload = "x" * payload_length + batch = ["x" * payload_length] * batch_size topic = Topic(client, topic) try: - topic.create(size) + topic.create(page_size) except Exception: pass gc.disable() start = time.time() + if count: + assert count % batch_size == 0 + generator = xrange(1, count + 1) if count else itertools.count(1) i = 0 try: for i in generator: - topic.produce(payload) + topic.produce(batch) if i % 10000 == 0: print 'Produced', i, 'records.' except KeyboardInterrupt: @@ -40,8 +44,8 @@ def benchmark(count, topic, size, payload_length): end = time.time() - print 'Produced', i, payload_length / 1024.0, 'KB records in', end - start, 'seconds' - print i / (end - start), 'messages/sec' + print 'Produced', i * batch_size, payload_length / 1024.0, 'KB records in', end - start, 'seconds' + print (i * batch_size) / (end - start), 'messages/sec' if __name__ == '__main__': diff --git a/kafkaesque.py b/kafkaesque.py index f334504..4da9f08 100644 --- a/kafkaesque.py +++ b/kafkaesque.py @@ -29,8 +29,8 @@ def create(self, size=1024, ttl=None): def consume(self, offset, limit=1024): return self.__pull((self.topic,), (offset, limit)) - def produce(self, record): - return self.__push((self.topic,), (record,)) + def produce(self, batch): + return self.__push((self.topic,), batch) def offset(self, consumer): score = self.client.zscore('{}/consumers'.format(self.topic), consumer) @@ -57,10 +57,21 @@ def create(topic, page_size, ttl): @cli.command(help="Write messages to a topic.") @click.argument('topic') @click.argument('input', type=click.File('rb'), default='-') -def produce(topic, input): +@click.option('--batch-size', type=click.INT, default=1) +def produce(topic, input, batch_size): topic = Topic(StrictRedis(), topic) + batch = [] + + def flush(): + print topic.produce(batch), batch + del batch[:] + for line in itertools.imap(operator.methodcaller('strip'), input): - print topic.produce(line), line + batch.append(line) + if len(batch) == batch_size: + flush() + + flush() @cli.command(help="Read messages from a topic.") diff --git a/scripts/push.lua b/scripts/push.lua index fb3cb7f..a31b823 100644 --- a/scripts/push.lua +++ b/scripts/push.lua @@ -21,6 +21,7 @@ local function close_page () redis.log(redis.LOG_DEBUG, string.format('Set %s#%s to expire in %s seconds.', topic, number, configuration['ttl'])) end number = number + 1 + length = 0 end local function check_page () @@ -28,6 +29,7 @@ local function check_page () close_page() start_page() end + return configuration['size'] - length end local last = redis.call('ZREVRANGE', topic .. '/pages', '0', '0', 'WITHSCORES') @@ -39,9 +41,13 @@ else start_page() end -for i=1,#items do - check_page() - redis.call('RPUSH', topic .. '/pages/' .. number, items[i]) +local cursor = 0 +while #items > cursor do + local remaining = check_page() + for i=1,remaining do + redis.call('RPUSH', topic .. '/pages/' .. number, items[cursor + i]) + end + cursor = cursor + remaining end return offset diff --git a/test_kafkaesque.py b/test_kafkaesque.py index 940454d..25d1f11 100644 --- a/test_kafkaesque.py +++ b/test_kafkaesque.py @@ -37,14 +37,14 @@ def test_produce(client): topic.create(size) payload, offset = generator.next() - topic.produce(payload) + topic.produce((payload,)) items.append((payload, offset)) assert client.zrangebyscore('{}/pages'.format(name), '-inf', 'inf', withscores=True) == [('0', 0.0)] assert list(enumerate(client.lrange('{}/pages/{}'.format(name, 0), 0, size))) == items for payload, offset in itertools.islice(generator, size): - topic.produce(payload) + topic.produce((payload,)) items.append((payload, offset)) assert client.zrangebyscore('{}/pages'.format(name), '-inf', 'inf', withscores=True) == [('0', 0.0), ('1', float(size))] @@ -66,7 +66,7 @@ def test_consume_page_sizes(client): topic.create(size) for offset, payload in itertools.islice(generator, size + 1): - topic.produce(payload) + topic.produce((payload,)) items.append([offset, payload]) offset, batch = list(topic.consume(0, limit=size)) @@ -90,7 +90,7 @@ def test_consume_across_pages(client): topic.create(size) for offset, payload in itertools.islice(generator, size + 1): - topic.produce(payload) + topic.produce((payload,)) items.append([offset, payload]) # Check with batches crossing pages. @@ -107,7 +107,7 @@ def test_ttl(client): topic.create(size, ttl=ttl) for i in xrange(0, size + 1): - topic.produce(i) + topic.produce((i,)) assert ttl - 1 <= client.ttl('{}/pages/{}'.format(name, 0)) <= ttl assert client.ttl('{}/pages/{}'.format(name, 1)) == -1