diff --git a/kafkaesque/__main__.py b/kafkaesque/__main__.py index 42a63a9..aa9bc7f 100644 --- a/kafkaesque/__main__.py +++ b/kafkaesque/__main__.py @@ -72,10 +72,12 @@ def consume(topic, follow, fetch_size): start = time.time() cursor = 0 + n = 0 try: while True: + lower = cursor cursor, batch = topic.consume(cursor, fetch_size) - logger.debug('Retrieved %s items from %s to %s.', len(batch), cursor, cursor + len(batch)) + logger.debug('Retrieved %s items from %s to %s.', len(batch), lower, cursor) if not batch: if not follow: logger.debug('Retrieved empty batch (end of stream.)') @@ -84,7 +86,7 @@ def consume(topic, follow, fetch_size): logger.debug('Retrieved empty batch.') time.sleep(0.1) - for offset, item in batch: + for n, (offset, item) in enumerate(batch, 1): print offset, item except KeyboardInterrupt: pass @@ -92,9 +94,9 @@ def consume(topic, follow, fetch_size): stop = time.time() logger.info( 'Consumed %s records in %s seconds (%s records/second.)', - cursor, + n, stop - start, - cursor / (stop - start), + n / (stop - start), )