Skip to content

Commit

Permalink
fix: ensure log_offset is properly printing instead of None in log (#13)
Browse files Browse the repository at this point in the history
* Code changes to retry the failed request on http 429

* Changed the retry logic to try until succeed

* Improved error message

* Change severity from error to warning

* Dropped duplicate logging

* fix: ensure log_offset is properly printing instead of None in log

* chore: bump version to 2.2.3

* Fix: Convert offset naive datetime to timezone-aware datetime (#15)

---------

Co-authored-by: Seyed Faziludeen <[email protected]>
Co-authored-by: Sudharsan-VS <[email protected]>
  • Loading branch information
3 people authored Nov 6, 2024
1 parent 9403090 commit ced0e09
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 10 deletions.
2 changes: 1 addition & 1 deletion duologsync/__version__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
__title__ = "duologsync"
__description__ = "Ingest + sync logs from the Duo Admin API into another system."
__url__ = "https://github.com/duosecurity/duo_log_sync"
__version__ = "2.2.2"
__version__ = "2.2.3"
__author__ = "Duo Security, Inc."
__license__ = "MIT"
__email__ = "[email protected]"
7 changes: 5 additions & 2 deletions duologsync/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
Definition of the Config class
"""

from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from http import HTTPStatus

import yaml
from cerberus import Validator, schema_registry # type: ignore
Expand Down Expand Up @@ -44,6 +45,8 @@ class Config:
PROXY_SERVER_DEFAULT = ''
PROXY_PORT_DEFAULT = 0

GRACEFUL_RETRY_STATUS_CODES = (HTTPStatus.TOO_MANY_REQUESTS.value,)

# To understand these schema definitions better, compare side-by-side to
# the template_config.yml file

Expand Down Expand Up @@ -374,7 +377,7 @@ def create_config(cls, config_filepath):
else:
# Calculate offset as a timestamp and rewrite its value in config
offset = config.get('dls_settings').get('api').get('offset')
offset = datetime.utcnow() - timedelta(days=offset)
offset = datetime.now(timezone.utc) - timedelta(days=offset)
config['dls_settings']['api']['offset'] = int(offset.timestamp())
return config

Expand Down
42 changes: 36 additions & 6 deletions duologsync/producer/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ async def produce(self):
# shutdown every second
await restless_sleep(Config.get_api_timeout())
Program.log(
f"{self.log_type} producer: fetching logs from offset {self.log_offset}",
f"{self.log_type} producer: fetching logs from offset {self.log_offset or self.mintime}",
logging.INFO,
)
api_result = await self.call_log_api()
Expand All @@ -80,10 +80,7 @@ async def produce(self):

# duo_client throws a RuntimeError if the ikey or skey is invalid
except RuntimeError as runtime_error:
shutdown_reason = f"{self.log_type} producer: [{runtime_error}]"
Program.log(
"DuoLogSync: check that the duoclient ikey and skey in the config file are correct"
)
shutdown_reason = self.handle_runtime_error_gracefully(runtime_error)

# Shutdown hath been noticed and thus shutdown shall begin
except ProgramShutdownError:
Expand All @@ -96,6 +93,39 @@ async def produce(self):
await self.log_queue.put([])
Program.log(f"{self.log_type} producer: shutting down", logging.INFO)

def handle_runtime_error_gracefully(self, runtime_error: RuntimeError):
"""
Handle a runtime error gracefully by checking if the error is eligible for a retry.
If it is, log the error and return None to indicate that the producer should retry.
If it is not, log the error and return a string to indicate that the producer should shut down.
"""
if self.eligible_for_retry(runtime_error):
error_data = getattr(runtime_error, "data", None)
error_code = error_data['code'] if error_data else None
Program.log(
f"{self.log_type} producer: retrying due to error: {runtime_error} error_code: {error_code}",
logging.WARNING,
)

return None

return f"{self.log_type} producer: [{runtime_error}]"

@staticmethod
def eligible_for_retry(runtime_error: RuntimeError):
"""
Check if the runtime error is eligible for a retry based on the status code.
See the Config.GRACEFUL_RETRY_STATUS_CODES for the list of status codes that is eligible for a retry.
"""
http_error_code = getattr(runtime_error, "status", None)

if http_error_code:
for http_status_code in Config.GRACEFUL_RETRY_STATUS_CODES:
if http_error_code == http_status_code:
return True

return False

async def add_logs_to_queue(self, logs):
"""
If logs is not none, add them to this Writer's queue
Expand Down Expand Up @@ -223,7 +253,7 @@ def get_log_offset(log, current_log_offset=None, log_type=None):
next_timestamp = int(next_timestamp_to_poll_from) + 1
return f"{next_timestamp},{log_id}"
else:
if log.get("metadata", {}).get("next_offset", None) is not None:
if (log.get("metadata", {}) or {}).get("next_offset", None) is not None:
next_offset = log.get("metadata", {}).get("next_offset", None)
return next_offset

Expand Down
2 changes: 1 addition & 1 deletion duologsync/program.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def initiate_shutdown(cls, reason):
@param reason Explanation of why a shutdown was requested
"""

cls.log(f"DuoLogSync: Shutting down due to [{reason}]", logging.WARNING)
cls.log(f"DuoLogSync: Shutting down due to [{reason}]", logging.ERROR)
cls._running = False

@classmethod
Expand Down

0 comments on commit ced0e09

Please sign in to comment.