Skip to content

Commit

Permalink
[FEATURE] Creating a new command to export database data to CSV. (#4572)
Browse files Browse the repository at this point in the history
* [FEATURE] Creating a new command to export database data to CSV.

Issue #4561

* Updating queries, bucket

* Renaming querries
  • Loading branch information
gsa-jrothacker authored Jan 3, 2025
1 parent 6b637d7 commit af35498
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 0 deletions.
55 changes: 55 additions & 0 deletions .github/workflows/export-data-to-csv.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
---
name: Export Data to CSV
on:
# schedule:
# Monthly, on the 5th, at 8am UTC (3am EST)
# - cron: '0 8 5 * *'
workflow_dispatch:
inputs:
environment:
required: true
type: choice
description: The environment the workflow should run on.
options:
- dev
- staging
- preview
- production

jobs:
scheduled-data-export:
if: ${{ github.event_name == 'schedule' }}
strategy:
matrix:
environments: ["production"] # For now, just do the scheduled job on production to save space
name: Run data export on ${{ inputs.environment }}
runs-on: ubuntu-latest
environment: ${{ matrix.environments }}
env:
space: ${{ matrix.environments }}
steps:
- name: Run Command
uses: cloud-gov/cg-cli-tools@main
with:
cf_username: ${{ secrets.CF_USERNAME }}
cf_password: ${{ secrets.CF_PASSWORD }}
cf_org: gsa-tts-oros-fac
cf_space: ${{ env.space }}
command: cf run-task gsa-fac -k 2G -m 2G --name export_data_to_csv --command "python manage.py export_data"

dispatch-data-export:
if: ${{ github.event.inputs.environment != '' }}
name: Run data export on ${{ inputs.environment }}
runs-on: ubuntu-latest
environment: ${{ inputs.environment }}
env:
space: ${{ inputs.environment }}
steps:
- name: Run Command
uses: cloud-gov/cg-cli-tools@main
with:
cf_username: ${{ secrets.CF_USERNAME }}
cf_password: ${{ secrets.CF_PASSWORD }}
cf_org: gsa-tts-oros-fac
cf_space: ${{ env.space }}
command: cf run-task gsa-fac -k 2G -m 2G --name export_data_to_csv --command "python manage.py export_data"
8 changes: 8 additions & 0 deletions backend/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1193,6 +1193,14 @@ six==1.16.0 \
# orderedmultidict
# pyjwkest
# python-dateutil
sling==1.3.4 \
--hash=sha256:70541da99b48313a0cfd5e61c52d984e4d50113b7945308697d32cd50a6bb183
# via -r ./requirements/requirements.in
sling-linux-arm64==1.3.4 \
--hash=sha256:08977df437742b415232e8c09c2f7f43181a3b764bef601fa6ebd369f6777783
sling-linux-amd64==1.3.4 \
--hash=sha256:d5f3b71fb191e8b9663f88890c464d84ab4c36df3a5fd6148a51cdd2ea9a5099
# via sling
sqlalchemy==2.0.36 \
--hash=sha256:03e08af7a5f9386a43919eda9de33ffda16b44eb11f3b313e6822243770e9763 \
--hash=sha256:0572f4bd6f94752167adfd7c1bed84f4b240ee6203a95e05d1e208d488d0d436 \
Expand Down
1 change: 1 addition & 0 deletions backend/requirements/requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ python-slugify
pyyaml
requests>=2.32.3
sqlalchemy
sling
sqlparse>=0.5.0
types-python-dateutil==2.9.0.20240821
uritemplate
Expand Down
138 changes: 138 additions & 0 deletions backend/support/management/commands/export_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import os
import logging

from config import settings
from datetime import datetime
from django.core.management.base import BaseCommand, CommandError
from sling import Replication, ReplicationStream

from support.decorators import newrelic_timing_metric
from dissemination.summary_reports import restricted_model_names

logger = logging.getLogger(__name__)

S3_CONNECTION = f"""{{
"type": "s3",
"bucket": "{settings.AWS_PRIVATE_STORAGE_BUCKET_NAME}",
"access_key_id": "{settings.AWS_PRIVATE_ACCESS_KEY_ID}",
"secret_access_key": "{settings.AWS_PRIVATE_SECRET_ACCESS_KEY}",
"endpoint": "{settings.AWS_S3_ENDPOINT_URL}"
}}
"""
DB_URL = os.environ.get("DATABASE_URL")
FAC_DB_URL = (
f"{DB_URL}?sslmode=disable" if settings.ENVIRONMENT in ["LOCAL", "TEST"] else DB_URL
)
DEFAULT_OPTIONS = {
"target_options": {
"format": "csv",
"compression": "gzip",
"file_max_rows": 0,
}
}


class StreamGenerator:
EXCLUDE_NONPUBLIC_QUERY = (
"select * from {table_name} where report_id in ("
" select dg.report_id from public.dissemination_general dg"
" where dg.audit_year = '{audit_year}' and dg.is_public = 'true' )"
)

UNRESTRICTED_QUERY = (
"select * from {table_name} where report_id in ("
" select dg.report_id from public.dissemination_general dg"
" where dg.audit_year = '{audit_year}')"
)

def __init__(self, table_name, friendly_name, query_override=None):
self.table_name = table_name
self.friendly_name = friendly_name

restricted_tables = [
"dissemination_" + model for model in restricted_model_names
]
default_query = (
self.EXCLUDE_NONPUBLIC_QUERY
if table_name in restricted_tables
else self.UNRESTRICTED_QUERY
)
self.query = query_override or default_query

def generate_stream(self, audit_year):
return (
f"{self.table_name}.{audit_year}",
ReplicationStream(
object=f"bulk_export/{{MM}}/{audit_year}_{self.friendly_name}.csv",
sql=self.query.format(
table_name=self.table_name, audit_year=audit_year
),
mode="full-refresh",
target_options={"format": "csv"},
),
)


STREAM_GENERATORS = [
StreamGenerator(
friendly_name="General",
table_name="dissemination_general",
query_override="select * from dissemination_general where audit_year = '{audit_year}'",
),
StreamGenerator(
friendly_name="AdditionalEIN", table_name="dissemination_additionalein"
),
StreamGenerator(
friendly_name="AdditionalUEI", table_name="dissemination_additionaluei"
),
StreamGenerator(
friendly_name="CorrectiveActionPlans", table_name="dissemination_captext"
),
StreamGenerator(
friendly_name="FederalAward", table_name="dissemination_federalaward"
),
StreamGenerator(friendly_name="Finding", table_name="dissemination_finding"),
StreamGenerator(
friendly_name="FindingText", table_name="dissemination_findingtext"
),
StreamGenerator(friendly_name="Note", table_name="dissemination_note"),
StreamGenerator(
friendly_name="PassThrough", table_name="dissemination_passthrough"
),
StreamGenerator(
friendly_name="SecondaryAuditor", table_name="dissemination_secondaryauditor"
),
]


@newrelic_timing_metric("data_export")
def _run_data_export():
logger.info("Begin exporting data")
# We may want to consider instead of hardcoding 2016 only export the past X years.
# This will only export data that exists, so doing +2 just incase some data is in early
years = range(2016, datetime.today().year + 2)
streams = {}
for stream_generator in STREAM_GENERATORS:
for year in years:
streams.update([stream_generator.generate_stream(year)])

replication = Replication(
source="FAC_DB",
target="BULK_DATA_EXPORT",
streams=streams,
defaults=DEFAULT_OPTIONS,
env=dict(FAC_DB=FAC_DB_URL, BULK_DATA_EXPORT=S3_CONNECTION),
debug=settings.DEBUG,
)
logger.info(f"Exporting {len(streams)} streams")
replication.run()
logger.info("Successfully exported data")


class Command(BaseCommand):
def handle(self, *args, **kwargs):
try:
_run_data_export()
except Exception as ex:
logger.error("An error occurred while exporting data", exc_info=ex)
raise CommandError("Error while exporting data")

0 comments on commit af35498

Please sign in to comment.