Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

unstable: Set up logging in extractor based on application config #405

Merged
merged 2 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions cognite/extractorutils/unstable/configuration/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,21 +223,23 @@ class LogLevel(Enum):


class LogFileHandlerConfig(ConfigModel):
type: Literal["file"]
path: Path
level: LogLevel
retention: int = 7


class LogConsoleHandlerConfig(ConfigModel):
type: Literal["console"]
level: LogLevel


LogHandlerConfig = Union[LogFileHandlerConfig, LogConsoleHandlerConfig]
LogHandlerConfig = Annotated[LogFileHandlerConfig | LogConsoleHandlerConfig, Field(discriminator="type")]


# Mypy BS
def _log_handler_default() -> List[Union[LogFileHandlerConfig, LogConsoleHandlerConfig]]:
return [LogConsoleHandlerConfig(level=LogLevel.INFO)]
def _log_handler_default() -> List[LogHandlerConfig]:
return [LogConsoleHandlerConfig(type="console", level=LogLevel.INFO)]


class ExtractorConfig(ConfigModel):
Expand Down
56 changes: 55 additions & 1 deletion cognite/extractorutils/unstable/core/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import logging
import logging.config
import time
from concurrent.futures import ThreadPoolExecutor
from contextvars import ContextVar, Token
from logging.handlers import TimedRotatingFileHandler
from multiprocessing import Queue
from threading import RLock, Thread
from traceback import format_exception
Expand All @@ -10,8 +13,14 @@
from humps import pascalize
from typing_extensions import Self, assert_never

from cognite.extractorutils._inner_util import _resolve_log_level
from cognite.extractorutils.threading import CancellationToken
from cognite.extractorutils.unstable.configuration.models import ConnectionConfig, ExtractorConfig
from cognite.extractorutils.unstable.configuration.models import (
ConnectionConfig,
ExtractorConfig,
LogConsoleHandlerConfig,
LogFileHandlerConfig,
)
from cognite.extractorutils.unstable.core._dto import Error as DtoError
from cognite.extractorutils.unstable.core._dto import TaskUpdate
from cognite.extractorutils.unstable.core._messaging import RuntimeMessage
Expand Down Expand Up @@ -78,6 +87,49 @@ def __init__(self, config: FullConfig[ConfigType]) -> None:

self.__init_tasks__()

def _setup_logging(self) -> None:
min_level = min([_resolve_log_level(h.level.value) for h in self.application_config.log_handlers])
max_level = max([_resolve_log_level(h.level.value) for h in self.application_config.log_handlers])

root = logging.getLogger()
root.setLevel(min_level)

# The oathlib logs too much on debug level, including secrets
logging.getLogger("requests_oauthlib.oauth2_session").setLevel(max(max_level, logging.INFO))

fmt = logging.Formatter(
"%(asctime)s.%(msecs)03d UTC [%(levelname)-8s] %(process)d %(threadName)s - %(message)s",
"%Y-%m-%d %H:%M:%S",
)
# Set logging to UTC
fmt.converter = time.gmtime

# Remove any previous logging handlers
for handler in root.handlers:
root.removeHandler(handler)

# Define new handlers
for handler_config in self.application_config.log_handlers:
match handler_config:
case LogConsoleHandlerConfig() as console_handler:
sh = logging.StreamHandler()
sh.setFormatter(fmt)
sh.setLevel(_resolve_log_level(console_handler.level.value))

root.addHandler(sh)

case LogFileHandlerConfig() as file_handler:
fh = TimedRotatingFileHandler(
filename=file_handler.path,
when="midnight",
utc=True,
backupCount=file_handler.retention,
)
fh.setLevel(_resolve_log_level(file_handler.level.value))
fh.setFormatter(fmt)

root.addHandler(fh)

def __init_tasks__(self) -> None:
pass

Expand Down Expand Up @@ -238,6 +290,7 @@ def _report_extractor_info(self) -> None:
)

def start(self) -> None:
self._setup_logging()
self._report_extractor_info()
Thread(target=self._run_checkin, name="ExtractorCheckin", daemon=True).start()

Expand All @@ -258,6 +311,7 @@ def __exit__(
with self._checkin_lock:
self._checkin()

self.logger.info("Shutting down extractor")
return exc_val is None

def run(self) -> None:
Expand Down
Loading