Skip to content

Commit

Permalink
Second
Browse files Browse the repository at this point in the history
  • Loading branch information
frode-aarstad committed Oct 28, 2024
1 parent 2478adb commit 78f7087
Show file tree
Hide file tree
Showing 8 changed files with 211 additions and 181 deletions.
318 changes: 159 additions & 159 deletions src/ert/config/ert_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,164 @@ def site_config_location() -> str:
)


def create_forward_model_json(
context: Substitutions,
forward_model_steps: List[ForwardModelStep],
run_id: Optional[str],
iens: int = 0,
itr: int = 0,
user_config_file: Optional[str] = "",
env_vars: Optional[Dict[str, str]] = None,
skip_pre_experiment_validation: bool = False,
) -> Dict[str, Any]:
if env_vars is None:
env_vars = {}

class Substituter:
def __init__(self, fm_step):
fm_step_args = ",".join(
[f"{key}={value}" for key, value in fm_step.private_args.items()]
)
fm_step_description = f"{fm_step.name}({fm_step_args})"
self.substitution_context_hint = (
f"parsing forward model step `FORWARD_MODEL {fm_step_description}` - "
"reconstructed, with defines applied during parsing"
)
self.copy_private_args = Substitutions()
for key, val in fm_step.private_args.items():
self.copy_private_args[key] = context.substitute_real_iter(
val, iens, itr
)

@overload
def substitute(self, string: str) -> str: ...

@overload
def substitute(self, string: None) -> None: ...

def substitute(self, string):
if string is None:
return string
string = self.copy_private_args.substitute(
string, self.substitution_context_hint, 1, warn_max_iter=False
)
return context.substitute_real_iter(string, iens, itr)

def filter_env_dict(self, d):
result = {}
for key, value in d.items():
new_key = self.substitute(key)
new_value = self.substitute(value)
if new_value is None:
result[new_key] = None
elif not (new_value[0] == "<" and new_value[-1] == ">"):
# Remove values containing "<XXX>". These are expected to be
# replaced by substitute, but were not.
result[new_key] = new_value
else:
logger.warning(
f"Environment variable {new_key} skipped due to"
f" unmatched define {new_value}",
)
# Its expected that empty dicts be replaced with "null"
# in jobs.json
if not result:
return None
return result

def handle_default(fm_step: ForwardModelStep, arg: str) -> str:
return fm_step.default_mapping.get(arg, arg)

for fm_step in forward_model_steps:
for key, val in fm_step.private_args.items():
if key in context and key != val and context[key] != val:
logger.info(
f"Private arg '{key}':'{val}' chosen over"
f" global '{context[key]}' in forward model step {fm_step.name}"
)
config_file_path = Path(user_config_file) if user_config_file is not None else None
config_path = str(config_file_path.parent) if config_file_path else ""
config_file = str(config_file_path.name) if config_file_path else ""

job_list_errors = []
job_list: List[ForwardModelStepJSON] = []
for idx, fm_step in enumerate(forward_model_steps):
substituter = Substituter(fm_step)
fm_step_json = {
"name": substituter.substitute(fm_step.name),
"executable": substituter.substitute(fm_step.executable),
"target_file": substituter.substitute(fm_step.target_file),
"error_file": substituter.substitute(fm_step.error_file),
"start_file": substituter.substitute(fm_step.start_file),
"stdout": (
substituter.substitute(fm_step.stdout_file) + f".{idx}"
if fm_step.stdout_file
else None
),
"stderr": (
substituter.substitute(fm_step.stderr_file) + f".{idx}"
if fm_step.stderr_file
else None
),
"stdin": substituter.substitute(fm_step.stdin_file),
"argList": [
handle_default(fm_step, substituter.substitute(arg))
for arg in fm_step.arglist
],
"environment": substituter.filter_env_dict(fm_step.environment),
"exec_env": substituter.filter_env_dict(fm_step.exec_env),
"max_running_minutes": fm_step.max_running_minutes,
}

try:
if not skip_pre_experiment_validation:
fm_step_json = fm_step.validate_pre_realization_run(fm_step_json)
except ForwardModelStepValidationError as exc:
job_list_errors.append(
ErrorInfo(
message=f"Validation failed for "
f"forward model step {fm_step.name}: {exc!s}"
).set_context(fm_step.name)
)

job_list.append(fm_step_json)

if job_list_errors:
raise ConfigValidationError.from_collected(job_list_errors)

return {
"global_environment": env_vars,
"config_path": config_path,
"config_file": config_file,
"jobList": job_list,
"run_id": run_id,
"ert_pid": str(os.getpid()),
}


def forward_model_data_to_json(
substitutions: Substitutions,
forward_model_steps: List[ForwardModelStep],
env_vars: Dict[str, str],
user_config_file: Optional[str] = "",
run_id: Optional[str] = None,
iens: int = 0,
itr: int = 0,
context_env: Optional[Dict[str, str]] = None,
):
if context_env is None:
context_env = {}
return create_forward_model_json(
context=substitutions,
forward_model_steps=forward_model_steps,
user_config_file=user_config_file,
env_vars={**env_vars, **context_env},
run_id=run_id,
iens=iens,
itr=itr,
)


@dataclass
class ErtConfig:
DEFAULT_ENSPATH: ClassVar[str] = "storage"
Expand Down Expand Up @@ -615,7 +773,7 @@ def _create_list_of_forward_model_steps_to_run(
for fm_step in fm_steps:
if fm_step.name in cls.PREINSTALLED_FORWARD_MODEL_STEPS:
try:
substituted_json = cls._create_forward_model_json(
substituted_json = create_forward_model_json(
run_id=None,
context=substitutions,
forward_model_steps=[fm_step],
Expand Down Expand Up @@ -644,164 +802,6 @@ def _create_list_of_forward_model_steps_to_run(
def forward_model_step_name_list(self) -> List[str]:
return [j.name for j in self.forward_model_steps]


def forward_model_data_to_json(
self,
run_id: Optional[str] = None,
iens: int = 0,
itr: int = 0,
context_env: Optional[Dict[str, str]] = None,
):
if context_env is None:
context_env = {}
return self._create_forward_model_json(
context=self.substitutions,
forward_model_steps=self.forward_model_steps,
user_config_file=self.user_config_file,
env_vars={**self.env_vars, **context_env},
run_id=run_id,
iens=iens,
itr=itr,
)

@classmethod
def _create_forward_model_json(
cls,
context: Substitutions,
forward_model_steps: List[ForwardModelStep],
run_id: Optional[str],
iens: int = 0,
itr: int = 0,
user_config_file: Optional[str] = "",
env_vars: Optional[Dict[str, str]] = None,
skip_pre_experiment_validation: bool = False,
) -> Dict[str, Any]:
if env_vars is None:
env_vars = {}

class Substituter:
def __init__(self, fm_step):
fm_step_args = ",".join(
[f"{key}={value}" for key, value in fm_step.private_args.items()]
)
fm_step_description = f"{fm_step.name}({fm_step_args})"
self.substitution_context_hint = (
f"parsing forward model step `FORWARD_MODEL {fm_step_description}` - "
"reconstructed, with defines applied during parsing"
)
self.copy_private_args = Substitutions()
for key, val in fm_step.private_args.items():
self.copy_private_args[key] = context.substitute_real_iter(
val, iens, itr
)

@overload
def substitute(self, string: str) -> str: ...

@overload
def substitute(self, string: None) -> None: ...

def substitute(self, string):
if string is None:
return string
string = self.copy_private_args.substitute(
string, self.substitution_context_hint, 1, warn_max_iter=False
)
return context.substitute_real_iter(string, iens, itr)

def filter_env_dict(self, d):
result = {}
for key, value in d.items():
new_key = self.substitute(key)
new_value = self.substitute(value)
if new_value is None:
result[new_key] = None
elif not (new_value[0] == "<" and new_value[-1] == ">"):
# Remove values containing "<XXX>". These are expected to be
# replaced by substitute, but were not.
result[new_key] = new_value
else:
logger.warning(
f"Environment variable {new_key} skipped due to"
f" unmatched define {new_value}",
)
# Its expected that empty dicts be replaced with "null"
# in jobs.json
if not result:
return None
return result

def handle_default(fm_step: ForwardModelStep, arg: str) -> str:
return fm_step.default_mapping.get(arg, arg)

for fm_step in forward_model_steps:
for key, val in fm_step.private_args.items():
if key in context and key != val and context[key] != val:
logger.info(
f"Private arg '{key}':'{val}' chosen over"
f" global '{context[key]}' in forward model step {fm_step.name}"
)
config_file_path = (
Path(user_config_file) if user_config_file is not None else None
)
config_path = str(config_file_path.parent) if config_file_path else ""
config_file = str(config_file_path.name) if config_file_path else ""

job_list_errors = []
job_list: List[ForwardModelStepJSON] = []
for idx, fm_step in enumerate(forward_model_steps):
substituter = Substituter(fm_step)
fm_step_json = {
"name": substituter.substitute(fm_step.name),
"executable": substituter.substitute(fm_step.executable),
"target_file": substituter.substitute(fm_step.target_file),
"error_file": substituter.substitute(fm_step.error_file),
"start_file": substituter.substitute(fm_step.start_file),
"stdout": (
substituter.substitute(fm_step.stdout_file) + f".{idx}"
if fm_step.stdout_file
else None
),
"stderr": (
substituter.substitute(fm_step.stderr_file) + f".{idx}"
if fm_step.stderr_file
else None
),
"stdin": substituter.substitute(fm_step.stdin_file),
"argList": [
handle_default(fm_step, substituter.substitute(arg))
for arg in fm_step.arglist
],
"environment": substituter.filter_env_dict(fm_step.environment),
"exec_env": substituter.filter_env_dict(fm_step.exec_env),
"max_running_minutes": fm_step.max_running_minutes,
}

try:
if not skip_pre_experiment_validation:
fm_step_json = fm_step.validate_pre_realization_run(fm_step_json)
except ForwardModelStepValidationError as exc:
job_list_errors.append(
ErrorInfo(
message=f"Validation failed for "
f"forward model step {fm_step.name}: {exc!s}"
).set_context(fm_step.name)
)

job_list.append(fm_step_json)

if job_list_errors:
raise ConfigValidationError.from_collected(job_list_errors)

return {
"global_environment": env_vars,
"config_path": config_path,
"config_file": config_file,
"jobList": job_list,
"run_id": run_id,
"ert_pid": str(os.getpid()),
}

@classmethod
def _workflows_from_dict(
cls,
Expand Down
19 changes: 15 additions & 4 deletions src/ert/enkf_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import orjson
from numpy.random import SeedSequence

from ert.config.ert_config import forward_model_data_to_json
from ert.config.forward_model_step import ForwardModelStep
from ert.config.model_config import ModelConfig
from ert.substitutions import Substitutions

Expand Down Expand Up @@ -195,6 +197,9 @@ def sample_prior(
def create_run_path(
run_args: List[RunArg],
ensemble: Ensemble,
user_config_file: str,
env_vars: Dict[str, str],
forward_model_steps: List[ForwardModelStep],
substitutions: Substitutions,
templates: List[Tuple[str, str]],
model_config: ModelConfig,
Expand All @@ -204,7 +209,6 @@ def create_run_path(
if context_env is None:
context_env = {}
t = time.perf_counter()
substitutions = substitutions
runpaths.set_ert_ensemble(ensemble.name)
for run_arg in run_args:
run_path = Path(run_arg.runpath)
Expand Down Expand Up @@ -234,7 +238,6 @@ def create_run_path(
)
target.write_text(result)

model_config = model_config
_generate_parameter_files(
ensemble.experiment.parameter_configuration.values(),
model_config.gen_kw_export_name,
Expand All @@ -246,8 +249,16 @@ def create_run_path(

path = run_path / "jobs.json"
_backup_if_existing(path)
forward_model_output = ert_config.forward_model_data_to_json(
run_arg.run_id, run_arg.iens, ensemble.iteration, context_env

forward_model_output = forward_model_data_to_json(
substitutions=substitutions,
forward_model_steps=forward_model_steps,
user_config_file=user_config_file,
env_vars=env_vars,
run_id=run_arg.run_id,
iens=run_arg.iens,
itr=ensemble.iteration,
context_env=context_env,
)
with open(run_path / "jobs.json", mode="wb") as fptr:
fptr.write(
Expand Down
Loading

0 comments on commit 78f7087

Please sign in to comment.