Skip to content

Commit

Permalink
maint: Re-organise final sensors
Browse files Browse the repository at this point in the history
  • Loading branch information
RogerSelwyn committed Oct 30, 2023
1 parent 19b116f commit c09b5f5
Show file tree
Hide file tree
Showing 7 changed files with 265 additions and 249 deletions.
211 changes: 110 additions & 101 deletions custom_components/o365/classes/mailsensor.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
"""O365 mail sensors."""
import datetime
from operator import itemgetter

from homeassistant.components.sensor import SensorEntity

from O365 import mailbox # pylint: disable=no-name-in-module

from ..const import (
ATTR_ATTRIBUTES,
ATTR_AUTOREPLIESSETTINGS,
ATTR_DATA,
ATTR_END,
ATTR_EXTERNAL_AUDIENCE,
ATTR_EXTERNALREPLY,
Expand All @@ -21,133 +22,57 @@
CONF_IMPORTANCE,
CONF_IS_UNREAD,
CONF_MAIL_FROM,
CONF_MAX_ITEMS,
CONF_SUBJECT_CONTAINS,
CONF_SUBJECT_IS,
DATETIME_FORMAT,
PERM_MAILBOX_SETTINGS,
PERM_MINIMUM_MAILBOX_SETTINGS,
SENSOR_AUTO_REPLY,
SENSOR_MAIL,
SENSOR_EMAIL,
)
from ..utils.utils import clean_html
from ..utils.utils import clean_html, get_email_attributes
from .sensorentity import O365Sensor


class O365MailSensor(O365Sensor):
"""O365 generic Mail Sensor class."""

def __init__(
self, coordinator, config, sensor_conf, mail_folder, name, entity_id, unique_id
):
def __init__(self, coordinator, config, sensor_conf, name, entity_id, unique_id):
"""Initialise the O365 Sensor."""
super().__init__(coordinator, config, name, entity_id, SENSOR_MAIL, unique_id)
self.mail_folder = mail_folder
super().__init__(coordinator, config, name, entity_id, SENSOR_EMAIL, unique_id)
self.download_attachments = sensor_conf.get(CONF_DOWNLOAD_ATTACHMENTS)
self.html_body = sensor_conf.get(CONF_HTML_BODY)
self.max_items = sensor_conf.get(CONF_MAX_ITEMS, 5)
self.query = self.mail_folder.new_query()
self.query = self.query.select(
"sender",
"from",
"subject",
"body",
"receivedDateTime",
"toRecipients",
"ccRecipients",
"has_attachments",
"importance",
"is_read",
)
if self.download_attachments:
self.query = self.query.select(
"attachments",
)
self._config = config
self._state = None
self._extra_attributes = None

@property
def icon(self):
"""Entity icon."""
return "mdi:microsoft-outlook"

@property
def state(self):
"""Sensor state."""
return self._state

@property
def extra_state_attributes(self):
"""Device state attributes."""
return self.coordinator.data[self.entity_key][ATTR_ATTRIBUTES]


class O365QuerySensor(O365MailSensor, SensorEntity):
"""O365 Query sensor processing."""

def __init__(
self, coordinator, config, sensor_conf, mail_folder, name, entity_id, unique_id
):
"""Initialise the O365 Query."""
super().__init__(
coordinator, config, sensor_conf, mail_folder, name, entity_id, unique_id
)

self.query.order_by("receivedDateTime", ascending=False)

self._build_query(sensor_conf)

def _build_query(self, sensor_conf):
body_contains = sensor_conf.get(CONF_BODY_CONTAINS)
subject_contains = sensor_conf.get(CONF_SUBJECT_CONTAINS)
subject_is = sensor_conf.get(CONF_SUBJECT_IS)
has_attachment = sensor_conf.get(CONF_HAS_ATTACHMENT)
importance = sensor_conf.get(CONF_IMPORTANCE)
email_from = sensor_conf.get(CONF_MAIL_FROM)
is_unread = sensor_conf.get(CONF_IS_UNREAD)
if (
body_contains is not None
or subject_contains is not None
or subject_is is not None
or has_attachment is not None
or importance is not None
or email_from is not None
or is_unread is not None
):
self._add_to_query("ge", "receivedDateTime", datetime.datetime(1900, 5, 1))
self._add_to_query("contains", "body", body_contains)
self._add_to_query("contains", "subject", subject_contains)
self._add_to_query("equals", "subject", subject_is)
self._add_to_query("equals", "hasAttachments", has_attachment)
self._add_to_query("equals", "from", email_from)
self._add_to_query("equals", "IsRead", not is_unread, is_unread)
self._add_to_query("equals", "importance", importance)

def _add_to_query(self, qtype, attribute_name, attribute_value, check_value=True):
if attribute_value is None or check_value is None:
return

if qtype == "ge":
self.query.chain("and").on_attribute(attribute_name).greater_equal(
attribute_value
)
if qtype == "contains":
self.query.chain("and").on_attribute(attribute_name).contains(
attribute_value
)
if qtype == "equals":
self.query.chain("and").on_attribute(attribute_name).equals(attribute_value)


class O365EmailSensor(O365MailSensor, SensorEntity):
"""O365 Email sensor processing."""

def __init__(
self, coordinator, config, sensor_conf, mail_folder, name, entity_id, unique_id
):
"""Initialise the O365 Email sensor."""
super().__init__(
coordinator, config, sensor_conf, mail_folder, name, entity_id, unique_id
)
return self._extra_attributes

is_unread = sensor_conf.get(CONF_IS_UNREAD)
def _handle_coordinator_update(self) -> None:
data = self.coordinator.data[self.entity_key][ATTR_DATA]
attrs = self._get_attributes(data)
attrs.sort(key=itemgetter("received"), reverse=True)
self._state = len(attrs)
self._extra_attributes = {ATTR_DATA: attrs}
self.async_write_ha_state()

if is_unread is not None:
self.query.chain("and").on_attribute("IsRead").equals(not is_unread)
def _get_attributes(self, data):
return [
get_email_attributes(x, self.download_attachments, self.html_body)
for x in data
]


class O365AutoReplySensor(O365Sensor, SensorEntity):
Expand Down Expand Up @@ -208,3 +133,87 @@ def _validate_autoreply_permissions(self):
"Not authorisied to update auto reply - requires permission: "
+ f"{PERM_MAILBOX_SETTINGS}",
)


def _build_base_query(mail_folder, sensor_conf):
"""Build base query for mail."""
download_attachments = sensor_conf.get(CONF_DOWNLOAD_ATTACHMENTS)
query = mail_folder.new_query()
query = query.select(
"sender",
"from",
"subject",
"body",
"receivedDateTime",
"toRecipients",
"ccRecipients",
"has_attachments",
"importance",
"is_read",
)
if download_attachments:
query = query.select(
"attachments",
)
return query


def build_inbox_query(mail_folder, sensor_conf):
"""Build query for email sensor."""
query = _build_base_query(mail_folder, sensor_conf)

is_unread = sensor_conf.get(CONF_IS_UNREAD)

if is_unread is not None:
query.chain("and").on_attribute("IsRead").equals(not is_unread)

return query


def build_query_query(mail_folder, sensor_conf):
"""Build query for query sensor."""
query = _build_base_query(mail_folder, sensor_conf)
query.order_by("receivedDateTime", ascending=False)

body_contains = sensor_conf.get(CONF_BODY_CONTAINS)
subject_contains = sensor_conf.get(CONF_SUBJECT_CONTAINS)
subject_is = sensor_conf.get(CONF_SUBJECT_IS)
has_attachment = sensor_conf.get(CONF_HAS_ATTACHMENT)
importance = sensor_conf.get(CONF_IMPORTANCE)
email_from = sensor_conf.get(CONF_MAIL_FROM)
is_unread = sensor_conf.get(CONF_IS_UNREAD)
if (
body_contains is not None
or subject_contains is not None
or subject_is is not None
or has_attachment is not None
or importance is not None
or email_from is not None
or is_unread is not None
):
query = _add_to_query(
query, "ge", "receivedDateTime", datetime.datetime(1900, 5, 1)
)
query = _add_to_query(query, "contains", "body", body_contains)
query = _add_to_query(query, "contains", "subject", subject_contains)
query = _add_to_query(query, "equals", "subject", subject_is)
query = _add_to_query(query, "equals", "hasAttachments", has_attachment)
query = _add_to_query(query, "equals", "from", email_from)
query = _add_to_query(query, "equals", "IsRead", not is_unread, is_unread)
query = _add_to_query(query, "equals", "importance", importance)

return query


def _add_to_query(query, qtype, attribute_name, attribute_value, check_value=True):
if attribute_value is None or check_value is None:
return

if qtype == "ge":
query.chain("and").on_attribute(attribute_name).greater_equal(attribute_value)
if qtype == "contains":
query.chain("and").on_attribute(attribute_name).contains(attribute_value)
if qtype == "equals":
query.chain("and").on_attribute(attribute_name).equals(attribute_value)

return query
Loading

0 comments on commit c09b5f5

Please sign in to comment.