-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprepare_stationdata.py
221 lines (182 loc) · 9.87 KB
/
prepare_stationdata.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
#!/usr/bin/env python3
# -------------------------------------------------------------------
# Downloader for the EUPP Hacky Benchmark Station Dataset
#
# Also works on the European Weather Cloud for data which is not
# publicly available (Switzerland).
#
# Developed on Python version 3.10.4
#
# Authors: Thorsten Simon and Reto Stauffer
# Date: 2022-09-16
# -------------------------------------------------------------------
import sys
import os
import re
import glob
import pickle
import fsspec
import argparse
from zipfile import ZipFile
import xarray as xr
import pandas as pd
import numpy as np
from functions import *
import logging as log
log.basicConfig(level = log.INFO)
def main(args):
"""main(args)
Params
------
args : argparse.Namespace or dict
Parsed argument, object as returned by parse_args().
Must contain 'country' (str), 'param' (str), and 'nocache' (bool).
If it is a dictionary, it will be converted into argparse.Namespace internally.
Return
------
No return, but saves a bunch of files into CSVDIR in the best case.
"""
if isinstance(args, dict): args = argparse.Namespace(**args)
assert isinstance(args, argparse.Namespace), TypeError("argument 'args' must be argparse.Namespace")
for k in ["country", "param", "nocache", "prefix"]:
if not k in args: ValueError(f"option '{k}' not in object 'args'")
assert isinstance(args.prefix, str), TypeError("args.country must be str")
assert isinstance(args.country, str), TypeError("args.country must be str")
assert isinstance(args.param, str), TypeError("args.param must be str")
assert isinstance(args.nocache, bool), TypeError("args.nocache must be bool")
# ---------------------------------------------------------------
# Prevent the script from running again if the final zip file exists
# ---------------------------------------------------------------
final_zip = os.path.join(args.prefix, f"{args.prefix}_{args.param}_{args.country}.zip")
if os.path.isfile(final_zip):
print(f"Final file {final_zip} exists; do not continue (return None)")
return None
# ---------------------------------------------------------------
# Make sure CSVDIR exists
# ---------------------------------------------------------------
if not os.path.isdir(args.prefix):
try: os.makedirs(args.prefix)
except Exception as e: raise Exception(e)
# ---------------------------------------------------------------
# Looping over all stations/steps
# ---------------------------------------------------------------
for reforecast in [True, False]:
# ---------------------------------------------------------------
# Loading data (uses cache file if existing)
# ---------------------------------------------------------------
[fcs, obs] = get_data(args.country, args.param, reforecast, do_cache = not args.nocache)
# ---------------------------------------------------------------
# Fetching station meta if needed
# ---------------------------------------------------------------
ftype = "reforecasts" if reforecast else "forecasts"
station_meta_csv = os.path.join(args.prefix, f"{args.prefix}_{args.param}_{args.country}_stationdata_{ftype}.csv")
if args.nocache or not os.path.isfile(station_meta_csv):
log.info("Extracting station meta data")
station_meta = get_station_meta(fcs, obs)
station_meta.to_csv(station_meta_csv, index = False)
del station_meta # Not used anymore in this script
# ---------------------------------------------------------------
# Time check
# ---------------------------------------------------------------
assert obs.get("step").dtype == np.dtype("<m8[ns]"), TypeError("dimension 'step' not of dtype '<m8[ns]'")
assert obs.get('step').attrs["long_name"] == "time since forecast_reference_time", \
ValueError("dimension 'step' not 'time since forecast_reference_time")
# ---------------------------------------------------------------
# Looping over all stations and lead times/steps
# ---------------------------------------------------------------
for station_id in obs.get("station_id").values:
station_id = int(station_id)
for step in obs.get("step").values:
# Convert forecast step to hours
step_hours = int(step / 1e9 / 3600) # convert to hours
log.info(f"Processing data for station {station_id:5d} {step_hours:+4d}h ahead; {reforecast=}.")
# -----------------------------------
# Define output file name and subset args
csvfile = get_csv_filename(args, station_id, step_hours, reforecast = reforecast)
# Subsetting station/step for data processing
subset = {"station_id": station_id, "step": step}
# -----------------------------------
# Prepare observation data
obs_subset = obs.loc[subset]
# Skip the rest if the data output file exists already
if not args.nocache and os.path.isfile(csvfile): continue # Skip if output file exists
df_obs = obs_subset.rename({args.param: f"{args.param}_obs"}).to_dataframe()[[f"{args.param}_obs"]]
# -----------------------------------
# Prepare forecast data
if "surface" in fcs.coords: subset["surface"] = 0.0
fcs_subset = fcs[["time", "step", args.param]].loc[subset]
df_fcs = fcs_subset[[args.param]].to_dataframe()[[args.param]].unstack("number")
# drop multi-index on columns; rename columns
df_fcs.columns = [f"{args.param}_{x:02d}" for x in df_fcs.columns.droplevel()]
# -----------------------------------
# Update date; only has an effect if we have reforecasts
##vtime = modify_date(valid_time)
log.info("Modify time index")
nyears = None if not "year" in fcs.coords else len(fcs.coords["year"])
df_obs = modify_date(df_obs, step, nyears)
df_fcs = modify_date(df_fcs, step, nyears)
# -----------------------------------
# Calculate ensemble mean and standard deviation (including control run)
tmp_mean = df_fcs.mean(axis = 1).to_frame("ens_mean")
tmp_std = df_fcs.std(axis = 1).to_frame("ens_sd")
# -----------------------------------
# Extract valid time, append julian day (0-based; 0 = January 1th)
yday = [int(x.strftime("%j")) - 1 for x in df_fcs.index]
yday = pd.DataFrame({"yday": yday}, index = df_fcs.index)
# -----------------------------------
# Combine valid time, observation, ensemble mean and standard deviation
# as well as the individual forecasts
assert df_fcs.shape[0] == df_obs.shape[0], Exception("number of rows of df_fcs and df_obs differ")
assert df_fcs.shape[0] == yday.shape[0], Exception("number of rows of df_fcs and yday differ")
data = pd.concat([yday, df_obs, tmp_mean, tmp_std, df_fcs], axis = 1)
##log.info(f"Writing final data set to {csvfile} now")
data.to_csv(csvfile)
del tmp_mean, tmp_std, yday, csvfile
del subset, data, df_fcs, df_obs
# ---------------------------------------------------------------
# All stations processes
# ---------------------------------------------------------------
log.info(f"All stations processed for {args.country}, {args.param}, create zip file")
# Find files to be zipped
keepwd = os.getcwd()
os.chdir(os.path.dirname(final_zip))
pattern = re.compile(f"{args.prefix}_{args.param}_{args.country}_.*\.csv$")
files = []
for f in glob.glob("*"):
if pattern.match(f): files.append(f)
files.sort()
try:
with ZipFile(os.path.basename(final_zip), "w") as fid:
for f in files:
fid.write(f) # Store in zip
except:
log.error("Problems with zipping do not delete files")
if os.path.isfile(final_zip): os.remove(final_zip)
finally:
log.info("Zip file created, delete source files")
for f in files: os.remove(f)
os.chdir(keepwd)
# -------------------------------------------------------------------
# Main part of the Script
# -------------------------------------------------------------------
if __name__ == "__main__":
# ---------------------------------------------------------------
# Parsing console arguments
# ---------------------------------------------------------------
parser = argparse.ArgumentParser(f"{sys.argv[0]}")
parser.add_argument("-c", "--country",
choices = ["germany", "france", "netherlands", "switzerland", "austria"],
type = str.lower, default = "germany",
help = "Name of the country to be processed.")
parser.add_argument("-p", "--param", type = str.lower, default = "t2m",
help = "Name of the parameter to be processed.")
parser.add_argument("--prefix", type = str, default = "euppens",
help = "Used as name of the output directory for the results as well as prefix for all files created by this script.")
parser.add_argument("-n", "--nocache", action = "store_true", default = False,
help = "Disables auto-caching zarr file content (stored as pickle files). Defaults to 'False' (will do caching). Also forces all files to be recreated.")
args = parser.parse_args()
if not args.country:
parser.print_help()
raise ValueError("argument -c/--country not set (has no default)")
# Start downloading
main(args)