-
Notifications
You must be signed in to change notification settings - Fork 0
/
createIngestSourceMeta.py
117 lines (94 loc) · 4.35 KB
/
createIngestSourceMeta.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#!/usr/bin/env python
# coding: utf-8
# Import python modules
import argparse, psycopg2, sys, os
import pandas as pd
from psycopg2.extensions import AsIs
from loguru import logger
# This function takes a gauge location type (COASTAL, TIDAL or RIVERS), and uses it to query the drf_gauge_station table,
# and return a list of station id(s), and station names.
def getStationID(locationType):
try:
# Create connection to database and get cursor
conn = psycopg2.connect("dbname='apsviz_gauges' user='apsviz_gauges' host='localhost' port='5432' password='apsviz_gauges'")
cur = conn.cursor()
# Set enviromnent
cur.execute("""SET CLIENT_ENCODING TO UTF8""")
cur.execute("""SET STANDARD_CONFORMING_STRINGS TO ON""")
cur.execute("""BEGIN""")
# Run query
cur.execute("""SELECT station_id, station_name FROM drf_gauge_station
WHERE location_type = %(location_type)s
ORDER BY station_name""",
{'location_type': locationType})
# convert query output to Pandas dataframe
df = pd.DataFrame(cur.fetchall(), columns=['station_id', 'station_name'])
# Close cursor and database connection
cur.close()
conn.close()
# Return Pandas dataframe
return(df)
# If exception print error
except (Exception, psycopg2.DatabaseError) as error:
print(error)
# This function takes a input a directory path and outputFile, and used them to read the input file
# and add station_id(s) that are extracted from the drf_gauge_station table in theapsviz_gauges database.
def addMeta(outputDir, outputFile):
# Extract list of stations from dataframe for query database using the getStationID function
locationType = outputFile.split('_')[2]
df = getStationID(locationType)
# Get source name from outputFilee
source = outputFile.split('_')[0]
# Check if source is ADCIRC, contrails or noaa, and make appropriate additions to DataFrame
if source == 'adcirc':
# Get source_name and data_source from outputFile, and add them to the dataframe along
# with the source_archive value
df['data_source'] = outputFile.split('_')[3].lower()+'_'+outputFile.split('_')[4].lower()
df['source_name'] = source
df['source_archive'] = 'renci'
elif source == 'contrails':
# Add data_source, source_name, and source_archive to dataframe
gtype = outputFile.split('_')[2].lower()
df['data_source'] = gtype+'_gauge'
df['source_name'] = 'ncem'
df['source_archive'] = source
elif source == 'noaa':
# Add data_source, source_name, and source_archive to dataframe
df['data_source'] = 'tidal_gauge'
df['source_name'] = source
df['source_archive'] = source
else:
# If source in incorrect print message and exit
sys.exit('Incorrect source')
# Drop station_name from DataFrame
df.drop(columns=['station_name'], inplace=True)
# Reorder column name and update indeces
newColsOrder = ['station_id','data_source','source_name','source_archive']
df=df.reindex(columns=newColsOrder)
# Write dataframe to csv file
df.to_csv(outputDir+'source_'+outputFile, index=False)
# Main program function takes args as input, which contains the outputDir, and outputFile values.
@logger.catch
def main(args):
# Add logger
logger.remove()
log_path = os.getenv('LOG_PATH', os.path.join(os.path.dirname(__file__), 'logs'))
logger.add(log_path+'/createIngestSourceMeta.log', level='DEBUG')
# Extract args variables
outputDir = args.outputDir
outputFile = args.outputFile
logger.info('Start processing source data for file '+outputFile+'.')
# Run addMeta function
addMeta(outputDir, outputFile)
logger.info('Finished processing source data for file '+outputFile+'.')
# Run main function takes outputDir, and outputFile as input.
if __name__ == "__main__":
""" This is executed when run from the command line """
parser = argparse.ArgumentParser()
# Optional argument which requires a parameter (eg. -d test)
parser.add_argument("--outputDir", action="store", dest="outputDir")
parser.add_argument("--outputFile", action="store", dest="outputFile")
# Parse input arguments
args = parser.parse_args()
# Run main
main(args)