forked from SteinRobert/homeassistant-vorwerk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
vacuum.py
237 lines (202 loc) · 7.1 KB
/
vacuum.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
"""Support for Neato Connected Vacuums."""
from __future__ import annotations
import logging
from typing import Any
from pybotvac import Robot
from pybotvac.exceptions import NeatoRobotException
import voluptuous as vol
from homeassistant.components.vacuum import (
ATTR_STATUS,
STATE_CLEANING,
STATE_DOCKED,
STATE_IDLE,
STATE_PAUSED,
SUPPORT_BATTERY,
SUPPORT_CLEAN_SPOT,
SUPPORT_LOCATE,
SUPPORT_PAUSE,
SUPPORT_RETURN_HOME,
SUPPORT_START,
SUPPORT_STATE,
SUPPORT_STOP,
StateVacuumEntity,
)
from homeassistant.const import ATTR_MODE
from homeassistant.helpers import config_validation as cv, entity_platform
from homeassistant.helpers.entity import DeviceInfo
from homeassistant.helpers.update_coordinator import (
CoordinatorEntity,
DataUpdateCoordinator,
)
from . import VorwerkState
from .const import (
ATTR_CATEGORY,
ATTR_NAVIGATION,
ATTR_ZONE,
VORWERK_DOMAIN,
VORWERK_ROBOT_API,
VORWERK_ROBOT_COORDINATOR,
VORWERK_ROBOTS,
)
_LOGGER = logging.getLogger(__name__)
SUPPORT_VORWERK = (
SUPPORT_BATTERY
| SUPPORT_PAUSE
| SUPPORT_RETURN_HOME
| SUPPORT_STOP
| SUPPORT_START
| SUPPORT_CLEAN_SPOT
| SUPPORT_STATE
| SUPPORT_LOCATE
)
async def async_setup_entry(hass, entry, async_add_entities):
"""Set up Vorwerk vacuum with config entry."""
_LOGGER.debug("Adding vorwerk vacuums " + str(entry.entry_id))
async_add_entities(
[
VorwerkConnectedVacuum(
robot[VORWERK_ROBOT_API], robot[VORWERK_ROBOT_COORDINATOR]
)
for robot in hass.data[VORWERK_DOMAIN][entry.entry_id][VORWERK_ROBOTS]
],
True,
)
platform = entity_platform.current_platform.get()
assert platform is not None
platform.async_register_entity_service(
"custom_cleaning",
{
vol.Optional(ATTR_MODE, default=2): cv.positive_int,
vol.Optional(ATTR_NAVIGATION, default=1): cv.positive_int,
vol.Optional(ATTR_CATEGORY, default=4): cv.positive_int,
vol.Optional(ATTR_ZONE): cv.string,
},
"vorwerk_custom_cleaning",
)
class VorwerkConnectedVacuum(CoordinatorEntity, StateVacuumEntity):
"""Representation of a Vorwerk Connected Vacuum."""
def __init__(
self, robot_state: VorwerkState, coordinator: DataUpdateCoordinator[Any]
) -> None:
"""Initialize the Vorwerk Connected Vacuum."""
super().__init__(coordinator)
self.robot: Robot = robot_state.robot
self._state: VorwerkState = robot_state
self._name = f"{self.robot.name}"
self._robot_serial = self.robot.serial
self._robot_boundaries: list = []
@property
def name(self) -> str:
"""Return the name of the device."""
return self._name
@property
def supported_features(self) -> int:
"""Flag vacuum cleaner robot features that are supported."""
return SUPPORT_VORWERK
@property
def battery_level(self) -> int | None:
"""Return the battery level of the vacuum cleaner."""
return int(self._state.battery_level) if self._state.battery_level else None
@property
def available(self) -> bool:
"""Return if the robot is available."""
return self._state.available
@property
def icon(self) -> str:
"""Return specific icon."""
return "mdi:robot-vacuum-variant"
@property
def state(self) -> str | None:
"""Return the status of the vacuum cleaner."""
return self._state.state if self._state else None
@property
def unique_id(self) -> str:
"""Return a unique ID."""
return self._robot_serial
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return the state attributes of the vacuum cleaner."""
data: dict[str, Any] = {}
if self._state.status is not None:
data[ATTR_STATUS] = self._state.status
return data
@property
def device_info(self) -> DeviceInfo:
"""Device info for robot."""
return self._state.device_info
def start(self) -> None:
"""Start cleaning or resume cleaning."""
if not self._state:
return
try:
if self._state.state == STATE_IDLE or self._state.state == STATE_DOCKED:
self.robot.start_cleaning()
elif self._state.state == STATE_PAUSED:
self.robot.resume_cleaning()
except NeatoRobotException as ex:
_LOGGER.error(
"Vorwerk vacuum connection error for '%s': %s", self.entity_id, ex
)
def pause(self) -> None:
"""Pause the vacuum."""
try:
self.robot.pause_cleaning()
except NeatoRobotException as ex:
_LOGGER.error(
"Vorwerk vacuum connection error for '%s': %s", self.entity_id, ex
)
def return_to_base(self, **kwargs: Any) -> None:
"""Set the vacuum cleaner to return to the dock."""
try:
if self._state.state == STATE_CLEANING:
self.robot.pause_cleaning()
self.robot.send_to_base()
except NeatoRobotException as ex:
_LOGGER.error(
"Vorwerk vacuum connection error for '%s': %s", self.entity_id, ex
)
def stop(self, **kwargs: Any) -> None:
"""Stop the vacuum cleaner."""
try:
self.robot.stop_cleaning()
except NeatoRobotException as ex:
_LOGGER.error(
"Vorwerk vacuum connection error for '%s': %s", self.entity_id, ex
)
def locate(self, **kwargs: Any) -> None:
"""Locate the robot by making it emit a sound."""
try:
self.robot.locate()
except NeatoRobotException as ex:
_LOGGER.error(
"Vorwerk vacuum connection error for '%s': %s", self.entity_id, ex
)
def clean_spot(self, **kwargs: Any) -> None:
"""Run a spot cleaning starting from the base."""
try:
self.robot.start_spot_cleaning()
except NeatoRobotException as ex:
_LOGGER.error(
"Vorwerk vacuum connection error for '%s': %s", self.entity_id, ex
)
def vorwerk_custom_cleaning(
self, mode: str, navigation: str, category: str, zone: str | None = None
) -> None:
"""Zone cleaning service call."""
boundary_id = None
if zone is not None:
for boundary in self._robot_boundaries:
if zone in boundary["name"]:
boundary_id = boundary["id"]
if boundary_id is None:
_LOGGER.error(
"Zone '%s' was not found for the robot '%s'", zone, self.entity_id
)
return
_LOGGER.info("Start cleaning zone '%s' with robot %s", zone, self.entity_id)
try:
self.robot.start_cleaning(mode, navigation, category, boundary_id)
except NeatoRobotException as ex:
_LOGGER.error(
"Vorwerk vacuum connection error for '%s': %s", self.entity_id, ex
)