diff --git a/HISTORY.rst b/HISTORY.rst index 4ee357a..de27499 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,6 +3,17 @@ History ======= +v0.18.0 (2023-03-22) + +* Add support for python3.11. +* Make sure to pip install cython before installing the requirements file when using python3.11. +* Update ddtrace==1.9.4. +* Update dominodatalab==1.2.3. +* Update pandas==1.5.3. +* Update python-json-logger==2.0.7. +* Update wheel==0.40.0. + + v0.17.36 (2023-03-13) * Raise error instead of printing in file_ingestion function xlsx_to_tsv. diff --git a/Makefile b/Makefile index 015d9cb..26c8acf 100644 --- a/Makefile +++ b/Makefile @@ -6,6 +6,7 @@ clean: install: . env/bin/activate; \ + pip install cython==0.29.33; \ pip install -r aioradio/requirements.txt setup: diff --git a/README.md b/README.md index cb84824..d788246 100644 --- a/README.md +++ b/README.md @@ -116,7 +116,7 @@ asyncio.get_event_loop().run_until_complete(main()) ## INSTALLING FOR DIRECT DEVELOPMENT OF AIORADIO -Install [python 3.10.X](https://www.python.org/downloads/) +Install [python 3.11.X](https://www.python.org/downloads/) Make sure you've installed [ODBC drivers](https://docs.microsoft.com/en-us/sql/connect/python/pyodbc/step-1-configure-development-environment-for-pyodbc-python-development), required for using the python package pyodbc. @@ -124,12 +124,13 @@ Clone aioradio locally and navigate to the root directory Install and activate python VirtualEnv ```bash -python3.10 -m venv env +python3.11 -m venv env source env/bin/activate ``` Install python modules included in requirements.txt ```bash +pip install cython pip install -r aioradio/requirements.txt ``` diff --git a/aioradio/file_ingestion.py b/aioradio/file_ingestion.py index dde3300..f2be6ef 100644 --- a/aioradio/file_ingestion.py +++ b/aioradio/file_ingestion.py @@ -24,10 +24,7 @@ import time import zipfile from asyncio import sleep -from collections import defaultdict -from dataclasses import dataclass -from dataclasses import field as dc_field -from datetime import datetime, timedelta, timezone, tzinfo +from datetime import datetime, timezone, tzinfo from pathlib import Path from tempfile import NamedTemporaryFile, TemporaryDirectory from types import coroutine @@ -35,7 +32,6 @@ import cchardet as chardet import mandrill -import numpy as np from openpyxl import load_workbook from smb.base import SharedFile from smb.smb_structs import OperationFailure @@ -48,1204 +44,6 @@ DIRECTORY = Path(__file__).parent.absolute() LOG = logging.getLogger('file_ingestion') -@dataclass -class EFIParse: - """EnrollmentFileIngestion parse class.""" - - filename: str - fice_enrolled_logic: set = dc_field(default_factory=set) - entry_year_filter: dict = dc_field(default_factory=dict) - - def __post_init__(self): - if not self.fice_enrolled_logic: - self.fice_enrolled_logic = { - "001100", - "001397", - "001507", - "001526", - "002120", - "002122", - "002180", - "002760", - "002778", - "002795", - "002907", - "003301", - "003450", - "003505", - "003535", - "003688", - "003709" - } - - if not self.entry_year_filter: - self.entry_year_filter = { - "start": "2022", - "end": "2026" - } - - now = datetime.now() - self.filed_date_min_max = { - "BirthDate": (now - timedelta(days=80 * 365), now - timedelta(days=10 * 365)), - "SrcDate": (now - timedelta(days=50 * 365), now + timedelta(days=365)), - "Inquired": (now - timedelta(days=50 * 365), now + timedelta(days=365)), - "Applied": (now - timedelta(days=50 * 365), now + timedelta(days=365)), - "Completed": (now - timedelta(days=50 * 365), now + timedelta(days=365)), - "Admitted": (now - timedelta(days=50 * 365), now + timedelta(days=365)), - "Confirmed": (now - timedelta(days=50 * 365), now + timedelta(days=365)), - "Enrolled": (now - timedelta(days=50 * 365), now + timedelta(days=365)), - "Canceled": (now - timedelta(days=50 * 365), now + timedelta(days=365)), - "Dropped": (now - timedelta(days=50 * 365), now + timedelta(days=365)), - "Graduated": (now - timedelta(days=50 * 365), now + timedelta(days=365)), - "ProspectDate": (now - timedelta(days=50 * 365), now + timedelta(days=365)), - "FAFSASubmitted": (now - timedelta(days=50 * 365), now + timedelta(days=365)), - "CustomDate1": (now - timedelta(days=50 * 365), now + timedelta(days=365)), - "CustomDate2": (now - timedelta(days=50 * 365), now + timedelta(days=365)), - "CustomDate3": (now - timedelta(days=50 * 365), now + timedelta(days=365)), - "CustomDate4": (now - timedelta(days=50 * 365), now + timedelta(days=365)), - "CustomDate5": (now - timedelta(days=50 * 365), now + timedelta(days=365)) - } - - self.filed_year_min_max = { - "EntryYear": ((now - timedelta(days=50 * 365)).year, (now + timedelta(days=10 * 365)).year), - "HSGradYear": ((now - timedelta(days=50 * 365)).year, (now + timedelta(days=10 * 365)).year) - } - - self.field_to_max_widths = { - "UniqueID": 50, - "StudentID": 50, - "LastName": 64, - "FirstName": 64, - "Gender": 1, - "GPA": 20, - "Address1": 150, - "Address2": 150, - "City": 50, - "StateCode": 50, - "ZipCode": 20, - "BirthDate": 10, - "EntryTerm": 14, - "EntryYear": 4, - "HSGradYear": 4, - "SrcCode": 256, - "SrcDate": 10, - "Inquired": 10, - "Applied": 10, - "Completed": 10, - "Admitted": 10, - "Confirmed": 10, - "Enrolled": 10, - "Canceled": 10, - "Dropped": 10, - "Graduated": 10, - "AcademicProgram": 256, - "StudentAthlete": 50, - "CampusLocation": 50, - "Email": 75, - "Ethnicity": 50, - "FirstGenFlag": 1, - "EFC": 20, - "HSCode": 6, - "ACTScore": 2, - "SATScore": 4, - "ProspectCode": 256, - "ProspectDate": 10, - "FAFSASubmitted": 10, - "ApplicationPlan": 30, - "AdmitCode": 30, - "College": 30, - "AdmittedProgram": 256, - "HonorsProgram": 5, - "StudentType": 20, - "International": 5, - "CountryOfOrigin": 30, - "StudentStatus": 20, - "Territory": 30, - "EngagementScore": 10, - "CellPhoneNumber": 10, - "TextMessageOptIn": 5, - "CustomFilter1": 20, - "CustomFilter2": 20, - "CustomFilter3": 20, - "CustomFilter4": 20, - "CustomFilter5": 20, - "CustomDate1": 10, - "CustomDate2": 10, - "CustomDate3": 10, - "CustomDate4": 10, - "CustomDate5": 10 - } - - self.gender_map = { - "MALE/MAN": "M", - "MALE": "M", - "MAN": "M", - "M": "M", - "FEMALE/WOMAN": "F", - "FEMALE": "F", - "WOMAN": "F", - "F": "F" - } - - self.grades_map = { - "A+": "4.0", - "A": "4.0", - "A-": "3.667", - "B+": "3.333", - "B": "3.0", - "B-": "2.667", - "C+": "2.333", - "C": "2.0", - "C-": "1.667", - "D+": "1.333", - "D": "1.0", - "D-": "0.667", - "F+": "0.333", - "F": "0.0", - "F-": "0.0" - } - - self.date_formats = [ - "%m/%d/%y", - "%m/%d/%Y", - "%-m/%-d/%Y", - "%-m/%-d/%y", - "%Y-%m-%d", - "%Y%m%d", - "%d-%b-%Y", - "%m-%d-%y", - "%m-%d-%Y", - "%b %d, %Y", - "%Y-%m-%dT%H:%M:%SZ", - "%d-%b-%y", - "%Y-%m-%dT%H:%M:%S", - "%-d-%b-%y", - "%-d-%b-%Y", - "%Y/%m/%d" - ] - - self.student_athlete_map = { - "0": "N", - "1": "Y", - "CHRL": "Y", - "NO": "N", - "WLAX": "Y", - "YES": "Y", - "TRUE": "Y", - "MWR": "Y", - "MTR": "Y", - "VB": "Y", - "N": "N", - "BASEB": "Y", - "MSWIM": "Y", - "WWR": "Y", - "Y": "Y", - "FALSE": "N", - "FB": "Y" - } - - self.season_year_map = { - "FA19": "2019", - "FA20": "2020", - "FA21": "2021", - "FA22": "2022", - "FA23": "2023", - "FA24": "2024", - "FA25": "2025", - "FA26": "2026", - "FA27": "2027", - "19FA": "2019", - "20FA": "2020", - "21FA": "2021", - "22FA": "2022", - "23FA": "2023", - "24FA": "2024", - "25FA": "2025", - "26FA": "2026", - "27FA": "2027" - } - - self.seasons_map = { - "SPRING": "SPRING", - "SUMMER": "SUMMER", - "FALL": "FALL", - "WINTER": "WINTER", - "FA": "FALL" - } - - self.state_to_statecode = { - "ALABAMA": "AL", - "ALASKA": "AK", - "AMERICAN SAMOA": "AS", - "ARIZONA": "AZ", - "ARKANSAS": "AR", - "CALIFORNIA": "CA", - "COLORADO": "CO", - "CONNECTICUT": "CT", - "DELAWARE": "DE", - "DISTRICT OF COLUMBIA": "DC", - "FEDERATED STATES OF MICRONESIA": "FM", - "FLORIDA": "FL", - "GEORGIA": "GA", - "GUAM": "GU", - "HAWAII": "HI", - "IDAHO": "ID", - "ILLINOIS": "IL", - "INDIANA": "IN", - "IOWA": "IA", - "KANSAS": "KS", - "KENTUCKY": "KY", - "LOUISIANA": "LA", - "MAINE": "ME", - "MARSHALL ISLANDS": "MH", - "MARYLAND": "MD", - "MASSACHUSETTS": "MA", - "MICHIGAN": "MI", - "MINNESOTA": "MN", - "MISSISSIPPI": "MS", - "MISSOURI": "MO", - "MONTANA": "MT", - "NEBRASKA": "NE", - "NEVADA": "NV", - "NEW HAMPSHIRE": "NH", - "NEW JERSEY": "NJ", - "NEW MEXICO": "NM", - "NEW YORK": "NY", - "NORTH CAROLINA": "NC", - "NORTH DAKOTA": "ND", - "NORTHERN MARIANA ISLANDS": "MP", - "OHIO": "OH", - "OKLAHOMA": "OK", - "OREGON": "OR", - "PALAU": "PW", - "PENNSYLVANIA": "PA", - "PUERTO RICO": "PR", - "RHODE ISLAND": "RI", - "SOUTH CAROLINA": "SC", - "SOUTH DAKOTA": "SD", - "TENNESSEE": "TN", - "TEXAS": "TX", - "U.S. ARMED FORCES - AMERICAS": "AA", - "U.S. ARMED FORCES - EUROPE": "AE", - "U.S. ARMED FORCES - PACIFIC": "AP", - "UTAH": "UT", - "VERMONT": "VT", - "VIRGIN ISLANDS": "VI", - "VIRGINIA": "VA", - "WASHINGTON": "WA", - "WEST VIRGINIA": "WV", - "WISCONSIN": "WI", - "WYOMING": "WY" - } - - self.cache = { - 'year': {}, - 'sort_date': {}, - 'date': {}, - 'bad_date': set() - } - - self.year_formats = [ - '%Y', - '%y' - ] - - self.apt_to_compiled = { - "apt": re.compile(re.escape("apt"), re.IGNORECASE), - "avenue": re.compile(re.escape("avenue"), re.IGNORECASE), - "ave": re.compile(re.escape("ave"), re.IGNORECASE), - "blvd": re.compile(re.escape("blvd"), re.IGNORECASE), - "circle": re.compile(re.escape("circle"), re.IGNORECASE), - "cir": re.compile(re.escape("cir"), re.IGNORECASE), - "court": re.compile(re.escape("court"), re.IGNORECASE), - "drive": re.compile(re.escape("drive"), re.IGNORECASE), - "lane": re.compile(re.escape("lane"), re.IGNORECASE), - "parkway": re.compile(re.escape("parkway"), re.IGNORECASE), - "place": re.compile(re.escape("place"), re.IGNORECASE), - "road": re.compile(re.escape("road"), re.IGNORECASE), - "street": re.compile(re.escape("street"), re.IGNORECASE), - "way": re.compile(re.escape("way"), re.IGNORECASE) - } - - self.addr_suffix_list = [ - "ct", - "dr", - "pl", - "rd", - "st" - ] - - self.addr_unit_to_compiled = { - " unit ": re.compile(re.escape(" unit "), re.IGNORECASE), - " bldg ": re.compile(re.escape(" bldg "), re.IGNORECASE), - " ste ": re.compile(re.escape(" ste "), re.IGNORECASE), - " # ": re.compile(re.escape(" # "), re.IGNORECASE), - " #": re.compile(re.escape(" #"), re.IGNORECASE) - } - - #### Used by EFI exclusively #### - self.non_prospect_row_idxs = set() - - self.enrollment_funnel_fields = { - 'Inquired', - 'Applied', - 'Completed', - 'Admitted', - 'Confirmed', - 'Enrolled', - 'Canceled', - 'Dropped', - 'Graduated' - } - - self.non_prospect_fields = self.enrollment_funnel_fields - {'Dropped', 'Graduated'} - - self.season_year_map = defaultdict(str, self.season_year_map) - - self.filtered = { - 'entryyear': 0, - 'prospects': 0 - } - - self.generic_bool_map = { - 'YES': 'Y', - 'NO': 'N', - 'Y': 'Y', - 'N': 'N', - 'TRUE': 'Y', - 'FALSE': 'N', - '1': 'Y', - '0': 'N' - } - - def check_width(self, value: str, field: str, row_idx: int, truncate_backward: bool=False) -> str: - """Check field value and truncate if it is longer than expected. - - Args: - value (str): Value - field (str): Column header field value - row_idx (int): Row index - truncate_backward (bool, optional): Truncate from back of value - - Returns: - str: [description] - """ - - if len(value) > self.field_to_max_widths[field]: - if truncate_backward: - new_value = value[-self.field_to_max_widths[field]:].lstrip() - else: - new_value = value[:self.field_to_max_widths[field]].rstrip() - LOG.warning(f"[{self.filename}] [row:{row_idx}] [{field}] - '{value}' " - f"exceeds max width of {self.field_to_max_widths[field]}. Trimming value to {new_value}") - value = new_value - - return value - - def check_name(self, value: str, field: str, row_idx: int) -> str: - """Check FirstName | LastName logic. - - Args: - value (str): Name value - field (str): Column header field value - row_idx (int): Row number in file - - Returns: - str: Name value - """ - - if value != '': - value = value.replace('"', '') - value = self.check_width(value, field, row_idx) - - return value - - def check_phone_number(self, value: str, field: str, row_idx: int) -> str: - """Check Phone Number logic. - - Args: - value (str): Phone number value - field (str): Column header field value - row_idx (int): Row number in file - - Returns: - str: Phone number value - """ - - if value != '': - value = value.replace(' ', '') - value = "".join(filter(str.isdigit, value)) - value = self.check_width(value, field, row_idx, truncate_backward=True) - - return value - - def check_gender(self, value: str) -> str: - """Check Gender logic. - - Args: - value (str): Gender value - - Returns: - str: Gender value - """ - - if value != '': - value = self.gender_map.get(value.upper(), '') - - return value - - def check_gpa(self, value: str, field: str, row_idx: int) -> str: - """Check GPA logic. - - Args: - value (str): GPA value - field (str): Column header field value - row_idx (int): Row number in file - - Returns: - str: GPA value - """ - - if value != '': - try: - value = '' if not (0 <= float(value) <= 200) else self.check_width(value, field, row_idx) - except ValueError: - value = self.grades_map.get(value.upper(), '') - - return value - - def check_statecode(self, value: str, field: str, row_idx: int) -> str: - """Check StateCode logic. - - Args: - value (str): StateCode value - field (str): Column header field value - row_idx (int): Row number in file - - Returns: - str: StateCode value - """ - - if value != '': - value = self.state_to_statecode.get(value.upper(), value) - value = self.check_width(value, field, row_idx) - - return value - - def check_date(self, value: str, field: str, row_idx: int) -> str: - """Check date conforms to expected date within time range. - - Args: - value (str): Date value - field (str): Column header field value - row_idx (int): Row number in file - - Returns: - str: Date value - """ - - if value != '': - - index = value.find(' ') - if index != -1: - value = value[:index] - - if value in self.cache['date']: - value = self.cache['date'][value] - elif value in self.cache['bad_date']: - value = '' - else: - for idx, pattern in enumerate(self.date_formats): - try: - val = datetime.strptime(value, pattern) - if idx != 0: - self.date_formats[0], self.date_formats[idx] = self.date_formats[idx], self.date_formats[0] - if field in self.filed_date_min_max: - # we have date field with defined min/max range. - dmin = self.filed_date_min_max[field][0] - dmax = self.filed_date_min_max[field][1] - if dmin <= val <= dmax: - val = val.strftime('%Y/%m/%d') - self.cache['date'][value] = val - self.cache['sort_date'][val] = f"{val[5:7]}/{val[8:10]}/{val[:4]}" - value = val - else: - LOG.warning(f"[{self.filename}] [row:{row_idx}] [{field}] - {val.date()}" - f" not between range of {dmin.date()} to {dmax.date()}") - value = '' - break - except ValueError: - pass - else: - self.cache['bad_date'].add(value) - value = '' - - return value - - def check_year(self, value: str, field: str, row_idx: int) -> str: - """Check year conforms to expected year within time range. - - Args: - value (str): Year value - field (str): Column header field value - row_idx (int): Row number in file - - Returns: - str: Year value - """ - - if value != '': - if value in self.cache['year']: - value = self.cache['year'][value] - else: - for idx, pattern in enumerate(self.year_formats): - try: - val = datetime.strptime(value, pattern).year - if idx != 0: - self.year_formats[0], self.year_formats[idx] = self.year_formats[idx], self.year_formats[0] - if field in self.filed_year_min_max: - # we have year field with defined min/max range. - ymin = self.filed_year_min_max[field][0] - ymax = self.filed_year_min_max[field][1] - if ymin <= val <= ymax: - val = str(val) - self.cache['year'][value] = val - value = val - else: - LOG.warning(f"[{self.filename}] [row:{row_idx}] [{field}] - {val} not between range of {ymin} to {ymax}") - self.cache['year'][value] = '' - value = '' - break - except ValueError: - pass - else: - if field != 'EntryYear': - self.cache['year'][value] = '' - value = '' - - return value - - def check_srccode(self, value: str, field: str, row_idx: int) -> str: - """Check SrcCode logic. - - Args: - value (str): SrcCode value - field (str): Column header field value - row_idx (int): Row number in file - - Returns: - str: SrcCode value - """ - - return self.check_width(value, field, row_idx) if value != '' else 'NRCCUA Unknown' - - def check_athlete(self, value: str) -> str: - """Check StudentAthlete logic. - - Args: - value (str): Athlete value - rows (list[int]): List of row indicies - - Returns: - str: Athlete value - """ - - if value != '': - value = self.student_athlete_map.get(value.upper(), 'Y') - - return value - - def check_email(self, value: str, field: str, row_idx: int) -> str: - """Check Email logic. - - Args: - value (str): Email value - field (str): Column header field value - row_idx (int): Row number in file - - Returns: - str: Email value - """ - - if value != '': - value = self.check_width(value, field, row_idx) if '@' in value else '' - - return value - - def check_generic(self, value: str, field: str, row_idx: int) -> str: - """Check generic column logic. - - Args: - value (str): Generic value - field (str): Column header field value - row_idx (int): Row number in file - - Returns: - str: Generic value - """ - - if value != '': - value = self.check_width(value, field, row_idx) - - return value - - def check_address1(self, value: str, field: str, row_idx: int) -> str: - """Check Address1 logic. - - Args: - value (str): Address value - field (str): Column header field value - row_idx (int): Row number in file - - Returns: - str: Address value - """ - - if value != '': - value = self.check_no_spaces_address(value) - value = self.check_address(value) - value = self.check_width(value, field, row_idx) - - return value - - def check_address2(self, value: str, field: str, row_idx: int) -> str: - """Check Address2 logic. - - Args: - value (str): Address value - field (str): Column header field value - row_idx (int): Row number in file - - Returns: - str: Address value - """ - - if value != '': - value = self.check_address(f" {value}") - value = self.check_width(value, field, row_idx) - - return value - - def check_no_spaces_address(self, value: str) -> str: - """Check and adjust address values that have no spaces by detecting - where spaces should exist and applying. - - Args: - value (str): Address value - - Returns: - str: Address value - """ - - if ' ' not in value: - value = value.rstrip('.') - for idx, character in enumerate(value): - if not character.isdigit(): - if idx != 0: - # add space after all starting digits detected - value = f"{value[:idx]} {value[idx:]}" - break - - value_lower = value.lower() - for item, compiled in self.apt_to_compiled.items(): - if item in value_lower: - # add space before item - value = compiled.sub(f" {item}", value) - break - - for item in self.addr_suffix_list: - if value_lower.endswith(item): - # add space before item - value = f"{value[:len(value) - 2]} {value[len(value) - 2:]}" - break - - return value - - def check_address(self, value: str) -> str: - """Normalize address unit if necessary. - - Args: - value (str): Address value - - Returns: - str: Address value - """ - - value_lower = value.lower() - for item, compiled in self.addr_unit_to_compiled.items(): - if item in value_lower: - value = compiled.sub(" apt ", value) - break - - return value.lstrip() - - def apply_fice_enrolled_logic(self, fice: str, confirmed: str, enrolled: str, canceled: str, dropped: str) -> str: - """Apply FICE enrolled logic if defined in the config. - - Args: - fice (str): Institution unique identifier - confirmed (str): Confirmed enrollment funnel value - enrolled (str): Enrolled enrollment funnel value - canceled (str): Canceled enrollment funnel value - dropped (str): Dropped enrollment funnel value - - Returns: - str: Enrolled enrollment funnel value - """ - - if fice in self.fice_enrolled_logic: - if confirmed != '' and enrolled == '' and canceled == '' and dropped == '': - enrolled = confirmed - - return enrolled - - def check_entry_fields(self, entryyear, entryterm) -> tuple: - """Check entryyear and entryterm fields and try to detect their values. - Some colleges put the year and term in the same fields. - - Args: - entryyear ([type]): Student's college entry year - entryterm ([type]): Student's college entry term - - Returns: - tuple: Skip record True/False, entryyear value, entryterm value - """ - - skip_record = False - values = defaultdict(str) - if entryyear != '': - values['year'] = entryyear.replace('/', '') - entryyear = '' - - if entryterm != '': - values['term'] = entryterm.replace('/', '') - entryterm = '' - - # search for season & year within entryyear and entryterm if available - if len(values) > 0: - concat_str = f"{values['year']} {values['term']}".upper() - for key, value in self.seasons_map.items(): - if key in concat_str: - entryterm = value - break - - words = [] - for word in concat_str.split(): - word_length = len(word) - if word_length >= 4: - words.append(word[:4]) - if word[:4] != word[word_length - 4:]: - words.append(word[word_length - 4:]) - - for word in words: - if self.entry_year_filter['start'] <= self.season_year_map[word] <= self.entry_year_filter['end']: - entryyear = self.season_year_map[word] - break - if self.entry_year_filter['start'] <= word <= self.entry_year_filter['end']: - entryyear = word - break - - if entryyear == '': - self.filtered['entryyear'] += 1 - skip_record = True - - return skip_record, entryyear, entryterm - - def check_for_prospects(self, row: dict[str, Any]) -> bool: - """Check if a record is a prospect, in which case we can skip and leave - out of output. - - Args: - row (dict[str, Any]): A single record that is a dict of column names to values - - Returns: - bool: Skip record since it is a prospect - """ - - skip_record = True - for field in self.non_prospect_fields: - # Check if we have a date in one of the enrollment funnel fields - if row[field]: - skip_record = False - break - else: - self.filtered['prospects'] += 1 - - return skip_record - - - ############################################################################################### - ############################### New EL3 field parsing functions ############################### - ############################################################################################### - # - # ETHNICITY - # FIRSTGENFLAG - # EFC - # HSCODE - # ACTSCORE - # SATSCORE - # PROSPECTCODE - # PROSPECTDATE - # FAFSASUBMITTED - # APPLICATIONPLAN - # ADMITCODE - # COLLEGE - # ADMITTEDPROGRAM - # HONORSPROGRAM - # STUDENTTYPE - # INTERNATIONAL - # COUNTRYOFORIGIN - # STUDENTSTATUS - # TERRITORY - # ENGAGEMENTSCORE - # CUSTOMFILTER1, ..., CUSTOMFILTER5 - # CUSTOMDATE1, ..., CUSTOMDATE5 - # - # Many of these fields are parsed using the functions check_generic or check_date - # else they use a function below. - - def check_generic_boolean(self, value: str) -> str: - """Check generic boolean value. - - Args: - value (str): Generic Boolean value - - Returns: - str: Generic Boolean value - """ - - if value != '': - value = self.generic_bool_map.get(value.upper(), '') - - return value - - @staticmethod - def check_act_score(value: str) -> str: - """Check ACT Score logic. - - Args: - value (str): ACT score - field (str): Column header field value - - Returns: - str: ACT score - """ - - if value != '': - try: - integer = int(value) - value = str(integer) if (1 <= integer <= 36) else '' - except ValueError: - value = '' - - return value - - @staticmethod - def check_sat_score(value: str) -> str: - """Check SAT Score logic. - - Args: - value (str): SAT score - field (str): Column header field value - - Returns: - str: SAT score - """ - - if value != '': - try: - integer = int(value) - value = str(integer) if (400 <= integer <= 1600) else '' - except ValueError: - value = '' - - return value - - @staticmethod - def check_hscode(value: str) -> str: - """Check HSCODE logic. - - Args: - value (str): HSCODE value - field (str): Column header field value - - Returns: - str: HSCODE value - """ - - if value != '' and len(value) == 6: - try: - _ = int(value) - except ValueError: - value = '' - else: - value = '' - - return value - - ############################################################################################### - ################################### Used by EFI exclusively ################################### - ############################################################################################### - - def check_generic_boolean_efi(self, records: list[str]): - """Check generic boolean logic. - - Args: - records (list[str]): List of a specific columns values - field (str): Column header field value - row_idx (int): Row number in file - """ - - for idx in range(len(records)): - records[idx] = self.check_generic_boolean(records[idx]) - - def check_act_score_efi(self, records: list[str]): - """Check ACT score logic. - - Args: - records (list[str]): List of a specific columns values - field (str): Column header field value - row_idx (int): Row number in file - """ - - for idx in range(len(records)): - records[idx] = self.check_act_score(records[idx]) - - def check_sat_score_efi(self, records: list[str]): - """Check SAT score logic. - - Args: - records (list[str]): List of a specific columns values - field (str): Column header field value - row_idx (int): Row number in file - """ - - for idx in range(len(records)): - records[idx] = self.check_sat_score(records[idx]) - - def check_hscode_efi(self, records: list[str]): - """Check HSCode logic. - - Args: - records (list[str]): List of a specific columns values - field (str): Column header field value - row_idx (int): Row number in file - """ - - for idx in range(len(records)): - records[idx] = self.check_hscode(records[idx]) - - def check_year_efi(self, records: list[str], field: str, row_idx: int): - """Check year conforms to expected year within time range. - - Args: - records (list[str]): List of a specific columns values - field (str): Column header field value - past (datetime): Past datetime threshold --> Deprecated, remove - future (datetime): Future datetime threshold --> Deprecated, remove - row_idx (int): Row number in file - """ - - for idx in range(len(records)): - records[idx] = self.check_year(records[idx], field, row_idx + idx) - - def check_date_efi(self, records: list[str], field: str, row_idx: int): - """Check date conforms to expected date within time range. - - Args: - records (list[str]): List of a specific columns values - field (str): Column header field value - past (datetime): Past datetime threshold --> Deprecated, remove - future (datetime): Future datetime threshold --> Deprecated, remove - row_idx (int): Row number in file - """ - - for idx in range(len(records)): - records[idx] = self.check_date(records[idx], field, row_idx + idx) - if field in self.non_prospect_fields and records[idx]: - self.non_prospect_row_idxs.add(idx) - - def check_name_efi(self, records: list[str], field: str, row_idx: int): - """Check FirstName | LastName logic. - - Args: - records (list[str]): List of a specific columns values - field (str): Column header field value - row_idx (int): Row number in file - """ - - for idx in range(len(records)): - records[idx] = self.check_name(records[idx], field, row_idx + idx) - - def check_phone_number_efi(self, records: list[str], field: str, row_idx: int): - """Check Phone Number logic. - - Args: - records (list[str]): List of a specific columns values - field (str): Column header field value - row_idx (int): Row number in file - """ - - for idx in range(len(records)): - records[idx] = self.check_phone_number(records[idx], field, row_idx + idx) - - def check_gender_efi(self, records: list[str]): - """Check Gender logic. - - Args: - records (list[str]): List of a specific columns values - """ - - for idx in range(len(records)): - records[idx] = self.check_gender(records[idx]) - - def check_gpa_efi(self, records: list[str], field: str, row_idx: int): - """Check GPA logic. - - Args: - records (list[str]): List of a specific columns values - field (str): Column header field value - row_idx (int): Row number in file - """ - - for idx in range(len(records)): - records[idx] = self.check_gpa(records[idx], field, row_idx + idx) - - def check_statecode_efi(self, records: list[str], field: str, row_idx: int): - """Check StateCode logic. - - Args: - records (list[str]): List of a specific columns values - field (str): Column header field value - row_idx (int): Row number in file - """ - - for idx in range(len(records)): - records[idx] = self.check_statecode(records[idx], field, row_idx + idx) - - def check_srccode_efi(self, records: list[str], field: str, row_idx: int): - """Check SrcCode logic. - - Args: - records (list[str]): List of a specific columns values - field (str): Column header field value - row_idx (int): Row number in file - """ - - for idx in range(len(records)): - records[idx] = self.check_srccode(records[idx], field, row_idx + idx) - - def check_athlete_efi(self, records: list[str]): - """Check StudentAthlete logic. - - Args: - records (list[str]): List of a specific columns values - rows (list[int]): List of row indicies - """ - - for idx in range(len(records)): - records[idx] = self.check_athlete(records[idx]) - - def check_email_efi(self, records: list[str], field: str, row_idx: int): - """Check Email logic. - - Args: - records (list[str]): List of a specific columns values - field (str): Column header field value - row_idx (int): Row number in file - """ - - for idx in range(len(records)): - records[idx] = self.check_email(records[idx], field, row_idx + idx) - - def check_generic_efi(self, records: list[str], field: str, row_idx: int): - """Check generic column logic. - - Args: - records (list[str]): List of a specific columns values - field (str): Column header field value - row_idx (int): Row number in file - """ - - for idx in range(len(records)): - records[idx] = self.check_generic(records[idx], field, row_idx + idx) - - def check_address1_efi(self, records: list[str], field: str, row_idx: int): - """Check Address1 logic. - - Args: - records (list[str]): List of a specific columns values - field (str): Column header field value - row_idx (int): Row number in file - """ - - for idx in range(len(records)): - records[idx] = self.check_address1(records[idx], field, row_idx + idx) - - def check_address2_efi(self, records: list[str], field: str, row_idx: int): - """Check Address2 logic. - - Args: - records (list[str]): List of a specific columns values - field (str): Column header field value - row_idx (int): Row number in file - """ - - for idx in range(len(records)): - records[idx] = self.check_address2(records[idx], field, row_idx + idx) - - def check_for_prospects_efi(self, records: list[list[str]]): - """Check and remove any records identified as prospects. - - Args: - records (list[list[str]]): All records defined as a list of lists - """ - - remove_indices = [i for i in range(len(records[0])) if i not in self.non_prospect_row_idxs] - if remove_indices: - self.filtered['prospects'] = len(remove_indices) - for col_idx, record in enumerate(records): - records[col_idx] = np.delete(record, remove_indices).tolist() - - def check_entry_fields_efi(self, records: list[list[str]], header_to_index: dict[str, int]): - """Check entryyear and entryterm fields and try to detect their values. - Some colleges put the year and term in the same fields. - - Args: - records (list[list[str]]): All records defined as a list of lists - header_to_index (dict[str, int]): Column name to index in records - """ - - entryyear = records[header_to_index['EntryYear']] - entryterm = records[header_to_index['EntryTerm']] - remove_indices = [] - for idx in range(len(records[0])): - - skip_record, entryyear[idx], entryterm[idx] = self.check_entry_fields(entryyear[idx], entryterm[idx]) - if skip_record: - remove_indices.append(idx) - - if remove_indices: - for col_idx, record in enumerate(records): - records[col_idx] = np.delete(record, remove_indices).tolist() - - def apply_fice_enrolled_logic_efi(self, records: list[list[str]], fice: str, header_to_index: dict[str, int]): - """Apply FICE enrolled logic if defined in the config. - - Args: - records (list[list[str]]): All records defined as a list of lists - fice (str): Institution unique identifier - header_to_index (dict[str, int]): Column name to index in records - """ - - if fice in self.fice_enrolled_logic: - confirmed = records[header_to_index['Confirmed']] - enrolled = records[header_to_index['Enrolled']] - canceled = records[header_to_index['Canceled']] - dropped = records[header_to_index['Dropped']] - for idx in range(len(records[0])): - enrolled[idx] = self.apply_fice_enrolled_logic(fice, confirmed[idx], enrolled[idx], canceled[idx], dropped[idx]) - def async_wrapper(func: coroutine) -> Any: """Decorator to run functions using async. Found this handy to use with DAG diff --git a/aioradio/requirements.txt b/aioradio/requirements.txt index 8a9f73c..3fbfd5c 100644 --- a/aioradio/requirements.txt +++ b/aioradio/requirements.txt @@ -4,8 +4,8 @@ backoff==2.2.1 boto3==1.24.59 botocore==1.27.59 cchardet==2.1.7 -ddtrace==1.3.6 -dominodatalab==1.2.2 +ddtrace==1.9.4 +dominodatalab==1.2.3 fakeredis==1.10.1 flask==2.1.2 flask-cors==3.0.10 @@ -14,17 +14,17 @@ mandrill==1.0.60 moto==3.1.18 openpyxl==3.0.10 orjson==3.8.5 -pandas==1.4.4 -pre-commit==2.21.0 +pandas==1.5.3 +pre-commit==3.2.0 psycopg2-binary==2.9.5 -pylint==2.16.1 +pylint==2.17.0 pyodbc==4.0.35 pysmb==1.2.9.1 -pytest==7.2.1 -pytest-asyncio==0.20.3 +pytest==7.2.2 +pytest-asyncio==0.21.0 pytest-cov==4.0.0 -python-json-logger==2.0.4 +python-json-logger==2.0.7 redis==3.5.3 twine==4.0.2 werkzeug==2.1.2 -wheel==0.38.4 +wheel==0.40.0 diff --git a/aioradio/tests/file_ingestion_test.py b/aioradio/tests/file_ingestion_test.py index e379b38..17e41fb 100644 --- a/aioradio/tests/file_ingestion_test.py +++ b/aioradio/tests/file_ingestion_test.py @@ -11,7 +11,7 @@ import pytest -from aioradio.file_ingestion import (EFIParse, async_db_wrapper, async_wrapper, +from aioradio.file_ingestion import (async_db_wrapper, async_wrapper, delete_ftp_file, establish_ftp_connection, get_current_datetime_from_timestamp, list_ftp_objects, @@ -213,7 +213,7 @@ async def test_async_db_wrapper(user): 'database': 'DataStage', 'secret': 'efi/sandbox/all', 'secret_json_key': 'mssql', - 'region': 'us-east-1', + 'region': 'us-east-2', 'rollback': True, 'trusted_connection': 'no', 'application_intent': 'ReadOnly', @@ -227,14 +227,3 @@ async def func(**kwargs): print(f"Connection name: {name}\tConnection object: {conn}") await func() - - -def test_check_phone_number(): - """Test check_phone_number in EFIParse.""" - - efi = EFIParse('') - number = efi.check_phone_number('+1 (512) 573-5819', 'CellPhoneNumber', 0) - assert number == '5125735819' - - number = efi.check_phone_number('215â€"863-79', 'CellPhoneNumber', 0) - assert number == '21586379' diff --git a/aioradio/utils.py b/aioradio/utils.py index b60a8b1..ccbf72e 100644 --- a/aioradio/utils.py +++ b/aioradio/utils.py @@ -3,7 +3,8 @@ # pylint: disable=ungrouped-imports # pylint: disable=wrong-import-position -from asyncio import coroutine, create_task, sleep, to_thread +from asyncio import create_task, sleep, to_thread +from types import coroutine from typing import Any, Dict, List, Tuple diff --git a/setup.py b/setup.py index 6f1728f..1d45e10 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ long_description = fileobj.read() setup(name='aioradio', - version='0.17.36', + version='0.18.0', description='Generic asynchronous i/o python utilities for AWS services (SQS, S3, DynamoDB, Secrets Manager), Redis, MSSQL (pyodbc), JIRA and more', long_description=long_description, long_description_content_type="text/markdown", @@ -20,12 +20,13 @@ 'aioradio/aws', ], install_requires=[ + 'cython>=0.29.33', 'aioboto3==10.4.0', 'aiojobs>=1.0.0', 'backoff>=2.1.2', 'botocore==1.27.59', 'boto3==1.24.59', - 'cchardet>=2.1.7', + #'cchardet>=2.1.7', 'ddtrace>=0.60.1', 'dominodatalab>=1.1.1', 'fakeredis>=1.7.1',