Skip to content

Commit

Permalink
maint: Update in support of python-o365 MSAL
Browse files Browse the repository at this point in the history
  • Loading branch information
RogerSelwyn committed Jan 8, 2025
1 parent bd18964 commit aa59f4d
Show file tree
Hide file tree
Showing 22 changed files with 525 additions and 163 deletions.
76 changes: 56 additions & 20 deletions custom_components/ms365_todo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@
CONF_CLIENT_SECRET,
CONF_ENTITY_NAME,
CONF_SHARED_MAILBOX,
TOKEN_DELETED,
TOKEN_ERROR,
TOKEN_EXPIRED,
TOKEN_FILE_MISSING,
)
from .helpers.config_entry import MS365ConfigEntry, MS365Data
from .integration import setup_integration
from .integration.const_integration import DOMAIN, PLATFORMS
from .integration.permissions_integration import Permissions
from .integration.setup_integration import async_do_setup

_LOGGER = logging.getLogger(__name__)

Expand All @@ -33,20 +36,24 @@ async def async_setup_entry(hass: HomeAssistant, entry: MS365ConfigEntry):

_LOGGER.debug("Permissions setup")
perms = Permissions(hass, entry.data)
permissions, failed_permissions = await perms.async_check_authorizations() # pylint: disable=unused-variable
if permissions is True:
account, is_authenticated, auth_error = await hass.async_add_executor_job( # pylint: disable=unused-variable
perms.try_authentication, credentials, main_resource
if perms.check_token_exists():
error, account, is_authenticated = await hass.async_add_executor_job(
perms.try_authentication, credentials, main_resource, entity_name
)
if not error:
error = await perms.async_check_authorizations()
else:
is_authenticated = False
account = None
error = TOKEN_FILE_MISSING

if is_authenticated and permissions is True:
if not error:
_LOGGER.debug("Do setup")
check_token = await _async_check_token(hass, account, entity_name)
if check_token:
coordinator, sensors, platforms = await async_do_setup(hass, entry, account)
coordinator, sensors, platforms = await setup_integration.async_do_setup(
hass, entry, account
)
entry.runtime_data = MS365Data(
perms, account, is_authenticated, coordinator, sensors, entry.options
)
Expand All @@ -57,10 +64,10 @@ async def async_setup_entry(hass: HomeAssistant, entry: MS365ConfigEntry):
ir.async_create_issue(
hass,
DOMAIN,
permissions,
error,
is_fixable=False,
severity=ir.IssueSeverity.ERROR,
translation_key=permissions,
translation_key=error,
translation_placeholders={
"domain": DOMAIN,
CONF_ENTITY_NAME: entry.data.get(CONF_ENTITY_NAME),
Expand All @@ -69,6 +76,43 @@ async def async_setup_entry(hass: HomeAssistant, entry: MS365ConfigEntry):
return False


async def async_migrate_entry(
hass: HomeAssistant, config_entry: MS365ConfigEntry
) -> bool:
"""Migrate old entry."""
_LOGGER.debug(
"Migrating configuration from version %s.%s",
config_entry.version,
config_entry.minor_version,
)

# if config_entry.version > 2:
# # This shouldn't happen since we are at v2
# return False

if config_entry.version == 1:
# Delete the token file ready for re-auth
new_data = {**config_entry.data}

perms = Permissions(hass, config_entry.data)
await hass.async_add_executor_job(perms.delete_token)
_LOGGER.warning(
TOKEN_DELETED,
perms.token_filename,
)
hass.config_entries.async_update_entry(
config_entry, data=new_data, minor_version=0, version=2
)

_LOGGER.debug(
"Migration to configuration version %s.%s successful",
config_entry.version,
config_entry.minor_version,
)

return True


async def async_unload_entry(hass: HomeAssistant, entry: MS365ConfigEntry) -> bool:
"""Unload a config entry."""
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
Expand All @@ -91,19 +135,11 @@ async def async_remove_entry(hass: HomeAssistant, entry: MS365ConfigEntry) -> No

async def _async_check_token(hass, account, entity_name):
try:
await hass.async_add_executor_job(account.get_current_user)
await hass.async_add_executor_job(account.get_current_user_data)
return True
except InvalidClientError as err:
if "client secret" in err.description and "expired" in err.description:
_LOGGER.warning(
(
"Client Secret expired for account: %s. "
+ "Create new Client Secret in Entra ID App Registration."
),
entity_name,
)
_LOGGER.warning(TOKEN_EXPIRED, entity_name)
else:
_LOGGER.warning(
"Token error for account: %s. Error - %s", entity_name, err.description
)
_LOGGER.warning(TOKEN_ERROR, entity_name, err.description)
return False
114 changes: 63 additions & 51 deletions custom_components/ms365_todo/classes/permissions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Generic Permissions processes."""

import json
import logging
import os
from copy import deepcopy
Expand All @@ -11,11 +10,15 @@
CONF_ENTITY_NAME,
CONST_UTC_TIMEZONE,
MS365_STORAGE_TOKEN,
PERM_OFFLINE_ACCESS,
TOKEN_ERROR_CORRUPT,
TOKEN_ERROR_LEGACY,
TOKEN_ERROR_MISSING,
TOKEN_ERROR_PERMISSIONS,
TOKEN_FILE_CORRUPTED,
TOKEN_FILE_MISSING,
TOKEN_FILE_OUTDATED,
TOKEN_FILE_PERMISSIONS,
TOKEN_FILENAME,
TOKEN_INVALID,
)
from ..helpers.filemgmt import build_config_file_path
from ..integration.const_integration import DOMAIN
Expand All @@ -33,6 +36,7 @@ def __init__(self, hass, config):

self._requested_permissions = []
self._permissions = []
self.failed_permissions = []
self.token_filename = self.build_token_filename()
self.token_path = build_config_file_path(self._hass, MS365_STORAGE_TOKEN)
_LOGGER.debug("Setup token")
Expand All @@ -50,47 +54,59 @@ def permissions(self):
"""Return the permission set."""
return self._permissions

def try_authentication(self, credentials, main_resource):
def try_authentication(self, credentials, main_resource, entity_name):
"""Try authenticating to O365."""
_LOGGER.debug("Setup account")
account = Account(
credentials,
token_backend=self.token_backend,
timezone=CONST_UTC_TIMEZONE,
main_resource=main_resource,
)
try:
return account, account.is_authenticated, False
account = Account(
credentials,
token_backend=self.token_backend,
timezone=CONST_UTC_TIMEZONE,
main_resource=main_resource,
)

except json.decoder.JSONDecodeError as err:
_LOGGER.error("Error authenticating - JSONDecodeError - %s", err)
return account, False, err
return False, account, account.is_authenticated
except ValueError as err:
if TOKEN_INVALID in str(err):
_LOGGER.warning(
TOKEN_ERROR_LEGACY,
DOMAIN,
entity_name,
err,
)
return TOKEN_FILE_OUTDATED, None, False

_LOGGER.warning(
TOKEN_ERROR_CORRUPT,
DOMAIN,
entity_name,
err,
)
return TOKEN_FILE_CORRUPTED, None, False

async def async_check_authorizations(self):
"""Report on permissions status."""
self._permissions = await self._hass.async_add_executor_job(
error, self._permissions = await self._hass.async_add_executor_job(
self._get_permissions
)

if self._permissions in [TOKEN_FILE_CORRUPTED, TOKEN_FILE_MISSING]:
return self._permissions, None
failed_permissions = []
if error in [TOKEN_FILE_CORRUPTED]:
return error
self.failed_permissions = []
for permission in self.requested_permissions:
if permission == PERM_OFFLINE_ACCESS:
continue
if not self.validate_authorization(permission):
failed_permissions.append(permission)
self.failed_permissions.append(permission)

if failed_permissions:
if self.failed_permissions:
_LOGGER.warning(
"Minimum required permissions: '%s'. Not available in token '%s' for account '%s'.",
", ".join(failed_permissions),
TOKEN_ERROR_PERMISSIONS,
", ".join(self.failed_permissions),
self.token_filename,
self._config[CONF_ENTITY_NAME],
)
return TOKEN_FILE_PERMISSIONS, failed_permissions
return TOKEN_FILE_PERMISSIONS

return True, None
return False

def validate_authorization(self, permission):
"""Validate higher permissions."""
Expand All @@ -111,7 +127,7 @@ def validate_authorization(self, permission):
sharedpermission = f"{deepcopy(permission)}.Shared"
return self._check_higher_permissions(sharedpermission)
# If Presence Resource then permissions can have a constraint of .All
# which includes base as well. e.g. Presencedar.Read is also enabled by Presence.Read.All
# which includes base as well. e.g. Presence.Read is also enabled by Presence.Read.All
if not constraint and resource in ["Presence"]:
allpermission = f"{deepcopy(permission)}.All"
return self._check_higher_permissions(allpermission)
Expand All @@ -120,17 +136,14 @@ def validate_authorization(self, permission):

def _check_higher_permissions(self, permission):
operation = permission.split(".")[1]
# If Operation is Send there are no alternatives
# If Operation is ReadBasic then Read or ReadWrite will also work
# If Operation is Read then ReadWrite will also work
if operation == "Send":
newops = ["Send"]
elif operation == "ReadBasic":
newops = ["ReadBasic", "Read", "ReadWrite"]
newops = [operation]
if operation == "ReadBasic":
newops = newops + ["Read", "ReadWrite"]
elif operation == "Read":
newops = ["Read", "ReadWrite"]
else:
newops = []
newops = newops + ["ReadWrite"]

for newop in newops:
newperm = deepcopy(permission).replace(operation, newop)
if newperm in self.permissions:
Expand All @@ -144,30 +157,29 @@ def build_token_filename(self):

def _get_permissions(self):
"""Get the permissions from the token file."""
full_token_path = os.path.join(self.token_path, self.token_filename)
if not os.path.exists(full_token_path) or not os.path.isfile(full_token_path):
_LOGGER.warning("Could not locate token at %s", full_token_path)
return TOKEN_FILE_MISSING
try:
with open(full_token_path, "r", encoding="UTF-8") as file_handle:
raw = file_handle.read()
permissions = json.loads(raw)["scope"]
except json.decoder.JSONDecodeError as err:

scopes = self.token_backend.get_token_scopes()
if scopes is None:
_LOGGER.warning(
(
"Token corrupted for integration %s, unique identifier %s, "
+ "please re-configure and re-authenticate - %s"
),
TOKEN_ERROR_CORRUPT,
DOMAIN,
self._config[CONF_ENTITY_NAME],
err,
"No permissions",
)
return TOKEN_FILE_CORRUPTED
return TOKEN_FILE_CORRUPTED, None

return permissions
return False, scopes

def delete_token(self):
"""Delete the token."""
full_token_path = os.path.join(self.token_path, self.token_filename)
if os.path.exists(full_token_path):
os.remove(full_token_path)

def check_token_exists(self):
"""Check if token file exists.."""
full_token_path = os.path.join(self.token_path, self.token_filename)
if not os.path.exists(full_token_path) or not os.path.isfile(full_token_path):
_LOGGER.warning(TOKEN_ERROR_MISSING, full_token_path)
return False
return True
Loading

0 comments on commit aa59f4d

Please sign in to comment.