Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Пауза при получении ошибки #127

Merged
merged 2 commits into from
Jan 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions fast_bitrix24/bitrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,12 @@ def slow(self, max_concurrent_requests: int = 1):

mcr_max_backup, self.srh.mcr_max = \
self.srh.mcr_max, max_concurrent_requests
self.srh.mcr_cur_limit = min(self.srh.mcr_max, self.srh.mcr_cur_limit)

yield True

self.srh.mcr_max = mcr_max_backup
self.srh.mcr_cur_limit = min(self.srh.mcr_max, self.srh.mcr_cur_limit)


class BitrixAsync:
Expand Down Expand Up @@ -329,7 +331,9 @@ async def slow(self, max_concurrent_requests: int = 1):

mcr_max_backup, self.srh.mcr_max = \
self.srh.mcr_max, max_concurrent_requests
self.srh.mcr_cur_limit = min(self.srh.mcr_max, self.srh.mcr_cur_limit)

yield True

self.srh.mcr_max = mcr_max_backup
self.srh.mcr_cur_limit = min(self.srh.mcr_max, self.srh.mcr_cur_limit)
3 changes: 2 additions & 1 deletion fast_bitrix24/mult_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ def top_up_tasks(self):
'''Добавляем в self.tasks столько задач, сколько свободных слотов для
запросов есть сейчас в self.srh.'''

to_add = max(self.srh.mcr_cur_limit - self.srh.concurrent_requests, 0)
to_add = max(
int(self.srh.mcr_cur_limit) - self.srh.concurrent_requests, 0)
for _ in range(to_add):
try:
self.tasks.add(next(self.task_iterator))
Expand Down
41 changes: 31 additions & 10 deletions fast_bitrix24/srh.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@
BITRIX_POOL_SIZE = 50
BITRIX_RPS = 2.0
BITRIX_MAX_BATCH_SIZE = 50
BITRIX_MAX_CONCURRENT_REQUESTS = 20
BITRIX_MAX_CONCURRENT_REQUESTS = 100

MAX_RETRIES = 3
INITIAL_TIMEOUT = 1
BACKOFF = 3
MAX_RETRIES = 10

RESTORE_CONNECTIONS_FACTOR = 1.3 # скорость восстановления количества запросов
DECREASE_CONNECTIONS_FACTOR = 3 # скорость уменьшения количества запросов
INITIAL_TIMEOUT = 0.5 # начальный таймаут в секундах
BACKOFF_FACTOR = 2 # основа расчета таймаута
# количество ошибок, до достижения котрого таймауты не делаются
NUM_FAILURES_NO_TIMEOUT = 3


class ServerError(Exception):
Expand Down Expand Up @@ -51,11 +56,19 @@ def __init__(self, webhook, verbose):
# rr - requests register - список отправленных запросов к серверу
self.rr = deque()

# лимит количества одновременных запросов,
# установленный конструктором или пользователем
self.mcr_max = BITRIX_MAX_CONCURRENT_REQUESTS

# временный лимит количества одновременных запросов,
# установленный через autothrottling
self.mcr_cur_limit = BITRIX_MAX_CONCURRENT_REQUESTS

self.concurrent_requests = 0
self.request_complete = Event()

# если положительное - количество последовательных удачных запросов
# если отрицательное - количество последовательно полученных ошибок
self.successive_results = 0

def _standardize_webhook(self, webhook):
Expand Down Expand Up @@ -141,7 +154,7 @@ def failure(self):
async def acquire(self):
'''Ожидает, пока не станет безопасно делать запрос к серверу.'''

self.autothrottle()
await self.autothrottle()

async with self.limit_concurrent_requests():
# если пул заполнен, ждать
Expand All @@ -167,15 +180,23 @@ async def acquire(self):
while self.rr and self.rr[len(self.rr) - 1] < trim_time:
self.rr.pop()

def autothrottle(self):
'''Если было несколько неудач, уменьшай скорость и количество
async def autothrottle(self):
'''Если было несколько неудач, делаем таймаут и уменьшаем скорость и количество
одновременных запросов, и наоборот.'''

if self.successive_results > 0:
self.mcr_cur_limit = max(2, min(int(self.mcr_cur_limit * 1.5),
self.mcr_max))

self.mcr_cur_limit = min(
self.mcr_cur_limit * RESTORE_CONNECTIONS_FACTOR, self.mcr_max)

elif self.successive_results < 0:
self.mcr_cur_limit = max(int(self.mcr_cur_limit // 1.5), 1)

self.mcr_cur_limit = max(
self.mcr_cur_limit / DECREASE_CONNECTIONS_FACTOR, 1)

if self.successive_results < NUM_FAILURES_NO_TIMEOUT:
power = -self.successive_results - NUM_FAILURES_NO_TIMEOUT - 1
await sleep(INITIAL_TIMEOUT * BACKOFF_FACTOR ** power)

@asynccontextmanager
async def limit_concurrent_requests(self):
Expand Down