Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow nested dictionary to be optional in addon config #4607

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions supervisor/addons/addon.py
Original file line number Diff line number Diff line change
Expand Up @@ -641,11 +641,7 @@ def test_update_schema(self) -> bool:
)

# create voluptuous
new_schema = vol.Schema(
vol.All(
dict, AddonOptions(self.coresys, new_raw_schema, self.name, self.slug)
)
)
new_schema = vol.Schema(vol.All(dict, AddonOptions(self, new_raw_schema)))

# validate
try:
Expand Down
2 changes: 1 addition & 1 deletion supervisor/addons/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ def schema(self) -> AddonOptions:
if isinstance(raw_schema, bool):
raw_schema = {}

return AddonOptions(self.coresys, raw_schema, self.name, self.slug)
return AddonOptions(self, raw_schema)

@property
def schema_ui(self) -> list[dict[any, any]] | None:
Expand Down
85 changes: 56 additions & 29 deletions supervisor/addons/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@
import logging
from pathlib import Path
import re
from typing import Any
from typing import TYPE_CHECKING, Any

import voluptuous as vol

from ..const import ATTR_OPTIONS
from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import HardwareNotFound
from ..hardware.const import UdevSubsystem
from ..hardware.data import Device
from ..validate import network_port

if TYPE_CHECKING:
from .model import AddonModel

_LOGGER: logging.Logger = logging.getLogger(__name__)

_STR = "str"
Expand Down Expand Up @@ -58,16 +62,13 @@
class AddonOptions(CoreSysAttributes):
"""Validate Add-ons Options."""

def __init__(
self, coresys: CoreSys, raw_schema: dict[str, Any], name: str, slug: str
):
def __init__(self, addon: "AddonModel", raw_schema: dict[str, Any]):
"""Validate schema."""
self.coresys: CoreSys = coresys
self.addon: "AddonModel" = addon
self.coresys: CoreSys = addon.coresys
self.raw_schema: dict[str, Any] = raw_schema
self.devices: set[Device] = set()
self.pwned: set[str] = set()
self._name = name
self._slug = slug

@property
def validate(self) -> vol.Schema:
Expand All @@ -85,8 +86,8 @@ def __call__(self, struct):
_LOGGER.warning(
"Option '%s' does not exist in the schema for %s (%s)",
key,
self._name,
self._slug,
self.addon.name,
self.addon.slug,
)
continue

Expand All @@ -103,8 +104,15 @@ def __call__(self, struct):
options[key] = self._single_validate(typ, value, key)
except (IndexError, KeyError):
raise vol.Invalid(
f"Type error for option '{key}' in {self._name} ({self._slug})"
f"Type error for option '{key}' in {self.addon.name} "
f"({self.addon.slug})"
) from None
except vol.Invalid as err:
if key not in err.msg:
raise vol.Invalid(
f"Invalid value for option '{key}': {err}"
) from None
raise err

self._check_missing_options(self.raw_schema, options, "root")
return options
Expand All @@ -115,7 +123,8 @@ def _single_validate(self, typ: str, value: Any, key: str):
# if required argument
if value is None:
raise vol.Invalid(
f"Missing required option '{key}' in {self._name} ({self._slug})"
f"Missing required option '{key}' in {self.addon.name} "
f"({self.addon.slug})"
) from None

# Lookup secret
Expand All @@ -124,15 +133,16 @@ def _single_validate(self, typ: str, value: Any, key: str):
value = self.sys_homeassistant.secrets.get(secret)
if value is None:
raise vol.Invalid(
f"Unknown secret '{secret}' in {self._name} ({self._slug})"
f"Unknown secret '{secret}' in {self.addon.name} "
f"({self.addon.slug})"
) from None

# parse extend data from type
match = RE_SCHEMA_ELEMENT.match(typ)

if not match:
raise vol.Invalid(
f"Unknown type '{typ}' in {self._name} ({self._slug})"
f"Unknown type '{typ}' in {self.addon.name} ({self.addon.slug})"
) from None

# prepare range
Expand All @@ -146,28 +156,29 @@ def _single_validate(self, typ: str, value: Any, key: str):
if typ.startswith(_PASSWORD) and value:
self.pwned.add(hashlib.sha1(str(value).encode()).hexdigest())
return vol.All(str(value), vol.Range(**range_args))(value)
elif typ.startswith(_INT):
if typ.startswith(_INT):
return vol.All(vol.Coerce(int), vol.Range(**range_args))(value)
elif typ.startswith(_FLOAT):
if typ.startswith(_FLOAT):
return vol.All(vol.Coerce(float), vol.Range(**range_args))(value)
elif typ.startswith(_BOOL):
if typ.startswith(_BOOL):
return vol.Boolean()(value)
elif typ.startswith(_EMAIL):
if typ.startswith(_EMAIL):
return vol.Email()(value)
elif typ.startswith(_URL):
if typ.startswith(_URL):
return vol.Url()(value)
elif typ.startswith(_PORT):
if typ.startswith(_PORT):
return network_port(value)
elif typ.startswith(_MATCH):
if typ.startswith(_MATCH):
return vol.Match(match.group("match"))(str(value))
elif typ.startswith(_LIST):
if typ.startswith(_LIST):
return vol.In(match.group("list").split("|"))(str(value))
elif typ.startswith(_DEVICE):
if typ.startswith(_DEVICE):
try:
device = self.sys_hardware.get_by_path(Path(value))
except HardwareNotFound:
raise vol.Invalid(
f"Device '{value}' does not exist in {self._name} ({self._slug})"
f"Device '{value}' does not exist in {self.addon.name} "
f"({self.addon.slug})"
) from None

# Have filter
Expand All @@ -176,15 +187,17 @@ def _single_validate(self, typ: str, value: Any, key: str):
device_filter = _create_device_filter(str_filter)
if device not in self.sys_hardware.filter_devices(**device_filter):
raise vol.Invalid(
f"Device '{value}' don't match the filter {str_filter}! in {self._name} ({self._slug})"
f"Device '{value}' don't match the filter {str_filter}! "
f"in {self.addon.name} ({self.addon.slug})"
)

# Device valid
self.devices.add(device)
return str(device.path)

raise vol.Invalid(
f"Fatal error for option '{key}' with type '{typ}' in {self._name} ({self._slug})"
f"Fatal error for option '{key}' with type '{typ}' in {self.addon.name} "
f"({self.addon.slug})"
) from None

def _nested_validate_list(self, typ: Any, data_list: list[Any], key: str):
Expand All @@ -194,7 +207,8 @@ def _nested_validate_list(self, typ: Any, data_list: list[Any], key: str):
# Make sure it is a list
if not isinstance(data_list, list):
raise vol.Invalid(
f"Invalid list for option '{key}' in {self._name} ({self._slug})"
f"Invalid list for option '{key}' in {self.addon.name} "
f"({self.addon.slug})"
) from None

# Process list
Expand All @@ -217,15 +231,19 @@ def _nested_validate_dict(
# Make sure it is a dict
if not isinstance(data_dict, dict):
raise vol.Invalid(
f"Invalid dict for option '{key}' in {self._name} ({self._slug})"
f"Invalid dict for option '{key}' in {self.addon.name} "
f"({self.addon.slug})"
) from None

# Process dict
for c_key, c_value in data_dict.items():
# Ignore unknown options / remove from list
if c_key not in typ:
_LOGGER.warning(
"Unknown option '%s' for %s (%s)", c_key, self._name, self._slug
"Unknown option '%s' for %s (%s)",
c_key,
self.addon.name,
self.addon.slug,
)
continue

Expand All @@ -252,11 +270,20 @@ def _check_missing_options(
if isinstance(miss_schema, list) and len(miss_schema) > 0:
miss_schema = miss_schema[0]

# If its a dict that's missing and the dict is missing from add-on's
# options dict, then it's optional
if (
isinstance(miss_schema, dict)
and miss_opt not in self.addon.data[ATTR_OPTIONS]
):
miss_schema = "dict?"

if isinstance(miss_schema, str) and miss_schema.endswith("?"):
continue

raise vol.Invalid(
f"Missing option '{miss_opt}' in {root} in {self._name} ({self._slug})"
f"Missing option '{miss_opt}' in {root} in {self.addon.name} "
f"({self.addon.slug})"
) from None


Expand Down
Loading