Skip to content

Commit

Permalink
Соблюдение новых ограничений на длительность запросов (#217)
Browse files Browse the repository at this point in the history
* Соблюдение новых ограничений на длительность запросов
Fixes #187

* Fixes for older Python versions

* More tests
  • Loading branch information
leshchenko1979 authored Mar 2, 2024
1 parent 754de24 commit c1e8569
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 290 deletions.
2 changes: 1 addition & 1 deletion fast_bitrix24/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.5.16"
__version__ = "1.6a1"
60 changes: 60 additions & 0 deletions fast_bitrix24/leaky_bucket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import asyncio
import collections
import contextlib
import time


RequestRecord = collections.namedtuple("RequestRecord", "when, duration")


class LeakyBucketLimiter:
"""The class emulates a leaky bucket where the consumer may only run requests
until he has used up X seconds of request running time in total
during a period of Y seconds.
When the consumer has hit the limit, he will have to wait.
"""

def __init__(self, max_request_running_time: float, measurement_period: float):
# how much time fits into the bucket before it starts failing
self.max_request_running_time = max_request_running_time

# over what period of time should the max_request_running_time be measured
self.measurement_period = measurement_period

# request register. left - most recent, right - least recent
self.request_register = collections.deque()

@contextlib.asynccontextmanager
async def acquire(self):
"""A context manager that will wait until it's safe to make the next request"""
await asyncio.sleep(self.get_needed_sleep_time())

try:
yield
finally:
self.clean_up()

def get_needed_sleep_time(self) -> float:
"""How much time to sleep before it's safe to make a request"""
acc = 0
for record in self.request_register:
acc += record.duration
if acc >= self.max_request_running_time:
return record.when + self.measurement_period - time.monotonic()
return 0

def clean_up(self):
"""Remove all stale records from the record register"""
if not self.request_register:
return

cut_off = time.monotonic() - self.measurement_period
while self.request_register[-1].when < cut_off:
self.request_register.pop()

def register(self, request_duration: float):
"""Register how long the last request has taken"""
self.request_register.appendleft(
RequestRecord(time.monotonic(), request_duration)
)
18 changes: 8 additions & 10 deletions fast_bitrix24/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,17 @@
logger.setLevel(DEBUG)
logger.addHandler(NullHandler())

logger.debug(f"fast_bitrix24 version: {__version__}")

try:
from IPython import get_ipython

logger.debug(f"IPython: {get_ipython()}")
except ImportError:
logger.debug("No IPython found")


def log(func):
async def wrapper(*args, **kwargs):
logger.info(f"Starting {func.__name__}({args}, {kwargs})")
logger.debug(f"fast_bitrix24 version: {__version__}")
try:
from IPython import get_ipython

logger.debug(f"IPython: {get_ipython()}")
except ImportError:
logger.debug("No IPython found")

return await func(*args, **kwargs)

return wrapper
86 changes: 35 additions & 51 deletions fast_bitrix24/srh.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
import time
from asyncio import Event, sleep, TimeoutError
from collections import deque
from asyncio import Event, TimeoutError, sleep
from contextlib import asynccontextmanager

import aiohttp
from aiohttp.client_exceptions import (
ClientConnectionError,
ClientPayloadError,
ClientResponseError,
ServerTimeoutError,
)

from .leaky_bucket import LeakyBucketLimiter
from .logger import logger
from .utils import _url_valid

BITRIX_POOL_SIZE = 50
BITRIX_RPS = 2.0
BITRIX_MAX_BATCH_SIZE = 50
BITRIX_MAX_CONCURRENT_REQUESTS = 50

BITRIX_MAX_REQUEST_RUNNING_TIME = 480
BITRIX_MEASUREMENT_PERIOD = 10 * 60

MAX_RETRIES = 10

RESTORE_CONNECTIONS_FACTOR = 1.3 # скорость восстановления количества запросов
Expand All @@ -33,6 +32,14 @@ class ServerError(Exception):
pass


RETRIED_ERRORS = (
ClientPayloadError,
ClientConnectionError,
ServerError,
TimeoutError,
)


class ServerRequestHandler:
"""
Используется для контроля скорости доступа к серверам Битрикс.
Expand All @@ -48,19 +55,13 @@ def __init__(self, webhook, respect_velocity_policy, client):
self.webhook = self.standardize_webhook(webhook)
self.respect_velocity_policy = respect_velocity_policy

self.requests_per_second = BITRIX_RPS
self.pool_size = BITRIX_POOL_SIZE

self.active_runs = 0

# если пользователь при инициализации передал клиента со своими настройками,
# то будем использовать его клиента
self.client_provided_by_user = bool(client)
self.session = client

# rr - requests register - список отправленных запросов к серверу
self.rr = deque()

# лимит количества одновременных запросов,
# установленный конструктором или пользователем
self.mcr_max = BITRIX_MAX_CONCURRENT_REQUESTS
Expand All @@ -76,6 +77,9 @@ def __init__(self, webhook, respect_velocity_policy, client):
# если отрицательное - количество последовательно полученных ошибок
self.successive_results = 0

# rate limiters by method
self.limiters = {} # dict[str, LeakyBucketLimiter]

@staticmethod
def standardize_webhook(webhook):
"""Приводит `webhook` к стандартному виду."""
Expand Down Expand Up @@ -120,36 +124,37 @@ async def handle_sessions(self):
if not self.active_runs and self.session and not self.session.closed:
await self.session.close()

async def single_request(self, method, params=None) -> dict:
async def single_request(self, method: str, params=None) -> dict:
"""Делает единичный запрос к серверу,
с повторными попытками при необходимости."""

while True:

try:
result = await self.request_attempt(method, params)
result = await self.request_attempt(method.strip().lower(), params)
self.success()
return result

except (
ClientPayloadError,
ClientConnectionError,
ServerError,
TimeoutError,
) as err:
except RETRIED_ERRORS as err: # all other exceptions will propagate
self.failure(err)

async def request_attempt(self, method, params=None) -> dict:
"""Делает попытку запроса к серверу, ожидая при необходимости."""

try:
async with self.acquire():
async with self.acquire(method):
logger.debug(f"Requesting {{'method': {method}, 'params': {params}}}")

async with self.session.post(
url=self.webhook + method, json=params
) as response:
json = await response.json(encoding="utf-8")

logger.debug("Response: %s", json)

request_run_time = json["time"]["operating"]
self.limiters[method].register(request_run_time)

return json

except ClientResponseError as error:
Expand All @@ -175,15 +180,21 @@ def failure(self, err: Exception):
) from err

@asynccontextmanager
async def acquire(self):
async def acquire(self, method: str):
"""Ожидает, пока не станет безопасно делать запрос к серверу."""

await self.autothrottle()

async with self.limit_concurrent_requests():
if self.respect_velocity_policy:
async with self.limit_request_velocity():
if method not in self.limiters:
self.limiters[method] = LeakyBucketLimiter(
BITRIX_MAX_REQUEST_RUNNING_TIME, BITRIX_MEASUREMENT_PERIOD
)

async with self.limiters[method].acquire():
yield

else:
yield

Expand Down Expand Up @@ -220,7 +231,7 @@ async def autothrottle(self):

@asynccontextmanager
async def limit_concurrent_requests(self):
"""Не позволяет оновременно выполнять
"""Не позволяет одновременно выполнять
более `self.mcr_cur_limit` запросов."""

while self.concurrent_requests > self.mcr_cur_limit:
Expand All @@ -235,30 +246,3 @@ async def limit_concurrent_requests(self):
finally:
self.concurrent_requests -= 1
self.request_complete.set()

@asynccontextmanager
async def limit_request_velocity(self):
"""Ограничивает скорость запросов к серверу."""

# если пул заполнен, ждать
while len(self.rr) >= self.pool_size:
time_from_last_request = time.monotonic() - self.rr[0]
time_to_wait = 1 / self.requests_per_second - time_from_last_request
if time_to_wait > 0:
await sleep(time_to_wait)
else:
break

# зарегистрировать запрос в очереди
start_time = time.monotonic()
self.rr.appendleft(start_time)

# отдать управление
try:
yield

# подчистить пул
finally:
trim_time = start_time - self.pool_size / self.requests_per_second
while self.rr[-1] < trim_time:
self.rr.pop()
Loading

0 comments on commit c1e8569

Please sign in to comment.