Skip to content

Commit

Permalink
fix: add request count with eviction for not retriable requests to fi…
Browse files Browse the repository at this point in the history
…x memory problem (#92)

Co-authored-by: Aldo Gonzalez <[email protected]>
  • Loading branch information
maxi297 and aldogonzalez8 authored Dec 2, 2024
1 parent 65ed26f commit f95652b
Show file tree
Hide file tree
Showing 4 changed files with 600 additions and 470 deletions.
15 changes: 15 additions & 0 deletions airbyte_cdk/sources/streams/http/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def __str__(self) -> str:
class HttpClient:
_DEFAULT_MAX_RETRY: int = 5
_DEFAULT_MAX_TIME: int = 60 * 10
_ACTIONS_TO_RETRY_ON = {ResponseAction.RETRY, ResponseAction.RATE_LIMITED}

def __init__(
self,
Expand Down Expand Up @@ -359,6 +360,17 @@ def _get_response_body(self, response: requests.Response) -> Optional[JsonType]:
except Exception:
return "The Content of the Response couldn't be decoded."

def _evict_key(self, prepared_request: requests.PreparedRequest) -> None:
"""
Addresses high memory consumption when enabling concurrency in https://github.com/airbytehq/oncall/issues/6821.
The `_request_attempt_count` attribute keeps growing as multiple requests are made using the same `http_client`.
To mitigate this issue, we evict keys for completed requests once we confirm that no further retries are needed.
This helps manage memory usage more efficiently while maintaining the necessary logic for retry attempts.
"""
if prepared_request in self._request_attempt_count:
del self._request_attempt_count[prepared_request]

def _handle_error_resolution(
self,
response: Optional[requests.Response],
Expand All @@ -367,6 +379,9 @@ def _handle_error_resolution(
error_resolution: ErrorResolution,
exit_on_rate_limit: Optional[bool] = False,
) -> None:
if error_resolution.response_action not in self._ACTIONS_TO_RETRY_ON:
self._evict_key(request)

# Emit stream status RUNNING with the reason RATE_LIMITED to log that the rate limit has been reached
if error_resolution.response_action == ResponseAction.RATE_LIMITED:
# TODO: Update to handle with message repository when concurrent message repository is ready
Expand Down
Loading

0 comments on commit f95652b

Please sign in to comment.