Skip to content

Commit

Permalink
Merge pull request #28 from dsi-clinic/preprocess
Browse files Browse the repository at this point in the history
linkage and network pipeline
  • Loading branch information
averyschoen authored Mar 6, 2024
2 parents 24ef142 + 48470c2 commit 3fbf913
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 6 deletions.
5 changes: 3 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,6 @@ run-notebooks:
#running the linkage pipeline and creating the network graph
#still waiting on linkage_pipeline completion to get this into final shape

output network_graph: all_individuals.csv all_organizations.csv all_transactions.csv
python linkage_pipeline.py
run-linkage-and-network-pipeline:
docker build -t $(project_image_name) -f Dockerfile $(current_abs_path)
docker run -v $(current_abs_path):/project -t $(project_image_name) python utils/linkage_pipeline.py
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ If you prefer to develop inside a container with VS Code then do the following s

### Record Linkage and Network Pipeline
1. Save the standardized tables "complete_individuals_table.csv", "complete_organizations_table.csv", and "complete_transactions_table.csv" (collected from the above pipeline or data from the project's Google Drive) in the following format: repo_root / "output" / "file"
2. **UPDATE:** Run the pipeline by calling ```make run-linkage-pipeline```. This pipeline will perform conservative record linkage, attempt to classify entities as neutral, fossil fuels, or clean energy, convert the standardized tables into a NetworkX Graph, and show an interactive network visual.
2. **UPDATE:** Run the pipeline by calling ```make run-linkage-and-network-pipeline```. This pipeline will perform conservative record linkage, attempt to classify entities as neutral, fossil fuels, or clean energy, convert the standardized tables into a NetworkX Graph, and show an interactive network visual.
3. The pipeline will output the deduplicated tables saved as "cleaned_individuals_table.csv", "cleaned_organizations_table.csv", and "cleaned_transactions_table.csv". A mapping file, "deduplicated_UUIDs" tracks the UUIDs designated as duplicates. The pipeline will also output "Network Graph Node Data", which is the NetworkX Graph object converted into an adjecency list.

## Repository Structure
Expand All @@ -65,7 +65,8 @@ If the data is larger than 50MB than you should not add it to the repo and inste
This [README.md file](/data/README.md) should be kept up to date.

### output
Should contain work product generated by the analysis. Keep in mind that results should (generally) be excluded from the git repository.
This folder is empty by default. The final outputs of the Makefile will be placed here, consisting of a NetworkX Graph object and a txt file containing graph metrics.



## Team Member
Expand Down
5 changes: 4 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ Requests==2.31.0
setuptools==68.0.0
textdistance==4.6.1
usaddress==0.5.4
nameparser==1.1.3
names-dataset==3.1.0
networkx~=3.1
networkx~=3.1
splink==3.9.12
names-dataset==3.1.0
names-dataset==3.1.0
5 changes: 4 additions & 1 deletion utils/linkage.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,12 +472,15 @@ def splink_dedupe(

deduped_df = pd.merge(
first_instance_df,
match_list_df[["cluster_id"]],
match_list_df[["cluster_id", "duplicated"]],
on="cluster_id",
how="left",
)
deduped_df = deduped_df.rename(columns={"cluster_id": "unique_id"})

deduped_df["duplicated"] = deduped_df["duplicated"].apply(
lambda x: x if isinstance(x, list) else [x]
)
convert_duplicates_to_dict(deduped_df)

deduped_df = deduped_df.drop(columns=["duplicated"])
Expand Down
222 changes: 222 additions & 0 deletions utils/linkage_and_network_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
import networkx as nx
import pandas as pd
from nameparser import HumanName

from utils.classify import classify_wrapper
from utils.constants import (
BASE_FILEPATH,
individuals_blocking,
individuals_settings,
organizations_blocking,
organizations_settings,
)
from utils.linkage import (
cleaning_company_column,
deduplicate_perfect_matches,
get_address_line_1_from_full_address,
get_address_number_from_address_line_1,
get_likely_name,
get_street_from_address_line_1,
splink_dedupe,
standardize_corp_names,
)
from utils.network import (
combine_datasets_for_network_graph,
construct_network_graph,
create_network_graph,
)


def preprocess_individuals(individuals: pd.DataFrame) -> pd.DataFrame:
"""
Given a dataframe of individual donors, preprocesses the data,
and return a cleaned dataframe.
Args:
individuals: dataframe of individual contributions
Returns:
cleaned dataframe of individuals
"""
if "Unnamed: 0" in individuals.columns:
individuals.drop(columns="Unnamed: 0", inplace=True)

individuals = individuals.astype(
{
"first_name": "string",
"last_name": "string",
"full_name": "string",
"company": "string",
}
)

individuals["company"] = (
individuals["company"]
.loc[individuals["company"].notnull()]
.apply(standardize_corp_names)
)
individuals["company"] = (
individuals["company"]
.loc[individuals["company"].notnull()]
.apply(cleaning_company_column)
)

# Address functions, assuming address column is named 'Address'
if "Address" in individuals.columns:
individuals["Address"] = individuals["Address"].astype(str)[
individuals["Address"].notnull()
]
individuals["Address Line 1"] = individuals["Address"].apply(
get_address_line_1_from_full_address
)
individuals["Street Name"] = individuals["Address Line 1"].apply(
get_street_from_address_line_1
)
individuals["Address Number"] = individuals["Address Line 1"].apply(
get_address_number_from_address_line_1
)

# Check if first name or last names are empty, if so, extract from full name column
individuals["full_name"] = individuals["full_name"].astype(str)[
individuals["full_name"].notnull()
]
if individuals["first_name"].isnull().any():
first_name = individuals["full_name"].apply(
lambda x: HumanName(x).first if pd.notnull(x) else x
)
individuals["first_name"] = first_name

if individuals["last_name"].isnull().any():
last_name = individuals["full_name"].apply(
lambda x: HumanName(x).last if pd.notnull(x) else x
)
individuals["last_name"] = last_name

individuals["full_name"] = individuals.apply(
lambda row: get_likely_name(
row["first_name"], row["last_name"], row["full_name"]
),
axis=1,
)

# Ensure that columns with values are prioritized and appear first
# important for splink implementation
individuals["sort_priority"] = (
~individuals["first_name"].isna()
& ~individuals["last_name"].isna()
& ~individuals["company"].isna()
) * 2 + (~individuals["party"].isna())

individuals = individuals.sort_values(
by="sort_priority", ascending=False
).drop(columns=["sort_priority"])

individuals["unique_id"] = individuals["id"]

return individuals


def preprocess_organizations(organizations: pd.DataFrame) -> pd.DataFrame:
"""
Given a dataframe of organization donors, preprocesses the data,
and return a cleaned dataframe.
"""
if "Unnamed: 0" in organizations.columns:
organizations.drop(columns="Unnamed: 0", inplace=True)

organizations["name"] = (
organizations["name"]
.loc[organizations["name"].notnull()]
.apply(standardize_corp_names)
)

organizations["unique_id"] = organizations["id"]

return organizations


def preprocess_transactions(transactions: pd.DataFrame) -> pd.DataFrame:
"""
Given a dataframe of transactions, preprocesses the data,
and return a cleaned dataframe.
Args:
transactions: dataframe of transactions
Returns:
cleaned dataframe of transactions
"""
if "Unnamed: 0" in transactions.columns:
transactions.drop(columns="Unnamed: 0", inplace=True)

transactions["purpose"] = transactions["purpose"].str.upper()

deduped = pd.read_csv(BASE_FILEPATH / "output" / "deduplicated_UUIDs.csv")
transactions[["donor_id", "recipient_id"]] = transactions[
["donor_id", "recipient_id"]
].replace(deduped)

return transactions


def main():
organizations = pd.read_csv(
BASE_FILEPATH / "data" / "complete_organizations_table.csv"
)

individuals = pd.read_csv(
BASE_FILEPATH / "data" / "complete_individuals_table.csv"
)

transactions = pd.read_csv(
BASE_FILEPATH / "data" / "complete_transactions_table.csv"
)

individuals = preprocess_individuals(individuals)
organizations = preprocess_organizations(organizations)

individuals, organizations = classify_wrapper(individuals, organizations)

individuals = deduplicate_perfect_matches(individuals)
organizations = deduplicate_perfect_matches(organizations)

organizations = splink_dedupe(
organizations, organizations_settings, organizations_blocking
)

individuals = splink_dedupe(
individuals, individuals_settings, individuals_blocking
)

transactions = preprocess_transactions(transactions)

cleaned_individuals_output_path = (
BASE_FILEPATH / "output" / "cleaned_individuals_table.csv"
)

cleaned_organizations_output_path = (
BASE_FILEPATH / "output" / "cleaned_organizations_table.csv"
)

cleaned_transactions_output_path = (
BASE_FILEPATH / "output" / "cleaned_transactions_table.csv"
)

individuals.to_csv(cleaned_individuals_output_path, index=False)
organizations.to_csv(cleaned_organizations_output_path, index=False)
transactions.to_csv(cleaned_transactions_output_path, index=False)

aggreg_df = combine_datasets_for_network_graph(
[individuals, organizations, transactions]
)
g = create_network_graph(aggreg_df)
g_output_path = BASE_FILEPATH / "output" / "g.gml"
nx.write_graphml(g, g_output_path)

construct_network_graph(
2018, 2023, [individuals, organizations, transactions]
)


if __name__ == "__main__":
main()

0 comments on commit 3fbf913

Please sign in to comment.