Test camera

This commit is contained in:
ia74 2025-07-24 19:00:45 -05:00
parent 8e6f8799fb
commit bfa11c4d59
10 changed files with 658 additions and 31 deletions

View file

@ -3,20 +3,25 @@
Based on reverse engineering of the iRobot mobile app.
"""
import aiofiles
from datetime import UTC, datetime
import hashlib
import hmac
import json
import logging
from pathlib import Path
from typing import Any
import urllib.parse
import uuid
from datetime import UTC, datetime
from typing import Any
import asyncio
import json
import aiohttp
_LOGGER = logging.getLogger(__name__)
# Debug: Save UMF data to file for analysis
DEBUG_SAVE_UMF = True
DEBUG_UMF_PATH = Path("/workspaces/ha-core/config/debug_umf_data.json")
class CloudApiError(Exception):
"""Custom exception for Cloud API errors."""
@ -437,7 +442,12 @@ class iRobotCloudApi:
url = f"{self.deployment['httpBaseAuth']}/v1/{blid}/pmaps/{pmap_id}/versions/{version_id}/umf"
params = {"activeDetails": "2"}
return await self._aws_request(url, params)
umf_data = await self._aws_request(url, params)
# Save UMF data for debugging/camera development
await self._save_umf_data_for_debug(pmap_id, umf_data)
return umf_data
async def get_favorites(self) -> dict[str, Any]:
"""Get favorite cleaning routines."""
@ -492,6 +502,49 @@ class iRobotCloudApi:
all_data["favorites"] = await self.get_favorites()
return all_data
async def _save_umf_data_for_debug(
self, pmap_id: str, umf_data: dict[str, Any]
) -> None:
"""Save UMF data to file for debugging purposes."""
if not DEBUG_SAVE_UMF:
return
try:
# Create debug data structure
debug_data = {
"timestamp": datetime.now(UTC).isoformat(),
"pmap_id": pmap_id,
"umf_data": umf_data,
}
# Load existing data if file exists
existing_data = []
if DEBUG_UMF_PATH.exists():
try:
async with aiofiles.open(DEBUG_UMF_PATH) as f:
content = await f.read()
existing_data = json.loads(content)
if not isinstance(existing_data, list):
existing_data = [existing_data]
except (json.JSONDecodeError, OSError):
existing_data = []
# Add new data
existing_data.append(debug_data)
# Keep only the latest 10 entries to avoid huge files
if len(existing_data) > 10:
existing_data = existing_data[-10:]
# Save back to file
async with aiofiles.open(DEBUG_UMF_PATH, "w") as f:
await f.write(json.dumps(existing_data, indent=2, default=str))
_LOGGER.debug("Saved UMF data for pmap %s to %s", pmap_id, DEBUG_UMF_PATH)
except (OSError, json.JSONEncodeError) as e:
_LOGGER.warning("Failed to save UMF debug data: %s", e)
"""
@ -602,4 +655,5 @@ active_pmapv_details:
quality:
confidence: 70
related_objects:
- '10482956"""
- '1048295640'
"""

View file

@ -2,13 +2,15 @@
import logging
import voluptuous as vol
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant, ServiceCall
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from .const import DOMAIN
from .coordinator import RoombaDataCoordinator, RoombaCloudCoordinator
import voluptuous as vol
from .coordinator import RoombaCloudCoordinator, RoombaDataCoordinator
CONFIG_SCHEMA = cv.config_entry_only_config_schema(DOMAIN)
@ -103,7 +105,7 @@ async def _async_register_services(hass: HomeAssistant) -> None:
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Safely remove Roombas."""
await hass.config_entries.async_unload_platforms(
entry, ["vacuum", "sensor", "switch"]
entry, ["vacuum", "sensor", "switch", "button", "camera"]
)
hass.data[DOMAIN].pop(entry.entry_id + "_coordinator")
return True
@ -125,7 +127,7 @@ async def _async_setup_cloud(
await _async_match_blid(hass, entry, coordinator, cloud_coordinator)
await hass.config_entries.async_forward_entry_setups(
entry, ["switch", "button"]
entry, ["switch", "button", "camera"]
)
except Exception as e: # pylint: disable=broad-except

View file

@ -1,10 +1,12 @@
"""Buttons needed."""
import logging
from homeassistant.components.button import ButtonEntity
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity import EntityCategory
import logging
from .const import DOMAIN, regionTypeMappings
from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)

View file

@ -0,0 +1,555 @@
"""Camera platform for Roomba map visualization."""
import io
import logging
from typing import Any
from PIL import Image, ImageDraw, ImageFont
from homeassistant.components.camera import Camera
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)
# Map rendering constants
MAP_WIDTH = 800
MAP_HEIGHT = 600
BACKGROUND_COLOR = (240, 240, 240) # Light gray
WALL_COLOR = (50, 50, 50) # Dark gray
ROOM_COLORS = [
(173, 216, 230), # Light blue
(144, 238, 144), # Light green
(255, 182, 193), # Light pink
(255, 255, 224), # Light yellow
(221, 160, 221), # Plum
(175, 238, 238), # Pale turquoise
(255, 218, 185), # Peach puff
(230, 230, 250), # Lavender
]
ROOM_BORDER_COLOR = (100, 100, 100) # Gray
TEXT_COLOR = (0, 0, 0) # Black
# Zone colors
KEEPOUT_ZONE_COLOR = (255, 0, 0, 100) # Red with transparency
KEEPOUT_ZONE_BORDER = (200, 0, 0) # Dark red
CLEAN_ZONE_COLOR = (0, 255, 0, 100) # Green with transparency
CLEAN_ZONE_BORDER = (0, 150, 0) # Dark green
OBSERVED_ZONE_COLOR = (255, 165, 0, 80) # Orange with transparency
OBSERVED_ZONE_BORDER = (255, 140, 0) # Dark orange
async def async_setup_entry(
hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
) -> None:
"""Set up Roomba map camera."""
_LOGGER.debug("Setting up camera platform for entry %s", entry.entry_id)
cloudCoordinator = hass.data[DOMAIN].get(entry.entry_id + "_cloud")
if not cloudCoordinator:
_LOGGER.warning("No cloud coordinator found for camera setup")
return
if not cloudCoordinator.data:
_LOGGER.warning("Cloud coordinator has no data yet for camera setup")
return
entities = []
blid = hass.data[DOMAIN].get(entry.entry_id + "_blid", "unknown")
_LOGGER.debug("Using BLID: %s for camera setup", blid)
if blid != "unknown" and blid in cloudCoordinator.data:
cloud_data = cloudCoordinator.data[blid]
_LOGGER.debug("Found cloud data for BLID %s", blid)
if "pmaps" in cloud_data:
_LOGGER.debug("Found %d pmaps in cloud data", len(cloud_data["pmaps"]))
for pmap in cloud_data["pmaps"]:
pmap_id = pmap.get("pmap_id", "unknown")
umf_key = f"pmap_umf_{pmap_id}"
_LOGGER.debug("Checking for UMF data key: %s", umf_key)
if umf_key in cloud_data:
_LOGGER.info("Creating camera entity for pmap %s", pmap_id)
entities.append(
RoombaMapCamera(
cloudCoordinator, entry, pmap_id, cloud_data[umf_key]
)
)
else:
_LOGGER.warning(
"No UMF data found for pmap %s (key: %s)", pmap_id, umf_key
)
else:
_LOGGER.warning("No pmaps found in cloud data")
else:
_LOGGER.warning("BLID %s not found in cloud data", blid)
if entities:
_LOGGER.info("Adding %d camera entities", len(entities))
async_add_entities(entities)
else:
_LOGGER.warning("No camera entities created")
class RoombaMapCamera(Camera):
"""Camera entity that renders Roomba map data as an image."""
def __init__(
self, coordinator, entry, pmap_id: str, umf_data: dict[str, Any]
) -> None:
"""Initialize the map camera."""
super().__init__()
self._coordinator = coordinator
self._entry = entry
self._pmap_id = pmap_id
self._umf_data = umf_data
# Extract map info
maps = umf_data.get("maps", [])
if maps:
map_data = maps[0]
self._map_header = map_data.get("map_header", {})
self._regions = map_data.get("regions", [])
self._points2d = map_data.get("points2d", []) # Coordinate points
# Extract zone data
self._keepout_zones = map_data.get("keepoutzones", [])
self._clean_zones = map_data.get("zones", [])
self._observed_zones = map_data.get("observed_zones", [])
else:
self._map_header = {}
self._regions = []
self._points2d = []
self._keepout_zones = []
self._clean_zones = []
self._observed_zones = []
# Camera attributes
self._attr_name = f"Roomba Map - {self._map_header.get('name', 'Unknown')}"
self._attr_unique_id = f"{entry.entry_id}_map_{pmap_id}"
# Device info
self._attr_device_info = {
"identifiers": {(DOMAIN, entry.unique_id)},
"name": entry.title,
"manufacturer": "iRobot",
}
def camera_image(
self, width: int | None = None, height: int | None = None
) -> bytes | None:
"""Return camera image."""
try:
return self._render_map()
except Exception as e:
_LOGGER.error("Error rendering map image: %s", e)
return None
def _render_map(self) -> bytes:
"""Render the map as a PNG image."""
# Create image
img = Image.new("RGB", (MAP_WIDTH, MAP_HEIGHT), BACKGROUND_COLOR)
draw = ImageDraw.Draw(img)
if not self._points2d or not self._regions:
# Draw "No Map Data" message
try:
font = ImageFont.truetype("arial.ttf", 24)
except OSError:
font = ImageFont.load_default()
text = "No Map Data Available"
bbox = draw.textbbox((0, 0), text, font=font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
x = (MAP_WIDTH - text_width) // 2
y = (MAP_HEIGHT - text_height) // 2
draw.text((x, y), text, fill=TEXT_COLOR, font=font)
else:
# Calculate map bounds from points2d
if self._points2d:
# Extract all coordinates
all_coords = [
point["coordinates"]
for point in self._points2d
if "coordinates" in point
]
if all_coords:
# Find min/max coordinates
x_coords = [coord[0] for coord in all_coords if len(coord) >= 2]
y_coords = [coord[1] for coord in all_coords if len(coord) >= 2]
if x_coords and y_coords:
min_x, max_x = min(x_coords), max(x_coords)
min_y, max_y = min(y_coords), max(y_coords)
# Calculate scale to fit image
map_width = max_x - min_x
map_height = max_y - min_y
if map_width > 0 and map_height > 0:
scale_x = (
MAP_WIDTH - 40
) / map_width # Leave 20px margin on each side
scale_y = (MAP_HEIGHT - 40) / map_height
scale = min(scale_x, scale_y)
# Center the map
offset_x = (
MAP_WIDTH - map_width * scale
) / 2 - min_x * scale
offset_y = (
MAP_HEIGHT - map_height * scale
) / 2 - min_y * scale
# Draw rooms
self._draw_regions(draw, offset_x, offset_y, scale)
# Draw coordinate points (walls/obstacles)
self._draw_points(draw, offset_x, offset_y, scale)
# Draw zones (keepout, clean, observed)
self._draw_zones(draw, offset_x, offset_y, scale)
# Convert to bytes
img_bytes = io.BytesIO()
img.save(img_bytes, format="PNG")
return img_bytes.getvalue()
def _draw_regions(
self, draw: ImageDraw.ImageDraw, offset_x: float, offset_y: float, scale: float
) -> None:
"""Draw room regions on the map."""
for i, region in enumerate(self._regions):
if "geometry" not in region:
continue
geometry = region["geometry"]
if geometry.get("type") != "polygon":
continue
# Get coordinates by ID references
polygon_ids = geometry.get("ids", [])
room_color = ROOM_COLORS[i % len(ROOM_COLORS)]
for polygon_id_list in polygon_ids:
if not isinstance(polygon_id_list, list):
continue
# Find coordinates for this polygon
polygon_coords = []
for coord_id in polygon_id_list:
coord = self._find_coordinate_by_id(coord_id)
if coord:
# Transform coordinate to image space
x = coord[0] * scale + offset_x
y = MAP_HEIGHT - (coord[1] * scale + offset_y) # Flip Y axis
polygon_coords.append((x, y))
if len(polygon_coords) >= 3: # Need at least 3 points for polygon
# Fill polygon
draw.polygon(
polygon_coords,
fill=room_color,
outline=ROOM_BORDER_COLOR,
width=2,
)
# Add room label
room_name = region.get("name", f"Room {i + 1}")
self._draw_room_label(draw, polygon_coords, room_name)
def _draw_points(
self, draw: ImageDraw.ImageDraw, offset_x: float, offset_y: float, scale: float
) -> None:
"""Draw coordinate points (walls, obstacles) on the map."""
for point in self._points2d:
coordinates = point.get("coordinates", [])
if len(coordinates) >= 2:
x = coordinates[0] * scale + offset_x
y = MAP_HEIGHT - (coordinates[1] * scale + offset_y) # Flip Y axis
draw.ellipse([x - 1, y - 1, x + 1, y + 1], fill=WALL_COLOR)
def _find_coordinate_by_id(self, coord_id: str) -> list[float] | None:
"""Find coordinate data by ID reference."""
for point in self._points2d:
if point.get("id") == coord_id:
return point.get("coordinates")
return None
def _draw_room_label(
self,
draw: ImageDraw.ImageDraw,
polygon_coords: list[tuple[float, float]],
text: str,
) -> None:
"""Draw room name label in the center of the polygon."""
if not polygon_coords:
return
# Calculate centroid
x_sum = sum(coord[0] for coord in polygon_coords)
y_sum = sum(coord[1] for coord in polygon_coords)
centroid_x = x_sum / len(polygon_coords)
centroid_y = y_sum / len(polygon_coords)
# Draw text
try:
font = ImageFont.truetype("arial.ttf", 14)
except OSError:
font = ImageFont.load_default()
bbox = draw.textbbox((0, 0), text, font=font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
x = centroid_x - text_width / 2
y = centroid_y - text_height / 2
# Draw text background
draw.rectangle(
[x - 2, y - 2, x + text_width + 2, y + text_height + 2],
fill=(255, 255, 255, 180),
)
draw.text((x, y), text, fill=TEXT_COLOR, font=font)
def _draw_zones(
self, draw: ImageDraw.ImageDraw, offset_x: float, offset_y: float, scale: float
) -> None:
"""Draw keepout zones, clean zones, and observed zones on the map."""
# Draw keepout zones (red)
for zone in self._keepout_zones:
self._draw_zone_polygon(
draw,
zone,
offset_x,
offset_y,
scale,
KEEPOUT_ZONE_COLOR[:3],
KEEPOUT_ZONE_BORDER,
"KEEPOUT",
)
# Draw clean zones (green)
for zone in self._clean_zones:
zone_name = zone.get("name", "Clean Zone")
self._draw_zone_polygon(
draw,
zone,
offset_x,
offset_y,
scale,
CLEAN_ZONE_COLOR[:3],
CLEAN_ZONE_BORDER,
zone_name,
)
# Draw observed zones (orange)
for zone in self._observed_zones:
zone_name = zone.get("name", "Observed")
self._draw_zone_polygon(
draw,
zone,
offset_x,
offset_y,
scale,
OBSERVED_ZONE_COLOR[:3],
OBSERVED_ZONE_BORDER,
zone_name,
)
def _draw_zone_polygon(
self,
draw: ImageDraw.ImageDraw,
zone: dict[str, Any],
offset_x: float,
offset_y: float,
scale: float,
fill_color: tuple[int, int, int],
border_color: tuple[int, int, int],
label: str,
) -> None:
"""Draw a single zone polygon."""
if "geometry" not in zone:
return
geometry = zone["geometry"]
if geometry.get("type") != "polygon":
return
# Get coordinates by ID references
polygon_ids = geometry.get("ids", [])
for polygon_id_list in polygon_ids:
if not isinstance(polygon_id_list, list):
continue
# Find coordinates for this polygon
polygon_coords = []
for coord_id in polygon_id_list:
coord = self._find_coordinate_by_id(coord_id)
if coord:
# Transform coordinate to image space
x = coord[0] * scale + offset_x
y = MAP_HEIGHT - (coord[1] * scale + offset_y) # Flip Y axis
polygon_coords.append((x, y))
if len(polygon_coords) >= 3: # Need at least 3 points for polygon
# Create a semi-transparent overlay for zones
# Since PIL doesn't support alpha in polygon fill directly,
# we'll use a dashed/dotted border style for zones
# Draw polygon outline with dashes
self._draw_dashed_polygon(draw, polygon_coords, border_color, 3)
# Draw zone label
if polygon_coords and label:
# Calculate centroid for label placement
x_sum = sum(coord[0] for coord in polygon_coords)
y_sum = sum(coord[1] for coord in polygon_coords)
centroid_x = x_sum / len(polygon_coords)
centroid_y = y_sum / len(polygon_coords)
self._draw_zone_label(
draw, centroid_x, centroid_y, label, border_color
)
def _draw_dashed_polygon(
self,
draw: ImageDraw.ImageDraw,
coords: list[tuple[float, float]],
color: tuple,
width: int,
) -> None:
"""Draw a dashed polygon outline."""
if len(coords) < 3:
return
# Draw dashed lines between consecutive points
for i in range(len(coords)):
start = coords[i]
end = coords[(i + 1) % len(coords)]
# Calculate distance and draw dashed line
dx = end[0] - start[0]
dy = end[1] - start[1]
distance = (dx**2 + dy**2) ** 0.5
if distance > 0:
# Draw dashes every 10 pixels
dash_length = 10
gap_length = 5
total_length = dash_length + gap_length
steps = int(distance / total_length)
for step in range(steps):
t1 = step * total_length / distance
t2 = min((step * total_length + dash_length) / distance, 1.0)
x1 = start[0] + t1 * dx
y1 = start[1] + t1 * dy
x2 = start[0] + t2 * dx
y2 = start[1] + t2 * dy
draw.line([(x1, y1), (x2, y2)], fill=color, width=width)
def _draw_dashed_line(
self,
draw: ImageDraw.ImageDraw,
start: tuple[float, float],
end: tuple[float, float],
color: tuple[int, int, int],
width: int = 2,
dash_length: int = 10,
) -> None:
"""Draw a dashed line between two points."""
x1, y1 = start
x2, y2 = end
# Calculate line length and direction
dx = x2 - x1
dy = y2 - y1
length = (dx * dx + dy * dy) ** 0.5
if length == 0:
return
# Normalize direction
dx_norm = dx / length
dy_norm = dy / length
# Draw dashes
current_pos = 0
while current_pos < length:
# Calculate dash start and end
dash_start_x = x1 + dx_norm * current_pos
dash_start_y = y1 + dy_norm * current_pos
dash_end_pos = min(current_pos + dash_length, length)
dash_end_x = x1 + dx_norm * dash_end_pos
dash_end_y = y1 + dy_norm * dash_end_pos
# Draw the dash
draw.line(
[(dash_start_x, dash_start_y), (dash_end_x, dash_end_y)],
fill=color,
width=width,
)
# Move to next dash (skip gap)
current_pos += dash_length * 2
def _draw_zone_label(
self,
draw: ImageDraw.ImageDraw,
x: float,
y: float,
text: str,
color: tuple[int, int, int],
) -> None:
"""Draw a zone label at the specified position."""
try:
font = ImageFont.truetype("arial.ttf", 12)
except OSError:
font = ImageFont.load_default()
bbox = draw.textbbox((0, 0), text, font=font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
# Center the text
text_x = x - text_width / 2
text_y = y - text_height / 2
# Draw text background (semi-transparent white)
draw.rectangle(
[text_x - 2, text_y - 2, text_x + text_width + 2, text_y + text_height + 2],
fill=(255, 255, 255, 200),
)
# Draw text
draw.text((text_x, text_y), text, fill=color, font=font)
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return camera attributes."""
return {
"pmap_id": self._pmap_id,
"map_name": self._map_header.get("name", "Unknown"),
"resolution": self._map_header.get("resolution", 0),
"area": self._map_header.get("area", 0),
"learning_percentage": self._map_header.get("learning_percentage", 0),
"regions_count": len(self._regions),
"keepout_zones_count": len(self._keepout_zones),
"clean_zones_count": len(self._clean_zones),
"observed_zones_count": len(self._observed_zones),
"points_count": len(self._points2d),
}

View file

@ -1,13 +1,14 @@
"""The configuration flow for the robot."""
import voluptuous as vol
import asyncio
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from .const import DOMAIN
from .CloudApi import iRobotCloudApi
from .const import DOMAIN
SCHEMA = vol.Schema(
{

View file

@ -10,8 +10,8 @@ from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import DEFAULT_SCAN_INTERVAL
from .CloudApi import iRobotCloudApi
from .const import DEFAULT_SCAN_INTERVAL
_LOGGER = logging.getLogger(__name__)

View file

@ -1,7 +1,10 @@
{
"domain": "roomba_rest980",
"name": "Roomba (Rest980)",
"codeowners": ["@ia74"],
"codeowners": [
"@ia74"
],
"version": "0.0.0",
"config_flow": true,
"dependencies": [],
"dhcp": [
@ -24,9 +27,11 @@
],
"documentation": "https://github.com/ia74/roomba_rest980",
"iot_class": "local_polling",
"issue_tracker": "https://github.com/ia74/roomba_rest980/issues",
"requirements": ["aiohttp"],
"version": "1.4.1",
"requirements": [
"aiohttp",
"Pillow",
"aiofiles"
],
"zeroconf": [
{
"type": "_amzn-alexa._tcp.local.",

View file

@ -9,7 +9,7 @@ from homeassistant.helpers.entity import EntityCategory
from homeassistant.util import dt as dt_util
from .const import DOMAIN, cleanBaseMappings, jobInitiatorMappings, phaseMappings
from .RoombaSensor import RoombaSensor, RoombaCloudSensor
from .RoombaSensor import RoombaCloudSensor, RoombaSensor
_LOGGER = logging.getLogger(__name__)

View file

@ -1,9 +1,11 @@
"""Switches needed."""
import logging
from homeassistant.components.switch import SwitchEntity
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity import EntityCategory
import logging
from .const import DOMAIN, regionTypeMappings
_LOGGER = logging.getLogger(__name__)
@ -69,20 +71,13 @@ class RoomSwitch(SwitchEntity):
async def async_turn_on(self, **kwargs):
"""Yes."""
self._is_on = True
if self not in order_switched:
order_switched.append(self)
self.async_write_ha_state()
async def async_turn_off(self, **kwargs):
"""No."""
self._is_on = False
if self in order_switched:
order_switched.remove(self)
self.async_write_ha_state()
def get_region_json(self):
"""Return robot-readable JSON to identify the room to start cleaning it."""
return self._room_json
order_switched: list[RoomSwitch] = []

View file

@ -111,11 +111,24 @@ class RoombaVacuum(CoordinatorEntity, StateVacuumEntity):
regions = []
# Check if we have room selection switches available
if hasattr(self, "_selected_rooms") and self._selected_rooms:
domain_data = self.hass.data.get(DOMAIN, {})
selected_rooms = []
# Find all room switches that are turned on
for key, entity in domain_data.items():
if (
key.startswith("switch.")
and hasattr(entity, "is_on")
and entity.is_on
):
selected_rooms.append(entity)
# If we have specific rooms selected, use targeted cleaning
if selected_rooms:
# Build regions list from selected rooms
regions = [
room.get_region_json()
for room in self._selected_rooms
for room in selected_rooms
if hasattr(room, "get_region_json")
]