Skip to content

Commit

Permalink
Add config models for hips and pipetask.
Browse files Browse the repository at this point in the history
Add config model for fake status operations.
Update fake status operations to use config.
Parametrize pipetask commands.
Parametrize HiPS commands.
  • Loading branch information
tcjennings committed Jan 7, 2025
1 parent fbc6362 commit 8742820
Show file tree
Hide file tree
Showing 11 changed files with 109 additions and 32 deletions.
9 changes: 9 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ services:
DB__ECHO: true
DB__URL: postgresql://cm-service@postgresql:5432/cm-service
DB__PASSWORD: INSECURE-PASSWORD
DB__TABLE_SCHEMA: public
networks:
- cmservice
depends_on:
Expand All @@ -36,6 +37,9 @@ services:
context: .
dockerfile: docker/Dockerfile
target: cmservice
env_file:
- path: .env
required: false
environment: *cmenv
ports:
- "8080:8080"
Expand All @@ -53,7 +57,12 @@ services:
context: .
dockerfile: docker/Dockerfile
target: cmworker
env_file:
- path: .env
required: false
environment: *cmenv
volumes:
- "./prod_area:/prod_area"
networks:
- cmservice
depends_on:
Expand Down
4 changes: 3 additions & 1 deletion src/lsst/cmservice/common/bash.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import yaml

from ..config import config
from .enums import StatusEnum
from .errors import CMBashSubmitError

Expand Down Expand Up @@ -60,9 +61,10 @@ def run_bash_job(
fake_status: StatusEnum | None,
If set, don't actually submit the job
"""
fake_status = fake_status or config.mock_status
if fake_status is not None:
with open(stamp_url, "w", encoding="utf-8") as fstamp:
fields = dict(status="reviewable")
fields = dict(status=StatusEnum.reviewable.name)
yaml.dump(fields, fstamp)
return
try:
Expand Down
2 changes: 2 additions & 0 deletions src/lsst/cmservice/common/htcondor.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def submit_htcondor_job(
If set, don't actually submit the job
"""
fake_status = fake_status or config.mock_status
if fake_status is not None:
return
try:
Expand Down Expand Up @@ -121,6 +122,7 @@ def check_htcondor_job(
status: StatusEnum
HTCondor job status
"""
fake_status = fake_status or config.mock_status
if fake_status is not None:
return StatusEnum.reviewable if fake_status.value >= StatusEnum.reviewable.value else fake_status
try:
Expand Down
1 change: 1 addition & 0 deletions src/lsst/cmservice/common/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def submit_slurm_job(
job_id : str
Slurm job id
"""
fake_status = fake_status or config.mock_status
if fake_status is not None:
return "fake_job"
try:
Expand Down
55 changes: 53 additions & 2 deletions src/lsst/cmservice/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from pydantic import BaseModel, Field, field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict

from .common.enums import ScriptMethodEnum
from .common.enums import ScriptMethodEnum, StatusEnum

__all__ = ["Configuration", "config"]

Expand All @@ -15,13 +15,30 @@ class BpsConfiguration(BaseModel):
"""Configuration settings for bps client operations.
Set via BPS__FIELD environment variables.
FIXME: rename LsstConfiguration?
"""

bps_bin: str = Field(
description="Name of a bps client binary",
default="bps",
)

pipetask_bin: str = Field(
description="Name of a pipetask client binary",
default="pipetask",
)

resource_usage_bin: str = Field(
description="Name of a resource usage gathering binary",
default="build-gather-resource-usage-qg",
)

n_jobs: int = Field(
description="Parallelization factor for jobs (-j N)",
default=16,
)


class ButlerConfiguration(BaseModel):
"""Configuration settings for butler client operations.
Expand All @@ -35,11 +52,28 @@ class ButlerConfiguration(BaseModel):
)

mock: bool = Field(
description="Whether to mock out Butler calls. Equivalent to setting `fake_status`",
description="Whether to mock out Butler calls.",
default=False,
)


class HipsConfiguration(BaseModel):
"""Configuration settings for HiPS operations.
Set via HIPS__FIELD environment variables.
"""

high_res_bin: str = Field(
description="Name of a high resolution QG builder bin",
default="build-high-resolution-hips-qg",
)

uri: str = Field(
description="URI for HiPS maps destination",
default="s3://rubin-hips",
)


class HTCondorConfiguration(BaseModel):
"""Configuration settings for htcondor client operations.
Expand Down Expand Up @@ -251,6 +285,7 @@ class Configuration(BaseSettings):
butler: ButlerConfiguration = ButlerConfiguration()
daemon: DaemonConfiguration = DaemonConfiguration()
db: DatabaseConfiguration = DatabaseConfiguration()
hips: HipsConfiguration = HipsConfiguration()
htcondor: HTCondorConfiguration = HTCondorConfiguration()
logging: LoggingConfiguration = LoggingConfiguration()
slurm: SlurmConfiguration = SlurmConfiguration()
Expand All @@ -261,6 +296,22 @@ class Configuration(BaseSettings):
default=ScriptMethodEnum.htcondor,
)

mock_status: StatusEnum | None = Field(
description="A fake status to return from all operations",
default=None,
)

@field_validator("mock_status", mode="before")
@classmethod
def validate_mock_status_by_name(cls, value: str | StatusEnum) -> StatusEnum | None:
if isinstance(value, StatusEnum) or value is None:
return value
try:
return StatusEnum[value]
except KeyError:
warn(f"Invalid mock status ({value}) provided to config, using default.")
return None

@field_validator("script_handler", mode="before")
@classmethod
def validate_script_method_by_name(cls, value: str | ScriptMethodEnum) -> ScriptMethodEnum:
Expand Down
3 changes: 2 additions & 1 deletion src/lsst/cmservice/handlers/element_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from ..common.enums import LevelEnum, StatusEnum
from ..common.errors import CMYamlParseError, test_type_and_raise
from ..config import config
from ..db.campaign import Campaign
from ..db.element import ElementMixin
from ..db.handler import Handler
Expand Down Expand Up @@ -394,7 +395,7 @@ async def check(
return (changed, status)

status = await self._post_check(session, element, **kwargs)
fake_status = kwargs.get("fake_status", None)
fake_status = kwargs.get("fake_status", config.mock_status)
if fake_status:
status = fake_status
await element.update_values(session, status=status)
Expand Down
16 changes: 9 additions & 7 deletions src/lsst/cmservice/handlers/elements.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from ..common.enums import StatusEnum
from ..common.errors import CMMissingScriptInputError, test_type_and_raise
from ..config import config
from ..db.campaign import Campaign
from ..db.element import ElementMixin
from ..db.group import Group
Expand Down Expand Up @@ -107,7 +108,7 @@ async def review_script(
**kwargs: Any,
) -> StatusEnum:
jobs = await parent.get_jobs(session, remaining_only=not kwargs.get("force_check", False))
fake_status = kwargs.get("fake_status", None)
fake_status = kwargs.get("fake_status", config.mock_status)
for job_ in jobs:
job_status = job_.status if fake_status is None else fake_status
if job_status.value < StatusEnum.accepted.value:
Expand Down Expand Up @@ -194,6 +195,8 @@ class SplitByQuery(Splitter):
to get the total number values of a particular field
and then constructing queries to split those values
into a number of groups.
FIXME calling sync butler query from async function
"""

@classmethod
Expand All @@ -214,17 +217,17 @@ async def split(
split_dataset = kwargs["split_dataset"]
split_min_groups = kwargs.get("split_min_groups", 1)
split_max_group_size = kwargs.get("split_max_group_size", 100000000)
fake_status = kwargs.get("fake_status", None)
if not fake_status: # pragma: no cover
mock_butler: bool = kwargs.get("fake_status", config.butler.mock)
if mock_butler:
sorted_field_values = np.arange(10)
else:
butler = Butler.from_config(
butler_repo,
collections=[input_coll, campaign_input_coll],
without_datastore=True,
)
itr = butler.registry.queryDataIds([split_field], datasets=split_dataset).subset(unique=True)
sorted_field_values = np.sort(np.array([x_[split_field] for x_ in itr]))
else:
sorted_field_values = np.arange(10)
n_matched = sorted_field_values.size

step_size = min(split_max_group_size, int(n_matched / split_min_groups))
Expand Down Expand Up @@ -274,14 +277,13 @@ async def _do_prepare(
if spec_block_name is None: # pragma: no cover
raise CMMissingScriptInputError(f"child_config for {script.fullname} does not contain spec_block")
spec_block_name = spec_aliases.get(spec_block_name, spec_block_name)
fake_status = kwargs.get("fake_status")

split_method = child_config.pop("split_method", "no_split")
splitter = SPLIT_CLASSES[split_method]

i = 0

group_gen = splitter.split(session, script, parent, fake_status=fake_status, **child_config)
group_gen = splitter.split(session, script, parent, **child_config)

async for group_dict_ in group_gen:
_new_group = await Group.create_row(
Expand Down
4 changes: 3 additions & 1 deletion src/lsst/cmservice/handlers/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from ..common.enums import StatusEnum
from ..common.errors import CMMissingFullnameError, CMYamlParseError
from ..common.utils import update_include_dict
from ..config import config
from ..db.campaign import Campaign
from ..db.job import Job
from ..db.pipetask_error import PipetaskError
Expand Down Expand Up @@ -435,6 +436,7 @@ async def load_manifest_report(
Associated Job
"""
job = await Job.get_row_by_fullname(session, job_name)
fake_status = fake_status or config.mock_status
if fake_status is not None:
return job

Expand Down Expand Up @@ -570,7 +572,7 @@ def status_from_bps_report(
The status to set for the bps_report script
"""
if wms_run_report is None:
return fake_status
return fake_status or config.mock_status

the_state = wms_run_report.state
# We treat RUNNING as running from the CM point of view,
Expand Down
12 changes: 8 additions & 4 deletions src/lsst/cmservice/handlers/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ async def _check_slurm_job(
parent: ElementMixin,
fake_status: StatusEnum | None = None,
) -> StatusEnum:
fake_status = fake_status or config.mock_status
slurm_status = await ScriptHandler._check_slurm_job(
self,
session,
Expand Down Expand Up @@ -226,6 +227,7 @@ async def _check_htcondor_job(
parent: ElementMixin,
fake_status: StatusEnum | None = None,
) -> StatusEnum:
fake_status = fake_status or config.mock_status
htcondor_status = await ScriptHandler._check_htcondor_job(
self,
session,
Expand Down Expand Up @@ -358,15 +360,15 @@ async def _load_wms_reports(
status: StatusEnum | None
Status of requested job
"""
fake_status = kwargs.get("fake_status", None)
fake_status = kwargs.get("fake_status", config.mock_status)

try:
wms_svc = self._get_wms_svc(config={})
except ImportError as msg:
if not fake_status: # pragma: no cover
raise msg
try:
if fake_status or wms_workflow_id is None:
if fake_status is not None or wms_workflow_id is None:
wms_run_report = None
else: # pragma: no cover
wms_run_report = wms_svc.report(wms_workflow_id=wms_workflow_id.strip())[0][0]
Expand Down Expand Up @@ -492,7 +494,9 @@ async def _write_script(
# Strip leading/trailing spaces just in case
prepend = "\n".join([line.strip() for line in prepend.splitlines()])

command = f"pipetask report --full-output-filename {report_url} {butler_repo} {graph_url}"
command = (
f"{config.bps.pipetask_bin} report --full-output-filename {report_url} {butler_repo} {graph_url}"
)
write_bash_script(script_url, command, prepend=prepend)

return StatusEnum.prepared
Expand Down Expand Up @@ -525,7 +529,7 @@ async def _do_check(
parent: ElementMixin,
**kwargs: Any,
) -> StatusEnum:
fake_status = kwargs.get("fake_status", None)
fake_status = kwargs.get("fake_status", config.mock_status)
status = await self._load_pipetask_report(session, parent, script.stamp_url, fake_status=fake_status) # type: ignore
status = status if fake_status is None else fake_status
await script.update_values(session, status=status)
Expand Down
7 changes: 4 additions & 3 deletions src/lsst/cmservice/handlers/script_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ async def review_script( # pylint: disable=unused-argument
status : StatusEnum
The status of the processing
"""
fake_status = kwargs.get("fake_status", None)
fake_status = kwargs.get("fake_status", config.mock_status)
return script.status if fake_status is None else fake_status

async def reset_script(
Expand Down Expand Up @@ -343,6 +343,7 @@ async def _check_stamp_file( # pylint: disable=unused-argument
status : StatusEnum
The status of the processing
"""
fake_status = fake_status or config.mock_status
default_status = script.status if fake_status is None else fake_status
status = check_stamp_file(stamp_file, default_status)
await script.update_values(session, status=status)
Expand Down Expand Up @@ -497,7 +498,7 @@ async def check(
parent: ElementMixin,
**kwargs: Any,
) -> StatusEnum:
fake_status = kwargs.get("fake_status", None)
fake_status = kwargs.get("fake_status", config.mock_status)

script_method = self.default_method if script.method == ScriptMethodEnum.default else script.method

Expand All @@ -521,7 +522,7 @@ async def check(
diagnostic_message = "Fake failure"
else: # pragma: no cover
diagnostic_message = await get_diagnostic_message(script.log_url)
_new_error = await ScriptError.create_row(
_ = await ScriptError.create_row(
session,
script_id=script.id,
source=ErrorSourceEnum.local_script,
Expand Down
Loading

0 comments on commit 8742820

Please sign in to comment.