-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Simplify Socrata API script and allow multiple assets to be updated #745
base: master
Are you sure you want to change the base?
Changes from all commits
a5b2b9d
92c0cb9
cfd81b4
0cc785b
f490403
0d7dbfd
08a90c4
484df21
fc9aeb5
165174d
968554d
f98b885
6c1ec09
114553f
ff45db0
57e03ce
dcf5056
9794582
f08596a
885accd
7129128
453977f
f13421a
a3c0833
c1af276
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,34 +4,35 @@ on: | |
workflow_dispatch: | ||
inputs: | ||
socrata_asset: | ||
type: choice | ||
description: Target Socrata asset | ||
options: | ||
- Appeals | ||
- Assessed Values | ||
- Parcel Addresses | ||
- Parcel Proximity | ||
- Parcel Sales | ||
- Parcel Status | ||
- Permits | ||
- Property Tax-Exempt Parcels | ||
- Parcel Universe (Historic) | ||
- Parcel Universe (Current Year) | ||
- Residential Condominium Unit Characteristics | ||
- Single and Multi-Family Improvement Characteristics | ||
default: Parcel Universe (Current Year) | ||
type: string | ||
description: > | ||
Comma-separated Socrata assets to update. Options are: | ||
Appeals | | ||
Assessed Values | | ||
Parcel Addresses | | ||
Parcel Proximity | | ||
Parcel Sales | | ||
Parcel Status | | ||
Permits | | ||
Property Tax-Exempt Parcels | | ||
Parcel Universe (Historic) | | ||
Parcel Universe (Current Year) | | ||
Residential Condominium Unit Characteristics | | ||
Single and Multi-Family Improvement Characteristics | ||
required: true | ||
years: | ||
# Comma separated list of years | ||
type: string | ||
description: > | ||
Years to update or overwrite (comma-separated). Leaving this field | ||
blank will upload an entire asset in one piece. Enter 'all' to upload | ||
all years individually. | ||
required: false | ||
overwrite: | ||
# True for overwrite, False for update | ||
type: boolean | ||
description: Overwrite socrata asset | ||
required: true | ||
years: | ||
# Comma separated list of years | ||
type: string | ||
description: Years to update or overwrite (comma-separated) | ||
default: 'all' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed the default and explained user's choices. |
||
required: false | ||
|
||
|
||
env: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,11 +31,24 @@ | |
).cursor(unload=True) | ||
|
||
|
||
def parse_years(years): | ||
def parse_assets(assets): | ||
""" | ||
Make sure the asset environmental variable is formatted correctly. | ||
""" | ||
|
||
assets = str(assets).split(",") | ||
assets = [i.strip() for i in assets] | ||
|
||
return assets | ||
|
||
|
||
def parse_years(years=None): | ||
""" | ||
Make sure the years environmental variable is formatted correctly. | ||
""" | ||
|
||
if years == "": | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just want to add a bit more error handling. Probably won't ever be needed. |
||
years = None | ||
if years is not None: | ||
years = str(years).replace(" ", "").split(",") | ||
|
||
|
@@ -88,6 +101,9 @@ def get_asset_info(socrata_asset): | |
Simple helper function to retrieve asset-specific information from dbt. | ||
""" | ||
|
||
if not os.path.isdir("./dbt"): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Only helpful locally, unnecessary for github deployment. |
||
os.chdir("..") | ||
|
||
os.chdir("./dbt") | ||
|
||
DBT = dbtRunner() | ||
|
@@ -128,9 +144,9 @@ def get_asset_info(socrata_asset): | |
|
||
def build_query(athena_asset, asset_id, years=None): | ||
""" | ||
Build an Athena compatible SQL query. Function will append a year | ||
conditional if `years` is non-empty. Many of the CCAO's open data assets are | ||
too large to pass to Socrata without chunking. | ||
Build a dictionary of Athena compatible SQL queries and their associated | ||
years. Many of the CCAO's open data assets are too large to pass to Socrata | ||
without chunking. | ||
""" | ||
|
||
# Retrieve column names and types from Athena | ||
|
@@ -177,21 +193,23 @@ def build_query(athena_asset, asset_id, years=None): | |
# Limit pull to columns present in open data asset | ||
columns = columns[columns["column"].isin(asset_columns)] | ||
|
||
print("The following columns will be updated:") | ||
print(f"The following columns will be updated for {athena_asset}:") | ||
print(columns) | ||
|
||
query = f"SELECT {', '.join(columns['column'])} FROM {athena_asset}" | ||
|
||
# Build a dictionary with queries for each year requested, or no years | ||
if not years: | ||
query = query | ||
query = {None: query} | ||
|
||
elif years is not None: | ||
query += " WHERE year = %(year)s" | ||
else: | ||
query = [query + " WHERE year = '" + year + "'" for year in years] | ||
query = dict([(k, v) for k, v in zip(years, query)]) | ||
|
||
return query | ||
|
||
|
||
def upload(method, asset_id, sql_query, overwrite, count, year=None): | ||
def upload(asset_id, sql_query, overwrite): | ||
""" | ||
Function to perform the upload to Socrata. `puts` or `posts` depending on | ||
user's choice to overwrite existing data. | ||
|
@@ -202,52 +220,39 @@ def upload(method, asset_id, sql_query, overwrite, count, year=None): | |
|
||
url = "https://datacatalog.cookcountyil.gov/resource/" + asset_id + ".json" | ||
|
||
print_message = "Overwriting" if overwrite else "Updating" | ||
|
||
if not year: | ||
query_conditionals = {} | ||
print_message = print_message + " all years for asset " + asset_id | ||
else: | ||
query_conditionals = {"year": year} | ||
print_message = ( | ||
print_message + " year: " + year + " for asset " + asset_id | ||
) | ||
# Raise URL status if it's bad | ||
session.get(url=url, headers={"X-App-Token": app_token}).raise_for_status() | ||
|
||
# We grab the data before uploading it so we can make sure timestamps are | ||
# properly formatted | ||
input_data = cursor.execute(sql_query, query_conditionals).as_pandas() | ||
date_columns = input_data.select_dtypes(include="datetime").columns | ||
for i in date_columns: | ||
input_data[i] = input_data[i].fillna("").dt.strftime("%Y-%m-%dT%X") | ||
for year, query in sql_query.items(): | ||
print_message = "Overwriting" if overwrite else "Updating" | ||
|
||
# Raise URL status if it's bad | ||
session.get( | ||
( | ||
"https://datacatalog.cookcountyil.gov/resource/" | ||
+ asset_id | ||
+ ".json?$limit=1" | ||
), | ||
headers={"X-App-Token": app_token}, | ||
).raise_for_status() | ||
|
||
session.get(url=url, headers={"X-App-Token": app_token}).raise_for_status() | ||
Comment on lines
-224
to
-233
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We were checking for a bad URL status twice. |
||
if not year: | ||
print_message = print_message + " all years for asset " + asset_id | ||
else: | ||
print_message = ( | ||
print_message + " year: " + year + " for asset " + asset_id | ||
) | ||
|
||
for i in range(0, input_data.shape[0], 10000): | ||
print(print_message) | ||
print(f"Rows {i + 1}-{i + 10000}") | ||
if count > 0: | ||
method = "post" | ||
response = getattr(session, method)( | ||
url=url, | ||
data=input_data.iloc[i : i + 10000].to_json(orient="records"), | ||
headers={"X-App-Token": app_token}, | ||
) | ||
count += 1 | ||
print(response.content) | ||
input_data = cursor.execute(query).as_pandas() | ||
date_columns = input_data.select_dtypes(include="datetime").columns | ||
for i in date_columns: | ||
input_data[i] = input_data[i].fillna("").dt.strftime("%Y-%m-%dT%X") | ||
|
||
for i in range(0, input_data.shape[0], 10000): | ||
print(print_message) | ||
print(f"Rows {i + 1}-{i + 10000}") | ||
method = "post" if not overwrite else "put" | ||
response = getattr(session, method)( | ||
url=url, | ||
data=input_data.iloc[i : i + 10000].to_json(orient="records"), | ||
headers={"X-App-Token": app_token}, | ||
) | ||
overwrite = False | ||
print(response.content) | ||
|
||
# Return the updated count so that if this function is called in a loop | ||
# the updated count persists. | ||
return count | ||
overwrite = False | ||
Comment on lines
+252
to
+255
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ensuring overwrite can never be |
||
|
||
|
||
def socrata_upload(socrata_asset, overwrite=False, years=None): | ||
|
@@ -264,54 +269,26 @@ def socrata_upload(socrata_asset, overwrite=False, years=None): | |
|
||
years_list = parse_years_list(years=years, athena_asset=athena_asset) | ||
|
||
tic = time.perf_counter() | ||
count = 0 | ||
|
||
if not years_list: | ||
sql_query = build_query( | ||
athena_asset=athena_asset, | ||
asset_id=asset_id, | ||
) | ||
|
||
upload_args = { | ||
"asset_id": asset_id, | ||
"sql_query": sql_query, | ||
"overwrite": overwrite, | ||
"count": count, | ||
} | ||
sql_query = build_query( | ||
athena_asset=athena_asset, | ||
asset_id=asset_id, | ||
years=years_list, | ||
) | ||
|
||
if overwrite: | ||
upload("put", **upload_args) | ||
else: | ||
upload("post", **upload_args) | ||
tic = time.perf_counter() | ||
upload(asset_id, sql_query, overwrite) | ||
toc = time.perf_counter() | ||
|
||
else: | ||
sql_query = build_query( | ||
athena_asset=athena_asset, | ||
asset_id=asset_id, | ||
years=years, | ||
) | ||
print(f"Total upload in {toc - tic:0.4f} seconds") | ||
|
||
for year in years_list: | ||
upload_args = { | ||
"asset_id": asset_id, | ||
"sql_query": sql_query, | ||
"overwrite": overwrite, | ||
"count": count, | ||
"year": year, | ||
} | ||
# Perform the upload and update the counter | ||
if count == 0 and overwrite: | ||
count = upload("put", **upload_args) | ||
else: | ||
count = upload("post", **upload_args) | ||
|
||
toc = time.perf_counter() | ||
print(f"Total upload in {toc - tic:0.4f} seconds") | ||
# Retrieve asset(s) | ||
all_assets = parse_assets(os.getenv("SOCRATA_ASSET")) | ||
|
||
|
||
socrata_upload( | ||
socrata_asset=os.getenv("SOCRATA_ASSET"), | ||
overwrite=check_overwrite(os.getenv("OVERWRITE")), | ||
years=parse_years(os.getenv("YEARS")), | ||
) | ||
for asset in all_assets: | ||
socrata_upload( | ||
socrata_asset=asset, | ||
overwrite=check_overwrite(os.getenv("OVERWRITE")), | ||
years=parse_years(os.getenv("YEARS")), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is gross, but I can't think of a better way to inform users of their choices since multiple choice isn't an input type and the github actions drop down menu doesn't seem to respect yaml line break formatting.