Skip to content

Commit

Permalink
Use pattern matching for config schema
Browse files Browse the repository at this point in the history
  • Loading branch information
eivindjahren committed Nov 15, 2023
1 parent 655357e commit 0da3eeb
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 111 deletions.
181 changes: 90 additions & 91 deletions src/ert/config/parsing/config_schema_item.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,97 +80,96 @@ def token_to_value_with_context(
if not len(self.type_map) > index:
return ContextString(str(token), token, keyword)
val_type = self.type_map[index]
if val_type is None:
return ContextString(str(token), token, keyword)
if val_type == SchemaItemType.BOOL:
if token.lower() == "true":
return ContextBool(True, token, keyword)
elif token.lower() == "false":
return ContextBool(False, token, keyword)
else:
raise ConfigValidationError.with_context(
f"{self.kw!r} must have a boolean value as argument {index + 1!r}",
token,
)
if val_type == SchemaItemType.INT:
try:
return ContextInt(int(token), token, keyword)
except ValueError as e:
raise ConfigValidationError.with_context(
f"{self.kw!r} must have an integer value as argument {index + 1!r}",
token,
) from e
if val_type == SchemaItemType.FLOAT:
try:
return ContextFloat(float(token), token, keyword)
except ValueError as e:
raise ConfigValidationError.with_context(
f"{self.kw!r} must have a number as argument {index + 1!r}", token
) from e

path: Optional[str] = str(token)
if val_type in [
SchemaItemType.PATH,
SchemaItemType.EXISTING_PATH,
]:
if not os.path.isabs(token):
path = os.path.normpath(
os.path.join(os.path.dirname(token.filename), token)
)
if val_type == SchemaItemType.EXISTING_PATH and not os.path.exists(
str(path)
):
err = f'Cannot find file or directory "{token.value}". '
if path != token:
err += f"The configured value was {path!r} "
raise ConfigValidationError.with_context(err, token)

assert isinstance(path, str)
return ContextString(path, token, keyword)
if val_type == SchemaItemType.EXECUTABLE:
absolute_path: Optional[str]
if not os.path.isabs(token):
# Try relative
absolute_path = os.path.abspath(os.path.join(cwd, token))
else:
absolute_path = token
if not os.path.exists(absolute_path):
absolute_path = shutil.which(token)

if absolute_path is None:
raise ConfigValidationError.with_context(
f"Could not find executable {token.value!r}", token
)

if os.path.isdir(absolute_path):
raise ConfigValidationError.with_context(
f"Expected executable file, but {token.value!r} is a directory.",
token,
)

if not os.access(absolute_path, os.X_OK):
context = (
f"{token.value!r} which was resolved to {absolute_path!r}"
if token.value != absolute_path
else f"{token.value!r}"
)
raise ConfigValidationError.with_context(
f"File not executable: {context}", token
)
return ContextString(absolute_path, token, keyword)
if isinstance(val_type, SchemaItemType):
return ContextString(str(token), token, keyword)
if isinstance(val_type, EnumType):
try:
return val_type(str(token))
except ValueError as err:
raise ConfigValidationError.with_context(
(
f"{self.kw!r} argument {index + 1!r} must be one of"
f" {[v.value for v in val_type]!r} was {token.value!r}" # type: ignore
),
token,
) from None
match val_type:
case None:
return ContextString(str(token), token, keyword)
case SchemaItemType.BOOL:
if token.lower() == "true":
return ContextBool(True, token, keyword)
elif token.lower() == "false":
return ContextBool(False, token, keyword)
else:
raise ConfigValidationError.with_context(
f"{self.kw!r} must have a boolean value as argument {index + 1!r}",
token,
)
case SchemaItemType.INT:
try:
return ContextInt(int(token), token, keyword)
except ValueError as e:
raise ConfigValidationError.with_context(
f"{self.kw!r} must have an integer value as argument {index + 1!r}",
token,
) from e
case SchemaItemType.FLOAT:
try:
return ContextFloat(float(token), token, keyword)
except ValueError as e:
raise ConfigValidationError.with_context(
f"{self.kw!r} must have a number as argument {index + 1!r}",
token,
) from e

case SchemaItemType.PATH | SchemaItemType.EXISTING_PATH:
path: Optional[str] = str(token)
if not os.path.isabs(token):
path = os.path.normpath(
os.path.join(os.path.dirname(token.filename), token)
)
if val_type == SchemaItemType.EXISTING_PATH and not os.path.exists(
str(path)
):
err = f'Cannot find file or directory "{token.value}". '
if path != token:
err += f"The configured value was {path!r} "
raise ConfigValidationError.with_context(err, token)

assert isinstance(path, str)
return ContextString(path, token, keyword)
case SchemaItemType.EXECUTABLE:
absolute_path: Optional[str]
if not os.path.isabs(token):
# Try relative
absolute_path = os.path.abspath(os.path.join(cwd, token))
else:
absolute_path = token
if not os.path.exists(absolute_path):
absolute_path = shutil.which(token)

if absolute_path is None:
raise ConfigValidationError.with_context(
f"Could not find executable {token.value!r}", token
)

if os.path.isdir(absolute_path):
raise ConfigValidationError.with_context(
f"Expected executable file, but {token.value!r} is a directory.",
token,
)

if not os.access(absolute_path, os.X_OK):
context = (
f"{token.value!r} which was resolved to {absolute_path!r}"
if token.value != absolute_path
else f"{token.value!r}"
)
raise ConfigValidationError.with_context(
f"File not executable: {context}", token
)
return ContextString(absolute_path, token, keyword)
case SchemaItemType():
return ContextString(str(token), token, keyword)
case EnumType():
try:
return val_type(str(token))
except ValueError as err:
raise ConfigValidationError.with_context(
(
f"{self.kw!r} argument {index + 1!r} must be one of"
f" {[v.value for v in val_type]!r} was {token.value!r}" # type: ignore
),
token,
) from None
raise ValueError(f"Unknown schema item {val_type}")

def apply_constraints(
Expand Down
39 changes: 19 additions & 20 deletions src/ert/config/parsing/schema_dict.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,33 +69,32 @@ def search_for_deprecated_keyword_usages(
) -> None:
detected_deprecations = []

def push_deprecation(info: DeprecationInfo, line: List[ContextString]):
def push_deprecation(info: DeprecationInfo, line: Sequence[ContextString]):
if info.check is None or (callable(info.check) and info.check(line)):
detected_deprecations.append((info, line))

for kw, v in config_dict.items():
schema_info = self.get(kw)
if schema_info is not None and schema_info.deprecation_info is not None:
if v is None:
# Edge case: Happens if
# a keyword is specified in the schema and takes N args
# and is also specified as deprecated,
# and is specified in the config with 0 arguments
# which parses to None for the args
continue
match v:
case None:
# Edge case: Happens if
# a keyword is specified in the schema and takes N args
# and is also specified as deprecated,
# and is specified in the config with 0 arguments
# which parses to None for the args
continue

if isinstance(v, ContextString):
push_deprecation(
schema_info.deprecation_info,
ContextList.with_values(token=v.keyword_token, values=[v]),
)
elif isinstance(v, list) and (
len(v) == 0 or isinstance(v[0], ContextString)
):
push_deprecation(schema_info.deprecation_info, v)
elif isinstance(v[0], list):
for arglist in v:
push_deprecation(schema_info.deprecation_info, arglist)
case ContextString():
push_deprecation(
schema_info.deprecation_info,
ContextList.with_values(token=v.keyword_token, values=[v]),
)
case [ContextString(), *_]:
push_deprecation(schema_info.deprecation_info, v)
case [list(), *_]:
for arglist in v:
push_deprecation(schema_info.deprecation_info, arglist)
if detected_deprecations:
for deprecation, line in detected_deprecations:
ConfigWarning.ert_formatted_warn(
Expand Down

0 comments on commit 0da3eeb

Please sign in to comment.