Skip to content

Commit

Permalink
Merge pull request #127 from leshchenko1979:leshchenko1979/issue126
Browse files Browse the repository at this point in the history
bugfix
  • Loading branch information
leshchenko1979 authored Jan 10, 2021
2 parents 6e7d338 + 1e504ce commit 7782a11
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 11 deletions.
4 changes: 4 additions & 0 deletions fast_bitrix24/bitrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,12 @@ def slow(self, max_concurrent_requests: int = 1):

mcr_max_backup, self.srh.mcr_max = \
self.srh.mcr_max, max_concurrent_requests
self.srh.mcr_cur_limit = min(self.srh.mcr_max, self.srh.mcr_cur_limit)

yield True

self.srh.mcr_max = mcr_max_backup
self.srh.mcr_cur_limit = min(self.srh.mcr_max, self.srh.mcr_cur_limit)


class BitrixAsync:
Expand Down Expand Up @@ -329,7 +331,9 @@ async def slow(self, max_concurrent_requests: int = 1):

mcr_max_backup, self.srh.mcr_max = \
self.srh.mcr_max, max_concurrent_requests
self.srh.mcr_cur_limit = min(self.srh.mcr_max, self.srh.mcr_cur_limit)

yield True

self.srh.mcr_max = mcr_max_backup
self.srh.mcr_cur_limit = min(self.srh.mcr_max, self.srh.mcr_cur_limit)
3 changes: 2 additions & 1 deletion fast_bitrix24/mult_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ def top_up_tasks(self):
'''Добавляем в self.tasks столько задач, сколько свободных слотов для
запросов есть сейчас в self.srh.'''

to_add = max(self.srh.mcr_cur_limit - self.srh.concurrent_requests, 0)
to_add = max(
int(self.srh.mcr_cur_limit) - self.srh.concurrent_requests, 0)
for _ in range(to_add):
try:
self.tasks.add(next(self.task_iterator))
Expand Down
41 changes: 31 additions & 10 deletions fast_bitrix24/srh.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@
BITRIX_POOL_SIZE = 50
BITRIX_RPS = 2.0
BITRIX_MAX_BATCH_SIZE = 50
BITRIX_MAX_CONCURRENT_REQUESTS = 20
BITRIX_MAX_CONCURRENT_REQUESTS = 100

MAX_RETRIES = 3
INITIAL_TIMEOUT = 1
BACKOFF = 3
MAX_RETRIES = 10

RESTORE_CONNECTIONS_FACTOR = 1.3 # скорость восстановления количества запросов
DECREASE_CONNECTIONS_FACTOR = 3 # скорость уменьшения количества запросов
INITIAL_TIMEOUT = 0.5 # начальный таймаут в секундах
BACKOFF_FACTOR = 2 # основа расчета таймаута
# количество ошибок, до достижения котрого таймауты не делаются
NUM_FAILURES_NO_TIMEOUT = 3


class ServerError(Exception):
Expand Down Expand Up @@ -51,11 +56,19 @@ def __init__(self, webhook, verbose):
# rr - requests register - список отправленных запросов к серверу
self.rr = deque()

# лимит количества одновременных запросов,
# установленный конструктором или пользователем
self.mcr_max = BITRIX_MAX_CONCURRENT_REQUESTS

# временный лимит количества одновременных запросов,
# установленный через autothrottling
self.mcr_cur_limit = BITRIX_MAX_CONCURRENT_REQUESTS

self.concurrent_requests = 0
self.request_complete = Event()

# если положительное - количество последовательных удачных запросов
# если отрицательное - количество последовательно полученных ошибок
self.successive_results = 0

def _standardize_webhook(self, webhook):
Expand Down Expand Up @@ -141,7 +154,7 @@ def failure(self):
async def acquire(self):
'''Ожидает, пока не станет безопасно делать запрос к серверу.'''

self.autothrottle()
await self.autothrottle()

async with self.limit_concurrent_requests():
# если пул заполнен, ждать
Expand All @@ -167,15 +180,23 @@ async def acquire(self):
while self.rr and self.rr[len(self.rr) - 1] < trim_time:
self.rr.pop()

def autothrottle(self):
'''Если было несколько неудач, уменьшай скорость и количество
async def autothrottle(self):
'''Если было несколько неудач, делаем таймаут и уменьшаем скорость и количество
одновременных запросов, и наоборот.'''

if self.successive_results > 0:
self.mcr_cur_limit = max(2, min(int(self.mcr_cur_limit * 1.5),
self.mcr_max))

self.mcr_cur_limit = min(
self.mcr_cur_limit * RESTORE_CONNECTIONS_FACTOR, self.mcr_max)

elif self.successive_results < 0:
self.mcr_cur_limit = max(int(self.mcr_cur_limit // 1.5), 1)

self.mcr_cur_limit = max(
self.mcr_cur_limit / DECREASE_CONNECTIONS_FACTOR, 1)

if self.successive_results < NUM_FAILURES_NO_TIMEOUT:
power = -self.successive_results - NUM_FAILURES_NO_TIMEOUT - 1
await sleep(INITIAL_TIMEOUT * BACKOFF_FACTOR ** power)

@asynccontextmanager
async def limit_concurrent_requests(self):
Expand Down

0 comments on commit 7782a11

Please sign in to comment.