Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify Socrata API script and allow multiple assets to be updated #745

Open
wants to merge 25 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbt/models/open_data/exposures.yml
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ exposures:
name: Data Department
meta:
test_row_count: true
asset_id: vgzx-68gb
asset_id: 3sp4-s4qg
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs to be reverted before merging.

primary_key:
- pin
- year
Expand Down
134 changes: 47 additions & 87 deletions socrata/socrata_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ def parse_years(years):
Make sure the years environmental variable is formatted correctly.
"""

if years == "":
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just want to add a bit more error handling. Probably won't ever be needed.

years = None
if years is not None:
years = str(years).replace(" ", "").split(",")

Expand Down Expand Up @@ -88,6 +90,9 @@ def get_asset_info(socrata_asset):
Simple helper function to retrieve asset-specific information from dbt.
"""

if not os.path.isdir("./dbt"):
Copy link
Member Author

@wrridgeway wrridgeway Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only helpful locally, unnecessary for github deployment.

os.chdir("..")

os.chdir("./dbt")

DBT = dbtRunner()
Expand Down Expand Up @@ -128,8 +133,7 @@ def get_asset_info(socrata_asset):

def build_query(athena_asset, asset_id, years=None):
"""
Build an Athena compatible SQL query. Function will append a year
conditional if `years` is non-empty. Many of the CCAO's open data assets are
Build a dictionary of Athena compatible SQL queries and their associated years. Many of the CCAO's open data assets are
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the meat of what's changing here.

too large to pass to Socrata without chunking.
"""

Expand Down Expand Up @@ -182,16 +186,18 @@ def build_query(athena_asset, asset_id, years=None):

query = f"SELECT {', '.join(columns['column'])} FROM {athena_asset}"

# Build a dictionary with queries for each year requested, or no years
if not years:
query = query
query = {None: query}

elif years is not None:
query += " WHERE year = %(year)s"
else:
query = [query + " WHERE year = '" + year + "'" for year in years]
query = dict([(k, v) for k, v in zip(years, query)])

return query


def upload(method, asset_id, sql_query, overwrite, count, year=None):
def upload(asset_id, sql_query, overwrite):
"""
Function to perform the upload to Socrata. `puts` or `posts` depending on
user's choice to overwrite existing data.
Expand All @@ -202,52 +208,39 @@ def upload(method, asset_id, sql_query, overwrite, count, year=None):

url = "https://datacatalog.cookcountyil.gov/resource/" + asset_id + ".json"

print_message = "Overwriting" if overwrite else "Updating"

if not year:
query_conditionals = {}
print_message = print_message + " all years for asset " + asset_id
else:
query_conditionals = {"year": year}
print_message = (
print_message + " year: " + year + " for asset " + asset_id
)
# Raise URL status if it's bad
session.get(url=url, headers={"X-App-Token": app_token}).raise_for_status()

# We grab the data before uploading it so we can make sure timestamps are
# properly formatted
input_data = cursor.execute(sql_query, query_conditionals).as_pandas()
date_columns = input_data.select_dtypes(include="datetime").columns
for i in date_columns:
input_data[i] = input_data[i].fillna("").dt.strftime("%Y-%m-%dT%X")
for year, query in sql_query.items():
print_message = "Overwriting" if overwrite else "Updating"

# Raise URL status if it's bad
session.get(
(
"https://datacatalog.cookcountyil.gov/resource/"
+ asset_id
+ ".json?$limit=1"
),
headers={"X-App-Token": app_token},
).raise_for_status()

session.get(url=url, headers={"X-App-Token": app_token}).raise_for_status()
Comment on lines -224 to -233
Copy link
Member Author

@wrridgeway wrridgeway Feb 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We were checking for a bad URL status twice.

if not year:
print_message = print_message + " all years for asset " + asset_id
else:
print_message = (
print_message + " year: " + year + " for asset " + asset_id
)

for i in range(0, input_data.shape[0], 10000):
print(print_message)
print(f"Rows {i + 1}-{i + 10000}")
if count > 0:
method = "post"
response = getattr(session, method)(
url=url,
data=input_data.iloc[i : i + 10000].to_json(orient="records"),
headers={"X-App-Token": app_token},
)
count += 1
print(response.content)
input_data = cursor.execute(query).as_pandas()
date_columns = input_data.select_dtypes(include="datetime").columns
for i in date_columns:
input_data[i] = input_data[i].fillna("").dt.strftime("%Y-%m-%dT%X")

for i in range(0, input_data.shape[0], 10000):
print(print_message)
print(f"Rows {i + 1}-{i + 10000}")
method = "post" if not overwrite else "put"
response = getattr(session, method)(
url=url,
data=input_data.iloc[i : i + 10000].to_json(orient="records"),
headers={"X-App-Token": app_token},
)
overwrite = False
print(response.content)

# Return the updated count so that if this function is called in a loop
# the updated count persists.
return count
overwrite = False
Comment on lines +252 to +255
Copy link
Member Author

@wrridgeway wrridgeway Feb 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensuring overwrite can never be true more than once per asset.



def socrata_upload(socrata_asset, overwrite=False, years=None):
Expand All @@ -264,49 +257,16 @@ def socrata_upload(socrata_asset, overwrite=False, years=None):

years_list = parse_years_list(years=years, athena_asset=athena_asset)

tic = time.perf_counter()
count = 0

if not years_list:
sql_query = build_query(
athena_asset=athena_asset,
asset_id=asset_id,
)

upload_args = {
"asset_id": asset_id,
"sql_query": sql_query,
"overwrite": overwrite,
"count": count,
}

if overwrite:
upload("put", **upload_args)
else:
upload("post", **upload_args)

else:
sql_query = build_query(
athena_asset=athena_asset,
asset_id=asset_id,
years=years,
)

for year in years_list:
upload_args = {
"asset_id": asset_id,
"sql_query": sql_query,
"overwrite": overwrite,
"count": count,
"year": year,
}
# Perform the upload and update the counter
if count == 0 and overwrite:
count = upload("put", **upload_args)
else:
count = upload("post", **upload_args)
sql_query = build_query(
athena_asset=athena_asset,
asset_id=asset_id,
years=years_list,
)

tic = time.perf_counter()
upload(asset_id, sql_query, overwrite)
toc = time.perf_counter()

print(f"Total upload in {toc - tic:0.4f} seconds")


Expand Down