Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Соблюдение новых ограничений на длительность запросов #217

Merged
merged 3 commits into from
Mar 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fast_bitrix24/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.5.16"
__version__ = "1.6a1"
60 changes: 60 additions & 0 deletions fast_bitrix24/leaky_bucket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import asyncio
import collections
import contextlib
import time


RequestRecord = collections.namedtuple("RequestRecord", "when, duration")


class LeakyBucketLimiter:
"""The class emulates a leaky bucket where the consumer may only run requests
until he has used up X seconds of request running time in total
during a period of Y seconds.

When the consumer has hit the limit, he will have to wait.
"""

def __init__(self, max_request_running_time: float, measurement_period: float):
# how much time fits into the bucket before it starts failing
self.max_request_running_time = max_request_running_time

# over what period of time should the max_request_running_time be measured
self.measurement_period = measurement_period

# request register. left - most recent, right - least recent
self.request_register = collections.deque()

@contextlib.asynccontextmanager
async def acquire(self):
"""A context manager that will wait until it's safe to make the next request"""
await asyncio.sleep(self.get_needed_sleep_time())

try:
yield
finally:
self.clean_up()

def get_needed_sleep_time(self) -> float:
"""How much time to sleep before it's safe to make a request"""
acc = 0
for record in self.request_register:
acc += record.duration
if acc >= self.max_request_running_time:
return record.when + self.measurement_period - time.monotonic()
return 0

def clean_up(self):
"""Remove all stale records from the record register"""
if not self.request_register:
return

cut_off = time.monotonic() - self.measurement_period
while self.request_register[-1].when < cut_off:
self.request_register.pop()

Check warning on line 54 in fast_bitrix24/leaky_bucket.py

View check run for this annotation

Codecov / codecov/patch

fast_bitrix24/leaky_bucket.py#L54

Added line #L54 was not covered by tests

def register(self, request_duration: float):
"""Register how long the last request has taken"""
self.request_register.appendleft(
RequestRecord(time.monotonic(), request_duration)
)
18 changes: 8 additions & 10 deletions fast_bitrix24/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,17 @@
logger.setLevel(DEBUG)
logger.addHandler(NullHandler())

logger.debug(f"fast_bitrix24 version: {__version__}")

try:
from IPython import get_ipython

logger.debug(f"IPython: {get_ipython()}")
except ImportError:
logger.debug("No IPython found")


def log(func):
async def wrapper(*args, **kwargs):
logger.info(f"Starting {func.__name__}({args}, {kwargs})")
logger.debug(f"fast_bitrix24 version: {__version__}")
try:
from IPython import get_ipython

logger.debug(f"IPython: {get_ipython()}")

Check warning on line 15 in fast_bitrix24/logger.py

View check run for this annotation

Codecov / codecov/patch

fast_bitrix24/logger.py#L15

Added line #L15 was not covered by tests
except ImportError:
logger.debug("No IPython found")

return await func(*args, **kwargs)

return wrapper
86 changes: 35 additions & 51 deletions fast_bitrix24/srh.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
import time
from asyncio import Event, sleep, TimeoutError
from collections import deque
from asyncio import Event, TimeoutError, sleep
from contextlib import asynccontextmanager

import aiohttp
from aiohttp.client_exceptions import (
ClientConnectionError,
ClientPayloadError,
ClientResponseError,
ServerTimeoutError,
)

from .leaky_bucket import LeakyBucketLimiter
leshchenko1979 marked this conversation as resolved.
Show resolved Hide resolved
from .logger import logger
from .utils import _url_valid

BITRIX_POOL_SIZE = 50
BITRIX_RPS = 2.0
BITRIX_MAX_BATCH_SIZE = 50
BITRIX_MAX_CONCURRENT_REQUESTS = 50

BITRIX_MAX_REQUEST_RUNNING_TIME = 480
BITRIX_MEASUREMENT_PERIOD = 10 * 60

MAX_RETRIES = 10

RESTORE_CONNECTIONS_FACTOR = 1.3 # скорость восстановления количества запросов
Expand All @@ -33,6 +32,14 @@
pass


RETRIED_ERRORS = (
ClientPayloadError,
ClientConnectionError,
ServerError,
TimeoutError,
)


class ServerRequestHandler:
"""
Используется для контроля скорости доступа к серверам Битрикс.
Expand All @@ -48,19 +55,13 @@
self.webhook = self.standardize_webhook(webhook)
self.respect_velocity_policy = respect_velocity_policy

self.requests_per_second = BITRIX_RPS
self.pool_size = BITRIX_POOL_SIZE

self.active_runs = 0

# если пользователь при инициализации передал клиента со своими настройками,
# то будем использовать его клиента
self.client_provided_by_user = bool(client)
self.session = client

# rr - requests register - список отправленных запросов к серверу
self.rr = deque()

# лимит количества одновременных запросов,
# установленный конструктором или пользователем
self.mcr_max = BITRIX_MAX_CONCURRENT_REQUESTS
Expand All @@ -76,6 +77,9 @@
# если отрицательное - количество последовательно полученных ошибок
self.successive_results = 0

# rate limiters by method
self.limiters = {} # dict[str, LeakyBucketLimiter]

@staticmethod
def standardize_webhook(webhook):
"""Приводит `webhook` к стандартному виду."""
Expand Down Expand Up @@ -120,36 +124,37 @@
if not self.active_runs and self.session and not self.session.closed:
await self.session.close()

async def single_request(self, method, params=None) -> dict:
async def single_request(self, method: str, params=None) -> dict:
"""Делает единичный запрос к серверу,
с повторными попытками при необходимости."""

while True:

try:
result = await self.request_attempt(method, params)
result = await self.request_attempt(method.strip().lower(), params)
leshchenko1979 marked this conversation as resolved.
Show resolved Hide resolved
self.success()
return result

except (
ClientPayloadError,
ClientConnectionError,
ServerError,
TimeoutError,
) as err:
except RETRIED_ERRORS as err: # all other exceptions will propagate
leshchenko1979 marked this conversation as resolved.
Show resolved Hide resolved
self.failure(err)

async def request_attempt(self, method, params=None) -> dict:
"""Делает попытку запроса к серверу, ожидая при необходимости."""

try:
async with self.acquire():
async with self.acquire(method):

Check warning on line 145 in fast_bitrix24/srh.py

View check run for this annotation

Codecov / codecov/patch

fast_bitrix24/srh.py#L145

Added line #L145 was not covered by tests
logger.debug(f"Requesting {{'method': {method}, 'params': {params}}}")

async with self.session.post(
url=self.webhook + method, json=params
) as response:
json = await response.json(encoding="utf-8")

logger.debug("Response: %s", json)

request_run_time = json["time"]["operating"]
self.limiters[method].register(request_run_time)

Check warning on line 156 in fast_bitrix24/srh.py

View check run for this annotation

Codecov / codecov/patch

fast_bitrix24/srh.py#L155-L156

Added lines #L155 - L156 were not covered by tests

return json

except ClientResponseError as error:
Expand All @@ -175,15 +180,21 @@
) from err

@asynccontextmanager
async def acquire(self):
async def acquire(self, method: str):
leshchenko1979 marked this conversation as resolved.
Show resolved Hide resolved
"""Ожидает, пока не станет безопасно делать запрос к серверу."""

await self.autothrottle()

async with self.limit_concurrent_requests():
if self.respect_velocity_policy:
async with self.limit_request_velocity():
if method not in self.limiters:
self.limiters[method] = LeakyBucketLimiter(

Check warning on line 191 in fast_bitrix24/srh.py

View check run for this annotation

Codecov / codecov/patch

fast_bitrix24/srh.py#L190-L191

Added lines #L190 - L191 were not covered by tests
BITRIX_MAX_REQUEST_RUNNING_TIME, BITRIX_MEASUREMENT_PERIOD
)

async with self.limiters[method].acquire():

Check warning on line 195 in fast_bitrix24/srh.py

View check run for this annotation

Codecov / codecov/patch

fast_bitrix24/srh.py#L195

Added line #L195 was not covered by tests
yield

else:
yield

Expand Down Expand Up @@ -220,7 +231,7 @@

@asynccontextmanager
async def limit_concurrent_requests(self):
"""Не позволяет оновременно выполнять
"""Не позволяет одновременно выполнять
более `self.mcr_cur_limit` запросов."""

while self.concurrent_requests > self.mcr_cur_limit:
Expand All @@ -235,30 +246,3 @@
finally:
self.concurrent_requests -= 1
self.request_complete.set()

@asynccontextmanager
async def limit_request_velocity(self):
"""Ограничивает скорость запросов к серверу."""

# если пул заполнен, ждать
while len(self.rr) >= self.pool_size:
time_from_last_request = time.monotonic() - self.rr[0]
time_to_wait = 1 / self.requests_per_second - time_from_last_request
if time_to_wait > 0:
await sleep(time_to_wait)
else:
break

# зарегистрировать запрос в очереди
start_time = time.monotonic()
self.rr.appendleft(start_time)

# отдать управление
try:
yield

# подчистить пул
finally:
trim_time = start_time - self.pool_size / self.requests_per_second
while self.rr[-1] < trim_time:
self.rr.pop()
Loading
Loading