Skip to content

Commit

Permalink
F14: add color extraction semaphore
Browse files Browse the repository at this point in the history
  • Loading branch information
stkrizh committed Jul 8, 2020
1 parent 9b56320 commit a9fcaae
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 8 deletions.
4 changes: 4 additions & 0 deletions colorific/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ def init(setup_workers: bool = True) -> web.Application:
if config.colorific.image_indexing:
application.cleanup_ctx.append(setup_image_indexing)

application["color_extraction_semaphore"] = asyncio.Semaphore(
config.rate_limit.color_extraction_concurrency
)

return application


Expand Down
3 changes: 2 additions & 1 deletion colorific/rate_limit.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,5 @@ async def ensure(self, request: Request) -> None:
await redis.expire(key, self.time_interval)

if value > self.limit:
raise RateLimitExceeded(self.error)
ttl = int(await redis.ttl(key))
raise RateLimitExceeded(self.error, ttl)
1 change: 1 addition & 0 deletions colorific/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class UnsplashConfig:

@dataclass(frozen=True)
class RateLimitConfig:
color_extraction_concurrency: int
color_extraction_ip_time_interval: int
color_extraction_ip_limit: int
image_search_ip_time_interval: int
Expand Down
22 changes: 16 additions & 6 deletions colorific/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@


LOG = logging.getLogger(__file__)

SERVICE_UNAVAILABLE_ERROR = (
"We're sorry, the service is temporarily unavailable due to many requests. "
"Please try again a bit later."
)

if TYPE_CHECKING:
_base = View
Expand Down Expand Up @@ -61,10 +64,12 @@ async def ensure_rate_limit(self) -> None:
if self.rate_limit is not None:
try:
await self.rate_limit.ensure(self.request)
except RateLimitExceeded:
except RateLimitExceeded as error:
error_msg, ttl = error.args
raise HTTPTooManyRequests(
text=json.dumps({"error": self.rate_limit.error}),
text=json.dumps({"error": error_msg}),
content_type="application/json",
headers={"Retry-After": str(ttl)},
)


Expand All @@ -89,10 +94,15 @@ async def options(self) -> Response:
async def put(self) -> Response:
await self.ensure_rate_limit()

if self.request.content_type == "application/json":
return await self.handle_json_request()
semaphore = self.request.app["color_extraction_semaphore"]
if semaphore.locked():
return json_response({"error": SERVICE_UNAVAILABLE_ERROR}, status=503)

async with semaphore:
if self.request.content_type == "application/json":
return await self.handle_json_request()

return await self.handle_binary_request()
return await self.handle_binary_request()

async def handle_json_request(self) -> Response:
schema = UploadURLRequestSchema()
Expand Down
2 changes: 2 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ default:
pool_exec_size: 2

rate_limit:
color_extraction_concurrency: 20
color_extraction_ip_time_interval: 300
color_extraction_ip_limit: 10
image_search_ip_time_interval: 60
Expand Down Expand Up @@ -61,6 +62,7 @@ test:
pool_exec_size: 1

rate_limit:
color_extraction_concurrency: 3
color_extraction_ip_time_interval: 2
color_extraction_ip_limit: 3
image_search_ip_time_interval: 2
Expand Down
57 changes: 56 additions & 1 deletion tests/test_rate_limit.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
from collections import Counter
from itertools import cycle, islice
from itertools import count, cycle, islice
from unittest.mock import AsyncMock, patch

from aiohttp.web import json_response
Expand Down Expand Up @@ -121,3 +121,58 @@ def side_effect(*args, **kwargs):
assert statuses[200] == limit_1 + limit_2

assert len(await redis.keys("*")) == 2


async def test_color_extraction_concurrency(client, redis):
concurrency = config.rate_limit.color_extraction_concurrency
ips = map(str, count(start=1))

async def side_effect(*args, **kwargs):
await asyncio.sleep(0.5)
return json_response({"fake": "OK"})

with patch.object(
ColorExtractionView, "handle_json_request", new_callable=AsyncMock
) as mock:
mock.side_effect = side_effect
coros = [
client.put("/image", json={"url": VALID_URL}, headers={"X-Real-IP": ip})
for ip in islice(ips, concurrency + 2)
]

responses = await asyncio.gather(*coros)
statuses = Counter(resp.status for resp in responses)
assert statuses[200] == concurrency
assert statuses[503] == 2

assert len(await redis.keys("*")) == concurrency + 2


async def test_retry_after_header(client, redis):
limit_1 = config.rate_limit.color_extraction_ip_limit
limit_2 = config.rate_limit.image_search_ip_limit
ip = "77.77.77.77"

def side_effect(*args, **kwargs):
return json_response({"fake": "OK"})

with patch.object(
ColorExtractionView, "handle_json_request", new_callable=AsyncMock
) as mock:
mock.side_effect = side_effect

coros = [
client.put("/image", json={"url": VALID_URL}, headers={"X-Real-IP": ip})
for _ in range(limit_1 + 2)
]
coros += [
client.get("/images?color=ffeeee", headers={"X-Real-IP": ip})
for _ in range(limit_2 + 1)
]
responses = await asyncio.gather(*coros)
failed_responses = [resp for resp in responses if resp.status == 429]

assert len(failed_responses) == 3
assert all(
resp.headers.get("Retry-After", "").isdigit() for resp in failed_responses
)

0 comments on commit a9fcaae

Please sign in to comment.