-
Notifications
You must be signed in to change notification settings - Fork 0
/
createIngestStationMeta.py
300 lines (246 loc) · 12.3 KB
/
createIngestStationMeta.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
#!/usr/bin/env python
# coding: utf-8
# Import Python modules
import argparse, glob, sys, os, psycopg2, us, pdb
import pandas as pd
from psycopg2.extensions import AsIs
from geopy.geocoders import Nominatim
from pyzipcode import ZipCodeDatabase
from loguru import logger
# This function takes a longitude and latitude value as input, and returns a geometry in OGC Well-Known Text (WKT)
def getGeometry(lon, lat):
try:
# Create connection to database and get cursor
conn = psycopg2.connect("dbname='apsviz_gauges' user='apsviz_gauges' host='localhost' port='5432' password='apsviz_gauges'")
cur = conn.cursor()
# Set enviromnent
cur.execute("""SET CLIENT_ENCODING TO UTF8""")
cur.execute("""SET STANDARD_CONFORMING_STRINGS TO ON""")
cur.execute("""BEGIN""")
# Run query
cur.execute("""SELECT ST_SetSRID(ST_MakePoint(%(longitude)s, %(latitude)s), 4326)""",
{'longitude': AsIs(lon), 'latitude': AsIs(lat)})
# fetch rows
rows = cur.fetchall()
# Close cursor and database connection
cur.close()
conn.close()
# return first row
return(rows[0][0])
# If exception print error
except (Exception, psycopg2.DatabaseError) as error:
print(error)
# This function takes not input, and returns a DataFrame that contains a list of NOAA stations that it extracted from the noaa_stations
# table. The data in the noaa_stations table was obtained from NOAA's api.tidesandcurrents.noaa.gov API.
def getNOAAStations():
try:
# Create connection to database and get cursor
conn = psycopg2.connect("dbname='apsviz_gauges' user='apsviz_gauges' host='localhost' port='5432' password='apsviz_gauges'")
cur = conn.cursor()
# Set enviromnent
cur.execute("""SET CLIENT_ENCODING TO UTF8""")
cur.execute("""SET STANDARD_CONFORMING_STRINGS TO ON""")
cur.execute("""BEGIN""")
# Run query
cur.execute("""SELECT station_name, lat, lon, name FROM noaa_stations
ORDER BY station_name""")
# convert query output to Pandas dataframe
df = pd.DataFrame(cur.fetchall(), columns=['station_name', 'lat', 'lon', 'name'])
# Close cursor and database connection
cur.close()
conn.close()
# Return DataFrame
return(df)
# If exception print error
except (Exception, psycopg2.DatabaseError) as error:
print(error)
# This function takes a gauge location type (COASTAL, TIDAL or RIVERS) as input, and returns a DataFrame that contains a list of NCEM stations
# that are extracted from the dbo_gages_all table. The dbo_gages_all table contains data from an Excel file (dbo_GAGES_ALL.xlsx) that was
# obtained from Tom Langan at NCEM.
def getNCEMStations(locationType):
try:
# Create connection to database and get cursor
conn = psycopg2.connect("dbname='apsviz_gauges' user='apsviz_gauges' host='localhost' port='5432' password='apsviz_gauges'")
cur = conn.cursor()
# Set enviromnent
cur.execute("""SET CLIENT_ENCODING TO UTF8""")
cur.execute("""SET STANDARD_CONFORMING_STRINGS TO ON""")
cur.execute("""BEGIN""")
# Run query
if locationType == 'coastal':
cur.execute("""SELECT site_id, latitude, longitude, owner, name, county FROM dbo_gages_all
WHERE is_coastal = 1 AND owner != 'NOAA' AND latitude IS NOT NULL
ORDER BY site_id""")
elif locationType == 'rivers':
cur.execute("""SELECT site_id, latitude, longitude, owner, name, county FROM dbo_gages_all
WHERE is_coastal = 0 AND owner != 'NOAA' AND latitude IS NOT NULL
ORDER BY site_id""")
else:
sys.exit('Incorrect station type')
# convert query output to Pandas dataframe
df = pd.DataFrame(cur.fetchall(), columns=['site_id', 'latitude', 'longitude', 'owner', 'name', 'county'])
# Close cursor and database connection
cur.close()
conn.close()
# Return DataFrame
return(df)
# If exception print error
except (Exception, psycopg2.DatabaseError) as error:
print(error)
# This function queriers the original NOAA station table (noaa_stations), using the getNOAAStations function,
# extracting station information, and returns a dataframe. It uses the information from the table along with
# Nominatim and ZipCodeDatabase to generate and address from latitude and longitude values.
def addNOAAMeta(locationType):
# Create instance of Nominatim, and ZipCodeDatabase
geolocator = Nominatim(user_agent="geoapiExercises")
zcdb = ZipCodeDatabase()
# Get station data from original NOAA station table
df = getNOAAStations()
# Define list for input into new columns
country = []
state = []
county = []
geom = []
# Loop through dataframe
for index, row in df.iterrows():
# Get longitude and latitude values
lon = row['lon']
lat = row['lat']
# Get location address using latitude and longitude values
location = geolocator.reverse(str(lat)+","+str(lon))
address = location.raw['address']
# Extract county_code and counrty from address
country_code = address.get('country_code', '').strip()
country.append(country_code)
# Check if address is in the US, if it is get county and state information. If it is not use blank string for county and state
# information.
if country_code == 'us':
try:
# Extract zipcode address using the ZipCodeDatabase instance, by inputing the zipcode from
# address extracted from Nominatim instance. The state and country values from the new zipcode
# address are appended to the state and country list
zipcode = zcdb[address.get('postcode', '').strip().split('-')[0]]
state.append(zipcode.state.lower())
county.append(address.get('county', '').replace('County', '').strip())
except:
# If there is an exception get state information from the us module
# NEED TO TAKE A CLOSER LOOK AT THIS, AND SEE IF I CAN USE AN IF STATEMENT TO FIND THE PROBLEM, INSTEAD OF USING EXCEPTION
stateinfo = us.states.lookup(address.get('state', '').strip())
try:
# Append state name and county name to the state and county variables
state.append(stateinfo.abbr.lower())
county.append(address.get('county', '').replace('County', '').strip())
except:
# If there is an exception check county information to see if county is Lajas, and if not check to see if county is Mayagüez,
# and if not define county as blank string
# NEED TO TAKE A CLOSER LOOK AT THIS, AND SEE IF I CAN USE AN IF STATEMENT TO FIND THE PROBLEM, INSTEAD OF USING EXCEPTION
countyname = address.get('county', '').replace('County', '').strip()
# If countyname is Lajas define state as pr
if countyname == 'Lajas':
state.append('pr')
county.append(countyname)
else:
# Else if county is not Lajas, check to see if city is Mayagüez, and if it is define state as pr, and append city to county.
# If city is not Mayagüez, then append blank string for state.
city = address.get('city', '').strip()
if city == 'Mayagüez':
state.append('pr')
county.append(city)
else:
state.append('')
county.append(countyname)
print(stateinfo)
print(address)
else:
statename = address.get('state', '').strip()
if len(statename) > 2:
state.append('')
else:
state.append(address.get('state', '').strip())
county.append(address.get('county', '').replace('County', '').strip())
# Append geometry to geom variable
geom.append(getGeometry(lon, lat))
# Add meta to DataFrame
df['gauge_owner'] = 'NOAA/NOS'
df['location_type'] = locationType
df['tz'] = 'gmt'
df['country'] = country
df['state'] = state
df['county'] = county
df['geom'] = geom
df.columns= df.columns.str.lower()
df = df.rename(columns={'name': 'location_name'})
# Reorder columns in DataFrame
newColsOrder = ["station_name","lat","lon","tz","gauge_owner","location_name","location_type","country","state","county","geom"]
df=df.reindex(columns=newColsOrder)
# Return DataFrame
return(df)
# This function queriers the original NCEM station table (db_gages_all), using the getNCEMStations function,
# extracting station information, and returns a dataframe.
def addNCEMMeta(locationType):
# Run the getNCEMStation, which outputs a DataFrame the contains a list of NCEM stations queried from the
# db_gages_all table, which contains the original NCEM station meta data.
df = getNCEMStations(locationType.lower())
# Rename columns
df = df.rename(columns={'latitude':'lat','longitude':'lon','site_id':'station_name',
'name':'location_name','owner': 'gauge_owner'})
# Convert all column name to lower case
df.columns= df.columns.str.lower()
# Add variables to DataFrame
df['tz'] = 'gmt'
df['location_type'] = locationType
df['country'] = 'us'
df['state'] = 'nc'
# Reorder column names and reset index values
newColsOrder = ["station_name","lat","lon","tz","gauge_owner","location_name","location_type","country","state","county"]
df=df.reindex(columns=newColsOrder)
df.reset_index(drop=True, inplace=True)
# Define geometry variable
geom = []
# Loop from the DataFrame of stations, and use the lon and lat values to get the geomtry values, using the getGeometry function
for index, row in df.iterrows():
geom.append(getGeometry(row['lon'], row['lat']))
# Add geometry value to DataFrame
df['geom'] = geom
# Return DataFrame
return(df)
# Main program function takes args as input, which contains the outputDir, and outputFile values.
@logger.catch
def main(args):
# Add logger
logger.remove()
log_path = os.getenv('LOG_PATH', os.path.join(os.path.dirname(__file__), 'logs'))
logger.add(log_path+'/createIngestStationMeta.log', level='DEBUG')
# Extract args variables
outputDir = args.outputDir
outputFile = args.outputFile
# Get dataset and location type
dataset = outputFile.split('_')[0]
locationType = outputFile.split('_')[2]
# Check if dataset is noaa, contrails
if dataset == 'noaa':
# If dataset is noaa run the addNOAAMeta function and write output to csv file
logger.info('Start processing NOAA stations.')
df = addNOAAMeta(locationType)
df.to_csv(outputDir+'geom_'+outputFile, index=False)
logger.info('Finnished processing NOAA stations.')
elif dataset == 'contrails':
# If dataset is contrails run the addNCEMMeta function and write output to csv file
logger.info('Start processing Contrails stations.')
df = addNCEMMeta(locationType)
df.to_csv(outputDir+'geom_'+outputFile, index=False)
logger.info('Finished processing Contrails stations.')
else:
# If dataset is not noaa or contrails exit program with error message.
logger.add(lambda _: sys.exit(1), level="ERROR")
# Run main function takes outputDir, and outputFile as input.
if __name__ == "__main__":
""" This is executed when run from the command line """
parser = argparse.ArgumentParser()
# Optional argument which requires a parameter (eg. -d test)
parser.add_argument("--outputDir", action="store", dest="outputDir")
parser.add_argument("--outputFile", action="store", dest="outputFile")
# Parse arguments
args = parser.parse_args()
# Run main
main(args)