diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 7e94f662..739fcd38 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -1,5 +1,5 @@ { - "name": "2023-fall-clinic-climate-cabinet-devcontainer", + "name": "2024-winter-clinic-climate-cabinet-devcontainer", "build": { "dockerfile": "../Dockerfile", "context": "..", diff --git a/Makefile b/Makefile index eb3ba0c3..c2553e2d 100644 --- a/Makefile +++ b/Makefile @@ -7,8 +7,8 @@ current_abs_path := $(subst Makefile,,$(mkfile_path)) # pipeline constants # PROJECT_NAME -project_image_name := "2023-fall-clinic-climate-cabinet" -project_container_name := "2023-fall-clinic-climate-cabinet-container" +project_image_name := "2024-winter-clinic-climate-cabinet" +project_container_name := "2024-winter-clinic-climate-cabinet-container" project_dir := "$(current_abs_path)" # environment variables @@ -29,3 +29,10 @@ run-notebooks: jupyter lab --port=8888 --ip='*' --NotebookApp.token='' --NotebookApp.password='' \ --no-browser --allow-root + +#running the linkage pipeline and creating the network graph +#still waiting on linkage_pipeline completion to get this into final shape + +run-linkage-and-network-pipeline: + docker build -t $(project_image_name) -f Dockerfile $(current_abs_path) + docker run -v $(current_abs_path):/project -t $(project_image_name) python utils/linkage_and_network_pipeline.py diff --git a/README.md b/README.md index 1340d12d..da6bba26 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# 2023-fall-clinic-climate-cabinet +# 2024-winter-clinic-climate-cabinet ## Data Science Clinic Project Goals @@ -34,28 +34,51 @@ If you prefer to develop inside a container with VS Code then do the following s 3. Click the blue or green rectangle in the bottom left of VS code (should say something like `><` or `>< WSL`). Options should appear in the top center of your screen. Select `Reopen in Container`. -### Project Pipeline +### Data Collection and Standardization Pipeline 1. Collect the data through **one** of the steps below a. Collect state's finance campaign data either from web scraping (AZ, MI, PA) or direct download (MN) OR - b. Go to the [Project's Google Drive]('https://drive.google.com/drive/u/2/folders/1HUbOU0KRZy85mep2SHMU48qUQ1ZOSNce') to download each state's data to their local repo following this format: repo_root / "data" / "raw" / / "file" + b. Go to the [Project's Google Drive]('https://drive.google.com/drive/u/2/folders/1HUbOU0KRZy85mep2SHMU48qUQ1ZOSNce') to download each state's data to their local repo following this format: repo_root / "data" / "raw" / state acronym / "file" 2. Open in development container which installs all necessary packages. 3. Run the project by running ```python utils/pipeline.py``` or ```python3 utils/pipeline.py``` run the processing pipeline that cleans, standardizes, and creates the individuals, organizations, and transactions concatenated into one comprehensive database. -5. running ```pipeline.py``` returns the tables to the output folder as csv files containing the complete individuals, organizations, and transactions DataFrames combining the AZ, MI, MN, and PA datasets. +5. Running ```pipeline.py``` returns the tables to the output folder as csv files containing the complete individuals, organizations, and transactions DataFrames combining the AZ, MI, MN, and PA datasets. 6. For future reference, the above pipeline also stores the information mapping given id to our database id (generated via uuid) in a csv file in the format of (state)IDMap.csv (example: ArizonaIDMap.csv) in the output folder -## Team Members +### Record Linkage and Network Pipeline +1. Save the standardized tables "complete_individuals_table.csv", "complete_organizations_table.csv", and "complete_transactions_table.csv" (collected from the above pipeline or data from the project's Google Drive) in the following format: repo_root / "output" / "file" +2. **UPDATE:** Run the pipeline by calling ```make run-linkage-and-network-pipeline```. This pipeline will perform conservative record linkage, attempt to classify entities as neutral, fossil fuels, or clean energy, convert the standardized tables into a NetworkX Graph, and show an interactive network visual. +3. The pipeline will output the deduplicated tables saved as "cleaned_individuals_table.csv", "cleaned_organizations_table.csv", and "cleaned_transactions_table.csv". A mapping file, "deduplicated_UUIDs" tracks the UUIDs designated as duplicates. The pipeline will also output "Network Graph Node Data", which is the NetworkX Graph object converted into an adjecency list. -Student Name: April Wang -Student Email: yuzhouw@uchicago.edu +## Repository Structure + +### utils +Project python code + +### notebooks +Contains short, clean notebooks to demonstrate analysis. + +### data + +Contains details of acquiring all raw data used in repository. If data is small (<50MB) then it is okay to save it to the repo, making sure to clearly document how to the data is obtained. + +If the data is larger than 50MB than you should not add it to the repo and instead document how to get the data in the README.md file in the data directory. + +This [README.md file](/data/README.md) should be kept up to date. + +### output +This folder is empty by default. The final outputs of the Makefile will be placed here, consisting of a NetworkX Graph object and a txt file containing graph metrics. + + + +## Team Member Student Name: Nicolas Posner Student Email: nrposner@uchicago.edu -Student Name: Aïcha Camara -Student Email: aichacamara@uchicago.edu - Student Name: Alan Kagiri Student Email: alankagiri@uchicago.edu. Student Name: Adil Kassim Student Email: adilk@uchicago.edu + +Student Name: Nayna Pashilkar +Student Email: npashilkar@uchicago.edu diff --git a/notebooks/Test.ipynb b/notebooks/Test.ipynb deleted file mode 100644 index 5df942e1..00000000 --- a/notebooks/Test.ipynb +++ /dev/null @@ -1,39 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 2, - "metadata": {}, - "outputs": [], - "source": [ - "# Example Notebook file demonstrating how to use the file structure\n", - "from utils.preprocess_util_lib_example import save_random_dataframe\n", - "from pathlib import Path\n", - "\n", - "save_random_dataframe(Path(\"../output\"), Path(\"test.csv\"))" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.11.5" - }, - "orig_nbformat": 4 - }, - "nbformat": 4, - "nbformat_minor": 2 -} diff --git a/output/README.md b/output/README.md index 932298fd..5c511d5e 100644 --- a/output/README.md +++ b/output/README.md @@ -1,2 +1,3 @@ # Output README --- +'deduplicated_UUIDs.csv' : Following record linkage work in the record_linkage pipeline, this file stores all the original uuids, and indicates the uuids to which the deduplicated uuids have been matched to. diff --git a/requirements.txt b/requirements.txt index 6658f0ea..2b595c8b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,3 +17,11 @@ beautifulsoup4==4.11.1 numpy==1.25.0 Requests==2.31.0 setuptools==68.0.0 +textdistance==4.6.1 +usaddress==0.5.4 +nameparser==1.1.3 +names-dataset==3.1.0 +networkx~=3.1 +networkx~=3.1 +splink==3.9.12 +names-dataset==3.1.0 diff --git a/setup.py b/setup.py index 63ef672a..07404acd 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,7 @@ from setuptools import find_packages, setup setup( - name="2023-fall-clinic-climate-cabinet", + name="2024-winter-clinic-climate-cabinet", version="0.1.0", packages=find_packages( include=[ diff --git a/utils/README.md b/utils/README.md index 3b9172ad..ee0cd84d 100644 --- a/utils/README.md +++ b/utils/README.md @@ -70,4 +70,12 @@ Util functions for MN EDA classify the donor entities in the expenditures. 3. The Contributors datasets have 4 kinds of recipient entities: lobbyists, candidates, committees, and nan. In order to fit the entries within the - schema, I code nan entries as 'Organization' \ No newline at end of file + schema, I code nan entries as 'Organization' + +#### classify.py +1. These functions take in the deduplicated and cleaned individuals and organizations +dataframes from the deduplication and linkage pipeline. +2. We classify based on substrings known to indicate clean energy or fossil fuels groups. +In particular, individuals are classified based on their employment by fossil fuels companies, +and organizations are classified by their names, prioritizing high profile corporations/PACs +and those which were found by a manual search of the largest donors/recipients in the dataset \ No newline at end of file diff --git a/utils/classify.py b/utils/classify.py new file mode 100644 index 00000000..5475894b --- /dev/null +++ b/utils/classify.py @@ -0,0 +1,107 @@ +import pandas as pd + +from utils.constants import c_org_names, f_companies, f_org_names + + +def classify_wrapper( + individuals_df: pd.DataFrame, organizations_df: pd.DataFrame +): + """Wrapper for classification in linkage pipeline + + Initialize the classify column in both dataframes and + call sub-functions classifying individuals and organizations + + Args: + individuals_df: cleaned and deduplicated dataframe of individuals + organizations_df: cleaned and deduplicated dataframe of organizations + + Returns: + individuals and organizations datfarames with a new + 'classification' column containing 'neutral', 'f', or 'c'. + 'neutral' status is the default for all entities, and those tagged + as 'neutral' are entities which we could not confidently identify as + either fossil fuel or clean energy organizations or affiliates. + Classification is very conservative, and we are very confident that + entities classified as one group or another are related to them. + + """ + + individuals_df["classification"] = "neutral" + organizations_df["classification"] = "neutral" + + classified_individuals = classify_individuals(individuals_df) + classified_orgs = classify_orgs(organizations_df) + + return classified_individuals, classified_orgs + + +def matcher(df: pd.DataFrame, substring: str, column: str, category: str): + """Applies a label to the classification column based on substrings + + We run through a given column containing strings in the dataframe. We + seek out rows containing substrings, and apply a certain label to + the classification column. We initialize using the 'neutral' label and + use the 'f' and 'c' labels to denote fossil fuel and clean energy + entities respectively. + + Args: + df: a pandas dataframe + substring: the string to search for + column: the column name in which to search + category: the category to assign the row, such as 'f' 'c' or 'neutral' + + Returns: + A pandas dataframe in which rows matching the substring conditions in + a certain column are marked with the appropriate category + """ + + bool_series = df[column].str.contains(substring, na=False) + + df.loc[bool_series, "classification"] = category + + return df + + +def classify_individuals(individuals_df: pd.DataFrame): + """Part of the classification pipeline + + We check if individuals work for a known fossil fuel company + and categorize them using the matcher() function. + + Args: + individuals_df: a dataframe containing deduplicated + standardized individuals data + + Returns: + an individuals dataframe updated with the fossil fuels category + """ + + for i in f_companies: + individuals_df = matcher(individuals_df, i, "company", "f") + + return individuals_df + + +def classify_orgs(organizations_df: pd.DataFrame): + """Part of the classification pipeline + + We apply the matcher function to the organizations dataframe + repeatedly, using a variety of substrings to identify fossil + fuel and clean energy companies. + + Args: + organizations_df: a dataframe containing deduplicated + standardized organizations data + + Returns: + an organizations dataframe updated with the fossil fuels + and clean energy category + """ + + for i in f_org_names: + organizations_df = matcher(organizations_df, i, "name", "f") + + for i in c_org_names: + organizations_df = matcher(organizations_df, i, "name", "c") + + return organizations_df diff --git a/utils/constants.py b/utils/constants.py index b87d39d3..bc6d4cf6 100644 --- a/utils/constants.py +++ b/utils/constants.py @@ -3,6 +3,9 @@ """ from pathlib import Path +import splink.duckdb.comparison_library as cl +import splink.duckdb.comparison_template_library as ctl + BASE_FILEPATH = Path(__file__).resolve().parent.parent # returns the base_path to the directory @@ -605,3 +608,150 @@ " WV ", " WY ", ] + +# utils/linkage.py constants + +COMPANY_TYPES = { + "CORP": "CORPORATION", + "CO": "CORPORATION", + "LLC": "LIMITED LIABILITY COMPANY", + "PTNR": "PARTNERSHIP", + "LP": "LIMITED PARTNERSHIP", + "LLP": "LIMITED LIABILITY PARTNERSHIP", + "SOLE PROP": "SOLE PROPRIETORSHIP", + "SP": "SOLE PROPRIETORSHIP", + "NPO": "NONPROFIT ORGANIZATION", + "PC": "PROFESSIONAL CORPORATION", + "CO-OP": "COOPERATIVE", + "LTD": "LIMITED COMPANY", + "JSC": "JOINT STOCK COMPANY", + "HOLDCO": "HOLDING COMPANY", + "PLC": "PUBLIC LIMITED COMPANY", + "PVT LTD": "PRIVATE LIMITED COMPANY", + "INC": "INCORPORATED", + "ASSOC": "ASSOCIATION", + "FDN": "FOUNDATION", + "TR": "TRUST", + "SOC": "SOCIETY", + "CONSORT": "CONSORTIUM", + "SYND": "SYNDICATE", + "GRP": "GROUP", + "CORP SOLE": "CORPORATION SOLE", + "JV": "JOINT VENTURE", + "SUB": "SUBSIDIARY", + "FRANCHISE": "FRANCHISE", + "PA": "PROFESSIONAL ASSOCIATION", + "CIC": "COMMUNITY INTEREST COMPANY", + "PAC": "POLITICAL ACTION COMMITTEE", +} + + +individuals_settings = { + "link_type": "dedupe_only", + "blocking_rules_to_generate_predictions": [ + "l.first_name = r.first_name", + "l.last_name = r.last_name", + ], + "comparisons": [ + ctl.name_comparison("full_name"), + cl.exact_match("entity_type", term_frequency_adjustments=True), + cl.jaro_winkler_at_thresholds( + "state", [0.9, 0.8] + ), # threshold will catch typos and shortenings + cl.jaro_winkler_at_thresholds("party", [0.9, 0.8]), + cl.jaro_winkler_at_thresholds("company", [0.9, 0.8]), + ], + # DEFAULT + "retain_matching_columns": True, + "retain_intermediate_calculation_columns": True, + "max_iterations": 10, + "em_convergence": 0.01, +} + +individuals_blocking = [ + "l.first_name = r.first_name", + "l.last_name = r.last_name", +] + +organizations_settings = { + "link_type": "dedupe_only", + "blocking_rules_to_generate_predictions": [ + "l.name = r.name", + ], + "comparisons": [ + cl.exact_match("entity_type", term_frequency_adjustments=True), + cl.jaro_winkler_at_thresholds( + "state", [0.9, 0.8] + ), # threshold will catch typos and shortenings + # Add more comparisons as needed + ], + "retain_matching_columns": True, + "retain_intermediate_calculation_columns": True, + "max_iterations": 10, + "em_convergence": 0.01, +} + +organizations_blocking = ["l.name = r.name"] + +# individuals compnay f names +f_companies = [ + "exxon", + "chevron", + "southwest gas", + "petroleum", + "koch industries", + "koch companies", + "oil & gas", + "marathon oil", + "shell oil", +] + +# organizations f names +f_org_names = [ + "koch industries", + "koch pac", + "kochpac", + "southwest gas az", + "pinnacle west", + "americans for prosperity", + "energy transfer", +] + +# organizations c names +c_org_names = [ + "clean energy", + "vote solar action", + "renewable", + "pattern energy", + "beyond carbon", + "lcv victory", + "league of conservation", +] + +suffixes = [ + "sr", + "jr", + "i", + "ii", + "iii", + "iv", + "v", + "vi", + "vii", + "viii", + "ix", + "x", +] + +titles = [ + "mr", + "ms", + "mrs", + "miss", + "prof", + "dr", + "doctor", + "sir", + "madam", + "professor", +] diff --git a/utils/linkage.py b/utils/linkage.py index aa56307e..495cae3c 100644 --- a/utils/linkage.py +++ b/utils/linkage.py @@ -1,33 +1,105 @@ +import os.path +import re + +import numpy as np +import pandas as pd +import usaddress +from splink.duckdb.linker import DuckDBLinker + +from utils.constants import COMPANY_TYPES, repo_root, suffixes, titles + """ Module for performing record linkage on state campaign finance dataset """ -def calculate_string_similarity(string1: str, string2: str) -> float: - """Returns how similar two strings are on a scale of 0 to 1 +def get_address_line_1_from_full_address(address: str) -> str: + """Given a full address, return the first line of the formatted address + + Address line 1 usually includes street address or PO Box information. - The exact meaning of the metric is open, but the following must hold true: - 1. equivalent strings must return 1 - 2. strings with no similar characters must return 0 - 3. strings with higher intuitive similarity must return higher scores + Uses the usaddress libray which splits an address string into components, + and labels each component. + https://usaddress.readthedocs.io/en/latest/ Args: - string1: any string - string2: any string + address: raw string representing full address Returns: - similarity score + address_line_1 as a string Sample Usage: - >>> calculate_string_similarity("exact match", "exact match") - 1.0 - >>> calculate_string_similarity("aaaaaa", "bbbbbbbbbbb") - 0.0 - >>> similar_score = calculate_string_similarity("very similar", "vary similar") - >>> different_score = calculate_string_similarity("very similar", "very not close") - >>> similar_socre > different_score - True + >>> get_address_line_1_from_full_address('6727 W. Corrine Dr. Peoria,AZ 85381') + '6727 W. Corrine Dr.' + >>> get_address_line_1_from_full_address('P.O. Box 5456 Sun City West ,AZ 85375') + 'P.O. Box 5456' + >>> get_address_line_1_from_full_address('119 S 5th St Niles,MI 49120') + '119 S 5th St' + >>> get_address_line_1_from_full_address( + ... '1415 PARKER STREET APT 251 DETROIT MI 48214-0000' + ... ) + '1415 PARKER STREET' """ - pass + + address_tuples = usaddress.parse( + address + ) # takes a string address and put them into value, key pairs as tuples + line1_components = [] + for value, key in address_tuples: + if key == "PlaceName": + break + elif key in ( + "AddressNumber", + "StreetNamePreDirectional", + "StreetName", + "StreetNamePostType", + "USPSBoxType", + "USPSBoxID", + ): + line1_components.append(value) + line1 = " ".join(line1_components) + return line1 + + +def determine_comma_role(name: str) -> str: + """Given a string (someone's name), attempts to determine the role of the + comma in the name and where it ought to belong. + + Some assumptions are made: + * If a suffix is included in the name and the name is not just the last + name(i.e "Doe, Jr), the format is + (last_name suffix, first and middle name) i.e Doe iv, Jane Elisabeth + + * If a comma is used anywhere else, it is in the format of + (last_name, first and middle name) i.e Doe, Jane Elisabeth + Args: + name: a string representing a name/names of individuals + Returns: + the name with or without a comma based on some conditions + + Sample Usage: + >>> determine_comma_role("Jane Doe, Jr") + 'Jane Doe, Jr' + >>> determine_comma_role("Doe, Jane Elisabeth") + ' Jane Elisabeth Doe' + >>> determine_comma_role("Jane Doe,") + 'Jane Doe' + >>> determine_comma_role("DOe, Jane") + ' Jane Doe' + """ + + name_parts = name.lower().split(",") + # if the comma is just in the end as a typo: + if len(name_parts[1]) == 0: + return name_parts[0].title() + # if just the suffix in the end, leave the name as it is + if name_parts[1].strip() in suffixes: + return name.title() + # at this point either it's just poor name placement, or the suffix is + # in the beginning of the name. Either way, the first part of the list is + # the true last name. + last_part = name_parts.pop(0) + first_part = " ".join(name_parts) + return first_part.title() + " " + last_part.title() def get_likely_name(first_name: str, last_name: str, full_name: str) -> str: @@ -48,55 +120,76 @@ def get_likely_name(first_name: str, last_name: str, full_name: str) -> str: Sample Usage: >>> get_likely_name("Jane", "Doe", "") - "Jane Doe" + 'Jane Doe' >>> get_likely_name("", "", "Jane Doe") - "Jane Doe" + 'Jane Doe' >>> get_likely_name("", "Doe, Jane", "") - "Jane Doe" + 'Jane Doe' >>> get_likely_name("Jane Doe", "Doe", "Jane Doe") - "Jane Doe" + 'Jane Doe' + >>> get_likely_name("Jane","","Doe, Sr") + 'Jane Doe, Sr' + >>> get_likely_name("Jane Elisabeth Doe, IV","Elisabeth","Doe, IV") + 'Jane Elisabeth Doe, Iv' + >>> get_likely_name("","","Jane Elisabeth Doe, IV") + 'Jane Elisabeth Doe, Iv' + >>> get_likely_name("Jane","","Doe, Jane, Elisabeth") + 'Jane Elisabeth Doe' """ - pass + # first, convert any NaNs to empty strings '' + first_name, last_name, full_name = [ + "" if x is np.NAN else x for x in [first_name, last_name, full_name] + ] + # second, ensure clean input by deleting spaces: + first_name, last_name, full_name = list( + map(lambda x: x.lower().strip(), [first_name, last_name, full_name]) + ) -def get_address_line_1_from_full_address(address: str) -> str: - """Given a full address, return the first line of the formatted address + # if data is clean: + if first_name + " " + last_name == full_name: + return full_name.title() - Address line 1 usually includes street address or PO Box information. + # remove titles or professions from the name + names = [first_name, last_name, full_name] - Args: - address: raw string representing full address - Returns: - address_line_1 + for i in range(len(names)): + # if there is a ',' deal with it accordingly + if "," in names[i]: + names[i] = determine_comma_role(names[i]) - Sample Usage: - >>> get_address_line_1_from_full_address("6727 W. Corrine Dr. Peoria,AZ 85381") - "6727 W. Corrine Dr." - >>> get_address_line_1_from_full_address("P.O. Box 5456 Sun City West ,AZ 85375") - "P.O. Box 5456" - >>> get_address_line_1_from_full_address("119 S 5th St Niles,MI 49120") - "119 S 5th St" - >>> get_address_line_1_from_full_address( - ... "1415 PARKER STREET APT 251 DETROIT MI 48214-0000" - ... ) - "1415 PARKER STREET" - """ - pass + names[i] = names[i].replace(".", "").split(" ") + names[i] = [ + name_part for name_part in names[i] if name_part not in titles + ] + names[i] = " ".join(names[i]) + + # one last check to remove any pieces that might add extra whitespace + names = list(filter(lambda x: x != "", names)) + names = " ".join(names) + names = names.title().replace(" ", " ").split(" ") + final_name = [] + [final_name.append(x) for x in names if x not in final_name] + return " ".join(final_name).strip() def get_street_from_address_line_1(address_line_1: str) -> str: """Given an address line 1, return the street name + Uses the usaddress libray which splits an address string into components, + and labels each component. + https://usaddress.readthedocs.io/en/latest/ + Args: - address_line_1: either street information or PO box + address_line_1: either street information or PO box as a string Returns: - street name + street name as a string Raises: ValueError: if string is malformed and no street can be reasonably found. >>> get_street_from_address_line_1("5645 N. UBER ST") - "UBER ST" + 'UBER ST' >>> get_street_from_address_line_1("") Traceback (most recent call last): ... @@ -105,5 +198,291 @@ def get_street_from_address_line_1(address_line_1: str) -> str: Traceback (most recent call last): ... ValueError: address_line_1 is PO Box + >>> get_street_from_address_line_1("300 59 St.") + '59 St.' + >>> get_street_from_address_line_1("Uber St.") + 'Uber St.' + >>> get_street_from_address_line_1("3NW 59th St") + '59th St' + """ + if not address_line_1 or address_line_1.isspace(): + raise ValueError("address_line_1 must have whitespace") + + address_line_lower = address_line_1.lower() + + if "po box" in address_line_lower: + raise ValueError("address_line_1 is PO Box") + + string = [] + address = usaddress.parse(address_line_1) + for key, val in address: + if val in ["StreetName", "StreetNamePostType"]: + string.append(key) + + return " ".join(string) + + +def convert_duplicates_to_dict(df: pd.DataFrame) -> None: + """For each uuid, maps it to all other uuids for which it has been deemed a + match. + + Given a dataframe where the uuids of all rows deemed similar are stored in a + list and all but the first row of each paired uuid is dropped, this function + maps the matched uuids to a single uuid. + + Args: + A pandas df containing a column called 'duplicated', where each row is a + list of all uuids deemed a match. In each list, all uuids but the first + have their rows already dropped. + + Returns + None. However it outputs a file to the output directory, with 2 + columns. The first lists all the uuids in df, and is labeled + 'original_uuids.' The 2nd shows the uuids to which each entry is mapped + to, and is labeled 'mapped_uuids'. + """ + deduped_dict = {} + for i in range(len(df)): + deduped_uudis = df.iloc[i]["duplicated"] + for j in range(len(deduped_uudis)): + deduped_dict.update({deduped_uudis[j]: df.iloc[i]["id"]}) + + # now convert dictionary into a csv file + deduped_df = pd.DataFrame.from_dict(deduped_dict, "index") + deduped_df = deduped_df.reset_index().rename( + columns={"index": "original_uuids", 0: "mapped_uuid"} + ) + deduped_df.to_csv( + repo_root / "output" / "deduplicated_UUIDs.csv", + index=False, + mode="a", + header=not os.path.exists("../output/deduplicated_UUIDs.csv"), + ) + + +def deduplicate_perfect_matches(df: pd.DataFrame) -> pd.DataFrame: + """Return a dataframe with duplicated entries removed. + + Given a dataframe, combines rows that have identical data beyond their + UUIDs, keeps the first UUID amond the similarly grouped UUIDs, and saves the + rest of the UUIDS to a file in the "output" directory linking them to the + first selected UUID. + + Args: + a pandas dataframe containing contribution data + Returns: + a deduplicated pandas dataframe containing contribution data + """ + # first remove all duplicate entries: + new_df = df.drop_duplicates() + + # find the duplicates along all columns but the id + new_df = ( + new_df.groupby(df.columns.difference(["id"]).tolist(), dropna=False)[ + "id" + ] + .agg(list) + .reset_index() + .rename(columns={"id": "duplicated"}) + ) + new_df.index = new_df["duplicated"].str[0].tolist() + + # convert the duplicated column into a dictionary that can will be + # an output by only feeding the entries with duplicates + new_df = new_df.reset_index().rename(columns={"index": "id"}) + convert_duplicates_to_dict(new_df[["id", "duplicated"]]) + new_df = new_df.drop(["duplicated"], axis=1) + return new_df + + +def cleaning_company_column(company_entry: str) -> str: """ - pass + Given a string, check if it contains a variation of self employed, + unemployed, or retired and return the standardized version. + + Args: + company: string of inputted company names + Returns: + standardized for retired, self employed, and unemployed, + or original string if no match or empty string + + Sample Usage: + >>> cleaning_company_column("Retireed") + 'Retired' + >>> cleaning_company_column("self") + 'Self Employed' + >>> cleaning_company_column("None") + 'Unemployed' + >>> cleaning_company_column("N/A") + 'Unemployed' + """ + + if not company_entry: + return company_entry + + company_edited = company_entry.lower() + + if company_edited == "n/a": + return "Unemployed" + + company_edited = re.sub(r"[^\w\s]", "", company_edited) + + if ( + company_edited == "retired" + or company_edited == "retiree" + or company_edited == "retire" + or "retiree" in company_edited + ): + return "Retired" + + elif ( + "self employe" in company_edited + or "freelance" in company_edited + or company_edited == "self" + or company_edited == "independent contractor" + ): + return "Self Employed" + elif ( + "unemploye" in company_edited + or company_edited == "none" + or company_edited == "not employed" + or company_edited == "nan" + ): + return "Unemployed" + + else: + return company_edited + + +def standardize_corp_names(company_name: str) -> str: + """Given an employer name, return the standardized version + + Args: + company_name: corporate name + Returns: + standardized company name + + Sample Usage: + >>> standardize_corp_names('MI BEER WINE WHOLESALERS ASSOC') + 'MI BEER WINE WHOLESALERS ASSOCIATION' + + >>> standardize_corp_names('MI COMMUNITY COLLEGE ASSOCIATION') + 'MI COMMUNITY COLLEGE ASSOCIATION' + + >>> standardize_corp_names('STEPHANIES CHANGEMAKER FUND') + 'STEPHANIES CHANGEMAKER FUND' + + """ + + company_name_split = company_name.upper().split(" ") + + for i in range(len(company_name_split)): + if company_name_split[i] in list(COMPANY_TYPES.keys()): + hold = company_name_split[i] + company_name_split[i] = COMPANY_TYPES[hold] + + new_company_name = " ".join(company_name_split) + return new_company_name + + +def get_address_number_from_address_line_1(address_line_1: str) -> str: + """Given an address line 1, return the building number or po box + + Uses the usaddress libray which splits an address string into components, + and labels each component. + https://usaddress.readthedocs.io/en/latest/ + + Args: + address_line_1: either street information or PO box + Returns: + address or po box number + + Sample Usage: + >>> get_address_number_from_address_line_1('6727 W. Corrine Dr. Peoria,AZ 85381') + '6727' + >>> get_address_number_from_address_line_1('P.O. Box 5456 Sun City West ,AZ 85375') + '5456' + >>> get_address_number_from_address_line_1('119 S 5th St Niles,MI 49120') + '119' + >>> get_address_number_from_address_line_1( + ... '1415 PARKER STREET APT 251 DETROIT MI 48214-0000' + ... ) + '1415' + """ + + address_line_1_components = usaddress.parse(address_line_1) + + for i in range(len(address_line_1_components)): + if address_line_1_components[i][1] == "AddressNumber": + return address_line_1_components[i][0] + elif address_line_1_components[i][1] == "USPSBoxID": + return address_line_1_components[i][0] + raise ValueError("Cannot find Address Number") + + +def splink_dedupe( + df: pd.DataFrame, settings: dict, blocking: list +) -> pd.DataFrame: + """Given a dataframe and config settings, return a + deduplicated dataframe + + Configuration settings and blocking can be found in constants.py as + individuals_settings, indivduals_blocking, organizations_settings, + organizations_blocking + + Uses the splink library which employs probabilistic matching for + record linkage + https://moj-analytical-services.github.io/splink/index.html + + + Args: + df: dataframe + settings: configuration settings + (based on splink documentation and dataframe columns) + blocking: list of columns to block on for the table + (cuts dataframe into parts based on columns labeled blocks) + Returns: + deduplicated version of initial dataframe with column 'matching_id' + that holds list of matching unique_ids + + """ + linker = DuckDBLinker(df, settings) + linker.estimate_probability_two_random_records_match( + blocking, recall=0.6 + ) # default + linker.estimate_u_using_random_sampling(max_pairs=5e6) + + for i in blocking: + linker.estimate_parameters_using_expectation_maximisation(i) + + df_predict = linker.predict() + clusters = linker.cluster_pairwise_predictions_at_threshold( + df_predict, threshold_match_probability=0.7 + ) # default + clusters_df = clusters.as_pandas_dataframe() + + match_list_df = ( + clusters_df.groupby("cluster_id")["unique_id"].agg(list).reset_index() + ) # dataframe where cluster_id maps unique_id to initial instance of row + match_list_df.rename(columns={"unique_id": "duplicated"}, inplace=True) + + first_instance_df = clusters_df.drop_duplicates(subset="cluster_id") + col_names = np.append("cluster_id", df.columns) + first_instance_df = first_instance_df[col_names] + + deduped_df = pd.merge( + first_instance_df, + match_list_df[["cluster_id", "duplicated"]], + on="cluster_id", + how="left", + ) + deduped_df = deduped_df.rename(columns={"cluster_id": "unique_id"}) + + deduped_df["duplicated"] = deduped_df["duplicated"].apply( + lambda x: x if isinstance(x, list) else [x] + ) + convert_duplicates_to_dict(deduped_df) + + deduped_df = deduped_df.drop(columns=["duplicated"]) + + return deduped_df diff --git a/utils/linkage_and_network_pipeline.py b/utils/linkage_and_network_pipeline.py new file mode 100644 index 00000000..86e0ab62 --- /dev/null +++ b/utils/linkage_and_network_pipeline.py @@ -0,0 +1,222 @@ +import networkx as nx +import pandas as pd +from nameparser import HumanName + +from utils.classify import classify_wrapper +from utils.constants import ( + BASE_FILEPATH, + individuals_blocking, + individuals_settings, + organizations_blocking, + organizations_settings, +) +from utils.linkage import ( + cleaning_company_column, + deduplicate_perfect_matches, + get_address_line_1_from_full_address, + get_address_number_from_address_line_1, + get_likely_name, + get_street_from_address_line_1, + splink_dedupe, + standardize_corp_names, +) +from utils.network import ( + combine_datasets_for_network_graph, + construct_network_graph, + create_network_graph, +) + + +def preprocess_individuals(individuals: pd.DataFrame) -> pd.DataFrame: + """ + Given a dataframe of individual donors, preprocesses the data, + and return a cleaned dataframe. + + Args: + individuals: dataframe of individual contributions + + Returns: + cleaned dataframe of individuals + """ + if "Unnamed: 0" in individuals.columns: + individuals.drop(columns="Unnamed: 0", inplace=True) + + individuals = individuals.astype( + { + "first_name": "string", + "last_name": "string", + "full_name": "string", + "company": "string", + } + ) + + individuals["company"] = ( + individuals["company"] + .loc[individuals["company"].notnull()] + .apply(standardize_corp_names) + ) + individuals["company"] = ( + individuals["company"] + .loc[individuals["company"].notnull()] + .apply(cleaning_company_column) + ) + + # Address functions, assuming address column is named 'Address' + if "Address" in individuals.columns: + individuals["Address"] = individuals["Address"].astype(str)[ + individuals["Address"].notnull() + ] + individuals["Address Line 1"] = individuals["Address"].apply( + get_address_line_1_from_full_address + ) + individuals["Street Name"] = individuals["Address Line 1"].apply( + get_street_from_address_line_1 + ) + individuals["Address Number"] = individuals["Address Line 1"].apply( + get_address_number_from_address_line_1 + ) + + # Check if first name or last names are empty, if so, extract from full name column + individuals["full_name"] = individuals["full_name"].astype(str)[ + individuals["full_name"].notnull() + ] + if individuals["first_name"].isnull().any(): + first_name = individuals["full_name"].apply( + lambda x: HumanName(x).first if pd.notnull(x) else x + ) + individuals["first_name"] = first_name + + if individuals["last_name"].isnull().any(): + last_name = individuals["full_name"].apply( + lambda x: HumanName(x).last if pd.notnull(x) else x + ) + individuals["last_name"] = last_name + + individuals["full_name"] = individuals.apply( + lambda row: get_likely_name( + row["first_name"], row["last_name"], row["full_name"] + ), + axis=1, + ) + + # Ensure that columns with values are prioritized and appear first + # important for splink implementation + individuals["sort_priority"] = ( + ~individuals["first_name"].isna() + & ~individuals["last_name"].isna() + & ~individuals["company"].isna() + ) * 2 + (~individuals["party"].isna()) + + individuals = individuals.sort_values( + by="sort_priority", ascending=False + ).drop(columns=["sort_priority"]) + + individuals["unique_id"] = individuals["id"] + + return individuals + + +def preprocess_organizations(organizations: pd.DataFrame) -> pd.DataFrame: + """ + Given a dataframe of organization donors, preprocesses the data, + and return a cleaned dataframe. + """ + if "Unnamed: 0" in organizations.columns: + organizations.drop(columns="Unnamed: 0", inplace=True) + + organizations["name"] = ( + organizations["name"] + .loc[organizations["name"].notnull()] + .apply(standardize_corp_names) + ) + + organizations["unique_id"] = organizations["id"] + + return organizations + + +def preprocess_transactions(transactions: pd.DataFrame) -> pd.DataFrame: + """ + Given a dataframe of transactions, preprocesses the data, + and return a cleaned dataframe. + + Args: + transactions: dataframe of transactions + + Returns: + cleaned dataframe of transactions + """ + if "Unnamed: 0" in transactions.columns: + transactions.drop(columns="Unnamed: 0", inplace=True) + + transactions["purpose"] = transactions["purpose"].str.upper() + + deduped = pd.read_csv(BASE_FILEPATH / "output" / "deduplicated_UUIDs.csv") + transactions[["donor_id", "recipient_id"]] = transactions[ + ["donor_id", "recipient_id"] + ].replace(deduped) + + return transactions + + +def main(): + organizations = pd.read_csv( + BASE_FILEPATH / "data" / "complete_organizations_table.csv" + ) + + individuals = pd.read_csv( + BASE_FILEPATH / "data" / "complete_individuals_table.csv" + ) + + transactions = pd.read_csv( + BASE_FILEPATH / "data" / "complete_transactions_table.csv" + ) + + individuals = preprocess_individuals(individuals) + organizations = preprocess_organizations(organizations) + + individuals, organizations = classify_wrapper(individuals, organizations) + + individuals = deduplicate_perfect_matches(individuals) + organizations = deduplicate_perfect_matches(organizations) + + organizations = splink_dedupe( + organizations, organizations_settings, organizations_blocking + ) + + individuals = splink_dedupe( + individuals, individuals_settings, individuals_blocking + ) + + transactions = preprocess_transactions(transactions) + + cleaned_individuals_output_path = ( + BASE_FILEPATH / "output" / "cleaned_individuals_table.csv" + ) + + cleaned_organizations_output_path = ( + BASE_FILEPATH / "output" / "cleaned_organizations_table.csv" + ) + + cleaned_transactions_output_path = ( + BASE_FILEPATH / "output" / "cleaned_transactions_table.csv" + ) + + individuals.to_csv(cleaned_individuals_output_path, index=False) + organizations.to_csv(cleaned_organizations_output_path, index=False) + transactions.to_csv(cleaned_transactions_output_path, index=False) + + aggreg_df = combine_datasets_for_network_graph( + [individuals, organizations, transactions] + ) + g = create_network_graph(aggreg_df) + g_output_path = BASE_FILEPATH / "output" / "g.gml" + nx.write_graphml(g, g_output_path) + + construct_network_graph( + 2018, 2023, [individuals, organizations, transactions] + ) + + +if __name__ == "__main__": + main() diff --git a/utils/network.py b/utils/network.py new file mode 100644 index 00000000..90f12a69 --- /dev/null +++ b/utils/network.py @@ -0,0 +1,229 @@ +import networkx as nx +import pandas as pd +import plotly.graph_objects as go + + +def name_identifier(uuid: str, dfs: list[pd.DataFrame]) -> str: + """Returns the name of the entity given the entity's uuid + + Args: + uuid: the uuid of the entity + List of dfs: dataframes that have a uuid column, and an 'name' or + 'full_name' column + Return: + The entity's name + """ + for df in dfs: + if "name" in df.columns: + name_in_org = df.loc[df["id"] == uuid] + if len(name_in_org) > 0: + return name_in_org.iloc[0]["name"] + + if "full_name" in df.columns: + name_in_ind = df.loc[df["id"] == uuid] + if len(name_in_ind) > 0: + return name_in_ind.iloc[0]["full_name"] + return None + + +def combine_datasets_for_network_graph(dfs: list[pd.DataFrame]) -> pd.DataFrame: + """Combines the 3 dataframes into a single dataframe to create a graph + + Given 3 dataframes, the func adds a 'recipient_name' column in the + transactions df, merges the dfs together to record transaction info between + entities, then concatenates the dfs into a final df of the merged + transactions and entity dfs. + + Args: + list of dataframes in the order: [inds_df, orgs_df, transactions_df] + Transactions dataframe with column: 'recipient_id' + Individuals dataframe with column: 'full_name' + Organizations dataframe with column: 'name' + + Returns + A merged dataframe with aggregate contribution amounts between entitites + """ + + inds_df, orgs_df, transactions_df = dfs + + # first update the transactions df to have a recipient name tied to id + transactions_df["recipient_name"] = transactions_df["recipient_id"].apply( + name_identifier, args=([orgs_df, inds_df],) + ) + + # next, merge the inds_df and orgs_df ids with the transactions_df donor_id + inds_trans_df = pd.merge( + inds_df, transactions_df, how="left", left_on="id", right_on="donor_id" + ) + inds_trans_df = inds_trans_df.dropna(subset=["amount"]) + orgs_trans_df = pd.merge( + orgs_df, transactions_df, how="left", left_on="id", right_on="donor_id" + ) + orgs_trans_df = orgs_trans_df.dropna(subset=["amount"]) + orgs_trans_df = orgs_trans_df.rename(columns={"name": "full_name"}) + + # concatenated the merged dfs + merged_df = pd.concat([orgs_trans_df, inds_trans_df]) + + # lastly, create the final dataframe with aggregated attributes + attribute_cols = merged_df.columns.difference( + ["donor_id", "recipient_id", "full_name", "recipient_name"] + ) + agg_functions = { + col: "sum" if col == "amount" else "first" for col in attribute_cols + } + aggreg_df = ( + merged_df.groupby( + ["donor_id", "recipient_id", "full_name", "recipient_name"] + ) + .agg(agg_functions) + .reset_index() + ) + aggreg_df = aggreg_df.drop(["id"], axis=1) + return aggreg_df + + +def create_network_graph(df: pd.DataFrame) -> nx.MultiDiGraph: + """Takes in a dataframe and generates a MultiDiGraph where the nodes are + entity names, and the rest of the dataframe columns make the node attributes + + Args: + df: a pandas dataframe with merged information from the inds, orgs, & + transactions dataframes + + Returns: + A Networkx MultiDiGraph with nodes and edges + """ + G = nx.MultiDiGraph() + edge_columns = [ + "office_sought", + "purpose", + "transaction_type", + "year", + "transaction_id", + "donor_office", + "amount", + ] + + for _, row in df.iterrows(): + # add node attributes based on the columns relevant to the entity + G.add_node( + row["full_name"], + **row[df.columns.difference(edge_columns)].dropna().to_dict(), + ) + # add the recipient as a node + G.add_node(row["recipient_name"], classification="neutral") + + # add the edge attributes between two nodes + edge_attributes = row[edge_columns].dropna().to_dict() + G.add_edge(row["full_name"], row["recipient_name"], **edge_attributes) + + return G + + +def plot_network_graph(G: nx.MultiDiGraph): + """Given a networkX Graph, creates a plotly visualization of the nodes and + edges + + Args: + A networkX MultiDiGraph with edges including the attribute 'amount' + + Returns: None. Creates a plotly graph + """ + edge_trace = go.Scatter( + x=(), + y=(), + line=dict(color="#888", width=1.5), + hoverinfo="text", + mode="lines+markers", + ) + hovertext = [] + pos = nx.spring_layout(G) + + for edge in G.edges(data=True): + source = edge[0] + target = edge[1] + hovertext.append(f"Amount: {edge[2]['amount']:.2f}") + # Adding coordinates of source and target nodes to edge_trace + edge_trace["x"] += ( + pos[source][0], + pos[target][0], + None, + ) # None creates a gap between line segments + edge_trace["y"] += (pos[source][1], pos[target][1], None) + + edge_trace["hovertext"] = hovertext + + # Define arrow symbol for edges + edge_trace["marker"] = dict( + symbol="arrow", color="#888", size=10, angleref="previous" + ) + + node_trace = go.Scatter( + x=[], + y=[], + text=[], + mode="markers", + hoverinfo="text", + marker=dict(showscale=True, colorscale="YlGnBu", size=10), + ) + node_trace["marker"]["color"] = [] + + for node in G.nodes(): + node_info = f"Name: {node}
" + for key, value in G.nodes[node].items(): + node_info += f"{key}: {value}
" + node_trace["text"] += tuple([node_info]) + # Get the classification value for the node + classification = G.nodes[node].get("classification", "neutral") + # Assign a color based on the classification value + if classification == "c": + color = "blue" + elif classification == "f": + color = "red" + else: + color = "green" # Default color for unknown classification + node_trace["marker"]["color"] += tuple([color]) + + # Add node positions to the trace + node_trace["x"] += tuple([pos[node][0]]) + node_trace["y"] += tuple([pos[node][1]]) + + # Define layout settings + layout = go.Layout( + title="Network Graph Indicating Campaign Contributions from 2018-2022", + titlefont=dict(size=16), + showlegend=True, + hovermode="closest", + margin=dict(b=20, l=5, r=5, t=40), + xaxis=dict(showgrid=True, zeroline=True, showticklabels=False), + yaxis=dict(showgrid=True, zeroline=True, showticklabels=False), + ) + + fig = go.Figure(data=[edge_trace, node_trace], layout=layout) + fig.show() + + +def construct_network_graph( + start_year: int, end_year: int, dfs: list[pd.DataFrame] +): + """Runs the network construction pipeline starting from 3 dataframes + + Args: + start_year & end_year: the range of the desired data + dfs: dataframes in the order: inds_df, orgs_df, transactions_df + + Returns: + """ + inds_df, orgs_df, transactions_df = dfs + transactions_df = transactions_df.loc[ + (transactions_df.year >= start_year) + & (transactions_df.year <= end_year) + ] + + aggreg_df = combine_datasets_for_network_graph( + [inds_df, orgs_df, transactions_df] + ) + G = create_network_graph(aggreg_df) + plot_network_graph(G) + nx.write_adjlist(G, "Network Graph Node Data") diff --git a/utils/pipeline.py b/utils/pipeline.py index 7a288fd4..e6b7a120 100644 --- a/utils/pipeline.py +++ b/utils/pipeline.py @@ -18,6 +18,7 @@ single_state_organizations_tables = [] single_state_transactions_tables = [] for state_cleaner in state_cleaners: + print("Cleaning...") ( individuals_table, organizations_table, diff --git a/utils/tests/test_classify.py b/utils/tests/test_classify.py new file mode 100644 index 00000000..b6bce883 --- /dev/null +++ b/utils/tests/test_classify.py @@ -0,0 +1,46 @@ +import numpy as np +import pandas as pd +import pytest + +from utils.classify import matcher + +d = { + "name": [ + "bob von rosevich", + "anantarya smith", + "bob j vonrosevich", + "missy elliot", + "mr johnson", + "quarantin directino", + "missy eliot", + "joseph johnson", + ], + "address": [ + "3 Blue Drive, Chicago", + "4 Blue Drive, Chicago", + "8 Fancy Way, Chicago", + "8 Fancy Way, Evanston", + "17 Regular Road, Chicago", + "42 Hollywood Boulevard, Chicago", + "8 Fancy Way, Evanston", + "17 Regular Road, Chicago", + ], +} + +test_df = pd.DataFrame(data=d) + +test_df["classification"] = "neutral" + + +@pytest.fixture +def matcher_scen_1(): + return test_df + + +def test_matcher_scen_1(matcher_scen_1): + matcher(matcher_scen_1, "Fancy", "address", "f") + res = test_df[test_df["classification"] == "f"]["name"].values + + assert np.all( + res == np.array(["bob j vonrosevich", "missy elliot", "missy eliot"]) + ) diff --git a/utils/tests/test_linkage.py b/utils/tests/test_linkage.py new file mode 100644 index 00000000..d96339d6 --- /dev/null +++ b/utils/tests/test_linkage.py @@ -0,0 +1,47 @@ +import pandas as pd +import pytest + +from utils.constants import BASE_FILEPATH +from utils.linkage import deduplicate_perfect_matches + +""" +Module for testing functions in linkage.py +""" + + +# Test for dedupe function +@pytest.fixture +def return_data(filename): + path = BASE_FILEPATH / "output" / filename + df = pd.read_csv(path, low_memory=False) + return df + + +@pytest.fixture +def call_dedup_func(): + inds_sample = return_data("complete_individuals_table.csv") + orgs_sample = return_data("complete_organizations_table.csv") + + assert not orgs_sample.empty() + assert not inds_sample.empty() + + deduplicated_inds = deduplicate_perfect_matches(inds_sample) + deduplicated_orgs = deduplicate_perfect_matches(orgs_sample) + + output_dedup_ids = return_data("deduplicated_UUIDs.csv") + # outpud_ids should have all the ids that deduplicated_inds and deduplicated_orgs + # has + + return deduplicated_inds, deduplicated_orgs, output_dedup_ids + + +@pytest.fixture +def confirm_dedup_uuids(): + inds, orgs, output = call_dedup_func() + + dedup_inds_id = set(inds.id.tolist()) + dedup_orgs_id = set(orgs.id.tolist()) + unique_ids = set(output.duplicated_uuids.tolist()) + + assert dedup_inds_id.issubset(unique_ids) + assert dedup_orgs_id.issubset(unique_ids)