Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] Web Scraper Fixes #103

Merged
merged 3 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/webscraper/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
key: str = os.environ.get("NEXT_PUBLIC_SUPABASE_ANON_KEY")
supabase: Client = create_client(url, key)
supabase_table: str = (
"Projects_duplicate" # TODO: modify based on which table in supabase we want to edit
"Projects_test_julee" # TODO: modify based on which table in supabase we want to edit
)

geocode_api: str = os.environ.get("NEXT_PUBLIC_GEOCODIO_API_KEY")
Expand Down
3 changes: 3 additions & 0 deletions api/webscraper/database_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,7 @@
"permit_process",
"permit_application_number",
"last_updated",
"has_energy_storage",
"has_pumped_storage",
"storage_size",
]
159 changes: 126 additions & 33 deletions api/webscraper/ores_scraper.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from collections import defaultdict
import requests
from bs4 import BeautifulSoup
import pandas as pd
from io import StringIO
from utils.scraper_utils import geocode_lat_long
from database_constants import initial_kdm
import re

# url = "https://dps.ny.gov/ores-permit-applications"
# page = requests.get(url)
Expand All @@ -28,31 +30,107 @@
"""


def parse_for_location(description):
# finds index in the description where the phrase "Town of..." appears
town_index = description.lower().find("town")
town_string = description[town_index:]
# splits town_string by the comma
town_split = town_string.split(",")
# town is the second to last word before the comma
town = town_split[-2].split(" ")[-1].strip()
# county is the last word when the location string is split by commas
county = town_split[-1].strip()

# removes the period from the end of county if it exists
index = county.find(".")
if index != -1:
while county.find(".", index + 1) != -1:
index = county.find(".", index + 1)
county = county[:index]

# capitalize first letter of each word in town/county name
if town:
town = " ".join([word.capitalize() for word in town.split(" ")])
if county:
county = " ".join([word.capitalize() for word in county.split(" ")])
return (town, county)
def parse_for_locations(description):
locations = [] # list of tuples (town, county)
towns = []

# Change description to lowercase
description = description.lower()

# Case 1: Multiple towns in the same county
if "towns" in description:
town_index = description.find("towns")
town_string = description[town_index:-1]
town_split = town_string.split(", ")
# Case 1a: 3+ towns
if len(town_split) > 3:
towns = re.findall(r"\b(?!towns|of|and)[a-z]+\b", town_string)
county_index = towns.index('county')
towns = towns[:county_index - 1]
# Case 1b: 2 towns
else:
towns = list(re.findall(r"towns of (.+?) and (.+?),", town_string)[0])
county = re.findall(r"\b([a-z .\-]+ county)\b", town_string)[0]
for town in towns:
locations.append((town.title(), county.title()))

# Case 2: Multiple towns in different counties
elif description.count("town") > 1:
town_index = description.find("town")
town_string = description[town_index:-1]
town_split = town_string.split(" and ")
county_to_town = defaultdict(list)
for potential_town in town_split:
town = re.findall(r"town of (.+?),", potential_town)
county = re.findall(r", (.+?) county", potential_town)[0] + " county"
county_to_town[county] += town
for county, towns in county_to_town.items():
for town in towns:
locations.append((town.title(), county.title()))

# Case 3: Single town
else:
town_index = description.find("town")
town_string = description[town_index:-1]
town_split = town_string.split(", ")
towns += re.findall(r"town of (.+?),", town_string)
if 'and' in towns[0]:
and_index = towns[0].index('and')
towns[0] = towns[0][:and_index - 1]
county = re.findall(r", (.+?) county", town_string)[0] + " county"
for town in towns:
locations.append((town.title(), county.title()))

return locations


def parse_for_project_size(description):
# Change description to lowercase
description = description.lower()

match = re.search(r"(\d+[.,]?\d*)[-\s]?megawatt", description)
project_size = match.group(1) if match else None

return project_size

# TODO: Fix this to account for all renewable energy types (only looks for solar and wind)
def parse_for_renewable_energy_technology(description):
# Change description to lowercase
description = description.lower()
if "solar" in description:
return "Solar PV"
if "wind" in description:
return "Land-Based Wind"
return None


def has_energy_storage(description):
# Change description to lowercase
description = description.lower()

return "energy storage" in description


def has_pumped_storage(description):
# Change description to lowercase
description = description.lower()

return "pumped storage" in description


def parse_for_storage_size(description):
# Change description to lowercase
description = description.lower()

if has_energy_storage(description) or has_pumped_storage(description):
including_index = description.index("including") # finds size based on the word "including"
temp = description[including_index:].split(", ")

# Search for the megawatt number in the lowercase text
match = re.search(r'(\d+)[-\s]?megawatt', temp[0])
storage_size = match.group(1) if match else None
return storage_size
return None

# ORES notice of intent
def filter_noi(data: list) -> list:
Expand All @@ -64,15 +142,20 @@ def filter_noi(data: list) -> list:
"""
filtered_list = []
for row in data:
town, county = parse_for_location(row["Description"])
locations = parse_for_locations(row["Description"])
project_dict = {
"permit_application_number": row.get("Permit Application Number", None),
"project_name": row.get("Project Name", None),
"town": town if town else None,
"county": county if county else None,
"town": list(set([town for town, county in locations])) if locations else None,
"county": list(set([county for town, county in locations])) if locations else None,
"latitude": None, # geocoding for lat/long is handled when inserting into database
"longitude": None,
"key_development_milestones": initial_kdm,
"size": parse_for_project_size(row["Description"]),
"has_energy_storage": has_energy_storage(row["Description"]),
"has_pumped_storage": has_pumped_storage(row["Description"]),
"storage_size": parse_for_storage_size(row["Description"]),
"renewable_energy_technology": parse_for_renewable_energy_technology(row["Description"])
}
filtered_list.append(project_dict)
return filtered_list
Expand All @@ -87,15 +170,20 @@ def filter_under_review(data: list) -> list:
"""
filtered_list = []
for row in data:
town, county = parse_for_location(row["Description"])
locations = parse_for_locations(row["Description"])
project_dict = {
"permit_application_number": row.get("Permit Application Number", None),
"project_name": row.get("Project Name", None),
"town": town if town else None,
"county": county if county else None,
"town": list(set([town for town, county in locations])) if locations else None,
"county": list(set([county for town, county in locations])) if locations else None,
"latitude": None, # geocoding for lat/long is handled when inserting into database
"longitude": None,
"key_development_milestones": initial_kdm, # updating kdm for projects under review is handled in database.py
"size": parse_for_project_size(row["Description"]),
"has_energy_storage": has_energy_storage(row["Description"]),
"has_pumped_storage": has_pumped_storage(row["Description"]),
"storage_size": parse_for_storage_size(row["Description"]),
"renewable_energy_technology": parse_for_renewable_energy_technology(row["Description"])
}
filtered_list.append(project_dict)
return filtered_list
Expand All @@ -110,15 +198,20 @@ def filter_permitted(data):
"""
filtered_list = []
for row in data:
town, county = parse_for_location(row["Description"])
locations = parse_for_locations(row["Description"])
project_dict = {
"permit_application_number": row.get("Permit Application Number", None),
"project_name": row.get("Project Name", None),
"town": town if town else None,
"county": county if county else None,
"town": list(set([town for town, county in locations])) if locations else None,
"county": list(set([county for town, county in locations])) if locations else None,
"latitude": None, # geocoding for lat/long is handled when inserting into database
"longitude": None,
"key_development_milestones": initial_kdm, # updating kdm for permitted projects is handled in database.py
"size": parse_for_project_size(row["Description"]),
"has_energy_storage": has_energy_storage(row["Description"]),
"has_pumped_storage": has_pumped_storage(row["Description"]),
"storage_size": parse_for_storage_size(row["Description"]),
"renewable_energy_technology": parse_for_renewable_energy_technology(row["Description"])
}
filtered_list.append(project_dict)
return filtered_list
Expand Down
Loading