Skip to content

Commit

Permalink
establish templates and basic functionality for database interactions
Browse files Browse the repository at this point in the history
  • Loading branch information
bgunnar5 committed Feb 11, 2025
1 parent 05bb300 commit b9e28b0
Show file tree
Hide file tree
Showing 4 changed files with 371 additions and 0 deletions.
Empty file added merlin/db_scripts/__init__.py
Empty file.
59 changes: 59 additions & 0 deletions merlin/db_scripts/db_interaction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""
"""
from merlin.db_scripts.db_study import DatabaseStudy


class MerlinDatabase:
"""
TODO I think we should make this the default way to interact with backends to abstract it a bit
- Can have abstract ResultsBackend class
- Can have RedisBackend, SQLAlchemyBackend, etc. classes to extend ResultsBackend
- Instead of using CONFIG.results_backend in the init for this class we could insted take in
an instance of the ResultsBackend class
"""

def __init__(self):
pass

def get_study(self, id: int) -> DatabaseStudy: # TODO not sure if id is an int or str
"""
Given an ID, get the associated study from the database.
Args:
id: The ID of the study to retrieve.
Returns:
A [`DatabaseStudy`][merlin.db_scripts.db_study.DatabaseStudy] object representing
the study that was queried.
"""
pass

def get_all_studies(self) -> List[DatabaseStudy]:
"""
Get every study that's currently in the database.
Returns:
A list of [`DatabaseStudy`][merlin.db_scripts.db_study.DatabaseStudy] objects.
"""
pass

def remove_study(self, id: int): # TODO not sure if id is an int or str
"""
Given an ID, remove the associated study from the database. As a consequence
of this action, any study runs associated with this study will also be removed.
Args:
id: The ID of the study to remove.
TODO do we want to remove runs? Should they be able to exist by themselves?
"""
pass

def remove_all_studies(self):
"""
Remove every study in the database.
TODO is this essentially clearing the db? What other info will be in the db?
"""
pass
196 changes: 196 additions & 0 deletions merlin/db_scripts/db_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
"""
"""
from dataclasses import dataclass, field, asdict
from typing import Dict, List
import uuid

from merlin.backends.results_backend import ResultsBackend


@dataclass
class RunInfo:
"""
A dataclass to store all of the information for a run.
Attributes:
id: The unique ID for the run.
study_id: The unique ID of the study this run is associated with.
workspace: The path to the output workspace.
queues: The task queues used for this run.
parent: The ID of the parent run (if any).
child: The ID of the child run (if any).
run_complete: Wether the run is complete.
parameters: The parameters used in this run.
samples: The samples used in this run.
additional_data: For any extra data not explicitly defined.
"""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
study_id: str
workspace: str = None
queues: List[str] = field(default_factory=list)
parent: str = None
child: str = None
run_complete: bool = False
parameters: Dict = field(default_factory=dict) # TODO NOT YET IMPLEMENTED
samples: Dict = field(default_factory=dict) # TODO NOT YET IMPLEMENTED
additional_data: Dict = field(default_factory=dict)

def to_dict(self) -> Dict:
"""
Convert the run data to a dictionary for storage in the database.
"""
return asdict(self)

@classmethod
def from_dict(cls, data: Dict) -> "RunInfo":
"""
Create a `RunInfo` instance from a dictionary.
"""
return cls(
id=data.get("id"),
study_id=data.get("study_id"),
workspace=data.get("workspace"),
queues=data.get("queues", ["merlin"]),
parent=data.get("parent", None),
child=data.get("child", None),
run_complete=data.get("run_complete", False),
parameters=data.get("parameters", {})
samples=data.get("samples", {})
additional_data=data.get("additional_data", {})
)


class DatabaseRun:
"""
"""

def __init__(self, run_info: RunInfo, backend: ResultsBackend):
self.run_info: RunInfo = run_info
self.backend: ResultsBackend = backend

@property
def run_complete(self) -> bool:
"""
An attribute representing whether this run is complete.
A "complete" study is a study that has executed all steps.
Returns:
True if the study is complete. False, otherwise.
"""
return self.run_info.run_complete

@run_complete.setter
def run_complete(self, value: bool):
"""
Update the run's completion status.
"""
self.run_info.run_complete = value
self.save()

def get_id(self) -> str:
"""
Get the ID for this run.
Returns:
The ID for this run.
"""
return self.run_info.id

def get_study_id(self) -> str:
"""
Get the ID for the study associated with this run.
Returns:
The ID for the study associated with this run.
"""
return self.run_info.study_id

def get_workspace(self) -> str:
"""
Get the path to the output workspace for this run.
Returns:
A string representing the output workspace for this run.
"""
return self.run_info.workspace

def get_queues(self) -> List[str]:
"""
Get the task queues that were used for this run.
Returns:
A list of strings representing the queues that were used for this run.
"""
return self.run_info.queues

def get_parent(self) -> str:
"""
Get the ID of the run that launched this run (if any).
This will only be set for iterative workflows with greater than 1 iteration.
Returns:
The ID of the run that launched this run.
"""
return self.run_info.parent

def get_child(self) -> str:
"""
Get the ID of the run that was launched by this run (if any).
This will only be set for iterative workflows with greater than 1 iteration.
Returns:
The ID of the run that was launched by this run.
"""
return self.run_info.child

def save(self):
"""
Save the current state of this run to the database.
"""
# TODO flush out logic in backend class to set this (might require more work here)
self.backend.set(self.get_id(), self.run_info)

# # Assuming the parent study's name is required to save the run
# study_name = self._data.get("study_name")
# if study_name:
# self.backend.set(f"{study_name}:{self.id}", self._data)

# run_data = self.run_info.to_dict()
# self.backend.set(f"run:{self.run_info.run_id}", run_data)

@classmethod
def load(cls, run_id: str, backend: ResultsBackend) -> "DatabaseRun":
"""
Load a run from the database.
Args:
run_id: The ID of the run to load.
backend: A [`ResultsBackend`][merlin.backends.results_backend.ResultsBackend] object.
Returns:
A `DatabaseRun` instance.
"""
# TODO

run_data = self.backend.get(run_id)
if not run_data:
raise ValueError(f"Run with ID {run_id} not found in the database.")

run_info = RunInfo.from_dict(run_data)
return cls(run_info, backend)

@classmethod
def delete(cls, run_id: str, backend: ResultsBackend):
"""
Delete a run from the database.
Args:
run_id: The ID of the run to delete.
backend: A [`ResultsBackend`][merlin.backends.results_backend.ResultsBackend] object.
"""
# TODO make sure this deletes everything for the run
self.backend.delete(run_id)

116 changes: 116 additions & 0 deletions merlin/db_scripts/db_study.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""
"""
from dataclasses import dataclass, field, asdict

from merlin.db_scripts.db_run import DatabaseRun, RunInfo

@dataclass
class StudyInfo:
"""
A dataclass to store all of the information for a run.
Attributes:
id: The unique ID for the study.
name: The name of the study.
runs: A list of runs associated with this study.
"""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
name: str = None
runs: List[str] = field(default_factory=list)

def to_dict(self) -> Dict:
"""
Convert the study data to a dictionary for storage in the database.
"""
return asdict(self)

@classmethod
def from_dict(cls, data: Dict) -> "StudyInfo":
"""
Create a `StudyInfo` instance from a dictionary.
"""
return cls(
id=data.get("id"),
name=data.get("name"),
runs=data.get("runs", []),
)


class DatabaseStudy:
"""
"""

def __init__(self, study_info: StudyInfo, backend: ResultsBackend):
self.study_info = study_info
self.backend = backend

def get_id() -> str:
"""
Get the ID for this study.
Returns:
The ID for this study.
"""
return self.study_info.id

def get_name() -> str:
"""
Get the name associated with this study.
Returns:
The name for this study.
"""
return self.study_info.name

def create_run(self): # TODO not sure if we want to return the ID of the run here?
"""
Create a run for this study. This will create a [`DatabaseRun`][merlin.db_scripts.db_run.DatabaseRun]
object and link it to this study.
"""
new_run = RunInfo()

pass

def get_run(self, id: str) -> DatabaseRun:
"""
Given an ID, get the associated run from the database.
Args:
id: The ID of the run to retrieve.
Returns:
A [`DatabaseRun`][merlin.db_scripts.db_run.DatabaseRun] object representing
the study that was queried.
"""
return DatabaseRun.load(id, self.backend)

def get_all_runs(self):
"""
Get every run associated with this study.
Returns:
A list of [`DatabaseRun`][merlin.db_scripts.db_run.DatabaseRun] objects.
"""
return [self.get_run(run_id) for run_id in self.study_info.runs]

def remove_run(self, id: str):
"""
Given an ID, remove the associated run from the database.
Args:
id: The ID of the run to remove.
"""
DatabaseRun.delete(id, self.backend)
self.study_info.runs.remove(id)

def remove_all_runs(self):
"""
Remove every run associated with this study.
"""
for run_id in self.study_info.runs:
self.remove_run(run_id)

def save(self):
"""
"""

0 comments on commit b9e28b0

Please sign in to comment.