Skip to content

Commit

Permalink
Merge pull request #1463 from sul-dlss/t1439-backstage-select
Browse files Browse the repository at this point in the history
Creates backstage selection dag
  • Loading branch information
jgreben authored Dec 3, 2024
2 parents 732dea3 + 8c1a4f5 commit 641a08d
Show file tree
Hide file tree
Showing 8 changed files with 191 additions and 46 deletions.
97 changes: 97 additions & 0 deletions libsys_airflow/dags/data_exports/backstage_selections.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from datetime import datetime, timedelta

from airflow import DAG
from airflow.models.param import Param
from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.timetables.interval import CronDataIntervalTimetable

from libsys_airflow.plugins.data_exports.instance_ids import (
choose_fetch_folio_ids,
fetch_record_ids,
save_ids_to_fs,
)

from libsys_airflow.plugins.data_exports.marc.exports import marc_for_instances

devs_to_email_addr = Variable.get("EMAIL_DEVS")

default_args = {
"owner": "libsys",
"depends_on_past": False,
"email": [devs_to_email_addr],
"email_on_failure": True,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=1),
}


with DAG(
"select_backstage_records",
default_args=default_args,
schedule=CronDataIntervalTimetable(
cron=Variable.get("select_backstage", "30 22 * * FRI"),
timezone="America/Los_Angeles",
),
start_date=datetime(2024, 11, 18),
catchup=False,
tags=["data export", "backstage"],
params={
"from_date": Param(
f"{(datetime.now() - timedelta(1)).strftime('%Y-%m-%d')}",
format="date",
type="string",
description="The earliest date to select record IDs from FOLIO.",
),
"to_date": Param(
f"{(datetime.now()).strftime('%Y-%m-%d')}",
format="date",
type="string",
description="The latest date to select record IDs from FOLIO.",
),
"fetch_folio_record_ids": Param(True, type="boolean"),
"saved_record_ids_kind": Param(None, type=["null", "string"]),
},
render_template_as_native_obj=True,
) as dag:
check_record_ids = BranchPythonOperator(
task_id="check_record_ids",
python_callable=choose_fetch_folio_ids,
op_kwargs={"fetch_folio_record_ids": "{{ params.fetch_folio_record_ids }}"},
)

fetch_folio_record_ids = PythonOperator(
task_id="fetch_record_ids_from_folio",
python_callable=fetch_record_ids,
op_kwargs={"record_kind": ["new"]},
)

save_ids_to_file = PythonOperator(
task_id="save_ids_to_file",
python_callable=save_ids_to_fs,
trigger_rule="none_failed_min_one_success",
op_kwargs={
"vendor": "backstage",
"record_id_kind": "{{ params.saved_record_ids_kind }}",
},
)

fetch_marc_records = PythonOperator(
task_id="fetch_marc_records_from_folio",
python_callable=marc_for_instances,
op_kwargs={
"instance_files": "{{ ti.xcom_pull('save_ids_to_file') }}",
},
)

finish_processing_marc = EmptyOperator(
task_id="finish_marc",
)


check_record_ids >> [fetch_folio_record_ids, save_ids_to_file]
fetch_folio_record_ids >> save_ids_to_file >> fetch_marc_records
save_ids_to_file >> fetch_marc_records >> finish_processing_marc
1 change: 1 addition & 0 deletions libsys_airflow/dags/data_exports/default_schedules.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"select_backstage": "30 22 * * FRI",
"select_gobi": "30 22 * * TUE",
"transmit_gobi": "30 1 * * WED",
"select_google": "0 0 * * *",
Expand Down
1 change: 1 addition & 0 deletions libsys_airflow/plugins/data_exports/apps/vendors.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"vendors": [
"backstage",
"gobi",
"google",
"hathi",
Expand Down
5 changes: 3 additions & 2 deletions libsys_airflow/plugins/data_exports/instance_ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ def fetch_record_ids(**kwargs) -> dict:
context = get_current_context()
params = context.get("params", {}) # type: ignore
airflow = kwargs.get("airflow", "/opt/airflow/libsys_airflow")
record_kind = kwargs.get("record_kind", ["new", "updates", "deletes"])
results = {"new": [], "updates": [], "deletes": []} # type: dict

for kind in ["new", "updates", "deletes"]:
for kind in record_kind:
sql_list = sql_files(params=params, airflow=airflow, kind=kind)

for idx, sqlfile in enumerate(sql_list):
Expand Down Expand Up @@ -85,7 +86,7 @@ def save_ids_to_fs(**kwargs) -> list[Union[str, None]]:
for file in data_path.glob("*.csv"):
ids_path.append(str(file))
else:
for kind in ["new", "updates", "deletes"]:
for kind in data.keys():
ids = save_ids(airflow=airflow, data=data[kind], kind=kind, vendor=vendor)
if ids:
ids_path.append(ids)
Expand Down
17 changes: 17 additions & 0 deletions libsys_airflow/plugins/data_exports/marc/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ def check_915(self, fields915: list) -> bool:
reject = True
return reject

def check_915_authority(self, fields915: list) -> bool:
reject = False
for field in fields915:
if any("NO EXPORT" in sf for sf in field.get_subfields("a")) and any(
"AUTHORITY VENDOR" in sf for sf in field.get_subfields("b")
):
reject = True
return reject

def exclude_marc_by_vendor(self, marc_record: marcRecord, vendor: str):
"""
Filters MARC record by Vendor
Expand All @@ -72,6 +81,14 @@ def exclude_marc_by_vendor(self, marc_record: marcRecord, vendor: str):
]
)

case "backstage":
exclude = any(
[
self.check_590(marc_record.get_fields("590")),
self.check_915_authority(marc_record.get_fields("915")),
]
)

return exclude

def retrieve_marc_for_instances(
Expand Down
12 changes: 6 additions & 6 deletions libsys_airflow/plugins/data_exports/marc/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ def leader_for_deletes(marc_file: str, full_dump: bool):


def clean_and_serialize_marc_files(marc_file_list: dict):
for kind in ['new', 'updates', 'deletes']:
for file in marc_file_list[kind]:
marc_clean_serialize(file, False)
logger.info(
f"Removed MARC fields and serialized records for '{kind}' files: {marc_file_list[kind]}"
)
for kind, file_list in marc_file_list.items():
for filepath in file_list:
marc_clean_serialize(filepath, False)
logger.info(
f"Removed MARC fields and serialized records for '{kind}' files: {filepath}"
)


def marc_clean_serialize(marc_file: str, full_dump: bool):
Expand Down
18 changes: 18 additions & 0 deletions tests/data_exports/test_marc_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,24 @@ def test_marc_for_instances(mocker, tmp_path, mock_folio_client):
subfields=[pymarc.Subfield(code='a', value='NO EXPORT')],
)

field_915_authority = pymarc.Field(
tag="915",
indicators=['1', '0'], # type: ignore
subfields=[
pymarc.Subfield(code='a', value='NO EXPORT'),
pymarc.Subfield(code='b', value='AUTHORITY VENDOR'),
],
)


def test_exclude_marc_by_vendor_backstage(mocker):
mocker.patch('libsys_airflow.plugins.data_exports.marc.exporter.folio_client')
exporter = Exporter()
marc_record = pymarc.Record()
marc_record.add_field(field_590, field_915_authority)

assert exporter.exclude_marc_by_vendor(marc_record, 'backstage')


def test_exclude_marc_by_vendor_gobi(mocker):
mocker.patch('libsys_airflow.plugins.data_exports.marc.exporter.folio_client')
Expand Down
Loading

0 comments on commit 641a08d

Please sign in to comment.