diff --git a/custom_components/mqtt_vacuum_camera/manifest.json b/custom_components/mqtt_vacuum_camera/manifest.json index 50f46dc6..08218915 100755 --- a/custom_components/mqtt_vacuum_camera/manifest.json +++ b/custom_components/mqtt_vacuum_camera/manifest.json @@ -8,5 +8,5 @@ "iot_class": "local_polling", "issue_tracker": "https://github.com/sca075/mqtt_vacuum_camera/issues", "requirements": ["pillow>=10.3.0,<10.5.0", "numpy"], - "version": "2024.08.1" + "version": "2024.08.2" } diff --git a/custom_components/mqtt_vacuum_camera/utils/auto_crop.py b/custom_components/mqtt_vacuum_camera/utils/auto_crop.py index 7383d84d..9168fb8b 100644 --- a/custom_components/mqtt_vacuum_camera/utils/auto_crop.py +++ b/custom_components/mqtt_vacuum_camera/utils/auto_crop.py @@ -1,5 +1,5 @@ """Auto Crop Class for trimming and zooming images. -Version: 2024.08.1""" +Version: 2024.08.2""" from __future__ import annotations @@ -68,7 +68,6 @@ def _calculate_trimmed_dimensions(self): - (self.imh.trim_up + self.imh.offset_top) ), ) - # Ensure shared reference dimensions are updated if hasattr(self.imh.shared, "image_ref_height") and hasattr( self.imh.shared, "image_ref_width" @@ -135,8 +134,6 @@ async def async_image_margins( self, image_array: NumpyArray, detect_colour: Color ) -> tuple[int, int, int, int]: """Crop the image based on the auto crop area.""" - """async_auto_trim_and_zoom_image""" - nonzero_coords = np.column_stack(np.where(image_array != list(detect_colour))) # Calculate the trim box based on the first and last occurrences min_y, min_x, _ = NumpyArray.min(nonzero_coords, axis=0) @@ -157,7 +154,6 @@ async def async_check_if_zoom_is_on( rand256: bool = False, ) -> NumpyArray: """Check if the image need to be zoom.""" - """async_auto_trim_and_zoom_image""" if ( zoom @@ -169,15 +165,17 @@ async def async_check_if_zoom_is_on( f"{self.file_name}: Zooming the image on room {self.imh.robot_in_room['room']}." ) if rand256: - trim_left = int(self.imh.robot_in_room["left"] / 10) - margin_size - trim_right = int(self.imh.robot_in_room["right"] / 10) + margin_size - trim_up = int(self.imh.robot_in_room["up"] / 10) - margin_size - trim_down = int(self.imh.robot_in_room["down"] / 10) + margin_size + trim_left = round(self.imh.robot_in_room["right"] / 10) - margin_size + trim_right = round(self.imh.robot_in_room["left"] / 10) + margin_size + trim_up = round(self.imh.robot_in_room["down"] / 10) - margin_size + trim_down = round(self.imh.robot_in_room["up"] / 10) + margin_size else: trim_left = self.imh.robot_in_room["left"] - margin_size trim_right = self.imh.robot_in_room["right"] + margin_size trim_up = self.imh.robot_in_room["up"] - margin_size trim_down = self.imh.robot_in_room["down"] + margin_size + trim_left, trim_right = sorted([trim_left, trim_right]) + trim_up, trim_down = sorted([trim_up, trim_down]) trimmed = image_array[trim_up:trim_down, trim_left:trim_right] else: # Apply the auto-calculated trims to the rotated image @@ -191,7 +189,6 @@ async def async_rotate_the_image( self, trimmed: NumpyArray, rotate: int ) -> NumpyArray: """Rotate the image and return the new array.""" - """async_auto_trim_and_zoom_image""" if rotate == 90: rotated = rot90(trimmed) self.imh.crop_area = [ diff --git a/custom_components/mqtt_vacuum_camera/utils/img_data.py b/custom_components/mqtt_vacuum_camera/utils/img_data.py index c506dabd..4f9bafbf 100755 --- a/custom_components/mqtt_vacuum_camera/utils/img_data.py +++ b/custom_components/mqtt_vacuum_camera/utils/img_data.py @@ -3,7 +3,7 @@ ImageData is part of the Image_Handler used functions to search data in the json provided for the creation of the new camera frame -Version: v2024.08.0 +Version: v2024.08.2 """ from __future__ import annotations @@ -86,9 +86,6 @@ def find_layers( if layer_type not in layer_dict: layer_dict[layer_type] = [] layer_dict[layer_type].append(json_obj.get("compressedPixels", [])) - # Hopefully will not brake anything. - # if layer_type == "floor": - # active_list.append("floor") if layer_type == "segment": active_list.append(int(active_type["active"])) @@ -190,7 +187,7 @@ async def async_get_rooms_coordinates( if rand: x, y, _ = entry # Extract x and y coordinates max_x = max(max_x, x) # Update max x coordinate - max_y = max(max_y, y) # Update max y coordinate + max_y = max(max_y, y + pixel_size) # Update max y coordinate min_x = min(min_x, x) # Update min x coordinate min_y = min(min_y, y) # Update min y coordinate else: diff --git a/custom_components/mqtt_vacuum_camera/valetudo/MQTT/connector.py b/custom_components/mqtt_vacuum_camera/valetudo/MQTT/connector.py index 5713af95..4e6ca88a 100755 --- a/custom_components/mqtt_vacuum_camera/valetudo/MQTT/connector.py +++ b/custom_components/mqtt_vacuum_camera/valetudo/MQTT/connector.py @@ -1,5 +1,5 @@ """ -Version: v2024.08.1 +Version: v2024.08.2 - Removed the PNG decode, the json is extracted from map-data instead of map-data-hass. - Tested no influence on the camera performance. - Added gzip library used in Valetudo RE data compression. @@ -270,20 +270,23 @@ async def rrm_handle_active_segments(self, msg) -> None: if command == "segmented_cleanup": segment_ids = command_status.get("segment_ids", []) - # Retrieve room data from RoomStore - rooms_data = await RoomStore().async_get_rooms_data(self._file_name) - # Sort the rooms data by room ID same as rooms data in attributes. - rooms_data = dict(sorted(rooms_data.items(), key=lambda item: int(item[0]))) - rrm_active_segments = [0] * len( - rooms_data - ) # Initialize based on the number of rooms + # Retrieve the shared room data instead of RoomStore or destinations + shared_rooms_data = self._shared.map_rooms + + # Create a mapping of room ID to its index based on the shared rooms data + room_id_to_index = { + room_id: idx for idx, room_id in enumerate(shared_rooms_data) + } + + # Initialize rrm_active_segments with zeros based on the number of rooms in shared_rooms_data + rrm_active_segments = [0] * len(shared_rooms_data) + + # Update the rrm_active_segments based on segment_ids for segment_id in segment_ids: - room_name = rooms_data.get(str(segment_id)) - if room_name: - # Convert room ID to index; since dict doesn't preserve order, find index manually - room_idx = list(rooms_data.keys()).index(str(segment_id)) - rrm_active_segments[room_idx] = 1 + room_index = room_id_to_index.get(segment_id) + if room_index is not None: + rrm_active_segments[room_index] = 1 self._shared.rand256_active_zone = rrm_active_segments _LOGGER.debug(f"Updated Active Segments: {rrm_active_segments}") diff --git a/custom_components/mqtt_vacuum_camera/valetudo/rand256/image_handler.py b/custom_components/mqtt_vacuum_camera/valetudo/rand256/image_handler.py index d374a40c..80ee9fe6 100755 --- a/custom_components/mqtt_vacuum_camera/valetudo/rand256/image_handler.py +++ b/custom_components/mqtt_vacuum_camera/valetudo/rand256/image_handler.py @@ -2,7 +2,7 @@ Image Handler Module for Valetudo Re Vacuums. It returns the PIL PNG image frame relative to the Map Data extrapolated from the vacuum json. It also returns calibration, rooms data to the card and other images information to the camera. -Version: v2024.08.1 +Version: v2024.08.2 """ from __future__ import annotations @@ -366,28 +366,25 @@ def _check_robot_position(x: int, y: int) -> bool: "angle": angle, "in_room": self.robot_in_room["room"], } - # ##Still working on this count to fix it on 2024.08.2 - # ##The issue is now that somehow the trimming dimensions are not okay. - # ##This cause the camera to crash when resizing the image. - _LOGGER.info(f"{self.file_name}: Auto Zoom is currently disabled.") - # self.active_zones = self.shared.rand256_active_zone - # if self.active_zones and ( - # self.robot_in_room["id"] in range(len(self.active_zones)) - # ): # issue #100 Index out of range - # self.zooming = bool(self.active_zones[self.robot_in_room["id"]]) - # else: - # self.zooming = False + self.active_zones = self.shared.rand256_active_zone + if self.active_zones and ( + (self.robot_in_room["id"]) in range(len(self.active_zones)) + ): # issue #100 Index out of range + self.zooming = bool(self.active_zones[(self.robot_in_room["id"])]) + else: + self.zooming = False return temp # else we need to search and use the async method _LOGGER.debug(f"{self.file_name} changed room.. searching..") - room_count = 0 + room_count = -1 last_room = None if self.rooms_pos: if self.robot_in_room: last_room = self.robot_in_room for room in self.rooms_pos: corners = room["corners"] + room_count += 1 self.robot_in_room = { "id": room_count, "left": corners[0][0], @@ -396,7 +393,6 @@ def _check_robot_position(x: int, y: int) -> bool: "down": corners[2][1], "room": room["name"], } - room_count += 1 # Check if the robot coordinates are inside the room's corners if _check_robot_position(robot_x, robot_y): temp = { diff --git a/custom_components/mqtt_vacuum_camera/valetudo/rand256/reimg_draw.py b/custom_components/mqtt_vacuum_camera/valetudo/rand256/reimg_draw.py index 0054535c..c5d7b198 100644 --- a/custom_components/mqtt_vacuum_camera/valetudo/rand256/reimg_draw.py +++ b/custom_components/mqtt_vacuum_camera/valetudo/rand256/reimg_draw.py @@ -1,7 +1,7 @@ """ Image Draw Class for Valetudo Rand256 Image Handling. This class is used to simplify the ImageHandler class. -Version: 2024.08.1 +Version: 2024.08.2 """ from __future__ import annotations @@ -144,7 +144,6 @@ async def _draw_segments( room_id = 0 rooms_list = [color_wall] - _LOGGER.info(f"{self.file_name}: Drawing segments. {len(segment_data)}") if not segment_data: _LOGGER.info(f"{self.file_name}: No segments data found.") return room_id, img_np_array @@ -154,10 +153,6 @@ async def _draw_segments( for pixels in segment_data: room_color = self.img_h.shared.rooms_colors[room_id] rooms_list.append(room_color) - _LOGGER.debug( - f"Room {room_id} color: {room_color}, " - f"{tuple(self.img_h.active_zones)}" - ) if ( self.img_h.active_zones and len(self.img_h.active_zones) > room_id diff --git a/custom_components/mqtt_vacuum_camera/valetudo/rand256/rrparser.py b/custom_components/mqtt_vacuum_camera/valetudo/rand256/rrparser.py index 5b963c70..226cbb10 100755 --- a/custom_components/mqtt_vacuum_camera/valetudo/rand256/rrparser.py +++ b/custom_components/mqtt_vacuum_camera/valetudo/rand256/rrparser.py @@ -1,474 +1,392 @@ """ -Version: v2024.04 +Version: v2024.08.2 - This parser is the python version of @rand256 valetudo_mapper. -- This class is extracting the vacuum map_data. +- This class is extracting the vacuum binary map_data. - Additional functions are to get in our image_handler the images datas. """ +from enum import Enum import math import struct +from typing import Any, Dict, List, Optional from homeassistant.core import callback +# noinspection PyTypeChecker class RRMapParser: + """Parse the map data from the Rand256 vacuum.""" + def __init__(self): self.map_data = None - TOOLS = {"DIMENSION_PIXELS": 1024, "DIMENSION_MM": 50 * 1024} - - TYPES = { - "CHARGER_LOCATION": 1, - "IMAGE": 2, - "PATH": 3, - "GOTO_PATH": 4, - "GOTO_PREDICTED_PATH": 5, - "CURRENTLY_CLEANED_ZONES": 6, - "GOTO_TARGET": 7, - "ROBOT_POSITION": 8, - "FORBIDDEN_ZONES": 9, - "VIRTUAL_WALLS": 10, - "CURRENTLY_CLEANED_BLOCKS": 11, - "FORBIDDEN_MOP_ZONES": 12, - "DIGEST": 1024, - } + class Tools: + """Tools for the RRMapParser.""" + + DIMENSION_PIXELS = 1024 + DIMENSION_MM = 50 * 1024 + + class Types(Enum): + """Types of blocks in the RRMapParser.""" + + CHARGER_LOCATION = 1 + IMAGE = 2 + PATH = 3 + GOTO_PATH = 4 + GOTO_PREDICTED_PATH = 5 + CURRENTLY_CLEANED_ZONES = 6 + GOTO_TARGET = 7 + ROBOT_POSITION = 8 + FORBIDDEN_ZONES = 9 + VIRTUAL_WALLS = 10 + CURRENTLY_CLEANED_BLOCKS = 11 + FORBIDDEN_MOP_ZONES = 12 + DIGEST = 1024 @staticmethod - def parseBlock(buf, offset, result=None, pixels=False): + def parse_block( + buf: bytes, + offset: int, + result: Optional[Dict[int, Any]] = None, + pixels: bool = False, + ) -> Dict[int, Any]: + """Parse a block of data from the map data.""" result = result or {} if len(buf) <= offset: return result - g3offset = 0 - type_ = struct.unpack("= 12 else 0 ), } - elif type_ == RRMapParser.TYPES["IMAGE"]: - if hlength > 24: - g3offset = 4 - parameters = { - "segments": { - "count": ( - struct.unpack(" 0 - and parameters["dimensions"]["width"] > 0 - ): - for i in range(length): - segment_type = ( + elif type_ == RRMapParser.Types.CURRENTLY_CLEANED_ZONES.value: + result[type_] = RRMapParser._parse_cleaned_zones(buf, offset, length) + elif type_ in ( + RRMapParser.Types.FORBIDDEN_ZONES.value, + RRMapParser.Types.FORBIDDEN_MOP_ZONES.value, + RRMapParser.Types.VIRTUAL_WALLS.value, + ): + result[type_] = RRMapParser._parse_forbidden_zones(buf, offset, length) + return RRMapParser.parse_block(buf, offset + length + hlength, result, pixels) + + @staticmethod + def _parse_image_block( + buf: bytes, + offset: int, + length: int, + hlength: int, + result: Dict[int, Any], + pixels: bool, + ) -> None: + """Parse the image block of the map data.""" + g3offset = 4 if hlength > 24 else 0 + parameters = { + "segments": { + "count": ( + struct.unpack(" 0 + and parameters["dimensions"]["width"] > 0 + ): + for i in range(length): + segment_type = ( + struct.unpack( + "> 3 ) - if segment_type == 0: - continue - elif segment_type == 1: + if s == 0 and pixels: + parameters["pixels"]["floor"].append(i) + elif s != 0: + if s not in parameters["segments"]["id"]: + parameters["segments"]["id"].append(s) + parameters["segments"]["pixels_seg_" + str(s)] = [] if pixels: - parameters["pixels"]["walls"].append(i) - else: - if pixels: - s = ( - struct.unpack( - "> 3 - if s == 0: - parameters["pixels"]["floor"].append(i) - if s != 0: - if s not in parameters["segments"]["id"]: - parameters["segments"]["id"].append(s) - parameters["segments"]["pixels_seg_" + str(s)] = [] - if pixels: - parameters["segments"][ - "pixels_seg_" + str(s) - ].append(i) - result[type_] = parameters - elif type_ in [ - RRMapParser.TYPES["PATH"], - RRMapParser.TYPES["GOTO_PATH"], - RRMapParser.TYPES["GOTO_PREDICTED_PATH"], - ]: - points = [] - for i in range(0, length, 4): - points.append( - [ - struct.unpack(" Dict[str, Any]: + """Parse a path block of the map data.""" + points = [ + [ + struct.unpack(" List[List[int]]: + """Parse the cleaned zones block of the map data.""" + zone_count = struct.unpack(" 0: - for i in range(0, length, 8): - zones.append( - [ - struct.unpack( - " 0 + else [] + ) - elif type_ in [ - RRMapParser.TYPES["FORBIDDEN_ZONES"], - RRMapParser.TYPES["FORBIDDEN_MOP_ZONES"], - RRMapParser.TYPES["VIRTUAL_WALLS"], - ]: - forbiddenZoneCount = struct.unpack( - " 0: - for i in range(0, length, 16): - forbiddenZones.append( - [ - struct.unpack( - " List[List[int]]: + """Parse the forbidden zones block of the map data.""" + zone_count = struct.unpack(" 0 + else [] + ) @callback - def PARSE(self, mapBuf): - if mapBuf[0x00] == 0x72 and mapBuf[0x01] == 0x72: - parsedMapData = { - "header_length": struct.unpack(" Dict[str, Any]: + """Parse the map data.""" + if map_buf[0:2] == b"rr": + return { + "header_length": struct.unpack(" Optional[Dict[str, Any]]: + """Parse the complete map data.""" + if not self.PARSE(map_buf).get("map_index"): return None - else: - parsedMapData = {} - blocks = self.parseBlock(mapBuf, 0x14, None, pixels) - if blocks[RRMapParser.TYPES["IMAGE"]]: - parsedMapData["image"] = blocks[RRMapParser.TYPES["IMAGE"]] + parsed_map_data = {} + blocks = self.parse_block(map_buf, 0x14, None, pixels) + + if RRMapParser.Types.IMAGE.value in blocks: + parsed_map_data["image"] = blocks[RRMapParser.Types.IMAGE.value] for item in [ - {"type": RRMapParser.TYPES["PATH"], "path": "path"}, + {"type": RRMapParser.Types.PATH.value, "path": "path"}, { - "type": RRMapParser.TYPES["GOTO_PREDICTED_PATH"], + "type": RRMapParser.Types.GOTO_PREDICTED_PATH.value, "path": "goto_predicted_path", }, ]: if item["type"] in blocks: - parsedMapData[item["path"]] = blocks[item["type"]] - parsedMapData[item["path"]]["points"] = [ - [point[0], RRMapParser.TOOLS["DIMENSION_MM"] - point[1]] - for point in parsedMapData[item["path"]]["points"] + parsed_map_data[item["path"]] = blocks[item["type"]] + parsed_map_data[item["path"]]["points"] = [ + [point[0], RRMapParser.Tools.DIMENSION_MM - point[1]] + for point in parsed_map_data[item["path"]]["points"] ] - if len(parsedMapData[item["path"]]["points"]) >= 2: - parsedMapData[item["path"]]["current_angle"] = math.degrees( + if len(parsed_map_data[item["path"]]["points"]) >= 2: + parsed_map_data[item["path"]]["current_angle"] = math.degrees( math.atan2( - parsedMapData[item["path"]]["points"][-1][1] - - parsedMapData[item["path"]]["points"][-2][1], - parsedMapData[item["path"]]["points"][-1][0] - - parsedMapData[item["path"]]["points"][-2][0], + parsed_map_data[item["path"]]["points"][-1][1] + - parsed_map_data[item["path"]]["points"][-2][1], + parsed_map_data[item["path"]]["points"][-1][0] + - parsed_map_data[item["path"]]["points"][-2][0], ) ) - if RRMapParser.TYPES["CHARGER_LOCATION"] in blocks: - charger = blocks[RRMapParser.TYPES["CHARGER_LOCATION"]]["position"] - charger[0] = RRMapParser.TOOLS["DIMENSION_MM"] - charger[0] - charger[1] = RRMapParser.TOOLS["DIMENSION_MM"] - charger[1] - parsedMapData["charger"] = charger - if RRMapParser.TYPES["ROBOT_POSITION"] in blocks: - robot = blocks[RRMapParser.TYPES["ROBOT_POSITION"]]["position"] - rob_angle = blocks[RRMapParser.TYPES["ROBOT_POSITION"]]["angle"] - robot[0] = RRMapParser.TOOLS["DIMENSION_MM"] - robot[0] - robot[1] = RRMapParser.TOOLS["DIMENSION_MM"] - robot[1] - parsedMapData["robot"] = robot - parsedMapData["robot_angle"] = ( - rob_angle - if "robot" in parsedMapData - else ( - parsedMapData["path"]["current_angle"] - if "path" in parsedMapData - else 0 - ) - ) - if RRMapParser.TYPES["GOTO_TARGET"] in blocks: - parsedMapData["goto_target"] = blocks[ - RRMapParser.TYPES["GOTO_TARGET"] - ]["position"] - parsedMapData["goto_target"][1] = ( - RRMapParser.TOOLS["DIMENSION_MM"] - - parsedMapData["goto_target"][1] - ) - if RRMapParser.TYPES["CURRENTLY_CLEANED_ZONES"] in blocks: - parsedMapData["currently_cleaned_zones"] = blocks[ - RRMapParser.TYPES["CURRENTLY_CLEANED_ZONES"] - ] - parsedMapData["currently_cleaned_zones"] = [ - [ - zone[0], - RRMapParser.TOOLS["DIMENSION_MM"] - zone[1], - zone[2], - RRMapParser.TOOLS["DIMENSION_MM"] - zone[3], - ] - for zone in parsedMapData["currently_cleaned_zones"] - ] - if RRMapParser.TYPES["FORBIDDEN_ZONES"] in blocks: - parsedMapData["forbidden_zones"] = blocks[ - RRMapParser.TYPES["FORBIDDEN_ZONES"] - ] - parsedMapData["forbidden_zones"] = [ - [ - zone[0], - RRMapParser.TOOLS["DIMENSION_MM"] - zone[1], - zone[2], - RRMapParser.TOOLS["DIMENSION_MM"] - zone[3], - zone[4], - RRMapParser.TOOLS["DIMENSION_MM"] - zone[5], - zone[6], - RRMapParser.TOOLS["DIMENSION_MM"] - zone[7], - ] - for zone in parsedMapData["forbidden_zones"] - ] - if RRMapParser.TYPES["VIRTUAL_WALLS"] in blocks: - parsedMapData["virtual_walls"] = blocks[ - RRMapParser.TYPES["VIRTUAL_WALLS"] - ] - parsedMapData["virtual_walls"] = [ - [ - wall[0], - RRMapParser.TOOLS["DIMENSION_MM"] - wall[1], - wall[2], - RRMapParser.TOOLS["DIMENSION_MM"] - wall[3], - ] - for wall in parsedMapData["virtual_walls"] - ] - if RRMapParser.TYPES["CURRENTLY_CLEANED_BLOCKS"] in blocks: - parsedMapData["currently_cleaned_blocks"] = blocks[ - RRMapParser.TYPES["CURRENTLY_CLEANED_BLOCKS"] - ] - if RRMapParser.TYPES["FORBIDDEN_MOP_ZONES"] in blocks: - parsedMapData["forbidden_mop_zones"] = blocks[ - RRMapParser.TYPES["FORBIDDEN_MOP_ZONES"] - ] - parsedMapData["forbidden_mop_zones"] = [ - [ - zone[0], - RRMapParser.TOOLS["DIMENSION_MM"] - zone[1], - zone[2], - RRMapParser.TOOLS["DIMENSION_MM"] - zone[3], - zone[4], - RRMapParser.TOOLS["DIMENSION_MM"] - zone[5], - zone[6], - RRMapParser.TOOLS["DIMENSION_MM"] - zone[7], - ] - for zone in parsedMapData["forbidden_mop_zones"] - ] - - return parsedMapData - - def parse_data(self, payload=None, pixels=False): - self.map_data = self.PARSE(payload) - self.map_data.update(self.PARSEDATA(payload, pixels)) + if RRMapParser.Types.CHARGER_LOCATION.value in blocks: + charger = blocks[RRMapParser.Types.CHARGER_LOCATION.value]["position"] + # Assume no transformation needed here + parsed_map_data["charger"] = charger + + if RRMapParser.Types.ROBOT_POSITION.value in blocks: + robot = blocks[RRMapParser.Types.ROBOT_POSITION.value]["position"] + rob_angle = blocks[RRMapParser.Types.ROBOT_POSITION.value]["angle"] + # Assume no transformation needed here + parsed_map_data["robot"] = robot + parsed_map_data["robot_angle"] = rob_angle + + if RRMapParser.Types.GOTO_TARGET.value in blocks: + parsed_map_data["goto_target"] = blocks[ + RRMapParser.Types.GOTO_TARGET.value + ]["position"] + # Assume no transformation needed here + + if RRMapParser.Types.CURRENTLY_CLEANED_ZONES.value in blocks: + parsed_map_data["currently_cleaned_zones"] = blocks[ + RRMapParser.Types.CURRENTLY_CLEANED_ZONES.value + ] + parsed_map_data["currently_cleaned_zones"] = [ + [ + zone[0], + RRMapParser.Tools.DIMENSION_MM - zone[1], + zone[2], + RRMapParser.Tools.DIMENSION_MM - zone[3], + ] + for zone in parsed_map_data["currently_cleaned_zones"] + ] + + if RRMapParser.Types.FORBIDDEN_ZONES.value in blocks: + parsed_map_data["forbidden_zones"] = blocks[ + RRMapParser.Types.FORBIDDEN_ZONES.value + ] + parsed_map_data["forbidden_zones"] = [ + [ + zone[0], + RRMapParser.Tools.DIMENSION_MM - zone[1], + zone[2], + RRMapParser.Tools.DIMENSION_MM - zone[3], + zone[4], + RRMapParser.Tools.DIMENSION_MM - zone[5], + zone[6], + RRMapParser.Tools.DIMENSION_MM - zone[7], + ] + for zone in parsed_map_data["forbidden_zones"] + ] + + if RRMapParser.Types.VIRTUAL_WALLS.value in blocks: + parsed_map_data["virtual_walls"] = blocks[ + RRMapParser.Types.VIRTUAL_WALLS.value + ] + parsed_map_data["virtual_walls"] = [ + [ + wall[0], + RRMapParser.Tools.DIMENSION_MM - wall[1], + wall[2], + RRMapParser.Tools.DIMENSION_MM - wall[3], + ] + for wall in parsed_map_data["virtual_walls"] + ] + + if RRMapParser.Types.CURRENTLY_CLEANED_BLOCKS.value in blocks: + parsed_map_data["currently_cleaned_blocks"] = blocks[ + RRMapParser.Types.CURRENTLY_CLEANED_BLOCKS.value + ] + + if RRMapParser.Types.FORBIDDEN_MOP_ZONES.value in blocks: + parsed_map_data["forbidden_mop_zones"] = blocks[ + RRMapParser.Types.FORBIDDEN_MOP_ZONES.value + ] + parsed_map_data["forbidden_mop_zones"] = [ + [ + zone[0], + RRMapParser.Tools.DIMENSION_MM - zone[1], + zone[2], + RRMapParser.Tools.DIMENSION_MM - zone[3], + zone[4], + RRMapParser.Tools.DIMENSION_MM - zone[5], + zone[6], + RRMapParser.Tools.DIMENSION_MM - zone[7], + ] + for zone in parsed_map_data["forbidden_mop_zones"] + ] + + return parsed_map_data + + def parse_data( + self, payload: Optional[bytes] = None, pixels: bool = False + ) -> Optional[Dict[str, Any]]: + """Get the map data from MQTT and return the json.""" + if payload: + self.map_data = self.PARSE(payload) + self.map_data.update(self.PARSEDATA(payload, pixels) or {}) return self.map_data - def get_image(self): + def get_image(self) -> Dict[str, Any]: + """Get the image data from the map data.""" return self.map_data.get("image", {}) - def get_path(self): - return self.map_data.get("path", {}) - - def get_goto_predicted_path(self): - return self.map_data.get("goto_predicted_path", {}) - - def get_charger_position(self): - return self.map_data.get("charger", {}) - - def get_robot_position(self): - return self.map_data.get("robot", {}) - - def get_robot_angle(self): - angle = (self.map_data.get("robot_angle", 0) + 450) % 360 - return angle, self.map_data.get("robot_angle", 0) - - def get_goto_target(self): - return self.map_data.get("goto_target", {}) - - def get_currently_cleaned_zones(self): - return self.map_data.get("currently_cleaned_zones", []) - - def get_forbidden_zones(self): - return self.map_data.get("forbidden_zones", []) - - def get_virtual_walls(self): - return self.map_data.get("virtual_walls", []) - - def get_currently_cleaned_blocks(self): - return self.map_data.get("currently_cleaned_blocks", []) - - def get_forbidden_mop_zones(self): - return self.map_data.get("forbidden_mop_zones", []) - - def get_image_size(self): - image = self.get_image() - if image: - dimensions = image.get("dimensions", {}) - return dimensions.get("width", 0), dimensions.get("height", 0) - return 0, 0 - - def get_image_position(self): - image = self.get_image() - if image: - dimensions = image.get("position", {}) - return dimensions.get("top", 0), dimensions.get("left", 0) - return 0, 0 - - def get_floor(self): - img = self.get_image() - return img.get("pixels", {}).get("floor", []) - - def get_walls(self): - img = self.get_image() - return img.get("pixels", {}).get("walls", []) - - def get_segments(self): - img = self.get_image() - segments = img.get("pixels", {}).get("segments", []) - segment_count = img.get("segments", {}).get("count", 0) - - # Only return segments if the count is greater than 0 - if segment_count > 0: - return segments - else: - return [] - @staticmethod def get_int32(data: bytes, address: int) -> int: - return ( - ((data[address + 0] << 0) & 0xFF) - | ((data[address + 1] << 8) & 0xFFFF) - | ((data[address + 2] << 16) & 0xFFFFFF) - | ((data[address + 3] << 24) & 0xFFFFFFFF) - ) + """Get a 32-bit integer from the data.""" + return struct.unpack_from("