Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify Socrata API script and allow multiple assets to be updated #745

Open
wants to merge 25 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 15 additions & 16 deletions .github/workflows/socrata_upload.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,21 @@ on:
workflow_dispatch:
inputs:
socrata_asset:
type: choice
description: Target Socrata asset
options:
- Appeals
- Assessed Values
- Parcel Addresses
- Parcel Proximity
- Parcel Sales
- Parcel Status
- Permits
- Property Tax-Exempt Parcels
- Parcel Universe (Historic)
- Parcel Universe (Current Year)
- Residential Condominium Unit Characteristics
- Single and Multi-Family Improvement Characteristics
default: Parcel Universe (Current Year)
type: string
description: >
Comma-separated Socrata assets to update. Options are:
Appeals |
Assessed Values |
Parcel Addresses |
Parcel Proximity |
Parcel Sales |
Parcel Status |
Permits |
Property Tax-Exempt Parcels |
Parcel Universe (Historic) |
Parcel Universe (Current Year) |
Residential Condominium Unit Characteristics |
Single and Multi-Family Improvement Characteristics
Comment on lines +9 to +21
Copy link
Member Author

@wrridgeway wrridgeway Feb 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is gross, but I can't think of a better way to inform users of their choices since multiple choice isn't an input type and the github actions drop down menu doesn't seem to respect yaml line break formatting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion, non-blocking] The output is a bit hard to read in the context of the rendered form:

image

I wonder if we need to provide the universe of available options? We could instead instruct the caller on where to find the options:

Suggested change
Comma-separated Socrata assets to update. Options are:
Appeals |
Assessed Values |
Parcel Addresses |
Parcel Proximity |
Parcel Sales |
Parcel Status |
Permits |
Property Tax-Exempt Parcels |
Parcel Universe (Historic) |
Parcel Universe (Current Year) |
Residential Condominium Unit Characteristics |
Single and Multi-Family Improvement Characteristics
Comma-separated list of names of Socrata assets to update.
Asset names are the same as the asset title in Socrata, minus
the "Assessor - " prefix.

required: true
overwrite:
# True for overwrite, False for update
Expand Down
4 changes: 2 additions & 2 deletions dbt/models/open_data/exposures.yml
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ exposures:
# Disabled until the asset is public
test_row_count: false
year_field: assessment_year
asset_id: 6yjf-dfxs
asset_id: tqd2-4zjs
primary_key:
- pin
- permit_number
Expand All @@ -199,7 +199,7 @@ exposures:
name: Data Department
meta:
test_row_count: true
asset_id: vgzx-68gb
asset_id: 3sp4-s4qg
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs to be reverted before merging.

primary_key:
- pin
- year
Expand Down
158 changes: 66 additions & 92 deletions socrata/socrata_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,23 @@
).cursor(unload=True)


def parse_assets(assets):
"""
Make sure the asset environmental variable is formatted correctly.
"""

assets = str(assets).replace(" ", "").split(",")

return assets


def parse_years(years):
"""
Make sure the years environmental variable is formatted correctly.
"""

if years == "":
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just want to add a bit more error handling. Probably won't ever be needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Praise] I like the defensiveness!

years = None
if years is not None:
years = str(years).replace(" ", "").split(",")

Expand Down Expand Up @@ -88,6 +100,9 @@ def get_asset_info(socrata_asset):
Simple helper function to retrieve asset-specific information from dbt.
"""

if not os.path.isdir("./dbt"):
Copy link
Member Author

@wrridgeway wrridgeway Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only helpful locally, unnecessary for github deployment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion, non-blocking] I think this is knowledge that it would make sense to preserve for the future:

Suggested change
if not os.path.isdir("./dbt"):
# When running locally, we will probably be inside the socrata/ dir, so
# switch back out to find the dbt/ dir
if not os.path.isdir("./dbt"):

os.chdir("..")

os.chdir("./dbt")

DBT = dbtRunner()
Expand Down Expand Up @@ -128,8 +143,7 @@ def get_asset_info(socrata_asset):

def build_query(athena_asset, asset_id, years=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion, non-blocking] Since this now returns a dict rather than a query, how about we rename it to better hint at its return type? Something like build_query_dict?

"""
Build an Athena compatible SQL query. Function will append a year
conditional if `years` is non-empty. Many of the CCAO's open data assets are
Build a dictionary of Athena compatible SQL queries and their associated years. Many of the CCAO's open data assets are
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the meat of what's changing here.

too large to pass to Socrata without chunking.
"""

Expand Down Expand Up @@ -182,16 +196,18 @@ def build_query(athena_asset, asset_id, years=None):

query = f"SELECT {', '.join(columns['column'])} FROM {athena_asset}"

# Build a dictionary with queries for each year requested, or no years
if not years:
query = query
query = {None: query}

elif years is not None:
query += " WHERE year = %(year)s"
else:
query = [query + " WHERE year = '" + year + "'" for year in years]
query = dict([(k, v) for k, v in zip(years, query)])
Comment on lines +201 to +207
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion, non-blocking] A couple things I find slightly confusing about this block:

  1. We're mutating the query variable continuously, which makes it unclear what each transformation step does and could potentially make it confusing to step through the code using logging or a breakpoint
  2. I admire the clever list comprehensions! But I think the combination of the first list comprehension and the second using zip() is a bit hard to get your head around quickly. I would prefer slightly more verbose code for the sake of readability

Here's how I would suggest we resolve both nits:

if not years:
    query_dict = {None: query}
else:
    query_dict = {year: f"{query} WHERE year = '{year}'" for year in years}

return query_dict

Also, keep in mind that templating user input into SQL strings like this is a common cause of SQL injection vulnerabilities. I think it's OK in this case, since we don't allow external code authors to run workflows, but I would still recommend we test an attack input for year like 2004'; select 1; to make sure we're properly escaping user input in our queries. Let me know if you want help with this and we can pair on it together.


return query


def upload(method, asset_id, sql_query, overwrite, count, year=None):
def upload(asset_id, sql_query, overwrite):
"""
Function to perform the upload to Socrata. `puts` or `posts` depending on
user's choice to overwrite existing data.
Expand All @@ -202,52 +218,39 @@ def upload(method, asset_id, sql_query, overwrite, count, year=None):

url = "https://datacatalog.cookcountyil.gov/resource/" + asset_id + ".json"

print_message = "Overwriting" if overwrite else "Updating"

if not year:
query_conditionals = {}
print_message = print_message + " all years for asset " + asset_id
else:
query_conditionals = {"year": year}
print_message = (
print_message + " year: " + year + " for asset " + asset_id
)
# Raise URL status if it's bad
session.get(url=url, headers={"X-App-Token": app_token}).raise_for_status()

# We grab the data before uploading it so we can make sure timestamps are
# properly formatted
input_data = cursor.execute(sql_query, query_conditionals).as_pandas()
date_columns = input_data.select_dtypes(include="datetime").columns
for i in date_columns:
input_data[i] = input_data[i].fillna("").dt.strftime("%Y-%m-%dT%X")

# Raise URL status if it's bad
session.get(
(
"https://datacatalog.cookcountyil.gov/resource/"
+ asset_id
+ ".json?$limit=1"
),
headers={"X-App-Token": app_token},
).raise_for_status()
for year, query in sql_query.items():
print_message = "Overwriting" if overwrite else "Updating"

session.get(url=url, headers={"X-App-Token": app_token}).raise_for_status()
if not year:
print_message = print_message + " all years for asset " + asset_id
else:
print_message = (
print_message + " year: " + year + " for asset " + asset_id
)

for i in range(0, input_data.shape[0], 10000):
print(print_message)
print(f"Rows {i + 1}-{i + 10000}")
if count > 0:
method = "post"
response = getattr(session, method)(
url=url,
data=input_data.iloc[i : i + 10000].to_json(orient="records"),
headers={"X-App-Token": app_token},
)
count += 1
print(response.content)
input_data = cursor.execute(query).as_pandas()
date_columns = input_data.select_dtypes(include="datetime").columns
for i in date_columns:
input_data[i] = input_data[i].fillna("").dt.strftime("%Y-%m-%dT%X")

for i in range(0, input_data.shape[0], 10000):
print(print_message)
print(f"Rows {i + 1}-{i + 10000}")
method = "post" if not overwrite else "put"
response = getattr(session, method)(
url=url,
data=input_data.iloc[i : i + 10000].to_json(orient="records"),
headers={"X-App-Token": app_token},
)
overwrite = False
print(response.content)

# Return the updated count so that if this function is called in a loop
# the updated count persists.
return count
overwrite = False
Comment on lines +252 to +255
Copy link
Member Author

@wrridgeway wrridgeway Feb 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensuring overwrite can never be true more than once per asset.



def socrata_upload(socrata_asset, overwrite=False, years=None):
Expand All @@ -264,54 +267,25 @@ def socrata_upload(socrata_asset, overwrite=False, years=None):

years_list = parse_years_list(years=years, athena_asset=athena_asset)

tic = time.perf_counter()
count = 0

if not years_list:
sql_query = build_query(
athena_asset=athena_asset,
asset_id=asset_id,
)

upload_args = {
"asset_id": asset_id,
"sql_query": sql_query,
"overwrite": overwrite,
"count": count,
}

if overwrite:
upload("put", **upload_args)
else:
upload("post", **upload_args)

else:
sql_query = build_query(
athena_asset=athena_asset,
asset_id=asset_id,
years=years,
)

for year in years_list:
upload_args = {
"asset_id": asset_id,
"sql_query": sql_query,
"overwrite": overwrite,
"count": count,
"year": year,
}
# Perform the upload and update the counter
if count == 0 and overwrite:
count = upload("put", **upload_args)
else:
count = upload("post", **upload_args)
sql_query = build_query(
athena_asset=athena_asset,
asset_id=asset_id,
years=years_list,
)

tic = time.perf_counter()
upload(asset_id, sql_query, overwrite)
toc = time.perf_counter()

print(f"Total upload in {toc - tic:0.4f} seconds")


socrata_upload(
socrata_asset=os.getenv("SOCRATA_ASSET"),
overwrite=check_overwrite(os.getenv("OVERWRITE")),
years=parse_years(os.getenv("YEARS")),
)
# Retrieve asset(s)
all_assets = parse_assets(os.getenv("SOCRATA_ASSET"))

for asset in all_assets:
socrata_upload(
socrata_asset=asset,
overwrite=check_overwrite(os.getenv("OVERWRITE")),
years=parse_years(os.getenv("YEARS")),
)