Skip to content

Commit

Permalink
Merge pull request #3305 from HHS/OPS-3278/load-projects
Browse files Browse the repository at this point in the history
Ops 3278/load projects
  • Loading branch information
johndeange authored Jan 16, 2025
2 parents bffe70a + 8a44d81 commit 8b09657
Show file tree
Hide file tree
Showing 9 changed files with 692 additions and 0 deletions.
18 changes: 18 additions & 0 deletions backend/data_tools/scripts/load_projects.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/sh
set -eo pipefail

export PYTHONPATH=.:$PYTHONPATH

ENV=$1
INPUT_CSV=$2

echo "Activating virtual environment..."
. .venv/bin/activate

echo "ENV is $ENV"
echo "INPUT_CSV is $INPUT_CSV"

echo "Running script..."
python data_tools/src/load_projects/main.py \
--env "${ENV}" \
--input-csv "${INPUT_CSV}"
Empty file.
79 changes: 79 additions & 0 deletions backend/data_tools/src/load_projects/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import os
import sys
import time

import click
from data_tools.src.azure_utils.utils import get_csv
from data_tools.src.common.db import init_db_from_config, setup_triggers
from data_tools.src.common.utils import get_config, get_or_create_sys_user
from data_tools.src.load_projects.utils import transform
from loguru import logger
from sqlalchemy import select, text
from sqlalchemy.orm import scoped_session, sessionmaker

from models import Portfolio

# Set the timezone to UTC
os.environ["TZ"] = "UTC"
time.tzset()

# logger configuration
format = (
"<green>{time:YYYY-MM-DD HH:mm:ss}</green> | "
"<level>{level: <8}</level> | "
"<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> | "
"<level>{message}</level>"
)
logger.add(sys.stdout, format=format, level="INFO")
logger.add(sys.stderr, format=format, level="INFO")


@click.command()
@click.option("--env", help="The environment to use.")
@click.option("--input-csv", help="The path to the CSV input file.")
def main(
env: str,
input_csv: str,
):
"""
Main entrypoint for the script.
"""
logger.debug(f"Environment: {env}")
logger.debug(f"Input CSV: {input_csv}")

logger.info("Starting the ETL process.")

script_config = get_config(env)
db_engine, db_metadata_obj = init_db_from_config(script_config)

if db_engine is None:
logger.error("Failed to initialize the database engine.")
sys.exit(1)

with db_engine.connect() as conn:
conn.execute(text("SELECT 1"))
logger.info("Successfully connected to the database.")

csv_f = get_csv(input_csv, script_config)

logger.info(f"Loaded CSV file from {input_csv}.")

Session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=db_engine))

with Session() as session:
sys_user = get_or_create_sys_user(session)
logger.info(f"Retrieved system user {sys_user}")

setup_triggers(session, sys_user)

try:
transform(csv_f, session, sys_user)
except RuntimeError as re:
logger.error(f"Error transforming data: {re}")
sys.exit(1)

logger.info("Finished the ETL process.")


if __name__ == "__main__":
main()
161 changes: 161 additions & 0 deletions backend/data_tools/src/load_projects/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
from csv import DictReader
from dataclasses import dataclass, field
from datetime import date
from typing import List, Optional

from loguru import logger
from sqlalchemy import and_, select
from sqlalchemy.orm import Session

from models import AdministrativeAndSupportProject, Project, ProjectType, ResearchProject, User


@dataclass
class ProjectData:
"""
Dataclass to represent a Project data row.
"""

PROJECT_TITLE: str
PROJECT_TYPE: str
SYS_PROJECT_ID: Optional[int] = field(default=None)
PROJECT_SHORT_TITLE: Optional[str] = field(default=None)
PROJECT_DESCRIPTION: Optional[str] = field(default=None)

def __post_init__(self):
if not self.PROJECT_TITLE or not self.PROJECT_TYPE:
raise ValueError("Project title and type are required.")

self.SYS_PROJECT_ID = int(self.SYS_PROJECT_ID) if self.SYS_PROJECT_ID else None
self.PROJECT_TITLE = str(self.PROJECT_TITLE)
self.PROJECT_TYPE = str(self.PROJECT_TYPE)
self.PROJECT_SHORT_TITLE = str(self.PROJECT_SHORT_TITLE) if self.PROJECT_SHORT_TITLE else None
self.PROJECT_DESCRIPTION = str(self.PROJECT_DESCRIPTION) if self.PROJECT_DESCRIPTION else None


def create_project_data(data: dict) -> ProjectData:
"""
Convert a dictionary to a ProjectData dataclass instance.
:param data: The dictionary to convert.
:return: A ProjectData dataclass instance.
"""
return ProjectData(**data)


def validate_data(data: ProjectData) -> bool:
"""
Validate the data in a ProjectData instance.
:param data: The ProjectData instance to validate.
:return: True if the data is valid, False otherwise.
"""
return all(
[
data.PROJECT_TITLE is not None,
data.PROJECT_TYPE is not None,
]
)


def validate_all(data: List[ProjectData]) -> bool:
"""
Validate a list of ProjectData instances.
:param data: The list of ProjectData instances to validate.
:return: A list of valid ProjectData instances.
"""
return sum(1 for d in data if validate_data(d)) == len(data)


def create_models(data: ProjectData, sys_user: User, session: Session) -> None:
"""
Create and persist the Project model.
:param data: The ProjectData instance to convert.
:param sys_user: The system user to use.
:param session: The database session to use.
:return: A list of BaseModel instances.
"""
logger.debug(f"Creating models for {data}")

try:
if data.PROJECT_TYPE == "RESEARCH":
project = ResearchProject(
id=data.SYS_PROJECT_ID,
project_type=ProjectType.RESEARCH,
title=data.PROJECT_TITLE,
short_title=data.PROJECT_SHORT_TITLE,
description=data.PROJECT_DESCRIPTION,
created_by=sys_user.id,
)
else:
project = AdministrativeAndSupportProject(
id=data.SYS_PROJECT_ID,
project_type=ProjectType.ADMINISTRATIVE_AND_SUPPORT,
title=data.PROJECT_TITLE,
short_title=data.PROJECT_SHORT_TITLE,
description=data.PROJECT_DESCRIPTION,
created_by=sys_user.id,
)
session.merge(project)
session.commit()
except Exception as e:
logger.error(f"Error creating models for {data}")
raise e


def create_all_models(data: List[ProjectData], sys_user: User, session: Session) -> None:
"""
Convert a list of ProjectData instances to a list of BaseModel instances.
:param data: The list of ProjectData instances to convert.
:param sys_user: The system user to use.
:param session: The database session to use.
:return: A list of BaseModel instances.
"""
for d in data:
create_models(d, sys_user, session)


def create_all_project_data(data: List[dict]) -> List[ProjectData]:
"""
Convert a list of dictionaries to a list of ProjectData instances.
:param data: The list of dictionaries to convert.
:return: A list of ProjectData instances.
"""
return [create_project_data(d) for d in data]


def transform(data: DictReader, session: Session, sys_user: User) -> None:
"""
Transform the data from the CSV file and persist the models to the database.
:param data: The data from the CSV file.
:param session: The database session to use.
:param sys_user: The system user to use.
:return: None
"""
if not data or not session or not sys_user:
logger.error("No data to process. Exiting.")
raise RuntimeError("No data to process.")

project_data = create_all_project_data(list(data))
logger.info(f"Created {len(project_data)} Project data instances.")

if not validate_all(project_data):
logger.error("Validation failed. Exiting.")
raise RuntimeError("Validation failed.")

logger.info("Data validation passed.")

create_all_models(project_data, sys_user, session)
logger.info(f"Finished loading models.")
4 changes: 4 additions & 0 deletions backend/data_tools/test_csv/projects_invalid.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
SYS_PROJECT_ID PROJECT_TITLE PROJECT_TYPE PROJECT_SHORT_TITLE PROJECT_DESCRIPTION
1 Human Services Interoperability Support "" HSS This contract will conduct interoperability activities to facilitate the exchange of information within, between, and from states and tribal organizations by facilitating lower-burden, interoperable data reporting and exchange to other state agencies and to ACF. The contract will focus on developing content that facilitates streamlined, interoperable reporting to ACF. The contract will also conduct research and evaluation activities with states and tribal organizations to assess the effectiveness of providing these interoperability artifacts for these organizations to use. The ability to share data and develop interoperable data systems is important for effective operation and oversight of these programs. This contract is designed to address these requirements and deliver needed and practical tools to accelerate implementation of data sharing and interoperable initiatives.
2 "" RESEARCH YDD OPRE launched the Youth Demonstration Development Project (YDD) in 2009 to systematically review the current field of research on youth development and successful transition to adulthood. The primary objective of YDD, conducted for OPRE by Mathematica Policy Research and its partner, Chapin Hall Center for Children, was to develop a conceptual framework that could be applied to existing or new ACF programs to improve the well-being of at-risk youth and increase their ability to become self-sufficient adults and avoid long-term reliance on public assistance. These frameworks will inform ACF’s decisions about possible demonstrations and evaluations of innovative approaches to improving outcomes among youth at risk of not achieving self-sufficiency.
3 Annual Performance Plans and Reports RESEARCH APP The Administration for Children and Families (ACF), within the U.S. Department of Health and Human Services, is responsible for federal programs that address the needs of vulnerable children and families throughout our society, including Native Americans, individuals with developmental disabilities, and refugees.
Loading

0 comments on commit 8b09657

Please sign in to comment.