Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds ability to store attributes in their own columns #33

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ qss:
(map)(Required)
Enables the qss integration. Only allowed once.

split_attributes:
(bool)(Optional)
If set to "true", stores each event attribute in its own column. Otherwise, stores all attributes as a dict in the "attrs" column. (Defaults to False)

host:
(string)(Required)
The URL or IP Address that points to your QuestDB database.
Expand Down
11 changes: 9 additions & 2 deletions custom_components/qss/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
CONF_AUTH_Y_KEY,
CONF_HOST,
CONF_PORT,
CONF_SPLIT_ATTRIBUTES,
DOMAIN,
)
from .event_handling import (
Expand Down Expand Up @@ -57,6 +58,7 @@
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_PORT): cv.positive_int,
vol.Optional(CONF_AUTH, default={}): AUTHENTICATION_SCHEMA,
vol.Optional(CONF_SPLIT_ATTRIBUTES, default=False): cv.boolean,
}
)
},
Expand All @@ -79,7 +81,9 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
auth_y_key = conf.get(CONF_AUTH).get(CONF_AUTH_Y_KEY)
db_auth = (auth_kid, auth_d_key, auth_x_key, auth_y_key)

instance = QuestDB(hass=hass, host=db_host, port=db_port, entity_filter=entity_filter, auth=db_auth)
split_attrs = conf.get(CONF_SPLIT_ATTRIBUTES)

instance = QuestDB(hass=hass, host=db_host, port=db_port, entity_filter=entity_filter, auth=db_auth, split_attributes=split_attrs)
instance.async_initialize()
instance.start()

Expand All @@ -96,6 +100,7 @@ def __init__( # noqa: PLR0913
port: int,
entity_filter: Callable[[str], bool],
auth: tuple,
split_attributes: bool,
) -> None:
"""Initialize qss."""
threading.Thread.__init__(self, name="QSS")
Expand All @@ -106,6 +111,8 @@ def __init__( # noqa: PLR0913
self.entity_filter = entity_filter
self.auth = auth

self.split_attributes = split_attributes

self.queue: Any = queue.Queue()
self.qss_ready = asyncio.Future()

Expand Down Expand Up @@ -153,7 +160,7 @@ def notify_hass_started(event: Event): # noqa: ARG001
while True:
event = get_event_from_queue(self.queue)
finish_task_if_empty_event(event, self.queue)
insert_event_data_into_questdb(self.host, self.port, self.auth, event, self.queue)
insert_event_data_into_questdb(self.host, self.port, self.auth, event, self.queue, self.split_attributes)

@callback
def event_listener(self, event: Event):
Expand Down
2 changes: 2 additions & 0 deletions custom_components/qss/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
CONF_AUTH_X_KEY = "x_key"
CONF_AUTH_Y_KEY = "y_key"

CONF_SPLIT_ATTRIBUTES = "split_attributes"

RETRY_WAIT_SECONDS = 5
RETRY_ATTEMPTS = 10

Expand Down
36 changes: 19 additions & 17 deletions custom_components/qss/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,40 +12,42 @@
_LOGGER = logging.getLogger(__name__)


def _insert_row_with_auth(host: str, port: int, auth: tuple, event: Event) -> None:
def _insert_row_with_auth(host: str, port: int, auth: tuple, event: Event, split_attributes: bool) -> None:
with Sender(host, port, auth=auth, tls=True) as sender:
entity_id = event.data["entity_id"]
state = event.data.get("new_state")
attrs = dict(state.attributes)
columns = {"state": state.state}
if split_attributes:
columns.update(state.attributes)
else:
columns["attrs"] = dumps(dict(state.attributes), sort_keys=True, default=str)
sender.row(
"qss",
symbols={
"entity_id": entity_id,
},
columns={
"state": state.state,
"attributes": dumps(attrs, sort_keys=True, default=str),
},
columns=columns,
at=event.time_fired,
)

sender.flush()


def _insert_row_without_auth(host: str, port: int, event: Event) -> None:
def _insert_row_without_auth(host: str, port: int, event: Event, split_attributes: bool) -> None:
with Sender(host, port) as sender:
entity_id = event.data["entity_id"]
state = event.data.get("new_state")
attrs = dict(state.attributes)
columns = {"state": state.state}
if split_attributes:
columns.update(state.attributes)
else:
columns["attrs"] = dumps(dict(state.attributes), sort_keys=True, default=str)
sender.row(
"qss",
symbols={
"entity_id": entity_id,
},
columns={
"state": state.state,
"attributes": dumps(attrs, sort_keys=True, default=str),
},
columns=columns,
at=event.time_fired,
)

Expand All @@ -57,15 +59,15 @@ def _insert_row_without_auth(host: str, port: int, event: Event) -> None:
wait=wait_fixed(RETRY_WAIT_SECONDS),
retry=retry_if_exception_type(IngressError),
)
def _retry_data_insertion(host: str, port: int, auth: tuple, event: Event) -> None:
def _retry_data_insertion(host: str, port: int, auth: tuple, event: Event, split_attributes: bool) -> None:
"""Use a retry for inserting event data into QuestDB."""
if all(auth):
_insert_row_with_auth(host, port, auth, event)
_insert_row_with_auth(host, port, auth, event, split_attributes)
else:
_insert_row_without_auth(host, port, event)
_insert_row_without_auth(host, port, event, split_attributes)


def insert_event_data_into_questdb(host: str, port: int, auth: tuple, event: Event, queue: Queue) -> None:
def insert_event_data_into_questdb(host: str, port: int, auth: tuple, event: Event, queue: Queue, split_attributes: bool) -> None:
"""Insert given event data into QuestDB."""
_retry_data_insertion(host, port, auth, event)
_retry_data_insertion(host, port, auth, event, split_attributes)
queue.task_done()
Loading