add cloud support & new sensor Cloud Attributes

This commit is contained in:
ia74 2025-07-20 17:21:44 -05:00
parent 5abe164d1f
commit db91093ab7
9 changed files with 657 additions and 30 deletions

View file

@ -7,8 +7,12 @@ Still work in progress, but the vacuum entity has been fully ported over.
## Roadmap ## Roadmap
- [x] Feature parity (minus actions) with vacuum entity - [x] Feature parity (minus actions) with vacuum entity
- [x] Cloud API connection
- [ ] Cloud MQTT connection
- [ ] Actions - [ ] Actions
- [ ] Dynamically grab rooms and add them to the UI - [ ] Dynamically grab rooms and add them to the UI
- [x] Grab room data (optional, cloud only)
- [ ] Create map image
## Why? ## Why?

View file

@ -0,0 +1,477 @@
"""iRobot Cloud API implementation for retrieving pmaps and other cloud data.
Based on reverse engineering of the iRobot mobile app.
"""
import hashlib
import hmac
import logging
import urllib.parse
import uuid
from datetime import UTC, datetime
from typing import Any
import asyncio
import json
import aiohttp
_LOGGER = logging.getLogger(__name__)
class CloudApiError(Exception):
"""Custom exception for Cloud API errors."""
class AuthenticationError(CloudApiError):
"""Authentication related errors."""
class AWSSignatureV4:
"""AWS Signature Version 4 implementation for signing requests."""
def __init__(
self,
access_key_id: str,
secret_access_key: str,
session_token: str | None = None,
):
"""Initialize AWS Signature V4 signer with credentials."""
self.access_key_id = access_key_id
self.secret_access_key = secret_access_key
self.session_token = session_token
def _hmac_sha256(self, key: bytes, data: str) -> bytes:
"""HMAC SHA256 helper."""
return hmac.new(key, data.encode("utf-8"), hashlib.sha256).digest()
def _sha256_hex(self, data: str) -> str:
"""SHA256 hex helper."""
return hashlib.sha256(data.encode("utf-8")).hexdigest()
def _get_signature_key(self, date_stamp: str, region: str, service: str) -> bytes:
"""Generate AWS Signature V4 signing key."""
k_date = self._hmac_sha256(f"AWS4{self.secret_access_key}".encode(), date_stamp)
k_region = self._hmac_sha256(k_date, region)
k_service = self._hmac_sha256(k_region, service)
return self._hmac_sha256(k_service, "aws4_request")
def _get_date_stamp(self, date: datetime) -> str:
"""Generate date stamp YYYYMMDD."""
return date.strftime("%Y%m%d")
def _get_amz_date(self, date: datetime) -> str:
"""Generate x-amz-date YYYYMMDD'T'HHMMSS'Z'."""
return date.strftime("%Y%m%dT%H%M%SZ")
def generate_signed_headers(
self,
method: str,
service: str,
region: str,
host: str,
path: str,
query_params: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
payload: str = "",
) -> dict[str, str]:
"""Generate AWS SigV4 signed headers for a request."""
if query_params is None:
query_params = {}
if headers is None:
headers = {}
now = datetime.now(tz=UTC)
amz_date = self._get_amz_date(now)
date_stamp = self._get_date_stamp(now)
# Step 1: HTTP Method
http_method = method.upper()
# Step 2: Canonical URI
canonical_uri = urllib.parse.quote(path, safe="/")
# Step 3: Canonical Query String
sorted_query_keys = sorted(query_params.keys())
canonical_query_string = "&".join(
[
f"{urllib.parse.quote(key, safe='~')}={urllib.parse.quote(str(query_params[key]), safe='~')}"
for key in sorted_query_keys
]
)
# Step 4: Canonical Headers
merged_headers = {"host": host, "x-amz-date": amz_date, **headers}
sorted_header_keys = sorted([k.lower() for k in merged_headers])
canonical_headers = (
"\n".join([f"{key}:{merged_headers[key]}" for key in sorted_header_keys])
+ "\n"
)
signed_headers = ";".join(sorted_header_keys)
# Step 5: Payload hash
payload_hash = self._sha256_hex(payload)
# Step 6: Canonical request
canonical_request = f"{http_method}\n{canonical_uri}\n{canonical_query_string}\n{canonical_headers}\n{signed_headers}\n{payload_hash}"
# Step 7: String to sign
algorithm = "AWS4-HMAC-SHA256"
credential_scope = f"{date_stamp}/{region}/{service}/aws4_request"
canonical_request_hash = self._sha256_hex(canonical_request)
string_to_sign = (
f"{algorithm}\n{amz_date}\n{credential_scope}\n{canonical_request_hash}"
)
# Step 8: Calculate signature
signing_key = self._get_signature_key(date_stamp, region, service)
signature = hmac.new(
signing_key, string_to_sign.encode("utf-8"), hashlib.sha256
).hexdigest()
# Step 9: Authorization header
authorization_header = f"{algorithm} Credential={self.access_key_id}/{credential_scope}, SignedHeaders={signed_headers}, Signature={signature}"
final_headers = {**merged_headers, "Authorization": authorization_header}
if self.session_token:
final_headers["x-amz-security-token"] = self.session_token
return final_headers
class iRobotCloudApi:
"""iRobot Cloud API client for authentication and data retrieval."""
def __init__(
self, username: str, password: str, session: aiohttp.ClientSession | None = None
):
"""Initialize iRobot Cloud API client with credentials."""
self.username = username
self.password = password
self.session = session or aiohttp.ClientSession()
self._should_close_session = session is None
# Configuration
self.config = {"appId": str(uuid.uuid4()), "deviceId": str(uuid.uuid4())}
# Authentication data
self.uid = None
self.uid_signature = None
self.signature_timestamp = None
self.credentials = None
self.deployment = None
self.robots = {}
# Headers for requests
self.headers = {
"Content-Type": "application/x-www-form-urlencoded",
"User-Agent": "iRobot/7.16.2.140449 CFNetwork/1568.100.1.2.1 Darwin/24.0.0",
}
async def __aenter__(self):
"""Async context manager entry."""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
if self._should_close_session and self.session:
await self.session.close()
async def discover_endpoints(self) -> dict[str, Any]:
"""Discover deployment endpoints."""
discovery_url = (
"https://disc-prod.iot.irobotapi.com/v1/discover/endpoints?country_code=US"
)
async with self.session.get(discovery_url) as response:
if response.status != 200:
raise CloudApiError(f"Discovery failed: {response.status}")
endpoints = await response.json()
self.deployment = endpoints["deployments"][endpoints["current_deployment"]]
_LOGGER.debug("Discovered deployment: %s", endpoints["current_deployment"])
return endpoints
async def login_gigya(self, api_key: str) -> dict[str, Any]:
"""Login to Gigya authentication service."""
gigya_endpoints = await self.discover_endpoints()
gigya = gigya_endpoints["gigya"]
base_acc = f"https://accounts.{gigya['datacenter_domain']}/accounts."
login_data = {
"loginMode": "standard",
"loginID": self.username,
"password": self.password,
"include": "profile,data,emails,subscriptions,preferences,",
"includeUserInfo": "true",
"targetEnv": "mobile",
"source": "showScreenSet",
"sdk": "ios_swift_1.3.0",
"sessionExpiration": "-2",
"apikey": api_key,
}
async with self.session.post(
f"{base_acc}login",
headers=self.headers,
data=urllib.parse.urlencode(login_data),
) as response:
response_text = await response.text()
_LOGGER.debug("Gigya login response status: %d", response.status)
_LOGGER.debug("Gigya login response: %s", response_text)
try:
login_result = json.loads(response_text)
except json.JSONDecodeError as e:
raise AuthenticationError(
f"Invalid JSON response from Gigya login: {response_text}"
) from e
if login_result.get("errorCode", 0) != 0:
raise AuthenticationError(f"Gigya login failed: {login_result}")
# Check if required keys exist
if "UID" not in login_result:
raise AuthenticationError(
f"Missing 'UID' in Gigya response: {login_result}"
)
if "UIDSignature" not in login_result:
raise AuthenticationError(
f"Missing 'UIDSignature' in Gigya response: {login_result}"
)
if "signatureTimestamp" not in login_result:
raise AuthenticationError(
f"Missing 'signatureTimestamp' in Gigya response: {login_result}"
)
self.uid = login_result["UID"]
self.uid_signature = login_result["UIDSignature"]
self.signature_timestamp = login_result["signatureTimestamp"]
_LOGGER.debug(
"Gigya login successful for: %s", login_result["profile"]["email"]
)
return login_result
async def login_irobot(self) -> dict[str, Any]:
"""Login to iRobot cloud service."""
if not self.deployment:
await self.discover_endpoints()
login_data = {
"app_id": f"IOS-{self.config['appId']}",
"app_info": {
"device_id": f"IOS-{self.config['deviceId']}",
"device_name": "iPhone",
"language": "en_US",
"version": "7.16.2",
},
"assume_robot_ownership": "0",
"authorizer_params": {"devices_per_token": 5},
"gigya": {
"signature": self.uid_signature,
"timestamp": self.signature_timestamp,
"uid": self.uid,
},
"multiple_authorizer_token_support": True,
"push_info": {
"platform": "APNS",
"push_token": "eb6ce9172e5fde9fe4c9a2a945b35709f73fb8014eb7449d944c6c89eeb472fb",
"supported_push_types": [
"mkt_mca",
"cr",
"cse",
"bf",
"uota",
"crae",
"ae",
"crbf",
"pm",
"teom",
"te",
"dt",
"tr",
"ir",
"mca",
"mca_pn_hd",
"shcp",
"shar",
"shas",
"scs",
"lv",
"ce",
"ri",
"fu",
],
},
"skip_ownership_check": "0",
}
async with self.session.post(
f"{self.deployment['httpBase']}/v2/login",
headers={"Content-Type": "application/json"},
json=login_data,
) as response:
response_text = await response.text()
_LOGGER.debug("iRobot login response status: %d", response.status)
_LOGGER.debug("iRobot login response: %s", response_text)
try:
login_result = json.loads(response_text)
except json.JSONDecodeError as e:
raise AuthenticationError(
f"Invalid JSON response from iRobot login: {response_text}"
) from e
if login_result.get("errorCode"):
raise AuthenticationError(f"iRobot login failed: {login_result}")
# Check if required keys exist
if "credentials" not in login_result:
raise AuthenticationError(
f"Missing 'credentials' in login response: {login_result}"
)
if "robots" not in login_result:
raise AuthenticationError(
f"Missing 'robots' in login response: {login_result}"
)
self.credentials = login_result["credentials"]
self.robots = login_result["robots"]
_LOGGER.debug("iRobot login successful, found %d robots", len(self.robots))
return login_result
async def authenticate(self) -> dict[str, Any]:
"""Complete authentication flow."""
# Discover endpoints first
endpoints = await self.discover_endpoints()
# Login to Gigya
await self.login_gigya(endpoints["gigya"]["api_key"])
# Login to iRobot
return await self.login_irobot()
async def _aws_request(
self, url: str, params: dict[str, Any] | None = None
) -> dict[str, Any]:
"""Make an authenticated AWS request."""
if not self.credentials:
raise AuthenticationError("Not authenticated. Call authenticate() first.")
region = self.credentials["CognitoId"].split(":")[0]
# Parse URL
parsed_url = urllib.parse.urlparse(url)
# Create AWS signer
signer = AWSSignatureV4(
access_key_id=self.credentials["AccessKeyId"],
secret_access_key=self.credentials["SecretKey"],
session_token=self.credentials["SessionToken"],
)
# Generate signed headers
query_params = params or {}
signed_headers = signer.generate_signed_headers(
method="GET",
service="execute-api",
region=region,
host=parsed_url.netloc,
path=parsed_url.path,
query_params=query_params,
headers={
"accept": "application/json",
"content-type": "application/json",
"user-agent": "aws-sdk-iOS/2.27.6 iOS/18.0.1 en_US",
},
payload="",
)
# Build final URL with query parameters
if query_params:
query_string = urllib.parse.urlencode(query_params)
final_url = f"{url}?{query_string}"
else:
final_url = url
async with self.session.get(final_url, headers=signed_headers) as response:
if response.status != 200:
raise CloudApiError(f"AWS request failed: {response.status}")
return await response.json()
async def get_mission_history(self, blid: str) -> dict[str, Any]:
"""Get mission history for a robot."""
url = f"{self.deployment['httpBaseAuth']}/v1/{blid}/missionhistory"
params = {
"app_id": f"IOS-{self.config['appId']}",
"filterType": "omit_quickly_canceled_not_scheduled",
"supportedDoneCodes": "dndEnd,returnHomeEnd",
}
return await self._aws_request(url, params)
async def get_pmaps(self, blid: str) -> list[dict[str, Any]]:
"""Get persistent maps (pmaps) for a robot."""
url = f"{self.deployment['httpBaseAuth']}/v1/{blid}/pmaps"
params = {"visible": "true", "activeDetails": "2"}
return await self._aws_request(url, params)
async def get_pmap_umf(
self, blid: str, pmap_id: str, version_id: str
) -> dict[str, Any]:
"""Get UMF (Unified Map Format) data for a specific pmap."""
url = f"{self.deployment['httpBaseAuth']}/v1/{blid}/pmaps/{pmap_id}/versions/{version_id}/umf"
params = {"activeDetails": "2"}
return await self._aws_request(url, params)
async def get_robot_data(self, blid: str) -> dict[str, Any]:
"""Get comprehensive robot data including pmaps and mission history."""
if blid not in self.robots:
raise CloudApiError(f"Robot {blid} not found in authenticated robots")
robot_data = {
"robot_info": self.robots[blid],
"mission_history": await self.get_mission_history(blid),
"pmaps": await self.get_pmaps(blid),
}
# Get UMF data for active pmaps
for pmap in robot_data["pmaps"]:
if pmap.get("active_pmapv_id"):
try:
robot_data[f"pmap_umf_{pmap['pmap_id']}"] = await self.get_pmap_umf(
blid, pmap["pmap_id"], pmap["active_pmapv_id"]
)
except CloudApiError as e:
_LOGGER.warning(
"Failed to get UMF for pmap %s: %s", pmap["pmap_id"], e
)
return robot_data
async def get_all_robots_data(self) -> dict[str, dict[str, Any]]:
"""Get data for all authenticated robots."""
if not self.robots:
raise CloudApiError("No robots found. Authenticate first.")
all_data = {}
for blid in self.robots:
try:
all_data[blid] = await self.get_robot_data(blid)
_LOGGER.debug("Retrieved data for robot %s", blid)
except CloudApiError as e:
_LOGGER.error("Failed to get data for robot %s: %s", blid, e)
all_data[blid] = {"error": str(e)}
return all_data

View file

@ -37,3 +37,26 @@ class RoombaSensor(CoordinatorEntity, SensorEntity):
def _get_default(self, key: str, default: str): def _get_default(self, key: str, default: str):
return self.coordinator.data.get(key) if self.coordinator.data else default return self.coordinator.data.get(key) if self.coordinator.data else default
class RoombaCloudSensor(CoordinatorEntity, SensorEntity):
"""Generic Roomba sensor to provide coordinator."""
_rs_given_info: tuple[str, str] = ("Sensor", "sensor")
def __init__(self, coordinator, entry) -> None:
"""Create a new generic sensor."""
super().__init__(coordinator)
_LOGGER.debug("Entry unique_id: %s", entry.unique_id)
self._entry = entry
self._attr_entity_category = EntityCategory.DIAGNOSTIC
self._attr_has_entity_name = True
self._attr_name = self._rs_given_info[0]
self._attr_unique_id = f"{entry.unique_id}_{self._rs_given_info[1]}"
self._attr_device_info = {
"identifiers": {(DOMAIN, entry.unique_id)},
"name": entry.title,
"manufacturer": "iRobot",
}
if not entry.data["cloud_api"]:
self._attr_available = False

View file

@ -1,15 +1,12 @@
"""Roomba integration using an external Rest980 server.""" """Roomba integration using an external Rest980 server."""
import asyncio
import logging import logging
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.helpers import config_validation as cv from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.aiohttp_client import async_get_clientsession from .const import DOMAIN
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator from .coordinator import RoombaDataCoordinator, RoombaCloudCoordinator
from .const import DEFAULT_SCAN_INTERVAL, DOMAIN
CONFIG_SCHEMA = cv.config_entry_only_config_schema(DOMAIN) CONFIG_SCHEMA = cv.config_entry_only_config_schema(DOMAIN)
@ -18,25 +15,32 @@ _LOGGER = logging.getLogger(__name__)
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Setup Roombas with the Rest980 base url.""" """Setup Roombas with the Rest980 base url."""
coordinator = RoombaDataCoordinator(hass, entry)
await coordinator.async_config_entry_first_refresh()
hass.data.setdefault(DOMAIN, {}) hass.data.setdefault(DOMAIN, {})
url = entry.data["base_url"]
session = async_get_clientsession(hass) # Use HAs shared session
async def async_update_data():
async with asyncio.timeout(10):
async with session.get(f"{url}/api/local/info/state") as resp:
return await resp.json()
coordinator = DataUpdateCoordinator(
hass,
_LOGGER,
name="Roomba REST Data",
update_method=async_update_data,
update_interval=DEFAULT_SCAN_INTERVAL,
)
hass.data[DOMAIN][entry.entry_id + "_coordinator"] = coordinator hass.data[DOMAIN][entry.entry_id + "_coordinator"] = coordinator
await coordinator.update_method() hass.data[DOMAIN][entry.entry_id + "_blid"] = "unknown"
if entry.data["cloud_api"]:
cc = RoombaCloudCoordinator(hass, entry)
await cc.async_config_entry_first_refresh()
hass.data[DOMAIN][entry.entry_id + "_cloud"] = cc
if "blid" not in entry.data:
for blid, robo in cc.data.items():
try:
ifo = robo["robot_info"] or {}
ifosku = ifo.get("sku")
ifoswv = ifo.get("softwareVer")
ifoname = ifo.get("name")
thisname = coordinator.data.get("name", "Roomba")
thisswv = coordinator.data.get("softwareVer")
thissku = coordinator.data.get("sku")
if ifosku == thissku and ifoswv == thisswv and ifoname == thisname:
hass.data[DOMAIN][entry.entry_id + "_blid"] = blid
except Exception as e:
_LOGGER.debug(e)
else:
hass.data[DOMAIN][entry.entry_id + "_cloud"] = {}
# Forward platforms; create tasks but await to ensure no failure? # Forward platforms; create tasks but await to ensure no failure?
await hass.config_entries.async_forward_entry_setups( await hass.config_entries.async_forward_entry_setups(
entry, ["vacuum", "sensor", "switch"] entry, ["vacuum", "sensor", "switch"]

View file

@ -7,8 +7,16 @@ from homeassistant import config_entries
from homeassistant.helpers.aiohttp_client import async_get_clientsession from homeassistant.helpers.aiohttp_client import async_get_clientsession
from .const import DOMAIN from .const import DOMAIN
from .CloudApi import iRobotCloudApi
SCHEMA = vol.Schema({vol.Required("base_url"): str}) SCHEMA = vol.Schema(
{
vol.Required("base_url"): str,
vol.Required("cloud_api", default=True): bool,
vol.Optional("irobot_username"): str,
vol.Optional("irobot_password"): str,
}
)
class RoombaConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): class RoombaConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
@ -31,6 +39,11 @@ class RoombaConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
data_schema=SCHEMA, data_schema=SCHEMA,
errors=["cannot_connect"], errors=["cannot_connect"],
) )
if user_input["cloud_api"]:
async with iRobotCloudApi(
user_input["irobot_username"], user_input["irobot_password"]
) as api:
await api.authenticate()
except Exception: except Exception:
return self.async_show_form( return self.async_show_form(
step_id="user", step_id="user",

View file

@ -0,0 +1,78 @@
"""Data update coordinator for Roomba REST980."""
import asyncio
import logging
import aiohttp
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import DEFAULT_SCAN_INTERVAL
from .CloudApi import iRobotCloudApi
_LOGGER = logging.getLogger(__name__)
class RoombaDataCoordinator(DataUpdateCoordinator):
"""Data coordinator for Roomba REST980 integration."""
def __init__(self, hass: HomeAssistant, config_entry: ConfigEntry) -> None:
"""Initialize my coordinator."""
super().__init__(
hass,
_LOGGER,
name="rest980 API data",
config_entry=config_entry,
update_interval=DEFAULT_SCAN_INTERVAL,
)
self.session = async_get_clientsession(hass) # Use HAs shared session
self.url = config_entry.data["base_url"]
async def _async_update_data(self):
"""Fetch data from API endpoint.
This is the place to pre-process the data to lookup tables
so entities can quickly look up their data.
"""
try:
# Note: asyncio.TimeoutError and aiohttp.ClientError are already
# handled by the data update coordinator.
async with asyncio.timeout(10):
async with self.session.get(f"{self.url}/api/local/info/state") as resp:
resp.raise_for_status()
return await resp.json()
except (aiohttp.ClientError, TimeoutError) as err:
raise UpdateFailed(f"Error communicating with API: {err}") from err
class RoombaCloudCoordinator(DataUpdateCoordinator):
"""Data coordinator for Roomba REST980 integration."""
api: iRobotCloudApi
def __init__(self, hass: HomeAssistant, config_entry: ConfigEntry) -> None:
"""Initialize my coordinator."""
super().__init__(
hass,
_LOGGER,
name="iRobot Cloud API data",
config_entry=config_entry,
update_interval=DEFAULT_SCAN_INTERVAL,
)
self.username = config_entry.data["irobot_username"]
self.password = config_entry.data["irobot_password"]
self.session = async_get_clientsession(hass)
self.api = iRobotCloudApi(self.username, self.password, self.session)
async def _async_setup(self):
await self.api.authenticate()
async def _async_update_data(self):
try:
async with asyncio.timeout(10):
return await self.api.get_all_robots_data()
except (aiohttp.ClientError, TimeoutError) as err:
raise UpdateFailed(f"Error communicating with API: {err}") from err

View file

@ -7,12 +7,13 @@ from homeassistant.helpers.entity import EntityCategory
from homeassistant.util import dt as dt_util from homeassistant.util import dt as dt_util
from .const import DOMAIN, cleanBaseMappings, jobInitiatorMappings, phaseMappings from .const import DOMAIN, cleanBaseMappings, jobInitiatorMappings, phaseMappings
from .RoombaSensor import RoombaSensor from .RoombaSensor import RoombaSensor, RoombaCloudSensor
async def async_setup_entry(hass: HomeAssistant, entry, async_add_entities): async def async_setup_entry(hass: HomeAssistant, entry, async_add_entities):
"""Create the sensors needed to poll Roomba's data.""" """Create the sensors needed to poll Roomba's data."""
coordinator = hass.data[DOMAIN][entry.entry_id + "_coordinator"] coordinator = hass.data[DOMAIN][entry.entry_id + "_coordinator"]
cloudCoordinator = hass.data[DOMAIN][entry.entry_id + "_cloud"]
async_add_entities( async_add_entities(
[ [
RoombaAttributes(coordinator, entry), RoombaAttributes(coordinator, entry),
@ -31,6 +32,7 @@ async def async_setup_entry(hass: HomeAssistant, entry, async_add_entities):
RoombaCarpetBoostMode(coordinator, entry), RoombaCarpetBoostMode(coordinator, entry),
RoombaCleanEdges(coordinator, entry), RoombaCleanEdges(coordinator, entry),
RoombaCleanMode(coordinator, entry), RoombaCleanMode(coordinator, entry),
RoombaCloudAttributes(cloudCoordinator, entry),
], ],
update_before_add=True, update_before_add=True,
) )
@ -89,6 +91,31 @@ class RoombaAttributes(RoombaSensor):
return self.coordinator.data or {} return self.coordinator.data or {}
class RoombaCloudAttributes(RoombaCloudSensor):
"""A simple sensor that returns all given datapoints without modification."""
_rs_given_info = ("Cloud Attributes", "cloud_attributes")
def __init__(self, coordinator, entry) -> None:
"""Initialize."""
super().__init__(coordinator, entry)
def _handle_coordinator_update(self):
"""Update sensor when coordinator data changes."""
self._attr_native_value = "OK" if self.coordinator.data else "Unavailable"
self.async_write_ha_state()
@property
def extra_state_attributes(self):
"""Return all the attributes returned by iRobot's cloud."""
return (
self.coordinator.data.get(
self.hass.data[DOMAIN][self._entry.entry_id + "_blid"]
)
or {}
)
class RoombaPhase(RoombaSensor): class RoombaPhase(RoombaSensor):
"""A simple sensor that returns the phase of the Roomba.""" """A simple sensor that returns the phase of the Roomba."""

View file

@ -2,13 +2,15 @@
"config": { "config": {
"step": { "step": {
"user": { "user": {
"title": "Connect to rest980", "title": "Setup Roomba for integration",
"description": "This is needed to grab your roomba's API data.",
"data": { "data": {
"base_url": "rest980 Server Base URL" "base_url": "rest980 Base URL (ex: http://localhost:3000)",
"cloud_api": "Enable cloud features?",
"irobot_username": "iRobot Username",
"irobot_password": "iRobot Password"
}, },
"data_description": { "data_description": {
"base_url": "Example: http://localhost:3000" "cloud_api": "Use iRobot's API to add extra features like dynamic rooms and maps."
} }
} }
}, },
@ -21,4 +23,4 @@
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]" "already_configured": "[%key:common::config_flow::abort::already_configured_device%]"
} }
} }
} }

View file

@ -73,7 +73,6 @@ class RoombaVacuum(CoordinatorEntity, StateVacuumEntity):
self._attr_activity = VacuumActivity.DOCKED self._attr_activity = VacuumActivity.DOCKED
self._attr_available = data != {} self._attr_available = data != {}
self._attr_battery_level = data.get("batPct", 0)
self._attr_extra_state_attributes = createExtendedAttributes(self) self._attr_extra_state_attributes = createExtendedAttributes(self)
self._attr_device_info = { self._attr_device_info = {
"identifiers": self._attr_device_info.get("identifiers"), "identifiers": self._attr_device_info.get("identifiers"),