From 444bff555c8134f7c8debb4f52d9af3220c097eb Mon Sep 17 00:00:00 2001 From: Mathias Lohne Date: Mon, 2 Dec 2024 10:01:54 +0100 Subject: [PATCH] unstable: Set up logging in extractor based on application config (#405) --- .../unstable/configuration/models.py | 8 ++- cognite/extractorutils/unstable/core/base.py | 56 ++++++++++++++++++- 2 files changed, 60 insertions(+), 4 deletions(-) diff --git a/cognite/extractorutils/unstable/configuration/models.py b/cognite/extractorutils/unstable/configuration/models.py index 022b826..8791ebd 100644 --- a/cognite/extractorutils/unstable/configuration/models.py +++ b/cognite/extractorutils/unstable/configuration/models.py @@ -223,21 +223,23 @@ class LogLevel(Enum): class LogFileHandlerConfig(ConfigModel): + type: Literal["file"] path: Path level: LogLevel retention: int = 7 class LogConsoleHandlerConfig(ConfigModel): + type: Literal["console"] level: LogLevel -LogHandlerConfig = Union[LogFileHandlerConfig, LogConsoleHandlerConfig] +LogHandlerConfig = Annotated[LogFileHandlerConfig | LogConsoleHandlerConfig, Field(discriminator="type")] # Mypy BS -def _log_handler_default() -> List[Union[LogFileHandlerConfig, LogConsoleHandlerConfig]]: - return [LogConsoleHandlerConfig(level=LogLevel.INFO)] +def _log_handler_default() -> List[LogHandlerConfig]: + return [LogConsoleHandlerConfig(type="console", level=LogLevel.INFO)] class ExtractorConfig(ConfigModel): diff --git a/cognite/extractorutils/unstable/core/base.py b/cognite/extractorutils/unstable/core/base.py index f1969c5..1d71d29 100644 --- a/cognite/extractorutils/unstable/core/base.py +++ b/cognite/extractorutils/unstable/core/base.py @@ -1,6 +1,9 @@ import logging +import logging.config +import time from concurrent.futures import ThreadPoolExecutor from contextvars import ContextVar, Token +from logging.handlers import TimedRotatingFileHandler from multiprocessing import Queue from threading import RLock, Thread from traceback import format_exception @@ -10,8 +13,14 @@ from humps import pascalize from typing_extensions import Self, assert_never +from cognite.extractorutils._inner_util import _resolve_log_level from cognite.extractorutils.threading import CancellationToken -from cognite.extractorutils.unstable.configuration.models import ConnectionConfig, ExtractorConfig +from cognite.extractorutils.unstable.configuration.models import ( + ConnectionConfig, + ExtractorConfig, + LogConsoleHandlerConfig, + LogFileHandlerConfig, +) from cognite.extractorutils.unstable.core._dto import Error as DtoError from cognite.extractorutils.unstable.core._dto import TaskUpdate from cognite.extractorutils.unstable.core._messaging import RuntimeMessage @@ -78,6 +87,49 @@ def __init__(self, config: FullConfig[ConfigType]) -> None: self.__init_tasks__() + def _setup_logging(self) -> None: + min_level = min([_resolve_log_level(h.level.value) for h in self.application_config.log_handlers]) + max_level = max([_resolve_log_level(h.level.value) for h in self.application_config.log_handlers]) + + root = logging.getLogger() + root.setLevel(min_level) + + # The oathlib logs too much on debug level, including secrets + logging.getLogger("requests_oauthlib.oauth2_session").setLevel(max(max_level, logging.INFO)) + + fmt = logging.Formatter( + "%(asctime)s.%(msecs)03d UTC [%(levelname)-8s] %(process)d %(threadName)s - %(message)s", + "%Y-%m-%d %H:%M:%S", + ) + # Set logging to UTC + fmt.converter = time.gmtime + + # Remove any previous logging handlers + for handler in root.handlers: + root.removeHandler(handler) + + # Define new handlers + for handler_config in self.application_config.log_handlers: + match handler_config: + case LogConsoleHandlerConfig() as console_handler: + sh = logging.StreamHandler() + sh.setFormatter(fmt) + sh.setLevel(_resolve_log_level(console_handler.level.value)) + + root.addHandler(sh) + + case LogFileHandlerConfig() as file_handler: + fh = TimedRotatingFileHandler( + filename=file_handler.path, + when="midnight", + utc=True, + backupCount=file_handler.retention, + ) + fh.setLevel(_resolve_log_level(file_handler.level.value)) + fh.setFormatter(fmt) + + root.addHandler(fh) + def __init_tasks__(self) -> None: pass @@ -238,6 +290,7 @@ def _report_extractor_info(self) -> None: ) def start(self) -> None: + self._setup_logging() self._report_extractor_info() Thread(target=self._run_checkin, name="ExtractorCheckin", daemon=True).start() @@ -258,6 +311,7 @@ def __exit__( with self._checkin_lock: self._checkin() + self.logger.info("Shutting down extractor") return exc_val is None def run(self) -> None: