- add errors to auth & automatically reauth

- make mapping cleaner
- fix device info not fully showing
- add not ready and error sensors
This commit is contained in:
ia74 2025-08-12 17:17:59 -05:00
parent 699310c93c
commit b9c9e1a897
10 changed files with 252 additions and 234 deletions

3
cloudReverse.md Normal file
View file

@ -0,0 +1,3 @@
# work in progress
This will be an expanded write-up on how I've found the iRobot Cloud API works.

View file

@ -3,19 +3,18 @@
Based on reverse engineering of the iRobot mobile app.
"""
import aiofiles
from datetime import UTC, datetime
import hashlib
import hmac
import json
from json.decoder import JSONDecodeError
import logging
from pathlib import Path
from typing import Any
import urllib.parse
import uuid
from json.decoder import JSONDecodeError
import aiofiles
import aiohttp
_LOGGER = logging.getLogger(__name__)
@ -152,7 +151,10 @@ class iRobotCloudApi:
"""iRobot Cloud API client for authentication and data retrieval."""
def __init__(
self, username: str, password: str, session: aiohttp.ClientSession | None = None
self,
username: str,
password: str,
session: aiohttp.ClientSession | None = None,
):
"""Initialize iRobot Cloud API client with credentials."""
self.username = username
@ -340,9 +342,10 @@ class iRobotCloudApi:
# Check if required keys exist
if "credentials" not in login_result:
raise AuthenticationError(
f"Missing 'credentials' in login response: {login_result}"
)
msg = login_result["errorMessage"] or login_result
if "mqtt slot" in msg:
msg = f"The cloud authentication has been rate-limited. Try closing the iRobot app or trying again later. ({login_result['errorMessage']})"
raise AuthenticationError(f"{msg}")
if "robots" not in login_result:
raise AuthenticationError(
@ -546,116 +549,3 @@ class iRobotCloudApi:
except (OSError, Exception) as e:
_LOGGER.warning("Failed to save UMF debug data: %s", e)
"""
active_pmapv_details:
active_pmapv:
pmap_id: BGQxV6zGTmCsalWFHr-S5g
pmapv_id: 250720T215523
create_time: 1753048538
proc_state: OK_Processed
creator: robot
nMssn: 1182
mission_id: 01K0MC4XWG0DKT67MCSGGG4924
learning_percentage: 100
last_user_pmapv_id: 250718T074805
last_user_ts: 1752824885
shareability: 1
robot_cap:
maps: 3
pmaps: 10
robot_id: B61489C9D5104793AFEA1F26C91B61DF
map_header:
id: BGQxV6zGTmCsalWFHr-S5g
version: 250720T215523
name: Main Floor
learning_percentage: 100
create_time: 1753048538
resolution: 0.105
user_orientation_rad: 1.5634
robot_orientation_rad: 3.1457
area: 38.1418
nmssn: 1182
mission_id: 01K0MC4XWG0DKT67MCSGGG4924
regions:
- id: '11'
name: Kitchen
region_type: kitchen
policies:
odoa_mode: 0
odoa_feats: {}
disabled_operating_modes: 0
override_operating_modes: 0
time_estimates:
- unit: seconds
estimate: 420
confidence: GOOD_CONFIDENCE
params:
noAutoPasses: true
twoPass: true
- unit: seconds
estimate: 210
confidence: GOOD_CONFIDENCE
params:
noAutoPasses: true
twoPass: false
- id: '15'
name: ''
region_type: unspecified
policies:
odoa_mode: 0
odoa_feats: {}
disabled_operating_modes: 0
override_operating_modes: 0
time_estimates:
- unit: seconds
estimate: 458
confidence: GOOD_CONFIDENCE
params:
noAutoPasses: true
twoPass: true
- unit: seconds
estimate: 229
confidence: GOOD_CONFIDENCE
params:
noAutoPasses: true
twoPass: false
- id: '10'
name: Hallway
region_type: hallway
policies:
odoa_mode: 0
odoa_feats: {}
disabled_operating_modes: 0
override_operating_modes: 0
time_estimates:
- unit: seconds
estimate: 1282
confidence: GOOD_CONFIDENCE
params:
noAutoPasses: true
twoPass: true
- unit: seconds
estimate: 641
confidence: GOOD_CONFIDENCE
params:
noAutoPasses: true
twoPass: false
zones: []
keepoutzones: []
observed_zones:
- id: '1449649421'
extent_type: rug
quality:
confidence: 70
related_objects:
- '1449649421'
- id: '1048295640'
extent_type: rug
quality:
confidence: 70
related_objects:
- '1048295640'
"""

View file

@ -2,10 +2,7 @@
from datetime import datetime
from homeassistant.helpers import issue_registry as ir
from .const import (
DOMAIN,
binMappings,
cleanBaseMappings,
cycleMappings,

View file

@ -3,10 +3,10 @@
import logging
from homeassistant.components.sensor import SensorEntity
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.entity import EntityCategory
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers import device_registry as dr
from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)

View file

@ -9,6 +9,7 @@ from PIL import Image, ImageDraw, ImageFont
from homeassistant.components.camera import Camera
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from .const import DOMAIN, regionTypeMappings
@ -130,15 +131,26 @@ class RoombaMapCamera(Camera):
self._observed_zones = []
# Camera attributes
self._attr_name = f"Roomba Map - {self._map_header.get('name', 'Unknown')}"
name = self._map_header.get("name", "Unknown")
if len(name) == 0:
name = "New Map"
self._attr_name = name
self._attr_unique_id = f"{entry.entry_id}_map_{pmap_id}"
# Device info
self._attr_device_info = {
"identifiers": {(DOMAIN, entry.unique_id)},
"name": entry.title,
"manufacturer": "iRobot",
}
@property
def device_info(self) -> DeviceInfo:
"""Return the Roomba's device information."""
data = self._coordinator.data or {}
rdata = data[self._entry.runtime_data.robot_blid]["robot_info"]
return DeviceInfo(
identifiers={(DOMAIN, self._entry.unique_id)},
name=rdata["name"],
manufacturer="iRobot",
model="Roomba",
model_id=rdata["sku"],
sw_version=rdata["softwareVer"],
)
def camera_image(
self, width: int | None = None, height: int | None = None
@ -171,9 +183,8 @@ class RoombaMapCamera(Camera):
y = (MAP_HEIGHT - text_height) // 2
draw.text((x, y), text, fill=TEXT_COLOR, font=font)
else:
# Calculate map bounds from points2d
if self._points2d:
elif self._points2d:
# Extract all coordinates
all_coords = [
point["coordinates"]
@ -202,12 +213,8 @@ class RoombaMapCamera(Camera):
scale = min(scale_x, scale_y)
# Center the map
offset_x = (
MAP_WIDTH - map_width * scale
) / 2 - min_x * scale
offset_y = (
MAP_HEIGHT - map_height * scale
) / 2 - min_y * scale
offset_x = (MAP_WIDTH - map_width * scale) / 2 - min_x * scale
offset_y = (MAP_HEIGHT - map_height * scale) / 2 - min_y * scale
# Draw rooms
self._draw_regions(draw, offset_x, offset_y, scale)
@ -216,7 +223,7 @@ class RoombaMapCamera(Camera):
self._draw_points(draw, offset_x, offset_y, scale)
# Draw zones (keepout, clean, observed)
self._draw_zones(draw, offset_x, offset_y, scale)
img = self._draw_zones(img, offset_x, offset_y, scale)
# Convert to bytes
img_bytes = io.BytesIO()
@ -275,7 +282,7 @@ class RoombaMapCamera(Camera):
if len(coordinates) >= 2:
x = coordinates[0] * scale + offset_x
y = MAP_HEIGHT - (coordinates[1] * scale + offset_y) # Flip Y axis
draw.ellipse([x - 1, y - 1, x + 1, y + 1], fill=WALL_COLOR)
# draw.ellipse([x - 1, y - 1, x + 1, y + 1], fill=WALL_COLOR)
def _find_coordinate_by_id(self, coord_id: str) -> list[float] | None:
"""Find coordinate data by ID reference."""
@ -321,14 +328,15 @@ class RoombaMapCamera(Camera):
draw.text((x, y), text, fill=TEXT_COLOR, font=font)
def _draw_zones(
self, draw: ImageDraw.ImageDraw, offset_x: float, offset_y: float, scale: float
) -> None:
self, img: Image.Image, offset_x: float, offset_y: float, scale: float
) -> Image.Image:
"""Draw keepout zones, clean zones, and observed zones on the map."""
current_img = img
# Draw keepout zones (red)
for zone in self._keepout_zones:
self._draw_zone_polygon(
draw,
current_img = self._draw_zone_polygon(
current_img,
zone,
offset_x,
offset_y,
@ -341,8 +349,8 @@ class RoombaMapCamera(Camera):
# Draw clean zones (green)
for zone in self._clean_zones:
zone_name = zone.get("name", "Clean Zone")
self._draw_zone_polygon(
draw,
current_img = self._draw_zone_polygon(
current_img,
zone,
offset_x,
offset_y,
@ -355,8 +363,8 @@ class RoombaMapCamera(Camera):
# Draw observed zones (orange)
for zone in self._observed_zones:
zone_name = zone.get("name", "Observed")
self._draw_zone_polygon(
draw,
current_img = self._draw_zone_polygon(
current_img,
zone,
offset_x,
offset_y,
@ -366,9 +374,11 @@ class RoombaMapCamera(Camera):
zone_name,
)
return current_img
def _draw_zone_polygon(
self,
draw: ImageDraw.ImageDraw,
img: Image.Image,
zone: dict[str, Any],
offset_x: float,
offset_y: float,
@ -376,17 +386,18 @@ class RoombaMapCamera(Camera):
fill_color: tuple[int, int, int],
border_color: tuple[int, int, int],
label: str,
) -> None:
) -> Image.Image:
"""Draw a single zone polygon."""
if "geometry" not in zone:
return
return img
geometry = zone["geometry"]
if geometry.get("type") != "polygon":
return
return img
# Get coordinates by ID references
polygon_ids = geometry.get("ids", [])
current_img = img
for polygon_id_list in polygon_ids:
if not isinstance(polygon_id_list, list):
@ -403,11 +414,17 @@ class RoombaMapCamera(Camera):
polygon_coords.append((x, y))
if len(polygon_coords) >= 3: # Need at least 3 points for polygon
# Create a semi-transparent overlay for zones
# Since PIL doesn't support alpha in polygon fill directly,
# we'll use a dashed/dotted border style for zones
# Check if this is a keepout zone to apply transparency
is_keepout = label in {"KEEP OUT", "Observed"}
# Draw polygon outline with dashes
if is_keepout:
# Draw semi-transparent keepout zone using overlay technique
current_img = self._draw_transparent_polygon(
current_img, polygon_coords, fill_color, border_color
)
else:
# For other zones, use dashed border style
draw = ImageDraw.Draw(current_img)
self._draw_dashed_polygon(draw, polygon_coords, border_color, 3)
# Draw zone label
@ -418,10 +435,51 @@ class RoombaMapCamera(Camera):
centroid_x = x_sum / len(polygon_coords)
centroid_y = y_sum / len(polygon_coords)
draw = ImageDraw.Draw(current_img)
self._draw_zone_label(
draw, centroid_x, centroid_y, label, border_color
)
return current_img
def _draw_transparent_polygon(
self,
base_img: Image.Image,
coords: list[tuple[float, float]],
fill_color: tuple[int, int, int],
border_color: tuple[int, int, int],
) -> Image.Image:
"""Draw a semi-transparent polygon by creating an overlay and blending.
Returns the blended image that should replace the original.
"""
if len(coords) < 3:
return base_img
# Create a transparent overlay
overlay = Image.new("RGBA", base_img.size, (0, 0, 0, 0))
overlay_draw = ImageDraw.Draw(overlay)
# Draw the polygon on the overlay with transparency
transparent_fill = (*fill_color, 100) # ~39% opacity
overlay_draw.polygon(
coords, fill=transparent_fill, outline=(*border_color, 255), width=2
)
# Convert base image to RGBA for blending
if base_img.mode != "RGBA":
base_rgba = base_img.convert("RGBA")
else:
base_rgba = base_img.copy()
# Blend the overlay with the base image
blended = Image.alpha_composite(base_rgba, overlay)
# Return the blended image in the original mode
if base_img.mode == "RGB":
return blended.convert("RGB")
return blended
def _draw_dashed_polygon(
self,
draw: ImageDraw.ImageDraw,

View file

@ -39,6 +39,10 @@ class RoombaConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
_proposed_name: str
_user_data: dict[str, any]
async def async_step_reauth(self, entry_data: dict[str, any]) -> ConfigFlowResult:
"""Perform reauth upon an API authentication error."""
await self.hass.config_entries.async_reload(self._reauth_entry_id)
async def async_step_cloud(self, user_input=None) -> ConfigFlowResult:
"""Show user the setup for the cloud API."""
if user_input is not None:

View file

@ -8,9 +8,13 @@ import aiohttp
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from homeassistant.helpers.update_coordinator import (
ConfigEntryAuthFailed,
DataUpdateCoordinator,
UpdateFailed,
)
from .CloudApi import iRobotCloudApi
from .CloudApi import iRobotCloudApi, AuthenticationError, CloudApiError
from .const import DEFAULT_SCAN_INTERVAL
_LOGGER = logging.getLogger(__name__)
@ -68,7 +72,10 @@ class RoombaCloudCoordinator(DataUpdateCoordinator):
self.api = iRobotCloudApi(self.username, self.password, self.session)
async def _async_setup(self):
try:
await self.api.authenticate()
except (AuthenticationError, CloudApiError) as err:
raise ConfigEntryAuthFailed(f"Cloud API error: {err}") from err
async def _async_update_data(self):
try:

View file

@ -1,9 +1,7 @@
{
"domain": "roomba_rest980",
"name": "Roomba (Rest980)",
"codeowners": [
"@ia74"
],
"codeowners": ["@ia74"],
"config_flow": true,
"dependencies": [],
"dhcp": [
@ -28,10 +26,8 @@
"iot_class": "local_polling",
"issue_tracker": "https://github.com/ia74/roomba_rest980/issues",
"quality_scale": "bronze",
"requirements": [
"aiofiles==24.1.0"
],
"version": "1.10.0",
"requirements": ["aiofiles==24.1.0"],
"version": "1.11.0",
"zeroconf": [
{
"type": "_amzn-alexa._tcp.local.",

View file

@ -8,7 +8,13 @@ from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity import EntityCategory
from homeassistant.util import dt as dt_util
from .const import DOMAIN, cleanBaseMappings, jobInitiatorMappings, phaseMappings
from .const import (
cleanBaseMappings,
errorMappings,
jobInitiatorMappings,
notReadyMappings,
phaseMappings,
)
from .RoombaSensor import RoombaCloudSensor, RoombaSensor
_LOGGER = logging.getLogger(__name__)
@ -41,7 +47,7 @@ async def async_setup_entry(hass: HomeAssistant, entry, async_add_entities):
)
if cloud_entities:
async_add_entities(cloud_entities)
entry.runtime_data.err = RoombaError(coordinator, entry)
async_add_entities(
[
RoombaAttributes(coordinator, entry),
@ -60,6 +66,8 @@ async def async_setup_entry(hass: HomeAssistant, entry, async_add_entities):
RoombaCarpetBoostMode(coordinator, entry),
RoombaCleanEdges(coordinator, entry),
RoombaCleanMode(coordinator, entry),
RoombaNotReady(coordinator, entry),
entry.runtime_data.err,
RoombaCloudAttributes(cloudCoordinator, entry),
],
update_before_add=True,
@ -106,11 +114,19 @@ class RoombaBatterySensor(RoombaSensor):
class RoombaAttributes(RoombaSensor):
"""A simple sensor that returns all given datapoints without modification."""
_rs_given_info = ("Attributes", "attributes")
_rs_given_info = ("Local Raw Attributes", "attributes")
def __init__(self, coordinator, entry) -> None:
"""Create the sensor."""
super().__init__(coordinator, entry)
self._attr_icon = "mdi:lan-connect"
self._attr_entity_registry_enabled_default = True
def _handle_coordinator_update(self):
"""Update sensor when coordinator data changes."""
self._attr_native_value = "OK" if self.coordinator.data else "Unavailable"
self._attr_native_value = (
"Available" if self.coordinator.data else "Unavailable"
)
self.async_write_ha_state()
@property
@ -122,15 +138,19 @@ class RoombaAttributes(RoombaSensor):
class RoombaCloudAttributes(RoombaCloudSensor):
"""A simple sensor that returns all given datapoints without modification."""
_rs_given_info = ("Cloud Attributes", "cloud_attributes")
_rs_given_info = ("Cloud Raw Attributes", "cloud_attributes")
def __init__(self, coordinator, entry) -> None:
"""Initialize."""
"""Create the sensor."""
super().__init__(coordinator, entry)
self._attr_icon = "mdi:cloud-braces"
self._attr_entity_registry_enabled_default = True
def _handle_coordinator_update(self):
"""Update sensor when coordinator data changes."""
self._attr_native_value = "OK" if self.coordinator.data else "Unavailable"
self._attr_native_value = (
"Available" if self.coordinator.data else "Unavailable"
)
self.async_write_ha_state()
@property
@ -327,6 +347,7 @@ class RoombaMissionElapsedTime(RoombaSensor):
missionStartTime = status.get("mssnStrtTm") # Unix timestamp in seconds?
if missionStartTime:
if missionStartTime != 0:
self._attr_available = True
try:
elapsed_time = dt_util.utcnow() - dt_util.utc_from_timestamp(
@ -361,6 +382,7 @@ class RoombaRechargeTime(RoombaSensor):
estimatedRechargeTime = status.get("rechrgTm") # Unix timestamp in seconds?
if estimatedRechargeTime:
if estimatedRechargeTime != 0:
self._attr_available = True
try:
self._attr_native_value = dt_util.utc_from_timestamp(
@ -434,6 +456,48 @@ class RoombaCleanEdges(RoombaSensor):
self.async_write_ha_state()
class RoombaError(RoombaSensor):
"""Read the error message of the Roomba."""
_rs_given_info = ("Error", "error")
def __init__(self, coordinator, entry) -> None:
"""Create a new error sensor."""
super().__init__(coordinator, entry)
self._attr_device_class = SensorDeviceClass.ENUM
self._attr_options = list(errorMappings.values())
self._attr_icon = "mdi:robot-vacuum-alert"
def _handle_coordinator_update(self):
"""Update sensor when coordinator data changes."""
data = self.coordinator.data or {}
nr = data.get("cleanMissionStatus", {}).get("error")
self._attr_native_value = errorMappings.get(nr, nr)
self.async_write_ha_state()
class RoombaNotReady(RoombaSensor):
"""Read the not ready message of the Roomba."""
_rs_given_info = ("Not Ready", "not_ready")
def __init__(self, coordinator, entry) -> None:
"""Create a new not_ready sensor."""
super().__init__(coordinator, entry)
self._attr_device_class = SensorDeviceClass.ENUM
self._attr_options = list(notReadyMappings.values())
self._attr_icon = "mdi:clock-alert"
def _handle_coordinator_update(self):
"""Update sensor when coordinator data changes."""
data = self.coordinator.data or {}
nr = data.get("cleanMissionStatus", {}).get("notReady")
self._attr_native_value = notReadyMappings.get(nr, nr)
self.async_write_ha_state()
class RoombaCleanMode(RoombaSensor):
"""Read the clean mode of the Roomba."""
@ -482,6 +546,7 @@ class RoombaMissionExpireTime(RoombaSensor):
expireTime = status.get("expireTm") # Unix timestamp in seconds?
if expireTime:
if expireTime != 0:
self._attr_available = True
try:
self._attr_native_value = dt_util.utc_from_timestamp(expireTime)

View file

@ -8,8 +8,8 @@ from homeassistant.components.vacuum import (
VacuumEntityFeature,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.core import HomeAssistant
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import DOMAIN
@ -165,12 +165,10 @@ class RoombaVacuum(CoordinatorEntity, StateVacuumEntity):
},
blocking=True,
)
except (KeyError, AttributeError, ValueError) as e:
except (KeyError, AttributeError, ValueError, Exception) as e:
_LOGGER.error("Failed to start cleaning due to configuration error: %s", e)
except Exception as e: # pylint: disable=broad-except
_LOGGER.error("Failed to start cleaning: %s", e)
async def async_stop(self):
async def async_stop(self) -> None:
"""Stop the action."""
await self.hass.services.async_call(
DOMAIN,