Skip to content

Commit

Permalink
Merge pull request #67 from sul-dlss-labs/doi-sunet-contribs
Browse files Browse the repository at this point in the history
Add new tasks create_doi_sunet and contribs
  • Loading branch information
lwrubel authored Jul 1, 2024
2 parents 1b8fb16 + 9a02e82 commit 75c3720
Show file tree
Hide file tree
Showing 7 changed files with 342 additions and 105 deletions.
53 changes: 31 additions & 22 deletions rialto_airflow/dags/harvest.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import datetime
import pickle
from pathlib import Path

from airflow.models import Variable
from airflow.decorators import dag, task
from airflow.models import Variable

from rialto_airflow.utils import create_snapshot_dir, rialto_authors_file
from rialto_airflow.harvest import dimensions, openalex, merge_pubs
from rialto_airflow.harvest import dimensions, merge_pubs, openalex
from rialto_airflow.harvest.doi_sunet import create_doi_sunet_pickle
from rialto_airflow.harvest.sul_pub import sul_pub_csv
from rialto_airflow.harvest.doi_set import create_doi_set

from rialto_airflow.harvest.contribs import create_contribs
from rialto_airflow.utils import create_snapshot_dir, rialto_authors_file

data_dir = Variable.get("data_dir")
sul_pub_host = Variable.get("sul_pub_host")
Expand Down Expand Up @@ -70,12 +71,22 @@ def sul_pub_harvest(snapshot_dir):
return str(csv_file)

@task()
def doi_set(dimensions, openalex, sul_pub):
def create_doi_sunet(dimensions, openalex, sul_pub, authors, snapshot_dir):
"""
Extract a mapping of DOI -> [SUNET] from the dimensions doi-orcid dict,
openalex doi-orcid dict, SUL-Pub publications, and authors data.
"""
pickle_file = Path(snapshot_dir) / "doi-sunet.pickle"
create_doi_sunet_pickle(dimensions, openalex, sul_pub, authors, pickle_file)

return str(pickle_file)

@task()
def doi_set(doi_sunet_pickle):
"""
Extract a unique list of DOIs from the dimensions doi-orcid dict,
the openalex doi-orcid dict, and the SUL-Pub publications.
Use the DOI -> [SUNET] pickle to return a list of all DOIs.
"""
return create_doi_set(dimensions, openalex, sul_pub)
return list(pickle.load(open(doi_sunet_pickle, "rb")).keys())

@task()
def dimensions_harvest_pubs(dois, snapshot_dir):
Expand Down Expand Up @@ -105,18 +116,14 @@ def merge_publications(sul_pub, openalex_pubs, dimensions_pubs, snapshot_dir):
return str(output)

@task()
def join_authors(pubs, authors_csv):
"""
Add the Stanford organizational data to the publications.
"""
return True

@task()
def pubs_to_contribs(pubs):
def pubs_to_contribs(pubs, doi_sunet_pickle, authors_csv, snapshot_dir):
"""
Get contributions from publications.
"""
return True
output = Path(snapshot_dir) / "contributions.parquet"
create_contribs(pubs, doi_sunet_pickle, authors_csv, output)

return str(output)

@task()
def publish(dataset):
Expand All @@ -135,17 +142,19 @@ def publish(dataset):

openalex_dois = openalex_harvest_dois(authors_csv, snapshot_dir)

dois = doi_set(dimensions_dois, openalex_dois, sul_pub)
doi_sunet = create_doi_sunet(
dimensions_dois, openalex_dois, sul_pub, authors_csv, snapshot_dir
)

dois = doi_set(doi_sunet)

dimensions_pubs = dimensions_harvest_pubs(dois, snapshot_dir)

openalex_pubs = openalex_harvest_pubs(dois, snapshot_dir)

pubs = merge_publications(sul_pub, openalex_pubs, dimensions_pubs, snapshot_dir)

pubs_authors = join_authors(pubs, authors_csv)

contribs = pubs_to_contribs(pubs_authors)
contribs = pubs_to_contribs(pubs, doi_sunet, authors_csv, snapshot_dir)

publish(contribs)

Expand Down
20 changes: 20 additions & 0 deletions rialto_airflow/harvest/contribs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import pickle
import polars as pl


def create_contribs(pubs_parquet, doi_sunet_pickle, authors, contribs_path):
pubs = pl.read_parquet(pubs_parquet)
authors = pl.read_csv(authors)

doi_sunet = pickle.load(open(doi_sunet_pickle, "rb"))
doi_sunet = pl.DataFrame({"doi": doi_sunet.keys(), "sunetid": doi_sunet.values()})

pubs = pubs.join(doi_sunet, on="doi")

contribs = pubs.explode("sunetid")

contribs = contribs.join(authors, on="sunetid")

contribs.write_parquet(contribs_path)

return contribs_path
32 changes: 0 additions & 32 deletions rialto_airflow/harvest/doi_set.py

This file was deleted.

108 changes: 108 additions & 0 deletions rialto_airflow/harvest/doi_sunet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import logging
import pickle
from collections import defaultdict

import pandas as pd


def create_doi_sunet_pickle(
dimensions: str, openalex: str, sul_pub_csv: str, authors_csv: str, output_path
) -> dict:
"""
Get DOIs from each source and determine their SUNETID(s) using the authors
csv file. Write the resulting mapping as a pickle to the output_path.
"""
# use the authors csv to generate two dictionaries for looking up the sunet
# based on an orcid or a cap_profile
orcid_sunet, cap_profile_sunet = get_author_maps(authors_csv)

# dimensions and openalex pickle files map doi -> [orcid] and use the
# orcid_sunet mapping to turn that into doi -> [sunet]
dimensions_map = doi_sunetids(dimensions, orcid_sunet)
openalex_map = doi_sunetids(openalex, orcid_sunet)

# sulpub csv has doi and authorship columns the latter of which contains the cap_profile_id so
# the cap_profile_sunet mapping can be used to return a mapping of doi -> [sunet]
sulpub_map = sulpub_doi_sunetids(sul_pub_csv, cap_profile_sunet)

doi_sunet = combine_maps(dimensions_map, openalex_map, sulpub_map)

with open(output_path, "wb") as handle:
pickle.dump(doi_sunet, handle, protocol=pickle.HIGHEST_PROTOCOL)

logging.info(f"Found {len(doi_sunet)} DOIs")


def doi_sunetids(pickle_file: str, orcid_sunet: dict) -> dict:
"""
Convert a mapping of doi -> [orcid] to a mapping of doi -> [sunet].
"""
doi_orcids = pickle.load(open(pickle_file, "rb"))

mapping = {}
for doi, orcids in doi_orcids.items():
mapping[doi] = [orcid_sunet[orcid] for orcid in orcids]

return mapping


def sulpub_doi_sunetids(sul_pub_csv, cap_profile_sunet):
# create a dataframe for sul_pubs which has a column for cap_profile_id
# extracted from the authorship column
df = pd.read_csv(sul_pub_csv, usecols=["doi", "authorship"])
df = df[df["doi"].notna()]

def extract_cap_ids(authors):
return [a["cap_profile_id"] for a in eval(authors) if a["status"] == "approved"]

df["cap_profile_id"] = df["authorship"].apply(extract_cap_ids)

df = df.explode("cap_profile_id")

# create a column for sunet using the cap_profile_sunet dictionary
df["sunet"] = df["cap_profile_id"].apply(lambda cap_id: cap_profile_sunet[cap_id])

return df.groupby("doi")["sunet"].apply(list).to_dict()


def get_author_maps(authors):
"""
Reads the authors csv and returns two dictionary mappings: orcid -> sunet,
cap_profile_id -> sunet.
"""
df = pd.read_csv(authors, usecols=["sunetid", "orcidid", "cap_profile_id"])
df["orcidid"] = df["orcidid"].apply(orcid_id)

# orcid -> sunet
orcid = pd.Series(df["sunetid"].values, index=df["orcidid"]).to_dict()

# cap_profile_id -> sunet
cap_profile_id = pd.Series(
df["sunetid"].values, index=df["cap_profile_id"]
).to_dict()

return orcid, cap_profile_id


def combine_maps(m1, m2, m3):
m = defaultdict(set)

# fold values from dictionary d2 into dictionary d1
def combine(d1, d2):
for doi, sunets in d2.items():
for sunet in sunets:
d1[doi].add(sunet)

combine(m, m1)
combine(m, m2)
combine(m, m3)

# return the mapping with the sets turned into lists
return {k: list(v) for k, v in m.items()}


def orcid_id(orcid):
if pd.isna(orcid):
return None
else:
return orcid.replace("https://orcid.org/", "")
76 changes: 76 additions & 0 deletions test/harvest/test_contribs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import pickle
import pytest
import polars as pl

from rialto_airflow.harvest.contribs import create_contribs


@pytest.fixture
def pubs_parquet(tmp_path):
fixture_path = tmp_path / "pubs.parquet"
df = pl.DataFrame(
{
"doi": [
"0000/abc",
"0000/123",
"0000/999",
],
"title": ["Exquisite article", "Fantabulous research", "Perfect prose"],
}
)
df.write_parquet(fixture_path)
return str(fixture_path)


@pytest.fixture
def doi_sunet(tmp_path):
fixture_path = tmp_path / "doi-sunet.pickle"
m = {"0000/abc": ["user1"], "0000/123": ["user2"], "0000/999": ["user1", "user2"]}
pickle.dump(m, open(str(fixture_path), "wb"))
return str(fixture_path)


@pytest.fixture
def authors(tmp_path):
fixture_path = tmp_path / "users.csv"
df = pl.DataFrame(
{"sunetid": ["user1", "user2"], "first_name": ["Mary", "Frederico"]}
)
df.write_csv(fixture_path)
return str(fixture_path)


def test_create_contribs(pubs_parquet, doi_sunet, authors, tmp_path):
contribs_parquet = tmp_path / "contribs.parquet"
create_contribs(pubs_parquet, doi_sunet, authors, contribs_parquet)

df = pl.read_parquet(contribs_parquet)
assert set(df.columns) == set(
["doi", "sunetid", "title", "first_name"]
), "columns are correct"

# first publication got joined to authors
assert len(df.filter(pl.col("doi") == "0000/abc")) == 1
row = df.filter(pl.col("doi") == "0000/abc").row(0, named=True)
assert row["sunetid"] == "user1"
assert row["first_name"] == "Mary"
assert row["title"] == "Exquisite article"

# second publication got joined to authors
assert len(df.filter(pl.col("doi") == "0000/123")) == 1
row = df.filter(pl.col("doi") == "0000/123").row(0, named=True)
assert row["sunetid"] == "user2"
assert row["first_name"] == "Frederico"
assert row["title"] == "Fantabulous research"

# third publication was broken out into two rows since the doi_sunet pickle
# file indicates it was authored by two people.
rows = df.filter(pl.col("doi") == "0000/999").sort("sunetid")
assert len(rows) == 2
assert rows["sunetid"][0] == "user1"
assert rows["first_name"][0] == "Mary"
assert rows["title"][0] == "Perfect prose"

assert rows["sunetid"][1] == "user2"
assert rows["first_name"][1] == "Frederico"
assert rows["title"][1] == "Perfect prose"
51 changes: 0 additions & 51 deletions test/harvest/test_doi_set.py

This file was deleted.

Loading

0 comments on commit 75c3720

Please sign in to comment.