Skip to content

Commit

Permalink
redo type support and adjust logging
Browse files Browse the repository at this point in the history
  • Loading branch information
helylle committed Aug 30, 2024
1 parent 0b891fe commit 5e20c1f
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 36 deletions.
31 changes: 6 additions & 25 deletions src/eduid/workers/job_runner/config.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import logging
from datetime import datetime, tzinfo
from typing import Any, Optional, Union
from typing import Any, NewType, Optional, Union

from pydantic import BaseModel, ConfigDict, Field, RootModel, field_validator, model_validator
from pydantic import BaseModel, ConfigDict, field_validator, model_validator

from eduid.common.clients.gnap_client.base import GNAPClientAuthData
from eduid.common.config.base import AmConfigMixin, LoggingConfigMixin, MsgConfigMixin, RootConfig, StatsConfigMixin
Expand Down Expand Up @@ -41,28 +41,9 @@ def at_least_one_datetime_value(cls, data: Any) -> Any:
return data


class JobConfig(RootModel):
"""
Configuration for a single job.
"""

root: dict[str, JobCronConfig]

def __iter__(self):
return iter(self.root)

def __getitem__(self, key):
return self.root[key]


class JobsRootConfig(RootModel):
root: dict[str, JobConfig]

def __iter__(self):
return iter(self.root)

def __getitem__(self, key):
return self.root[key]
EnvironmentOrWorkerName = NewType("EnvironmentOrWorkerName", str)
JobName = NewType("JobName", str)
JobsConfig = NewType("JobsConfig", dict[EnvironmentOrWorkerName, dict[JobName, JobCronConfig]])


class JobRunnerConfig(RootConfig, LoggingConfigMixin, StatsConfigMixin, MsgConfigMixin, AmConfigMixin):
Expand All @@ -74,7 +55,7 @@ class JobRunnerConfig(RootConfig, LoggingConfigMixin, StatsConfigMixin, MsgConfi
log_format: str = "{asctime} | {levelname:7} | {hostname} | {name:35} | {module:10} | {message}"
mongo_uri: str = ""
status_cache_seconds: int = 10
jobs: Optional[JobsRootConfig] = None
jobs: Optional[JobsConfig] = None
gnap_auth_data: GNAPClientAuthData

@field_validator("application_root")
Expand Down
31 changes: 20 additions & 11 deletions src/eduid/workers/job_runner/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from apscheduler.schedulers.asyncio import AsyncIOScheduler

from eduid.common.config.exceptions import BadConfiguration
from eduid.workers.job_runner.config import EnvironmentOrWorkerName, JobCronConfig
from eduid.workers.job_runner.context import Context
from eduid.workers.job_runner.jobs.skv import check_skv_users, gather_skv_users

Expand All @@ -12,31 +13,39 @@ def schedule_jobs(self, context: Context):
Schedule all jobs configured for host or environment
"""

environment = context.config.environment
environment = EnvironmentOrWorkerName(context.config.environment)
worker_name = EnvironmentOrWorkerName(context.worker_name)

if context.config.jobs is None:
context.logger.info(f"No jobs configured for {context.worker_name} running {environment}")
context.logger.info("No jobs configured in config")
return

jobs_config = context.config.jobs.model_dump()
jobs_config = context.config.jobs
context.logger.debug(f"jobs_config: {jobs_config}")

jobs: dict = {}

# Gather jobs for current environment and worker in a dictionary
if environment in jobs_config:
context.logger.info(f"Setting up jobs for environment {environment}")
context.logger.info(f"Setting up jobs {jobs_config[environment]}")
context.logger.debug(f"Setting up jobs for environment {environment}")
context.logger.debug(f"Setting up jobs {jobs_config[environment]}")
jobs.update(jobs_config[environment])

if context.worker_name in jobs_config:
context.logger.info(f"Setting up jobs for worker {context.worker_name}")
context.logger.info(f"Setting up jobs {jobs_config[context.worker_name]}")
jobs.update(jobs_config[context.worker_name])
if worker_name in jobs_config:
context.logger.debug(f"Setting up jobs for worker {worker_name}")
context.logger.debug(f"Setting up jobs {jobs_config[worker_name]}")
jobs.update(jobs_config[worker_name])

context.logger.info(f"Setting up jobs {jobs} for {context.worker_name} running {environment}")
if len(jobs) == 0:
context.logger.info(f"No jobs configured for {worker_name} running {environment}")
return

context.logger.info(f"Setting up jobs {jobs} for {worker_name} running {environment}")

# Add all configured jobs to the scheduler
for job in jobs:
params = jobs[job]
cron_settings: JobCronConfig = jobs[job]
params = cron_settings.model_dump()
context.logger.info(f"Setting up job {job} with parameters {params}")

match job:
Expand Down

0 comments on commit 5e20c1f

Please sign in to comment.