-
Notifications
You must be signed in to change notification settings - Fork 0
/
workflow.py
835 lines (609 loc) · 37.9 KB
/
workflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
#!/usr/bin/env python
# coding: utf-8
# # Overview
# The **objective** of this notebook is to present the workflow of conducting data assimilation on [PFLOTRAN](https://www.pflotran.org/) by using [DART](https://www.image.ucar.edu/DAReS/DART/). Briefly, the procedures are as follows:
# - [x] [Configuration](#parameter): define directories, file locations, and other parameters
# - [x] [PFLOTRAN preparation](#pflotran_prepare): generate PFLOTRAN input files
# - [x] [PFLOTRAN model spin-up](#pflotran_spinup): conduct model spin-up
# - [x] [DART files preparation](#dart_prepare): add new DART quantities, prepare DART input namelists, prepare DART prior data, prepare observations in DART format, and check ```model_mod``` interface
# - [x] [Generate all the executable files](#dart_executables): generate all the executables, convert observations in DART format, check ```model_mod``` interface, and test the data assimilation engine
# - [x] [Run DART and PFLOTRAN](#run_dart_pflotran): run the shell script for integrating DART and PFLOTRAN model
#
# Here, we perform inverse modeling on a 1D thermal model for illustration. The model assimilates temperature observation to update its parameters (i.e., flow flux).
# <a id='parameter'></a>
# # Step 1: Configuration
# In[ ]:
import os
import re
import sys
import time
import shutil
import pickle
import f90nml
import subprocess
import numpy as np
from math import floor, ceil
from datetime import datetime, timedelta
# ****************
# **Define the locations of MPI, PFLOTRAN, application folder and DART-PFLOTRAN interface folder**
#
# It is suggested that <span style="background-color:lightgreen">mpi_exe</span> is defined based on the mpi utility (e.g., mpirun) installed by PFLOTRAN.
#
# **Important:** You must make sure the <span style="background-color:lightgreen">mpi_exe</span> has the same MPI system as the settings ```MPIFC``` and ```MPILD``` in ```mkmf.template```.
# In[ ]:
# MPI settings
mpi_exe_da = 'srun'
mpi_exe_pf = 'srun'
ncore_da = 100 # The number of MPI cores used by DART
ncore_pf = 6100 # The number of MPI cores used by PFLOTRAN
ngroup_pf= 100 # The number of group used by stochastic running in PFLOTRAN
# PFLOTRAN executable
pflotran_exe = '/global/project/projectdirs/m1800/pin/pflotran/pflotran-dev/src/pflotran/bin/pflotran-dev'
# Main directory names
temp_app_dir = "/global/project/projectdirs/m1800/peishi/300A_analysis/Richards_av_std1.5" # The template for application folder
app_dir = "/global/cscratch1/sd/peishi89/300A-analysis/Richards_av_std1.5_per15days" # The template for application folder
dart_dir = "/global/homes/p/peishi89/DART/manhattan"
dart_pf_dir = "/global/homes/p/peishi89/DART/manhattan/models/pflotran" # The dart pflotran utitlity folder name
# configs = {}
configs = f90nml.namelist.Namelist()
configs["main_dir_cfg"] = {"app_dir": app_dir, "dart_dir": dart_dir, "dart_pf_dir": dart_pf_dir}
configs["exe_cfg"] = {"pflotran_exe": pflotran_exe, "mpi_exe_da": mpi_exe_da, "mpi_exe_pf": mpi_exe_pf,
"ncore_pf": ncore_pf, "ncore_da": ncore_da, "ngroup_pf": ngroup_pf}
# ****************
# **Generate the application directory if it does not exit**
# In[ ]:
# Create the application directory if it does not exists
if not os.path.isdir(app_dir):
shutil.copytree(temp_app_dir, app_dir)
# ****************
# **Load all the required file paths/names from ```file_paths.nml```**
#
# ```file_paths.nml``` defines the relative locations of all the files (e.g., utility files, shell scripts, data files, etc) used by this application
# In[ ]:
sys.path.append(dart_pf_dir)
from utils.read_filepaths_nml import read_filepaths_nml
dirs_cfg, files_cfg = read_filepaths_nml(app_dir=app_dir, dart_pf_dir=dart_pf_dir)
# ****************
# **Set file saving options**
# - keep_each_ens_file: whether each ensemble nc file read and generated by DART is kept when the DA ends.
# - save_immediate_mda_result: whether the results of each MDA iteration is kept when the DA ends
# In[ ]:
files_cfg["keep_each_ens_file"] = True # Whether each ensemble nc file is kept when the DA ends
files_cfg["save_immediate_mda_result"] = True # Indicate whether the results of each MDA iteration will be saved
config_file = files_cfg["config_file"]
# ****************
# **Clean up the folders under the app_dir**
# In[ ]:
pflotran_in_dir = dirs_cfg["pflotran_in_dir"]
pflotran_out_dir = dirs_cfg["pflotran_out_dir"]
dart_inout_dir = dirs_cfg["dart_data_dir"]
# subprocess.run("cd {}; rm *".format(pflotran_in_dir), shell=True, check=False)
subprocess.run("cd {}; rm -f *".format(pflotran_out_dir), shell=True, check=True)
subprocess.run("cd {}; ls | xargs rm".format(dart_inout_dir), shell=True, check=False)
# subprocess.run("cd {}; ls | xargs -r rm".format(dart_inout_dir), shell=True, check=True)
# ****************
# **Define PFLOTRAN-related and observation files**
# - pflotran_para_file: file name for storing multiple realizations of parameters in HDF5 following [cell index option.](https://www.pflotran.org/documentation/user_guide/cards/process_model_cards/dataset_card.html?highlight=parameter#cell-indexed-datasets) (string)
# - material_id_file: the PFLOTRAN mesh file in HDF5 (string)
# - obs_nc_original_file: the original observation file in netCDF format (string)
# - obs_nc_file: the clipped observation file (only including the spatial and temporal datasets of interest) in netCDF format (string)
# - other pflotran output and input file name and format options
# In[ ]:
# PFLOTRAN parameter file and material id file
files_cfg["pflotran_para_file"] = os.path.join(pflotran_in_dir, "parameter_prior_av_std1.5.h5")
files_cfg["material_id_file"] = os.path.join(pflotran_in_dir, "AT-mesh-alluvium-rivershape.h5")
# Observation file
files_cfg["obs_nc_original_file"] = os.path.join(pflotran_in_dir, "SFA_300a_all_2017-2019.nc")
files_cfg["obs_nc_file"] = os.path.join(pflotran_in_dir, "SFA_300a_clipped.nc")
# PFLOTRAN output, where the state prior can be read from
# dependending on the OUTPUT card, it can be from the observation file or from snapshot file
files_cfg["pflotran_in_file"] = os.path.join(pflotran_in_dir, "AT-model-ensemble.in")
files_cfg["pflotran_out_prefix"] = "pflotran"
files_cfg["pflotran_out_file"] = os.path.join(pflotran_out_dir, "pflotranR[ENS].h5")
files_cfg["pflotran_restart_file"] = os.path.join(pflotran_out_dir, "pflotranR[ENS]-restart.h5")
files_cfg["pflotran_log_file"] = os.path.join(pflotran_out_dir, "pflotranR[ENS].out")
files_cfg["pflotran_obs_file"] = os.path.join(pflotran_out_dir, "pflotranR[ENS]-obs[ANY].tec")
files_cfg["use_obs_tecfile_for_prior"] = True
configs["other_dir_cfg"] = dirs_cfg
configs["file_cfg" ] = files_cfg
# ****************
# **Specify the following types of variables used in DART**
# - obs_set: the observation data to be assimilated (corresponding to the variable names in obs_nc_original_file) (list of string)
# - obs_pflotran_set: the corresponding observation variable name in PFLOTRAN (list of string)
# - para_set: the PFLOTRAN parameters to be analyzed (list of string)
# - para_homogeneous: whether the parameters are homogeneous or heterogeneous (bool)
# - para_material_id_set: the corresponding material IDs in PFLOTRAN (defined in both input deck and material_id_file) (list of integer)
# - para_hdf_dataset_name_set: the DATASET name in pflotran_para_file (list of string)
# - para_isotropic_set: whether the parameter is isotropic (list of bool)
# - para_anisotropic_ratio_set: the anisotropic ratio between horizontal and vertical directions, less than one (list of float)
# - para_resampled_set: the set of parameters whose prior would be sampled based on the posteor at the previous time step (list of string)
# - para_sample_method_set: the sampling approach (list of string)
# - the statistics of the parameters: (1) the value range (i.e., para_min_set, para_max_set); (2) the mean and std for randomly sampling (i.e., para_mean_set, para_std_set).
# - para_take_log_set: whether the parameters should be estimated based on its original value or log value (list of bool)
# - use_default_para_initial: whether to use the default parameter ensemble as the initial before assimilation starts (bool)
# In[ ]:
# Observation data to be assimilated
# Note that the names should be identical to the variable names in the corresponding netCDF file
obs_set = ['WATER_LEVEL_SENSOR', 'NORMALIZED_SPC_SENSOR']
# The corresponding observation variable in PFLOTRAN
# Note that if it is WATER_LEVEL, it is converted from LIQUID_PRESSURE output from PFLOTRAN;
# if it is SPC, it is converted from the river tracer output from PFLOTRAN.
obs_pflotran_set = ['WATER_LEVEL', 'GROUNDWATER_TRACER']
# The PFLOTRAN parameters to be analyzed
# para_set = ['FLOW_FLUX','POROSITY','THERMAL_CONDUCTIVITY']
para_set = ['HANFORD_PERMEABILITY', 'ALLUVIUM_PERMEABILITY']
# para_set = ['PERMEABILITY']
para_homogeneous = False
if not para_homogeneous:
para_material_id_set = [1, 5] # the corresponding material IDs in PFLOTRAN; if multiple, then their sampling methods are different
para_hdf_dataset_name_set = ['perm', 'perm'] # the corresponding dataset name in parameter prior h5 file
para_isotropic_set = [False, True] # the corresponding indication of whether they are isotropic
para_anisotropic_ratio_set = [1, 0.1] # the corresponding indication of whether they are isotropic
else:
para_material_id_set = [None, None]
para_hdf_dataset_name_set = para_set
para_isotropic_set = [False, False]
para_anisotropic_ratio_set = [1, 1]
# Parameter sampling approach from posterior to prior
para_resampled_set = ['HANFORD_PERMEABILITY', 'ALLUVIUM_PERMEABILITY'] # The parameters to be resampled at each time step
# para_resampled_set = [''] # The parameters to be resampled at each time step
para_sample_method_set = ["SGS", "SGS"] # the parameter sampling method, including: SGS, US, SIS, rescale, normal, lognormal, truncated_normal, uniform
# The statistics of the parameters
# The index order follows para_set
para_min_set = [-12, -18] # The minimum values (-99999 means no lower bound limit)
para_max_set = [-4, -4] # The maximum values (99999 means no upper bound limit)
para_mean_set = [None, None] # The mean values
para_std_set = [None, None] # The standard deviation values
# Others
para_take_log_set = [True, True] # Indicate whether the parameters should be estimated based on its original value or log value
use_default_para_initial = True # Indicate whether using the default parameter ensemble as initial
configs["obspara_set_cfg"] = {"obs_set": obs_set, "obs_pflotran_set": obs_pflotran_set,
"para_set": para_set, "para_isotropic_set": para_isotropic_set,
"para_material_id_set": para_material_id_set, "para_hdf_dataset_name_set": para_hdf_dataset_name_set,
"para_take_log_set": para_take_log_set, "use_default_para_initial":use_default_para_initial,
# "para_dist_set": para_dist_set, "para_prior_rescaled_set": para_prior_rescaled_set,
"para_homogeneous": para_homogeneous, "para_anisotropic_ratio_set": para_anisotropic_ratio_set,
"para_min_set": para_min_set, "para_max_set": para_max_set,
"para_mean_set": para_mean_set, "para_std_set": para_std_set,
"para_sample_method_set": para_sample_method_set, "para_resampled_set": para_resampled_set}
# ****************
# **Specify the spatial domains of the observation data to be assimilated**
#
# The limits of x, y, and z are bounded by the minimum and maximum boundaries through (min, max). If the limit is not specified, -99999 and 99999 are assigned for the lower and upper bounds, respectively.
# In[ ]:
obs_space_xlimit = [-99999, 99999]
obs_space_ylimit = [-99999, 99999]
obs_space_zlimit = [-99999, 99999]
configs["obs_space_cfg"] = {"obs_space_xlimit": obs_space_xlimit, "obs_space_ylimit": obs_space_ylimit,
"obs_space_zlimit": obs_space_zlimit}
# ****************
# **Specify the data assimilation time window**
# - assim_window_fixed: whether the assimilation window is fixed (bool)
# - assim_window_days: the number of days for assimilation window if assim_window_fixed is true (integer)
# - assim_window_seconds: the number of remaining seconds for assimilation window if assim_window_fixed is true (integer)
# - assim_window_size: the assimilation window size (in fractional days) if assim_window_fixed is true (float)
# - assim_window_list: the list of assimilation windows if assim_window_fixed is false (list of float)
# - use_para_initial_at_nth_window: employ the intial parameter ensemble at the nth assimilation window; is negative if unused (integer)
# In[ ]:
da_cfg = {}
# Assimilation time window time_step_days+time_step_seconds
# Assimilation window
da_cfg["assim_window_fixed"] = True # whether the assimilation window is fixed (day)
da_cfg["assim_window_days"] = 0 # assimilation time window/step (day)
da_cfg["assim_window_seconds"] = 15*24*3600 # assimilation time window/step (second)
# da_cfg["assim_window_seconds"] = 30*24*3600 # assimilation time window/step (second)
# da_cfg["assim_window_seconds"] = 24*3600 # assimilation time window/step (second)
da_cfg["assim_window_size"] = da_cfg["assim_window_days"]+float(da_cfg["assim_window_seconds"])/86400. # day
first_assim_window_size = da_cfg["assim_window_size"]
# da_cfg["assim_window_list"] = [2., (4000.*60+10)/86400.] # day
da_cfg["use_para_initial_at_nth_window"] = -1 # Use the initial parameter ensemble at the nth assimilation window (unused if negative)
# first_assim_window_size = iada_cfg["assim_window_list"][0]
# ****************
# **Specify the temporal information**
# - spinup_length_days: the number of days for model spinup (integer)
# - spinup_length_seconds: the remaining seconds for model spinup (integer)
# - spinup_length: the overall spinup time in fractional days (float)
# - is_spinup_done: whether the model spinup is conducted (bool)
# - current_model_time: the current model time and the first one would be the sum of spinup_length and harf of the first assimilation window size, in fractional days (float)
# - model_time_list: the list of the model time (list of float)
# - model_start and assim_start: the start of assimilation time in calendar time (string)
# - first_obs_time_days: the time of the first observation to be assimilated in day (integer)
# - first_obs_time_seconds: the time of the first observation to be assimilated in second (integer)
# - first_obs_time: the time of the first observation to be assimilated in fractional day (float)
# - last_obs_time_days: the time of the last observation to be assimilated in day (integer)
# - last_obs_time_seconds: the time of the last observation to be assimilated in second (integer)
# - last_obs_time: the time of the last observation to be assimilated in fractional day (float)
# - exceeds_obs_time: whether the current model time exceeds the last observation time, indicating whether the assimilation is done (bool)
#
# **note that** model start time is considered after the spinup
# In[ ]:
time_cfg = {}
one_sec = 1./86400. # fraction of day
# Model spinup length
time_cfg["spinup_length_days"] = 360 # number of days for spinup
time_cfg["spinup_length_seconds"] = 0 # number of seconds for the remaining spinup
time_cfg["spinup_length"] = time_cfg["spinup_length_days"]+float(time_cfg["spinup_length_seconds"])/86400. # spinup time (day)
time_cfg["is_spinup_done"] = True # whether spinup is conducted
# Model start time
time_cfg["current_model_time"] = time_cfg["spinup_length"] + (first_assim_window_size + one_sec)/2 # model start time should be the middle of the first assimilation window (after spinup)
time_cfg["model_time_list"] = [time_cfg["current_model_time"]] # the list of model time
# Map between observation assimilation start time and model start time
obs_start = datetime(2017,1,1,0,0,0) + timedelta(days=time_cfg["spinup_length_days"], seconds=time_cfg["spinup_length_seconds"])
# assim_start = obs_start + timedelta(days=time_cfg["spinup_length"]) # assimilation time should be after the model spinup
assim_start = obs_start # assimilation time should be after the model spinup
time_cfg["model_start"] = obs_start.strftime("%Y-%m-%d %H:%M:%S")
time_cfg["assim_start"] = assim_start.strftime("%Y-%m-%d %H:%M:%S")
# The maximum time for observation
time_cfg["first_obs_time_days"] = time_cfg["spinup_length_days"]
time_cfg["first_obs_time_seconds"] = time_cfg["spinup_length_seconds"]
time_cfg["first_obs_time_size"] = time_cfg["first_obs_time_days"]+float(time_cfg["first_obs_time_seconds"])/86400. # day
if da_cfg["assim_window_fixed"]:
time_cfg["last_obs_time_days"] = time_cfg["first_obs_time_days"]
time_cfg["last_obs_time_seconds"] = time_cfg["first_obs_time_seconds"] + 2*da_cfg["assim_window_seconds"]
# time_cfg["last_obs_time_seconds"] = time_cfg["first_obs_time_seconds"] + 3600*24*31
else:
total_time = np.sum(da_cfg["assim_window_list"])
time_cfg["last_obs_time_days"] = time_cfg["first_obs_time_days"] + floor(total_time)
time_cfg["last_obs_time_seconds"] = time_cfg["first_obs_time_seconds"] + int((total_time-floor(total_time)*86400))
time_cfg["last_obs_time_size"] = time_cfg["last_obs_time_days"]+float(time_cfg["last_obs_time_seconds"])/86400. # day
# Whether the model time exceeds the last observation
time_cfg["exceeds_obs_time"] = time_cfg["current_model_time"] >= time_cfg["last_obs_time_size"]
# Save them to configs
configs["time_cfg"] = time_cfg
# ****************
# **Config the data assimilation**
# - nens: number of ensemble members (integer)
# - obs_error: the error of all observations in obs_set (list of float)
# - obs_error_type: the type of observation error (i.e., 'relative' and 'absolute') (string)
# - assim_start_days: the start time of the current assimilation window in day (integer)
# - assim_start_seconds: the remaining seconds of the start time of the current assimilation window (integer)
# - assim_end_days: the end time of the current assimilation window in day (integer)
# - assim_end_seconds: the remaining seconds of the end time of the current assimilation window (integer)
# - ntimestep: the number of time steps or assimilation windows (integer)
# - obs_nes_posterior_from_model: whether the observation posterior should be recomputed from the model output based on the parameter posterior (bool)
#
# In[ ]:
# Observation error, number of ensembles
da_cfg["nens"] = 100 # number of ensembles
da_cfg["obs_error"] = [0.001, 0.05] # the observation error (with the order corresponding to the obs_set)
da_cfg["obs_error_type"] = ["relative", "relative"] # the type of observation error (i.e., relative and absolute)
# The start and end time of the current assimilation time window
da_cfg["assim_start_days"] = int(floor(time_cfg["current_model_time"] - first_assim_window_size/2))
da_cfg["assim_start_seconds"] = int((time_cfg["current_model_time"] - first_assim_window_size/2 - da_cfg["assim_start_days"])*86400+1)
da_cfg["assim_end_days"] = int(floor(time_cfg["current_model_time"] + first_assim_window_size/2))
da_cfg["assim_end_seconds"] = int((time_cfg["current_model_time"] + first_assim_window_size/2 - da_cfg["assim_end_days"])*86400)
# Compute the number of time steps
# print(np.ceil((time_cfg["last_obs_time_size"] - time_cfg["first_obs_time_size"]) / da_cfg["assim_window_size"]))
if da_cfg["assim_window_fixed"]:
da_cfg["ntimestep"] = ceil((time_cfg["last_obs_time_size"] - time_cfg["first_obs_time_size"]) / da_cfg["assim_window_size"])
else:
da_cfg["ntimestep"] = len(da_cfg["assim_window_list"])
# Decide whether the observation posterior should be recomputed from the model output based on the parameter posterior
da_cfg["obs_ens_posterior_from_model"] = True
# TODO: remove this once this issue is fixed in PFLOTRAN
# Check whether the number of realizations are divisible by the number of cores used for running PFLOTRAN
if ncore_pf % da_cfg["nens"] != 0:
raise Exception("the number of realizations {} are divisible by the number of cores {} used for running PFLOTRAN".format(da_cfg["nens"],ncore_pf))
# ****************
# **Config the MDA settings**
# - enks_mda_alpha: the list of MDA coefficient with the summation of their inverse equal to one (list of float)
# - enks_mda_iteration_step: the current iteration step, staring from one (integer)
# - enks_mda_total_iterations: the total iterations (integer)
# In[ ]:
# The inflation settings used in EnKS-MDA (the alpha value)
da_cfg["enks_mda_alpha"] = [4., 4., 4., 4.] # Note that the summation of the inverse of alpha should be one
# da_cfg["enks_mda_alpha"] = [3., 3., 3.] # Note that the summation of the inverse of alpha should be one
# da_cfg["enks_mda_alpha"] = [2., 2.] # Note that the summation of the inverse of alpha should be one
# da_cfg["enks_mda_alpha"] = [1.] # Note that the summation of the inverse of alpha should be one
da_cfg["enks_mda_iteration_step"] = 1 # the ith iteration (1 for the first iteration)
da_cfg["enks_mda_total_iterations"] = len(da_cfg["enks_mda_alpha"]) # Note that the summation of the inverse of alpha should be one
# Check whether the sum of the inverse of enks_mda_alpha is one
alpha_inv_sum = sum([1./alpha for alpha in da_cfg["enks_mda_alpha"]])
if alpha_inv_sum - 1 > 1e-8:
raise Exception("The sum of the inverse of enks_mda_alpha should be one!")
# Save them to configs
configs["da_cfg"] = da_cfg
# ****************
# **Save all the configurations in pickle**
# In[ ]:
configs.write(config_file, force=True)
# <a id='pflotran_prepare'></a>
# # Step 2: PFLOTRAN preparation
# *Here, we use Kewei's 1D thermal model as an example for generating PFLOTRAN input card and parameter.h5.*
#
# In this section, the following procedures are performed:
# - generate PFLOTRAN input deck file ```PFLOTRAN.in```
# - generate the parameter files in HDF 5, ```parameter_prior.h5```, used by PFLOTRAN input deck file
#
# **Note that**
# - ```PFLOTRAN.in``` for each DA scenario should be prepared by users.
# In[ ]:
prep_pflotran_inputdeck, pflotran_in = files_cfg["prep_pflotran_inputdeck_file"], files_cfg["pflotran_in_file"]
prep_pflotran_parameterprior = files_cfg["prep_pflotran_para_file"]
# ****************
# **Generate the ensembles of PFLOTRAN prior**
#
# **Run code**
# - Run: ```prepare_pflotran_parameterprior.py```
# In[ ]:
# get_ipython().run_cell_magic('script', 'bash -s "$prep_pflotran_parameterprior" "$config_file"', 'python $1 $2')
print("\n")
print("------------------------------------------------------------")
print("Prepare PFLOTRAN ensemble parameter values...")
subprocess.run("python {} {}".format(prep_pflotran_parameterprior, config_file), shell=True, check=True)
# ****************
# **Generate PFLOTRAN input deck file**
#
# **Run code**
# - Run: ```prepare_pflotran_inputdeck.py```
# In[ ]:
# print("\n")
# print("------------------------------------------------------------")
# print("Prepare PFLOTRAN input deck...")
# subprocess.run("python {} {}".format(prep_pflotran_inputdeck, config_file), shell=True, check=True)
# <a id='pflotran_spinup'></a>
# # Step 3: PFLOTRAN model spin-up
# Take in the ```pflotran.in``` and ```parameter.h5``` files and conduct the model spin-up by running ```pflotran.sh``` file. The ```pflotran.sh``` is a simple shell script executing ensemble simulation of PFLOTRAN by using MPI.
#
# **Run the code**
# - Run: ```run_pflotran.py```
#
# In[ ]:
# pflotran_sh, pflotran_out_dir = files_cfg["pflotran_sh_file"], dirs_cfg["pflotran_out_dir"]
run_pflotran, pflotran_out_dir = files_cfg["run_pflotran_file"], dirs_cfg["pflotran_out_dir"]
# In[ ]:
print("\n")
print("------------------------------------------------------------")
print("Model spinup...")
if time_cfg['spinup_length'] != 0 and not configs["time_cfg"]["is_spinup_done"]:
subprocess.run("python {} {}".format(run_pflotran, config_file), shell=True, check=True)
else:
print("Model spinup is already done or not required.")
# ****************
# **Once the model spinup finishes, modify the corresponding configuration entry**
# In[ ]:
configs["time_cfg"]["is_spinup_done"] = True
configs.write(config_file, force=True)
# <a id='dart_prepare'></a>
# # Step 4: DART files preparation
# In this section, the following procedures are performed:
# - [TO BE REVISED]: generate the template for DART generic variable quantity files (i.e., ```DEFAULT_obs_kind_mod.F90``` and ```obs_def_pflotran_mod.f90```);
# - generate the DART input namelists;
# - generate DART prior NetCDF data ```prior_ensemble_[ENS].nc``` from PFLOTRAN's parameter and outputs;
# - generate DART posterior NetCDF files (*sharing the same variable names and dimensions as the prior NetCDF files but without the data values*);
# - convert the observation file to DART observation format;
# - check ```model_mod.F90``` based on current setting by using the ```check_model_mod``` provided by DART.
# <a id='dart_generic_prepare'></a>
# ## Generate the templates for DART generic variable quantity files
# - Run: ```list2dartqty.py``` to sequentially generate/modify
# - a mapping between PFLOTRAN variales and DART generic quantities in ```obs_def_pflotran_mod.F90```
# - the default DART generic quantity definition file ```DEFAULT_obs_kind_mod.F90```
#
# In[ ]:
to_dartqty, obs_type_file = files_cfg["to_dartqty_file"], files_cfg["obs_type_file"]
# In[ ]:
# get_ipython().run_cell_magic('script', 'bash -s "$to_dartqty" "$config_file"', 'python $1 $2 $3 $4')
print("\n")
print("------------------------------------------------------------")
print("Add PFLOTRAN variables to DEFAULT_obs_kind_mod.F90 if necessary...")
subprocess.run("python {} {}".format(to_dartqty, config_file), shell=True, check=True)
# ## Generate DART input namelists in ```input.nml```
#
# The ```input.nml``` file is generated based on a template ```input.nml.template``` by modifying the following namelist entries:
#
# ```input.nml.template``` $\rightarrow$ ```input.nml```
#
# **Namelists from DART**
# - [obs_kind_nml](https://www.image.ucar.edu/DAReS/DART/Manhattan/assimilation_code/modules/observations/obs_kind_mod.html#Namelist): namelist for controling what observation types are to be assimilated
# - [preprocess_nml](https://www.image.ucar.edu/DAReS/Codes/DART/manhattan/assimilation_code/programs/preprocess/preprocess): namelist of the DART-supplied preprocessor program which creates observation kind and observation definition modules from a set of other specially formatted Fortran 90 files
# - and others (see DART documentation for details.)
#
# **Self-defined namelists**
# - smoother_nml: a self-defined namelist of the main module for driving ensemble data assimilations modified from DART [filter_nml](https://www.image.ucar.edu/DAReS/DART/Manhattan/assimilation_code/modules/assimilation/filter_mod.html#Namelist)
# - model_nml: a self-defined namelist for providing the basic information in the model
# - template_file: the template prior NetCDF file for ```model_mod.F90``` to digest the spatial information of the model (string)
# - nvar_para: the number of parameter variables (integer)
# - para_var_names: the original variable names for model parameters (list of string)
# - para_var_qtynames: the corresponding DART variable quantities for model parameters (list of string)
# - nvar_state: the number of state variables (integer)
# - state_var_names: the original variable names for model states (list of string)
# - state_var_qtynames: the corresponding DART variable quantities for model states (list of string)
# - debug: whether the debug mode is turned on (bool)
# - max_time_diff_seconds: the maximum time difference allowed for finding the closest ensemble for observations in second (integer)
#
# - convertnc_nml: a self-defined namelist for providing the NetCDF observation file name and the DART observation file name used in ```convert_nc.f90```
# - netcdf_file: the location of the NetCDF file containing the observation data (string)
# - out_file: the location of the DART observation file (string)
#
# **Note that**
# - There are more namelists or other items in the above namelist in input.nml.template. Users can edit the below python dictionary ```inputnml``` to include their modifications.
# - Users can also include more namelists provided by DART by modifying ```inputnml```.
# ***************
# **Assemble all the namelists in input.nml**
# In[ ]:
# Parameters for different namelists in input.nml
smoother_nml = {"input_state_file_list":files_cfg["dart_input_list_file"],
"output_state_file_list":files_cfg["dart_output_list_file"],
"ens_size":da_cfg["nens"],
"num_output_state_members":da_cfg["nens"],
"obs_sequence_in_name":files_cfg["obs_dart_file"],
"first_obs_days": da_cfg["assim_start_days"],
"first_obs_seconds": da_cfg["assim_start_seconds"],
"last_obs_days": da_cfg["assim_end_days"],
"last_obs_seconds": da_cfg["assim_end_seconds"],
"output_mean": False,
"output_sd": False}
obs_kind_nml = {"assimilate_these_obs_types":obs_set}
assim_tools_nml = {"filter_kind":2, "sort_obs_inc": True}
model_nml = {"nvar_state":len(obs_pflotran_set),
"state_var_names":obs_pflotran_set,
"state_var_qtynames":['QTY_PFLOTRAN_'+v for v in obs_pflotran_set],
"nvar_para":len(para_set),
"para_var_names":para_set,
"para_var_qtynames":['QTY_PFLOTRAN_'+v for v in para_set],
"max_time_diff_seconds": 10,
"debug": False,
"template_file":files_cfg["dart_prior_template_file"]}
preprocess_nml = {"input_files":files_cfg["obs_type_file"],
"input_obs_kind_mod_file":files_cfg["def_obs_kind_file"]}
convertnc_nml = {"netcdf_file": files_cfg["obs_nc_file"],
"out_file": files_cfg["obs_dart_file"],
"obs_start_day": da_cfg["assim_start_days"],
"obs_start_second": da_cfg["assim_start_seconds"],
"obs_end_day": da_cfg["assim_end_days"],
"obs_end_second":da_cfg["assim_end_seconds"],
"inflation_alpha":da_cfg["enks_mda_alpha"][da_cfg["enks_mda_iteration_step"]-1]}
modelmodcheck_nml = {"input_state_files": files_cfg["dart_prior_template_file"]}
inputnml = {"smoother_nml":smoother_nml,
"obs_kind_nml":obs_kind_nml,
"assim_tools_nml":assim_tools_nml,
"model_nml":model_nml,
"preprocess_nml":preprocess_nml,
"convert_nc_nml":convertnc_nml,
"model_mod_check_nml":modelmodcheck_nml}
configs["inputnml_cfg"] = inputnml
# Save the configurations
configs.write(config_file, force=True)
# with open(config_pickle, 'wb') as f:
# pickle.dump(configs, f)
# ***************
# **Run the code**
# - Run: ```prepare_inputnml.py```
# In[ ]:
prep_inputnml = files_cfg["prep_inputnml_file"]
# In[ ]:
# get_ipython().run_cell_magic('script', 'bash -s "$prep_inputnml" "$config_file"', 'python $1 $2')
print("\n")
print("------------------------------------------------------------")
print("Generate input.nml file...")
subprocess.run("python {} {}".format(prep_inputnml, config_file), shell=True, check=True)
# <a id='observationconvertion'></a>
# ## Prepare the observation conversion to DART observation format
# In this section, we prepare the process of converting the observation data to DART format. We first convert observation data in raw format into NetCDF format. Then, a fortran script is prepared for the conversion from the NetCDF to to DART format. The structure of NetCDF file for recording observation file.
#
# | NetCDF dimensions | NetCDF variables |
# |:-----------------:|:-----------------------------------:|
# | time: ntime | time: shape(time) |
# | location: nloc | location: shape(location) |
# | | physical variable: shape(time,nloc) |
#
# **Note that**
# - if the time calendar follows *gregorian*, the time unit should be entered as ```seconds since YYYY-MM-DD HH:MM:SS```. Otherwise, put the time calender as *None* and time unit as ```second``` (make sure convert your measurement times to seconds).
# ***************
# **Convert the raw csv temperature observations to NetCDF file**
# - Run: ```csv2nc.py```
#
# **Note that**
# - Here, we illustrate the conversion from csv as an example. However, the conversion to netCDF format can be arbitrary based on the raw data format.
#
# In[ ]:
csv_to_nc, obs_nc_original = files_cfg["csv_to_nc_file"], files_cfg["obs_nc_original_file"]
if os.path.isfile(obs_nc_original):
print("\n")
print("------------------------------------------------------------")
print("Observation file is found.")
else:
print("\n")
print("------------------------------------------------------------")
raise Exception("Convertion raw observation data to NetCDF file is required...")
# print("Convert raw observation data to NetCDF file...")
# subprocess.run("python {} {}".format(csv_to_nc, config_file), shell=True, check=True)
# ***************
# **Clip the NetCDF file based on the defined spatial and temporal domains**
#
# The NetCDF file generated in the previous step is further processed by selecting data observed in the required spatial and temporal domains
# - Run: ```clip_obs_nc.py```
# In[ ]:
print("\n")
print("------------------------------------------------------------")
print("Clip the NetCDF file based on the defined spatial and temporal domains...")
clip_obs_nc, obs_nc = files_cfg["clip_obs_nc_file"], files_cfg["obs_nc_file"]
subprocess.run("python {} {}".format(clip_obs_nc, config_file), shell=True, check=True)
# ***************
# **Prepare the ```convert_nc.f90``` based on the list of observation variables**
# - Run: ```prepare_convert_nc.py```
# - Code input arguments:
# - obs_nc: filename for the observation NetCDF file
# In[ ]:
prep_convert_nc, convert_nc_file = files_cfg["prep_convert_nc_file"], files_cfg["convert_nc_file"]
# In[ ]:
# get_ipython().run_cell_magic('script', 'bash -s "$prep_convert_nc" "$config_file"', 'python $1 $2')
print("\n")
print("------------------------------------------------------------")
print("Prepare the convert_nc.f90 based on the list of observation variables to be assimilated...")
subprocess.run("python {} {}".format(prep_convert_nc, config_file), shell=True, check=True)
# <a id='dart_executables'></a>
# # Step 5: Generate all the executable files
# Now, we compile all the executables from ```mkmf_*```. The following executables are generated here:
# - ```preprocess```: for preprocessing the [prepared DART generic variable quantity files prepared](#dart_generic_prepare)
# - ```convert_nc```: for [converting the observations from NetCDF to DART format](#observationconvertion)
# - ```model_mod_check```: for checking ```model_mod.F90``` interface file
# - ```smoother```: for conducting the DART data assimilation
# ## Generate the executables
# - Run: ```quickbuild.csh```
# - Code input arguments:
# - app_work_dir: location of the application work folder
# In[ ]:
dart_work_dir, app_work_dir = dirs_cfg["dart_work_dir"], dirs_cfg["app_work_dir"]
quickbuild = files_cfg["quickbuild_csh"]
# In[ ]:
print("\n")
print("------------------------------------------------------------")
print("Generate all the executables...")
# subprocess.run("cd {}; csh {} {} -mpi".format(dart_work_dir, quickbuild, app_work_dir), shell=True, check=True)
# subprocess.run("cd {}; csh {} {}".format(dart_work_dir, quickbuild, app_work_dir), shell=True, check=True)
# ## Check ```model_mod.F90``` interface file
# - Run: ```model_mod_check```
# In[ ]:
model_mod_check = files_cfg["model_mod_check_exe"]
# <a id='run_dart_pflotran'></a>
# # Step 6: Run DART and PFLOTRAN
# In this section, run the shell script to couple DART and PFLOTRAN.
# In[ ]:
dart_work_dir = dirs_cfg["dart_work_dir"]
run_DART_PFLOTRAN = files_cfg["run_da_csh"]
concatenate_output = files_cfg["concatenate_dart_output_file"]
inputnml_file = files_cfg["input_nml_file"]
start_time = time.time()
# In[ ]:
print("\n")
print("------------------------------------------------------------")
print("Assimilation starts here...")
subprocess.run("cd {}; csh {} {} {}".format(dart_work_dir, run_DART_PFLOTRAN, inputnml_file, config_file), shell=True, check=True)
# In[ ]:
print("\n")
print("------------------------------------------------------------")
print("Concatenate the prior and posterior at all times ...")
subprocess.run("python {} {}".format(concatenate_output, config_file), shell=True, check=True)
# In[ ]:
end_time = time.time()
print("The total time usage of running DART and PFLOTRAN is %.3f (second): " % (end_time-start_time))
exit()
# # Submit the job to Cori
# **Note that** if you want to submit the job, do not run the above cells, instead, (1) configure all the above settings, and (2) run the cells below.
# In[8]:
notebook_file = 'workflow.ipynb'
script_file = 'workflow.py'
sbatch_file = 'sbatch_da.sh'
# ***************
# **Convert the jupyter notebook to python script**
# In[ ]:
get_ipython().system(' jupyter nbconvert --to script $notebook_file')
# ***************
# **Generate the sbatch file**
# In[ ]:
# ***************
# **Submit the sbatch file**
# In[ ]:
#!/bin/bash
#SBATCH -A m1800
#SBATCH -q regular
#SBATCH -N 96
#SBATCH -t 48:00:00
#SBATCH -L SCRATCH
#SBATCH -C haswell
#SBATCH -J DTPF_300A
#SBATCH --mail-type=begin,end,fail
#SBATCH [email protected]
python workflow_300A_richards_wl-regular.py
wait