From 6e2a9242b5926be81c622a7d97359e895a1cd133 Mon Sep 17 00:00:00 2001 From: David Hall Date: Tue, 7 Jan 2025 08:59:38 -0800 Subject: [PATCH 1/3] retry tokenizer backoff --- src/levanter/store/cache.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/src/levanter/store/cache.py b/src/levanter/store/cache.py index 2a5afd947..c271be9c7 100644 --- a/src/levanter/store/cache.py +++ b/src/levanter/store/cache.py @@ -1237,10 +1237,22 @@ async def _copy_one_array(dest_array: JaggedArrayStore, source_array: JaggedArra source_offsets = source_array.offsets[1 : source_num_rows + 1][ts.d[:].translate_to[0]] source_offsets = _virtual_offset(source_offsets, data_offset) - async with ts.Transaction() as txn: - dest_offsets = dest_array.offsets - out_end = row_offset + 1 + source_num_rows - offset_future = dest_offsets.with_transaction(txn)[row_offset + 1 : out_end].write(source_offsets) + delay = 1 + while True: + try: + async with ts.Transaction() as txn: + dest_offsets = dest_array.offsets + out_end = row_offset + 1 + source_num_rows + offset_future = dest_offsets.with_transaction(txn)[row_offset + 1 : out_end].write( + source_offsets + ) + + break + except ValueError as e: + if "Please reduce your request rate." in str(e): + logger.info("Rate limit exceeded. Retrying.") + await asyncio.sleep(delay) + delay *= 2 futures.append(offset_future) From 5d14efccc79081262309dc4d3862621b388f8768 Mon Sep 17 00:00:00 2001 From: David Hall Date: Tue, 7 Jan 2025 09:02:21 -0800 Subject: [PATCH 2/3] maybe set a cap --- src/levanter/store/cache.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/levanter/store/cache.py b/src/levanter/store/cache.py index c271be9c7..7543d4408 100644 --- a/src/levanter/store/cache.py +++ b/src/levanter/store/cache.py @@ -1253,6 +1253,8 @@ async def _copy_one_array(dest_array: JaggedArrayStore, source_array: JaggedArra logger.info("Rate limit exceeded. Retrying.") await asyncio.sleep(delay) delay *= 2 + if delay > 60: + raise futures.append(offset_future) From 796e4e06d88cb74dbbe7f6d3025a16b1bac2b5ff Mon Sep 17 00:00:00 2001 From: David Hall Date: Tue, 14 Jan 2025 22:21:35 -0800 Subject: [PATCH 3/3] more --- src/levanter/store/cache.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/levanter/store/cache.py b/src/levanter/store/cache.py index 7543d4408..a7c88baef 100644 --- a/src/levanter/store/cache.py +++ b/src/levanter/store/cache.py @@ -1237,7 +1237,7 @@ async def _copy_one_array(dest_array: JaggedArrayStore, source_array: JaggedArra source_offsets = source_array.offsets[1 : source_num_rows + 1][ts.d[:].translate_to[0]] source_offsets = _virtual_offset(source_offsets, data_offset) - delay = 1 + delay = 4 while True: try: async with ts.Transaction() as txn: @@ -1253,7 +1253,7 @@ async def _copy_one_array(dest_array: JaggedArrayStore, source_array: JaggedArra logger.info("Rate limit exceeded. Retrying.") await asyncio.sleep(delay) delay *= 2 - if delay > 60: + if delay > 120: raise futures.append(offset_future)