-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy patharm_climatology.py
139 lines (119 loc) · 6.53 KB
/
arm_climatology.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
"""
ARM Climatologies
-----------------
Process for reading in ARM datastreams, applying QC and DQRs
and producing monthly/yearly averages in csv files
Author: Adam Theisen
"""
import act
print(act.__file__)
import glob
import numpy as np
from datetime import datetime
import pandas as pd
import dask
from act.utils.data_utils import DatastreamParserARM
def process_data(site, ds, y, variable, averaging):
#if int(y) == int(datetime.now().year):
# return
files = glob.glob('./data/'+ds+'/'+ds+'.'+y+'*')
if int(y) == int(datetime.now().year):
return
files.sort()
#obj = act.io.arm.read_arm_netcdf(files, compat='override', coords='minimal')
obj = act.io.arm.read_arm_netcdf(files, coords='minimal')
if variable == 'temp_mean':
obj = act.qc.arm.add_dqr_to_qc(obj, variable=variable, exclude=['D160215.4'])
else:
obj = act.qc.arm.add_dqr_to_qc(obj, variable=variable)
#if 'ecor' in ds:
# r = [129, 265]
# obj = obj.where((obj['wind_dir'].values < r[1]) & (obj['wind_dir'].values > r[0]))
try:
obj = obj.where(obj['qc_'+variable] == 0)
except:
pass
# For 1 min precip rates
#data = obj[variable].values / 60.
#obj[variable].values = data
# Produce specified averages and print out to a file
count = obj.resample(time=averaging, skipna=True).count()
if 'precip' in variable:
obj = obj.resample(time=averaging, skipna=True).sum() # For precipitation accumulation
else:
obj = obj.resample(time=averaging, skipna=True).mean()
data = []
for i in range(len(obj['time'].values)):
if averaging == 'Y':
time = str(pd.to_datetime(obj['time'].values[i]).year) + '-01-01T00:00:00.000000000'
if averaging == 'M':
time = str(pd.to_datetime(obj['time'].values[i]).year) + '-' + str(pd.to_datetime(obj['time'].values[i]).month).zfill(2) + '-01T00:00:00.000000000'
if (obj['time'].values[i].astype('datetime64[Y]').astype(int) + 1970) == int(y):
data.append([time, str(obj[variable].values[i]), str(count[variable].values[i])])
obj.close()
return data
# Set up the datastream, variable name and averaging interval
# Averaging interval based on xarray resample (M=Month, Y=Year)
ds_dict = {
#'sgpmetE1.b1': {'variables': ['temp_mean', 'rh_mean'], 'averaging': ['Y', 'M']},
#'sgpmetE3.b1': {'variables': ['temp_mean', 'rh_mean'], 'averaging': ['Y', 'M']},
#'sgpmetE4.b1': {'variables': ['temp_mean', 'rh_mean'], 'averaging': ['Y', 'M']},
#'sgpmetE5.b1': {'variables': ['temp_mean', 'rh_mean'], 'averaging': ['Y', 'M']},
#'sgpmetE6.b1': {'variables': ['temp_mean', 'rh_mean'], 'averaging': ['Y', 'M']},
#'sgpmetE7.b1': {'variables': ['temp_mean', 'rh_mean'], 'averaging': ['Y', 'M']},
#'sgpmetE8.b1': {'variables': ['temp_mean', 'rh_mean'], 'averaging': ['Y', 'M']},
#'sgpmetE9.b1': {'variables': ['temp_mean', 'rh_mean'], 'averaging': ['Y', 'M']},
#'sgpmetE11.b1': {'variables': ['temp_mean', 'rh_mean'], 'averaging': ['Y', 'M']},
#'sgpmetE15.b1': {'variables': ['temp_mean', 'rh_mean'], 'averaging': ['Y', 'M']},
#'sgpmetE20.b1': {'variables': ['temp_mean', 'rh_mean'], 'averaging': ['Y', 'M']},
#'sgpmetE24.b1': {'variables': ['temp_mean', 'rh_mean'], 'averaging': ['Y', 'M']},
#'sgpmetE25.b1': {'variables': ['temp_mean', 'rh_mean'], 'averaging': ['Y', 'M']},
#'sgpmetE27.b1': {'variables': ['temp_mean', 'rh_mean'], 'averaging': ['Y', 'M']},
#'sgpmetE31.b1': {'variables': ['temp_mean', 'rh_mean'], 'averaging': ['Y', 'M']},
#'sgpmetE32.b1': {'variables': ['temp_mean', 'rh_mean'], 'averaging': ['Y', 'M']},
#'sgpmetE33.b1': {'variables': ['temp_mean', 'rh_mean'], 'averaging': ['Y', 'M']},
#'sgpmetE34.b1': {'variables': ['temp_mean', 'rh_mean'], 'averaging': ['Y', 'M']},
#'sgpmetE35.b1': {'variables': ['temp_mean', 'rh_mean'], 'averaging': ['Y', 'M']},
#'sgpmetE36.b1': {'variables': ['temp_mean', 'rh_mean'], 'averaging': ['Y', 'M']},
#'sgpmetE37.b1': {'variables': ['temp_mean', 'rh_mean'], 'averaging': ['Y', 'M']},
#'sgpmetE38.b1': {'variables': ['temp_mean', 'rh_mean'], 'averaging': ['Y', 'M']},
#'sgpmetE39.b1': {'variables': ['temp_mean', 'rh_mean'], 'averaging': ['Y', 'M']},
#'sgpmetE40.b1': {'variables': ['temp_mean', 'rh_mean'], 'averaging': ['Y', 'M']},
#'sgpmetE41.b1': {'variables': ['temp_mean', 'rh_mean'], 'averaging': ['Y', 'M']},
#'nsa60noaacrnX1.b1': {'variables': ['temperature', 'precipitation'], 'averaging': ['Y', 'M']},
#'nsametC1.b1': {'variables': ['temp_mean', 'rh_mean'], 'averaging': ['Y', 'M']},
#'nsamawsC1.b1': {'variables': ['atmospheric_temperature', 'atmospheric_relative_humidity'], 'averaging': ['Y', 'M']},
#'sgpmetE13.b1': {'variables': ['temp_mean', 'rh_mean', 'tbrg_precip_total'], 'averaging': ['Y', 'M']},
'sgpmawsC1.b1': {'variables': ['atmospheric_temperature', 'atmospheric_relative_humidity'], 'averaging': ['Y', 'M']},
#'sgp30ecorE14.b1': {'variables': ['h', 'lv_e', 'k', 'fc'], 'averaging': ['M']},
#'sgp30ecorE14.b1': {'variables': ['lv_e', 'k', 'fc'], 'averaging': ['M']},
#'nsatsiskycoverC1.b1': {'variables': ['percent_opaque', 'percent_thin'], 'averaging': ['Y', 'M']},
}
for ds in ds_dict:
site = ds[0:3]
# Update this path to where your data are
#files = glob.glob('./data/' + ds + '/' + ds + '.*')
files = glob.glob('/data/archive/' + site +'/' + ds + '/' + ds + '.*')
files.sort()
years = [f.split('.')[-3][0:4] for f in files]
years = np.unique(years)
for averaging in ds_dict[ds]['averaging']:
# Open a file to write the results out to and process each year
for variable in ds_dict[ds]['variables']:
print('Processing: ' + ' '.join([ds, variable, averaging]))
f = open('./results/' + ds + '_' + variable + '_' + averaging + '.csv', 'w')
task = []
results = []
for y in years:
task.append(dask.delayed(process_data)(site, ds, y, variable, averaging))
#data = process_data(site, ds, y, variable, averaging)
#results.append(data)
results = dask.compute(*task)
for i, r in enumerate(results):
if r is None:
continue
if len(r) > 1:
for month in r:
f.write(','.join(month) + '\n')
else:
f.write(','.join(r[0]) + '\n')