Skip to content

Commit

Permalink
Check password early on backup restore
Browse files Browse the repository at this point in the history
Introduce a validate password method which only peaks into the archive
to validate the password before starting the actual restore process.
This makes sure that a wrong password returns an error even when
restoring the backup in background.
  • Loading branch information
agners committed Dec 31, 2024
1 parent 5783834 commit e48f467
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 16 deletions.
40 changes: 35 additions & 5 deletions supervisor/backups/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,12 +296,13 @@ def new(
if not compressed:
self._data[ATTR_COMPRESSED] = False

def set_password(self, password: str) -> bool:
def set_password(self, password: str | None) -> None:
"""Set the password for an existing backup."""
if not password:
return False
self._init_password(password)
return True
if password:
self._init_password(password)
else:
self._key = None
self._aes = None

def _init_password(self, password: str) -> None:
"""Set password + init aes cipher."""
Expand Down Expand Up @@ -334,6 +335,35 @@ def _decrypt_data(self, data: str) -> str:
data = padder.update(decrypt.update(b64decode(data))) + padder.finalize()
return data.decode()

async def validate_password(self) -> bool:
"""Validate backup password"""

Check failure on line 339 in supervisor/backups/backup.py

View workflow job for this annotation

GitHub Actions / Check ruff

Ruff (D400)

supervisor/backups/backup.py:339:9: D400 First line should end with a period

Check failure on line 339 in supervisor/backups/backup.py

View workflow job for this annotation

GitHub Actions / Check ruff

Ruff (D415)

supervisor/backups/backup.py:339:9: D415 First line should end with a period, question mark, or exclamation point
def _validate_file() -> bool:

Check warning on line 340 in supervisor/backups/backup.py

View workflow job for this annotation

GitHub Actions / Check pylint

R1710: Either all return statements in a function should return an expression, or none of them should. (inconsistent-return-statements)
ending = f".tar{'.gz' if self.compressed else ''}"

with tarfile.open(self.tarfile, "r:") as backup:
test_tar_name = next((entry.name for entry in backup.getmembers() if entry.name.endswith(ending)), None)
if not test_tar_name:
_LOGGER.warning("Not tar file found to validate password with")
return True

test_tar_file = backup.extractfile(test_tar_name)
try:
with SecureTarFile(
ending, # Not used
gzip=self.compressed,
key=self._key,
mode="r",
fileobj=test_tar_file,
):
# If we can read the tar file, the password is correct
return True
except tarfile.ReadError:
_LOGGER.debug("Invalid password")
return False
except Exception: # noqa: BLE001

Check warning on line 363 in supervisor/backups/backup.py

View workflow job for this annotation

GitHub Actions / Check pylint

W0718: Catching too general exception Exception (broad-exception-caught)
_LOGGER.exception("Unexpected error validating password")
return await self.sys_run_in_executor(_validate_file)

async def load(self):
"""Read backup.json from tar file."""
if not self.tarfile.is_file():
Expand Down
20 changes: 12 additions & 8 deletions supervisor/backups/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,10 +687,12 @@ async def do_restore_full(
f"{backup.slug} is only a partial backup!", _LOGGER.error
)

if backup.protected and not backup.set_password(password):
raise BackupInvalidError(
f"Invalid password for backup {backup.slug}", _LOGGER.error
)
if backup.protected:
backup.set_password(password)
if not await backup.validate_password():
raise BackupInvalidError(
f"Invalid password for backup {backup.slug}", _LOGGER.error
)

if backup.supervisor_version > self.sys_supervisor.version:
raise BackupInvalidError(
Expand Down Expand Up @@ -755,10 +757,12 @@ async def do_restore_partial(
folder_list.remove(FOLDER_HOMEASSISTANT)
homeassistant = True

if backup.protected and not backup.set_password(password):
raise BackupInvalidError(
f"Invalid password for backup {backup.slug}", _LOGGER.error
)
if backup.protected:
backup.set_password(password)
if not await backup.validate_password():
raise BackupInvalidError(
f"Invalid password for backup {backup.slug}", _LOGGER.error
)

if backup.homeassistant is None and homeassistant:
raise BackupInvalidError(
Expand Down
2 changes: 1 addition & 1 deletion tests/api/test_backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ async def test_restore_immediate_errors(

with (
patch.object(Backup, "protected", new=PropertyMock(return_value=True)),
patch.object(Backup, "set_password", return_value=False),
patch.object(Backup, "validate_password", return_value=False),
):
resp = await api_client.post(
f"/backups/{mock_partial_backup.slug}/restore/partial",
Expand Down
4 changes: 2 additions & 2 deletions tests/backups/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ async def test_fail_invalid_full_backup(

backup_instance = full_backup_mock.return_value
backup_instance.protected = True
backup_instance.set_password.return_value = False
backup_instance.validate_password.return_value = False

with pytest.raises(BackupInvalidError):
await manager.do_restore_full(backup_instance)
Expand Down Expand Up @@ -359,7 +359,7 @@ async def test_fail_invalid_partial_backup(

backup_instance = partial_backup_mock.return_value
backup_instance.protected = True
backup_instance.set_password.return_value = False
backup_instance.validate_password.return_value = False

with pytest.raises(BackupInvalidError):
await manager.do_restore_partial(backup_instance)

Check failure on line 365 in tests/backups/test_manager.py

View workflow job for this annotation

GitHub Actions / Run tests Python 3.12.8

test_fail_invalid_partial_backup supervisor.exceptions.JobException
Expand Down

0 comments on commit e48f467

Please sign in to comment.