diff --git a/fast_bitrix24/__version__.py b/fast_bitrix24/__version__.py index 35ddfd4..5bb6d59 100644 --- a/fast_bitrix24/__version__.py +++ b/fast_bitrix24/__version__.py @@ -1 +1 @@ -__version__ = "1.5.16" +__version__ = "1.6a1" diff --git a/fast_bitrix24/leaky_bucket.py b/fast_bitrix24/leaky_bucket.py new file mode 100644 index 0000000..db832f9 --- /dev/null +++ b/fast_bitrix24/leaky_bucket.py @@ -0,0 +1,60 @@ +import asyncio +import collections +import contextlib +import time + + +RequestRecord = collections.namedtuple("RequestRecord", "when, duration") + + +class LeakyBucketLimiter: + """The class emulates a leaky bucket where the consumer may only run requests + until he has used up X seconds of request running time in total + during a period of Y seconds. + + When the consumer has hit the limit, he will have to wait. + """ + + def __init__(self, max_request_running_time: float, measurement_period: float): + # how much time fits into the bucket before it starts failing + self.max_request_running_time = max_request_running_time + + # over what period of time should the max_request_running_time be measured + self.measurement_period = measurement_period + + # request register. left - most recent, right - least recent + self.request_register = collections.deque() + + @contextlib.asynccontextmanager + async def acquire(self): + """A context manager that will wait until it's safe to make the next request""" + await asyncio.sleep(self.get_needed_sleep_time()) + + try: + yield + finally: + self.clean_up() + + def get_needed_sleep_time(self) -> float: + """How much time to sleep before it's safe to make a request""" + acc = 0 + for record in self.request_register: + acc += record.duration + if acc >= self.max_request_running_time: + return record.when + self.measurement_period - time.monotonic() + return 0 + + def clean_up(self): + """Remove all stale records from the record register""" + if not self.request_register: + return + + cut_off = time.monotonic() - self.measurement_period + while self.request_register[-1].when < cut_off: + self.request_register.pop() + + def register(self, request_duration: float): + """Register how long the last request has taken""" + self.request_register.appendleft( + RequestRecord(time.monotonic(), request_duration) + ) diff --git a/fast_bitrix24/logger.py b/fast_bitrix24/logger.py index c29be76..9326210 100644 --- a/fast_bitrix24/logger.py +++ b/fast_bitrix24/logger.py @@ -5,19 +5,17 @@ logger.setLevel(DEBUG) logger.addHandler(NullHandler()) -logger.debug(f"fast_bitrix24 version: {__version__}") - -try: - from IPython import get_ipython - - logger.debug(f"IPython: {get_ipython()}") -except ImportError: - logger.debug("No IPython found") - - def log(func): async def wrapper(*args, **kwargs): logger.info(f"Starting {func.__name__}({args}, {kwargs})") + logger.debug(f"fast_bitrix24 version: {__version__}") + try: + from IPython import get_ipython + + logger.debug(f"IPython: {get_ipython()}") + except ImportError: + logger.debug("No IPython found") + return await func(*args, **kwargs) return wrapper diff --git a/fast_bitrix24/srh.py b/fast_bitrix24/srh.py index 032fa01..d640712 100644 --- a/fast_bitrix24/srh.py +++ b/fast_bitrix24/srh.py @@ -1,6 +1,4 @@ -import time -from asyncio import Event, sleep, TimeoutError -from collections import deque +from asyncio import Event, TimeoutError, sleep from contextlib import asynccontextmanager import aiohttp @@ -8,17 +6,18 @@ ClientConnectionError, ClientPayloadError, ClientResponseError, - ServerTimeoutError, ) +from .leaky_bucket import LeakyBucketLimiter from .logger import logger from .utils import _url_valid -BITRIX_POOL_SIZE = 50 -BITRIX_RPS = 2.0 BITRIX_MAX_BATCH_SIZE = 50 BITRIX_MAX_CONCURRENT_REQUESTS = 50 +BITRIX_MAX_REQUEST_RUNNING_TIME = 480 +BITRIX_MEASUREMENT_PERIOD = 10 * 60 + MAX_RETRIES = 10 RESTORE_CONNECTIONS_FACTOR = 1.3 # скорость восстановления количества запросов @@ -33,6 +32,14 @@ class ServerError(Exception): pass +RETRIED_ERRORS = ( + ClientPayloadError, + ClientConnectionError, + ServerError, + TimeoutError, +) + + class ServerRequestHandler: """ Используется для контроля скорости доступа к серверам Битрикс. @@ -48,9 +55,6 @@ def __init__(self, webhook, respect_velocity_policy, client): self.webhook = self.standardize_webhook(webhook) self.respect_velocity_policy = respect_velocity_policy - self.requests_per_second = BITRIX_RPS - self.pool_size = BITRIX_POOL_SIZE - self.active_runs = 0 # если пользователь при инициализации передал клиента со своими настройками, @@ -58,9 +62,6 @@ def __init__(self, webhook, respect_velocity_policy, client): self.client_provided_by_user = bool(client) self.session = client - # rr - requests register - список отправленных запросов к серверу - self.rr = deque() - # лимит количества одновременных запросов, # установленный конструктором или пользователем self.mcr_max = BITRIX_MAX_CONCURRENT_REQUESTS @@ -76,6 +77,9 @@ def __init__(self, webhook, respect_velocity_policy, client): # если отрицательное - количество последовательно полученных ошибок self.successive_results = 0 + # rate limiters by method + self.limiters = {} # dict[str, LeakyBucketLimiter] + @staticmethod def standardize_webhook(webhook): """Приводит `webhook` к стандартному виду.""" @@ -120,36 +124,37 @@ async def handle_sessions(self): if not self.active_runs and self.session and not self.session.closed: await self.session.close() - async def single_request(self, method, params=None) -> dict: + async def single_request(self, method: str, params=None) -> dict: """Делает единичный запрос к серверу, с повторными попытками при необходимости.""" while True: try: - result = await self.request_attempt(method, params) + result = await self.request_attempt(method.strip().lower(), params) self.success() return result - except ( - ClientPayloadError, - ClientConnectionError, - ServerError, - TimeoutError, - ) as err: + except RETRIED_ERRORS as err: # all other exceptions will propagate self.failure(err) async def request_attempt(self, method, params=None) -> dict: """Делает попытку запроса к серверу, ожидая при необходимости.""" try: - async with self.acquire(): + async with self.acquire(method): logger.debug(f"Requesting {{'method': {method}, 'params': {params}}}") + async with self.session.post( url=self.webhook + method, json=params ) as response: json = await response.json(encoding="utf-8") + logger.debug("Response: %s", json) + + request_run_time = json["time"]["operating"] + self.limiters[method].register(request_run_time) + return json except ClientResponseError as error: @@ -175,15 +180,21 @@ def failure(self, err: Exception): ) from err @asynccontextmanager - async def acquire(self): + async def acquire(self, method: str): """Ожидает, пока не станет безопасно делать запрос к серверу.""" await self.autothrottle() async with self.limit_concurrent_requests(): if self.respect_velocity_policy: - async with self.limit_request_velocity(): + if method not in self.limiters: + self.limiters[method] = LeakyBucketLimiter( + BITRIX_MAX_REQUEST_RUNNING_TIME, BITRIX_MEASUREMENT_PERIOD + ) + + async with self.limiters[method].acquire(): yield + else: yield @@ -220,7 +231,7 @@ async def autothrottle(self): @asynccontextmanager async def limit_concurrent_requests(self): - """Не позволяет оновременно выполнять + """Не позволяет одновременно выполнять более `self.mcr_cur_limit` запросов.""" while self.concurrent_requests > self.mcr_cur_limit: @@ -235,30 +246,3 @@ async def limit_concurrent_requests(self): finally: self.concurrent_requests -= 1 self.request_complete.set() - - @asynccontextmanager - async def limit_request_velocity(self): - """Ограничивает скорость запросов к серверу.""" - - # если пул заполнен, ждать - while len(self.rr) >= self.pool_size: - time_from_last_request = time.monotonic() - self.rr[0] - time_to_wait = 1 / self.requests_per_second - time_from_last_request - if time_to_wait > 0: - await sleep(time_to_wait) - else: - break - - # зарегистрировать запрос в очереди - start_time = time.monotonic() - self.rr.appendleft(start_time) - - # отдать управление - try: - yield - - # подчистить пул - finally: - trim_time = start_time - self.pool_size / self.requests_per_second - while self.rr[-1] < trim_time: - self.rr.pop() diff --git a/tests/test_async.py b/tests/test_async.py index 4088adf..8cce052 100644 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -6,7 +6,7 @@ import pytest from fast_bitrix24 import BitrixAsync -from fast_bitrix24.srh import BITRIX_POOL_SIZE, BITRIX_RPS, ServerRequestHandler +from fast_bitrix24.srh import ServerRequestHandler @pytest.mark.skipif( @@ -37,229 +37,3 @@ async def test_simultaneous_calls(self, create_100_leads_async): assert len(result) == 3 assert result[0] == result[1] == result[2] assert all(len(r) >= 100 for r in result) - - -def get_custom_bitrix(pool_size, requests_per_second, respect_velocity_policy=True): - bitrix = BitrixAsync( - "http://www.bitrix24.ru/path", respect_velocity_policy=respect_velocity_policy - ) - - bitrix.srh.pool_size = pool_size - bitrix.srh.requests_per_second = requests_per_second - - return bitrix - - -async def assert_time_acquire(bitrix, acquire_amount, time_expected): - t1 = monotonic() - - for _ in range(acquire_amount): - async with bitrix.srh.acquire(): - pass - - t2 = monotonic() - - assert time_expected <= t2 - t1 < time_expected + 0.2 - - -@pytest.mark.asyncio -class TestAcquire: - async def test_acquire_sequential(self): - - await assert_time_acquire(get_custom_bitrix(1, 1), 1, 0) - await assert_time_acquire(get_custom_bitrix(10, 1), 10, 0) - await assert_time_acquire(get_custom_bitrix(1, 5), 5, 0.8) - await assert_time_acquire(get_custom_bitrix(50, 10), 60, 1) - - await assert_time_acquire(get_custom_bitrix(1, 1, False), 100, 0) - - async def test_acquire_intermittent(self): - - bitrix = get_custom_bitrix(10, 10) - - await assert_time_acquire(bitrix, 10, 0) - await sleep(0.3) - await assert_time_acquire(bitrix, 10, 0.7) - - async def test_acquire_speed(self): - CYCLES = 100 - POOL_SIZE = 50 - INTERMITTENT_TIME = 0 - RPS = 100 - - i = CYCLES - bitrix = get_custom_bitrix(POOL_SIZE, RPS) - - start = monotonic() - - while i > POOL_SIZE: - async with bitrix.srh.acquire(): - i -= 1 - - elapsed = monotonic() - start - assert elapsed < 1 - - await sleep(INTERMITTENT_TIME) - - while i: - async with bitrix.srh.acquire(): - i -= 1 - - elapsed = monotonic() - start - expected = INTERMITTENT_TIME + (CYCLES - POOL_SIZE) / RPS - assert expected - elapsed < 1 - - -class MockStaticResponse(object): - def __init__(self, stored_json=None): - self.stored_json = stored_json - - async def json(self, **args): - return self.stored_json - - -class MockSession(object): - def __init__(self, post_callback): - self.post_callback = post_callback - self.pool = BITRIX_POOL_SIZE - self.rps = BITRIX_RPS - self.num_requests = 0 - - @asynccontextmanager - async def post(self, url, json): - self.pool -= 1 - self.num_requests += 1 - - if self.pool < 0: - raise RuntimeError(f"Pool exhausted after {self.num_requests} requests") - - yield self.post_callback(self, url, json) - - -class MockSRH(ServerRequestHandler): - def __init__(self, post_callback): - super().__init__( - "http://www.google.com/", respect_velocity_policy=True, client=None - ) - self.post_callback = post_callback - - @asynccontextmanager - async def handle_sessions(self): - self.session = MockSession(self.post_callback) - yield - - async def restore_pool(self): - while True: - if self.session.pool < BITRIX_POOL_SIZE: - self.session.pool += 1 - await sleep(1 / self.session.rps) - - -@pytest.mark.asyncio -class TestMocks: - async def test_mock(self): - - bitrix = BitrixAsync("http://www.google.com/") - bitrix.srh = MockSRH( - lambda *args: MockStaticResponse({"result": ["OK"], "total": 1}) - ) - - assert await bitrix.get_all("abc") == ["OK"] - - async def test_mock_get_all(self): - - record_ID = iter(range(1_000_000)) - - def post_callback(self: ServerRequestHandler, url: str, json: dict): - - if "batch" not in url: - page = [{"ID": next(record_ID)} for _ in range(50)] - response = {"result": page, "total": 5000} - - else: - cmds = { - command: [{"ID": next(record_ID)} for _ in range(50)] - for command in json["cmd"] - } - response = {"result": {"result": cmds, "total": 5000}} - - return MockStaticResponse(response) - - bitrix = get_custom_bitrix(10_000, 10_000) - bitrix.srh = MockSRH(post_callback) - - result = await bitrix.get_all("abc") - assert bitrix.srh.session.num_requests == 3 - assert len(result) == 5000 - - async def test_get_by_ID(self): - - ParsedCommand = namedtuple("ParsedCommand", ["metod", "params"]) - - def post_callback(self: ServerRequestHandler, url: str, json: dict): - def parse_command(value: str): - split = value.split("?") - method, param_str = split[0], split[1] - pairs = param_str.split("&") - split = (pair.split("=") for pair in pairs if pair) - params = dict(split) - return ParsedCommand(method, params) - - commands = {key: parse_command(value) for key, value in json["cmd"].items()} - - items = { - label: {"ID": parsed.params["ID"]} for label, parsed in commands.items() - } - - response = {"result": {"result": items}} - - return MockStaticResponse(response) - - POOL_SIZE = 50 - RPS = 100 - PAGE_SIZE = 50 - SIZE = POOL_SIZE * PAGE_SIZE + POOL_SIZE * 2 - print(SIZE) - - COMPUTATION_TIME = 2 - timeout = max(SIZE / 50 - POOL_SIZE, 0) / RPS + COMPUTATION_TIME - print(timeout) - assert 0 < timeout < 3 # мы не хотим, чтобы тест шел вечно - - bitrix = get_custom_bitrix(POOL_SIZE, RPS) - bitrix.srh = MockSRH(post_callback) - - bitrix_task = create_task(bitrix.get_by_ID("abc", list(range(SIZE)))) - restore_pool_task = create_task(bitrix.srh.restore_pool()) - - await wait({bitrix_task, restore_pool_task}, timeout=timeout) - - result = bitrix_task.result() - restore_pool_task.cancel() - - assert len(result) == SIZE - - async def test_limit_request_velocity(self): - async def mock_request(srh: ServerRequestHandler): - async with srh.limit_request_velocity(): - print(len(srh.rr), min(srh.rr), max(srh.rr), max(srh.rr) - min(srh.rr)) - - srh = MockSRH(None) - tasks = set() - - SIZE = 70 - srh.requests_per_second = 100 - - for _ in range(SIZE): - tasks |= {ensure_future(mock_request(srh))} - - timeout = (SIZE - srh.pool_size) / srh.requests_per_second + 1 - - start = monotonic() - - await wait(tasks, timeout=timeout) - - elapsed = monotonic() - start - print(elapsed) - - assert timeout - 1 <= elapsed < timeout diff --git a/tests/test_exceptions.py b/tests/test_exceptions.py index 429872b..d39e620 100644 --- a/tests/test_exceptions.py +++ b/tests/test_exceptions.py @@ -3,6 +3,7 @@ import aiohttp.client_exceptions as exc import pytest + from fast_bitrix24 import Bitrix @@ -19,4 +20,4 @@ async def test_retries(exception): # должна исчерпать все попытки и выдать RuntimeError with pytest.raises(RuntimeError): - await srh.single_request(None) + await srh.single_request("abc") diff --git a/tests/test_leaky_bucket.py b/tests/test_leaky_bucket.py new file mode 100644 index 0000000..3880fb3 --- /dev/null +++ b/tests/test_leaky_bucket.py @@ -0,0 +1,100 @@ +import asyncio +import math +import time + +import pytest + +from fast_bitrix24.leaky_bucket import LeakyBucketLimiter, RequestRecord + + +@pytest.mark.parametrize( + "max_request_running_time, measurement_period, requests, measurements", + [ + [2, 10, [], [[0, 0]]], + [2, 10, [], [[5, 0]]], + [2, 10, [], [[15, 0]]], + [2, 10, [[0, 1], [2, 1]], [[5, 5]]], + [2, 10, [[0, 1], [2, 1]], [[7, 3]]], + [2, 10, [[0, 1], [2, 1], [3, 1]], [[5, 7]]], + [ + 2, + 10, + [[0, 1], [2, 1], [3, 1], [10, 0.9]], + [[0, 0], [1, 0], [2.1, 7.9], [10.1, 1.9]], + ], + ], +) +def test_needed_sleep_time( + max_request_running_time, + measurement_period, + requests, + measurements, + monkeypatch, +): + limiter = LeakyBucketLimiter(max_request_running_time, measurement_period) + + while requests or measurements: + if (requests and measurements and requests[0][0] < measurements[0][0]) or ( + requests and not measurements + ): + when, duration = requests.pop(0) + monkeypatch.setattr("time.monotonic", lambda: when) + limiter.register(duration) + else: + call_point, expected = measurements.pop(0) + monkeypatch.setattr("time.monotonic", lambda: call_point) + print("Request record:", limiter.request_register) + print("Time", call_point) + assert math.isclose(limiter.get_needed_sleep_time(), expected) + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "max_request_running_time, measurement_period, request_durations, expected_sleep_time, test_id", + [ + # Happy path tests + (10, 20, [1, 2, 3], 0, "happy-1"), + (10, 20, [5, 5.1, 5], 9.9, "happy-2"), + # Edge cases + (10, 20, [10], 10, "edge-1"), + (10, 20, [10, 0.1], 9.9, "edge-2"), + ], +) +async def test_leaky_bucket_limiter( + max_request_running_time, + measurement_period, + request_durations, + expected_sleep_time, + test_id, + monkeypatch, +): + # Set up mocks + start_time = time.monotonic() + + def fake_time(): + return start_time + + monkeypatch.setattr(time, "monotonic", fake_time) + + sleep_log = [] + + async def fake_sleep(duration): + sleep_log.append(duration) + + monkeypatch.setattr(asyncio, "sleep", fake_sleep) + + # Arrange + limiter = LeakyBucketLimiter(max_request_running_time, measurement_period) + + # Act + for duration in request_durations: + async with limiter.acquire(): + pass + limiter.register(duration) + start_time += duration + await asyncio.sleep(duration) + + # Assert + assert math.isclose( + limiter.get_needed_sleep_time(), expected_sleep_time + ), f"Test failed for {test_id}"