This commit is contained in:
ia74 2025-07-15 17:33:16 -05:00
commit 87ef490eed
10 changed files with 1066 additions and 0 deletions

View file

@ -0,0 +1,39 @@
"""A generic sensor to provide the coordinator and device info."""
import logging
from homeassistant.components.sensor import SensorEntity
from homeassistant.helpers.entity import EntityCategory
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)
class RoombaSensor(CoordinatorEntity, SensorEntity):
"""Generic Roomba sensor to provide coordinator."""
_rs_given_info: tuple[str, str] = ("Sensor", "sensor")
def __init__(self, coordinator, entry) -> None:
"""Create a new generic sensor."""
super().__init__(coordinator)
_LOGGER.debug("Entry unique_id: %s", entry.unique_id)
self._entry = entry
self._attr_entity_category = EntityCategory.DIAGNOSTIC
self._attr_has_entity_name = True
self._attr_name = self._rs_given_info[0]
self._attr_unique_id = f"{entry.unique_id}_{self._rs_given_info[1]}"
self._attr_device_info = {
"identifiers": {(DOMAIN, entry.unique_id)},
"name": entry.title,
"manufacturer": "iRobot",
}
def returnIn(self, mapping: dict[str, str], index: str) -> str:
"""Default or map value."""
mapping.get(index, index)
def _get_default(self, key: str, default: str):
return self.coordinator.data.get(key) if self.coordinator.data else default

View file

@ -0,0 +1,54 @@
"""Roomba integration using an external Rest980 server."""
import asyncio
import logging
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from .const import DEFAULT_SCAN_INTERVAL, DOMAIN
CONFIG_SCHEMA = cv.config_entry_only_config_schema(DOMAIN)
_LOGGER = logging.getLogger(__name__)
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Setup Roombas with the Rest980 base url."""
hass.data.setdefault(DOMAIN, {})
url = entry.data["base_url"]
session = async_get_clientsession(hass) # Use HAs shared session
async def async_update_data():
async with asyncio.timeout(10):
async with session.get(f"{url}/api/local/info/state") as resp:
return await resp.json()
coordinator = DataUpdateCoordinator(
hass,
_LOGGER,
name="Roomba REST Data",
update_method=async_update_data,
update_interval=DEFAULT_SCAN_INTERVAL,
)
hass.data[DOMAIN][entry.entry_id + "_coordinator"] = coordinator
await coordinator.update_method()
# Forward platforms; create tasks but await to ensure no failure?
await hass.config_entries.async_forward_entry_setups(
entry, ["vacuum", "sensor", "switch"]
)
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Safely remove Roombas."""
await hass.config_entries.async_unload_platforms(
entry, ["vacuum", "sensor", "switch"]
)
hass.data[DOMAIN].pop(entry.entry_id + "_coordinator")
return True

View file

@ -0,0 +1,29 @@
"""The configuration flow for the robot."""
import voluptuous as vol
from homeassistant import config_entries
from .const import DOMAIN
class RoombaConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Config flow."""
async def async_step_user(self, user_input=None):
"""Show the user the input for the base url."""
if user_input is not None:
return self.async_create_entry(title="Roomba", data=user_input, options={})
return self.async_show_form(
step_id="user",
data_schema=vol.Schema({vol.Required("base_url"): str}),
)
async def async_step_options(self, user_input=None):
"""I dont know."""
return self.async_create_entry(
title="Room Switches Configured via UI",
data=self.options,
options=self.options,
)

62
roomba_rest980/const.py Normal file
View file

@ -0,0 +1,62 @@
"""Constants for the Roomba (Rest980) integration."""
from datetime import timedelta
DOMAIN = "roomba_rest980"
DEFAULT_SCAN_INTERVAL = timedelta(seconds=10) # or whatever interval you want
notReadyMappings = {
0: "n-a",
2: "Uneven Ground",
15: "Low Battery",
39: "Pending",
48: "Path Blocked",
}
errorMappings = {0: "n-a", 15: "Reboot Required", 18: "Docking Issue"}
cycleMappings = {
"clean": "Clean",
"quick": "Clean (Quick)",
"spot": "Spot",
"evac": "Emptying",
"dock": "Docking",
"train": "Training",
"none": "Ready",
}
phaseMappings = {
"charge": "Charge",
"run": "Run",
"evac": "Empty",
"stop": "Paused",
"stuck": "Stuck",
"hmUsrDock": "Sent Home",
"hmMidMsn": "Mid Dock",
"hmPostMsn": "Final Dock",
"idle": "Idle", # Added for RoombaPhase
"stopped": "Stopped", # Added for RoombaPhase
}
binMappings = {True: "Full", False: "Not Full"}
yesNoMappings = {True: "Yes", False: "No"}
cleanBaseMappings = {
300: "Ready",
301: "Ready",
302: "Empty",
303: "Empty",
350: "Bag Missing",
351: "Clogged",
352: "Sealing Problem",
353: "Bag Full",
360: "Comms Problem",
}
jobInitiatorMappings = {
"schedule": "iRobot Schedule",
"rmtApp": "iRobot App",
"manual": "Robot",
"localApp": "HA",
"none": "None", # Added for RoombaJobInitiator
}

View file

@ -0,0 +1,39 @@
{
"domain": "roomba_rest980",
"name": "Roomba (Rest980)",
"codeowners": ["@ia74"],
"version": "0.0.0",
"config_flow": true,
"dependencies": [],
"dhcp": [
{
"hostname": "irobot-*",
"macaddress": "501479*"
},
{
"hostname": "roomba-*",
"macaddress": "80A589*"
},
{
"hostname": "roomba-*",
"macaddress": "DCF505*"
},
{
"hostname": "roomba-*",
"macaddress": "204EF6*"
}
],
"documentation": "https://github.com/ia74/roomba_rest980",
"iot_class": "local_polling",
"requirements": ["aiohttp"],
"zeroconf": [
{
"type": "_amzn-alexa._tcp.local.",
"name": "irobot-*"
},
{
"type": "_amzn-alexa._tcp.local.",
"name": "roomba-*"
}
]
}

446
roomba_rest980/sensor.py Normal file
View file

@ -0,0 +1,446 @@
"""Create sensors that poll Roomba's data."""
from homeassistant.components.sensor import SensorDeviceClass
from homeassistant.const import PERCENTAGE, UnitOfArea, UnitOfTime
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity import EntityCategory
from homeassistant.util import dt as dt_util
from .const import DOMAIN, cleanBaseMappings, jobInitiatorMappings, phaseMappings
from .RoombaSensor import RoombaSensor
async def async_setup_entry(hass: HomeAssistant, entry, async_add_entities):
"""Create the sensors needed to poll Roomba's data."""
coordinator = hass.data[DOMAIN][entry.entry_id + "_coordinator"]
async_add_entities(
[
RoombaAttributes(coordinator, entry),
RoombaBatterySensor(coordinator, entry),
RoombaBinSensor(coordinator, entry),
RoombaJobInitiator(coordinator, entry),
RoombaPhase(coordinator, entry),
RoombaTotalArea(coordinator, entry),
RoombaTotalTime(coordinator, entry),
RoombaCleanBase(coordinator, entry),
RoombaTotalJobs(coordinator, entry),
RoombaMissionStartTime(coordinator, entry),
RoombaRechargeTime(coordinator, entry),
RoombaMissionExpireTime(coordinator, entry),
RoombaCarpetBoostMode(coordinator, entry),
RoombaCleanEdges(coordinator, entry),
RoombaCleanMode(coordinator, entry),
],
update_before_add=True,
)
class RoombaBatterySensor(RoombaSensor):
"""Read the battery level of the Roomba."""
_rs_given_info = ("Battery", "battery")
def __init__(self, coordinator, entry) -> None:
"""Create a new battery level sensor."""
super().__init__(coordinator, entry)
self._attr_native_unit_of_measurement = PERCENTAGE
self._attr_entity_category = EntityCategory.DIAGNOSTIC
def _handle_coordinator_update(self):
"""Update sensor when coordinator data changes."""
data = self.coordinator.data or {}
self._attr_native_value = data.get("batPct", 0)
self.async_write_ha_state()
@property
def extra_state_attributes(self):
"""Return all the attributes returned by rest980."""
return self._get_default("batInfo", {})
@property
def icon(self):
"""Return a dynamic icon based on battery percentage."""
batLevel = self.native_value or 0
if batLevel >= 95:
return "mdi:battery"
if batLevel >= 60:
return "mdi:battery-60"
if batLevel >= 30:
return "mdi:battery-30"
if batLevel < 30:
return "mdi:battery-alert"
return "mdi:battery"
class RoombaAttributes(RoombaSensor):
"""A simple sensor that returns all given datapoints without modification."""
_rs_given_info = ("Attributes", "attributes")
def _handle_coordinator_update(self):
"""Update sensor when coordinator data changes."""
self._attr_native_value = "OK" if self.coordinator.data else "Unavailable"
self.async_write_ha_state()
@property
def extra_state_attributes(self):
"""Return all the attributes returned by rest980."""
return self.coordinator.data or {}
class RoombaPhase(RoombaSensor):
"""A simple sensor that returns the phase of the Roomba."""
_rs_given_info = ("Phase", "phase")
def __init__(self, coordinator, entry) -> None:
"""Initialize."""
super().__init__(coordinator, entry)
self._attr_device_class = SensorDeviceClass.ENUM
self._attr_options = list(phaseMappings.values())
self._attr_entity_category = EntityCategory.DIAGNOSTIC
def _handle_coordinator_update(self):
"""Update sensor when coordinator data changes."""
data = self.coordinator.data or {}
status = data.get("cleanMissionStatus", {})
# Mission State
cycle = status.get("cycle")
phase = status.get("phase")
battery = data.get("batPct")
if phase == "charge" and battery == 100:
rPhase = "Idle"
elif cycle == "none" and phase == "stop":
rPhase = "Stopped"
else:
rPhase = phaseMappings.get(phase, "Unknown")
self._attr_native_value = rPhase
self.async_write_ha_state()
@property
def icon(self):
"""Return the current phase of the Roomba."""
data = self.coordinator.data or {}
status = data.get("cleanMissionStatus", {})
# Mission State
cycle = status.get("cycle")
phase = status.get("phase")
if cycle == "none" and phase == "stop":
return "mdi:progress-alert"
return "mdi:progress-helper"
class RoombaCleanBase(RoombaSensor):
"""A simple sensor that returns the phase of the Roomba."""
_rs_given_info = ("Clean Base", "clean_base")
def __init__(self, coordinator, entry) -> None:
"""Initialize."""
super().__init__(coordinator, entry)
self._attr_device_class = SensorDeviceClass.ENUM
self._attr_options = list(cleanBaseMappings.values())
self._attr_entity_category = EntityCategory.DIAGNOSTIC
self._attr_icon = "mdi:trash-can"
def _handle_coordinator_update(self):
"""Update sensor when coordinator data changes."""
data = self.coordinator.data or {}
dock = data.get("dock")
dockState = dock.get("state")
self._attr_native_value = cleanBaseMappings.get(dockState, "Unknown")
self.async_write_ha_state()
class RoombaBinSensor(RoombaSensor):
"""Read the bin data of the Roomba."""
_rs_given_info = ("Bin", "bin")
def __init__(self, coordinator, entry) -> None:
"""Create a new battery level sensor."""
super().__init__(coordinator, entry)
self._attr_device_class = SensorDeviceClass.ENUM
self._attr_options = ["Not Full", "Full"]
self._attr_entity_category = EntityCategory.DIAGNOSTIC
def _handle_coordinator_update(self):
"""Update sensor when coordinator data changes."""
self._attr_native_value = (
"Full" if self._get_default("bin", {}).get("full") else "Not Full"
)
self.async_write_ha_state()
@property
def icon(self):
"""Return a dynamic icon based on bin being full or not."""
full: bool = self._get_default("bin", {}).get("full")
return "mdi:trash-can-outline" if not full else "mdi:trash-can"
class RoombaJobInitiator(RoombaSensor):
"""Read the job initiator of the Roomba."""
_rs_given_info = ("Job Initiator", "job_initiator")
def __init__(self, coordinator, entry) -> None:
"""Create a new job initiator reading."""
super().__init__(coordinator, entry)
self._attr_device_class = SensorDeviceClass.ENUM
self._attr_options = list(jobInitiatorMappings.values())
self._attr_entity_category = EntityCategory.DIAGNOSTIC
self._attr_icon = "mdi:cursor-pointer"
def _handle_coordinator_update(self):
"""Update sensor when coordinator data changes."""
data = self.coordinator.data or {}
status = data.get("cleanMissionStatus", {})
initiator = status.get("initiator") or "none"
self._attr_native_value = jobInitiatorMappings.get(initiator, "Unknown")
self.async_write_ha_state()
class RoombaMissionStartTime(RoombaSensor):
"""Read the mission start time of the Roomba."""
_rs_given_info = ("Job Start Time", "job_start_time")
def __init__(self, coordinator, entry) -> None:
"""Create a new job start time reading."""
super().__init__(coordinator, entry)
self._attr_device_class = SensorDeviceClass.TIMESTAMP
self._attr_icon = "mdi:clock-start"
def _handle_coordinator_update(self):
"""Update sensor when coordinator data changes."""
data = self.coordinator.data or {}
status = data.get("cleanMissionStatus", {})
missionStartTime = status.get("mssnStrtTm") # Unix timestamp in seconds?
if missionStartTime:
self._attr_available = True
try:
self._attr_native_value = dt_util.utc_from_timestamp(missionStartTime)
except (TypeError, ValueError):
self._attr_native_value = None
else:
self._attr_native_value = None
self._attr_available = False
self.async_write_ha_state()
class RoombaRechargeTime(RoombaSensor):
"""Read the mission start time of the Roomba."""
_rs_given_info = ("Recharge Time", "job_recharge_time")
def __init__(self, coordinator, entry) -> None:
"""Create a new job recharge time reading."""
super().__init__(coordinator, entry)
self._attr_device_class = SensorDeviceClass.TIMESTAMP
self._attr_icon = "mdi:battery-clock"
def _handle_coordinator_update(self):
"""Update sensor when coordinator data changes."""
data = self.coordinator.data or {}
status = data.get("cleanMissionStatus", {})
missionStartTime = status.get("rechrgTm") # Unix timestamp in seconds?
if missionStartTime:
self._attr_available = True
try:
self._attr_native_value = dt_util.utc_from_timestamp(missionStartTime)
except (TypeError, ValueError):
self._attr_native_value = None
else:
self._attr_native_value = None
self._attr_available = False
self.async_write_ha_state()
class RoombaCarpetBoostMode(RoombaSensor):
"""Read the mission start time of the Roomba."""
_rs_given_info = ("Carpet Boost", "carpet_boost")
def __init__(self, coordinator, entry) -> None:
"""Create a new job carpet boost mode."""
super().__init__(coordinator, entry)
self._attr_device_class = SensorDeviceClass.ENUM
self._attr_options = ["Eco", "Performance", "Auto", "n-a"]
self._attr_icon = "mdi:rug"
def _handle_coordinator_update(self):
"""Update sensor when coordinator data changes."""
data = self.coordinator.data or {}
vacuumHigh = data.get("vacHigh")
carpetBoost = data.get("carpetBoost")
if vacuumHigh is not None:
if not vacuumHigh and not carpetBoost:
self._attr_native_value = "Eco"
elif vacuumHigh and not carpetBoost:
self._attr_native_value = "Performance"
else:
self._attr_native_value = "Auto"
else:
self._attr_native_value = "n-a"
self.async_write_ha_state()
class RoombaCleanEdges(RoombaSensor):
"""Read the mission start time of the Roomba."""
_rs_given_info = ("Clean Edges", "clean_edges")
def __init__(self, coordinator, entry) -> None:
"""Create a new clean_edges sensor."""
super().__init__(coordinator, entry)
self._attr_device_class = SensorDeviceClass.ENUM
self._attr_options = ["Yes", "No", "n-a"]
self._attr_icon = "mdi:wall"
def _handle_coordinator_update(self):
"""Update sensor when coordinator data changes."""
data = self.coordinator.data or {}
openOnly = data.get("openOnly")
if openOnly is not None:
if openOnly:
self._attr_native_value = "No"
else:
self._attr_native_value = "Yes"
else:
self._attr_native_value = "n-a"
self.async_write_ha_state()
class RoombaCleanMode(RoombaSensor):
"""Read the clean mode of the Roomba."""
_rs_given_info = ("Clean Mode", "clean_mode")
def __init__(self, coordinator, entry) -> None:
"""Create a new clean_edges sensor."""
super().__init__(coordinator, entry)
self._attr_device_class = SensorDeviceClass.ENUM
self._attr_options = ["One", "Two", "Auto", "n-a"]
self._attr_icon = "mdi:broom"
def _handle_coordinator_update(self):
"""Update sensor when coordinator data changes."""
data = self.coordinator.data or {}
noAutoPasses = data.get("noAutoPasses")
twoPass = data.get("twoPass")
if noAutoPasses is not None and twoPass is not None:
if noAutoPasses is True and twoPass is False:
self._attr_native_value = "One"
elif noAutoPasses is True and twoPass is True:
self._attr_native_value = "Two"
else:
self._attr_native_value = "Auto"
else:
self._attr_native_value = "n-a"
self.async_write_ha_state()
class RoombaMissionExpireTime(RoombaSensor):
"""Read the mission start time of the Roomba."""
_rs_given_info = ("Job Expire Time", "job_expire_time")
def __init__(self, coordinator, entry) -> None:
"""Create a new job recharge time reading."""
super().__init__(coordinator, entry)
self._attr_device_class = SensorDeviceClass.TIMESTAMP
self._attr_icon = "mdi:timeline-alert"
def _handle_coordinator_update(self):
"""Update sensor when coordinator data changes."""
data = self.coordinator.data or {}
status = data.get("cleanMissionStatus", {})
missionStartTime = status.get("expireTm") # Unix timestamp in seconds?
self._attr_available = False
if missionStartTime:
self._attr_available = True
try:
self._attr_native_value = dt_util.utc_from_timestamp(missionStartTime)
except (TypeError, ValueError):
self._attr_native_value = None
else:
self._attr_native_value = None
self.async_write_ha_state()
class RoombaTotalArea(RoombaSensor):
"""Read the job initiator of the Roomba."""
_rs_given_info = ("Total Area", "total_area")
def __init__(self, coordinator, entry) -> None:
"""Create a new job initiator reading."""
super().__init__(coordinator, entry)
self._attr_device_class = SensorDeviceClass.AREA
self._attr_native_unit_of_measurement = UnitOfArea.SQUARE_METERS
self._attr_entity_category = EntityCategory.DIAGNOSTIC
self._attr_icon = "mdi:texture-box"
def _handle_coordinator_update(self):
"""Update sensor when coordinator data changes."""
data = self.coordinator.data or {}
runtimeStats = data.get("runtimeStats") or {}
sqft = runtimeStats.get("sqft")
self._attr_native_value = sqft
self.async_write_ha_state()
class RoombaTotalTime(RoombaSensor):
"""Read the total time the Roomba."""
_rs_given_info = ("Total Time", "total_time")
def __init__(self, coordinator, entry) -> None:
"""Create a new job initiator reading."""
super().__init__(coordinator, entry)
self._attr_device_class = SensorDeviceClass.DURATION
self._attr_native_unit_of_measurement = UnitOfTime.MINUTES
self._attr_entity_category = EntityCategory.DIAGNOSTIC
self._attr_icon = "mdi:clock-time-five"
def _handle_coordinator_update(self):
"""Update sensor when coordinator data changes."""
data = self.coordinator.data or {}
runtimeStats = data.get("runtimeStats") or {}
hr = runtimeStats.get("hr")
timeMin = runtimeStats.get("min")
self._attr_native_value = (hr * 60) + timeMin
self.async_write_ha_state()
class RoombaTotalJobs(RoombaSensor):
"""Read the total jobs from the Roomba."""
_rs_given_info = ("Total Jobs", "total_jobs")
def __init__(self, coordinator, entry) -> None:
"""Create a new job initiator reading."""
super().__init__(coordinator, entry)
self._attr_native_unit_of_measurement = UnitOfTime.MINUTES
self._attr_entity_category = EntityCategory.DIAGNOSTIC
self._attr_icon = "mdi:transmission-tower"
def _handle_coordinator_update(self):
"""Update sensor when coordinator data changes."""
data = self.coordinator.data or {}
bbmssn = data.get("bbmssn") or {}
self._attr_native_value = bbmssn.get("nMssn")
self.async_write_ha_state()

View file

@ -0,0 +1,12 @@
# services.yaml
vacuum_action:
fields:
command:
required: true
example: "dock"
vacuum_clean:
fields:
payload:
required: true
example: '{"regions": [{"region_id": "11", "type": "rid"}]}'

View file

@ -0,0 +1,24 @@
{
"config": {
"step": {
"user": {
"title": "Connect to rest980",
"description": "This is needed to grab your roomba's API data.",
"data": {
"base_url": "rest980 Server Base URL"
},
"data_description": {
"base_url": "Example: http://localhost:3000"
}
}
},
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"invalid_auth": "[%key:common::config_flow::error::invalid_auth%]",
"unknown": "[%key:common::config_flow::error::unknown%]"
},
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]"
}
}
}

59
roomba_rest980/switch.py Normal file
View file

@ -0,0 +1,59 @@
"""Switches needed."""
from homeassistant.components.switch import SwitchEntity
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity import EntityCategory
from .const import DOMAIN
ROOMS = {
"Kitchen": {"region_id": "11", "type": "rid"},
"Living Room": {"region_id": "9", "type": "rid"},
"Dining Room": {"region_id": "1", "type": "rid"},
"Hallway": {"region_id": "10", "type": "rid"},
"Office": {"region_id": "12", "type": "rid"},
}
async def async_setup_entry(hass: HomeAssistant, entry, async_add_entities):
"""Create the switches to identify cleanable rooms."""
entities = [RoomSwitch(entry, name, data) for name, data in ROOMS.items()]
for ent in entities:
hass.data[DOMAIN][f"switch.{ent.unique_id}"] = ent
async_add_entities(entities)
class RoomSwitch(SwitchEntity):
"""A switch entity to determine whether or not a room should be cleaned by the vacuum."""
def __init__(self, entry, name, data) -> None:
"""Creates a switch entity for rooms."""
self._attr_name = f"Clean {name}"
self._attr_unique_id = f"{entry.entry_id}_{name.lower().replace(' ', '_')}"
self._is_on = False
self._room_json = data
self._attr_entity_category = EntityCategory.CONFIG
self._attr_device_info = {
"identifiers": {(DOMAIN, entry.unique_id)},
"name": entry.title,
"manufacturer": "iRobot",
}
@property
def is_on(self):
"""Does the user want the room to be cleaned?"""
return self._is_on
async def async_turn_on(self, **kwargs):
"""Yes."""
self._is_on = True
self.async_write_ha_state()
async def async_turn_off(self, **kwargs):
"""No."""
self._is_on = False
self.async_write_ha_state()
def get_region_json(self):
"""I'm not sure what this does to be honest."""
return self._room_json

302
roomba_rest980/vacuum.py Normal file
View file

@ -0,0 +1,302 @@
"""The vacuum."""
from datetime import datetime
import json
import logging
from homeassistant.components.vacuum import (
StateVacuumEntity,
VacuumActivity,
VacuumEntityFeature,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import (
DOMAIN,
binMappings,
cleanBaseMappings,
cycleMappings,
errorMappings,
jobInitiatorMappings,
notReadyMappings,
phaseMappings,
yesNoMappings,
)
_LOGGER = logging.getLogger(__name__)
SUPPORT_ROBOT = (
VacuumEntityFeature.START
| VacuumEntityFeature.RETURN_HOME
| VacuumEntityFeature.CLEAN_SPOT
| VacuumEntityFeature.MAP
| VacuumEntityFeature.SEND_COMMAND
| VacuumEntityFeature.STATE
| VacuumEntityFeature.STATUS
)
async def async_setup_entry(hass: HomeAssistant, entry, async_add_entities):
"""Create the vacuum."""
coordinator = hass.data[DOMAIN][entry.entry_id + "_coordinator"]
await coordinator.async_config_entry_first_refresh()
async_add_entities([RoombaVacuum(hass, coordinator, entry)])
class RoombaVacuum(CoordinatorEntity, StateVacuumEntity):
"""The Rest980 controlled vacuum."""
def __init__(self, hass: HomeAssistant, coordinator, entry: ConfigEntry) -> None:
"""Setup the robot."""
super().__init__(coordinator)
self.hass = hass
self._entry: ConfigEntry = entry
self._attr_supported_features = SUPPORT_ROBOT
self._attr_unique_id = f"{entry.unique_id}_vacuum"
self._attr_device_info = {
"identifiers": {(DOMAIN, entry.unique_id)},
"name": entry.title,
"manufacturer": "iRobot",
}
def _get_default(self, key: str, default: str):
return self.coordinator.data.get(key) if self.coordinator.data else default
@property
def name(self):
"""Return its name."""
return self.coordinator.data.get("name") if self.coordinator.data else "Roomba"
@property
def available(self):
"""Is vacuum available?"""
return self.coordinator.data is not None
@property
def activity(self):
"""Return the state."""
data = self.coordinator.data
if not data:
return None # Return None so HA marks entity as unavailable
status = data.get("cleanMissionStatus", {})
cycle = status.get("cycle")
not_ready = status.get("notReady")
if cycle == "none" and not_ready == 39:
return VacuumActivity.IDLE
if not_ready and not_ready > 0:
return VacuumActivity.ERROR
if cycle in ["clean", "quick", "spot", "train"]:
return VacuumActivity.CLEANING
if cycle in ["evac", "dock"]:
return VacuumActivity.DOCKED
return VacuumActivity.IDLE
@property
def extra_state_attributes(self):
"""Return all the given attributes from rest980."""
data = self.coordinator.data or {}
status = data.get("cleanMissionStatus", {})
# Mission State
cycle = status.get("cycle")
phase = status.get("phase")
err = status.get("error")
notReady = status.get("notReady")
initiator = status.get("initiator")
missionStartTime = status.get("mssnStrtTm")
rechargeTime = status.get("rechrgTm")
expireTime = status.get("expireTm")
# Generic Data
softwareVer = data.get("softwareVer")
vacuumHigh = data.get("vacHigh")
carpetBoost = data.get("carpetBoost")
if vacuumHigh is not None:
if not vacuumHigh and not carpetBoost:
robotCarpetBoost = "Eco"
elif vacuumHigh and not carpetBoost:
robotCarpetBoost = "Performance"
else:
robotCarpetBoost = "Auto"
else:
robotCarpetBoost = "n-a"
battery = data.get("batPct")
if "+" in softwareVer:
softwareVer = softwareVer.split("+")[1]
if cycle == "none" and notReady == 39:
extv = "Pending"
elif notReady > 0:
extv = f"Not Ready ({notReady})"
else:
extv = self.returnIn(cycleMappings, cycle)
if phase == "charge" and battery == 100:
rPhase = "Idle"
elif cycle == "none" and phase == "stop":
rPhase = "Stopped"
else:
rPhase = self.returnIn(phaseMappings, phase)
if missionStartTime != 0:
time = datetime.fromtimestamp(missionStartTime)
elapsed = round((datetime.now().timestamp() - time.timestamp()) / 60)
if elapsed > 60:
jobTime = f"{elapsed // 60}h {f'{elapsed % 60:0>2d}'}m"
else:
jobTime = f"{elapsed}m"
else:
jobTime = "n-a"
if rechargeTime != 0:
time = datetime.fromtimestamp(rechargeTime)
resume = round((datetime.now().timestamp() - time.timestamp()) / 60)
if elapsed > 60:
jobResumeTime = f"{resume // 60}h {f'{resume % 60:0>2d}'}m"
else:
jobResumeTime = f"{resume}m"
else:
jobResumeTime = "n-a"
if expireTime != 0:
time = datetime.fromtimestamp(expireTime)
expire = round((datetime.now().timestamp() - time.timestamp()) / 60)
if elapsed > 60:
jobExpireTime = f"{expire // 60}h {f'{expire % 60:0>2d}'}m"
else:
jobExpireTime = f"{expire}m"
else:
jobExpireTime = "n-a"
# Bin
robotBin = data.get("bin")
binFull = robotBin.get("full")
binPresent = robotBin.get("present")
# Dock
dock = data.get("dock")
dockState = dock.get("state")
# Pose
## NOTE: My roomba's firmware does not support this anymore, so I'm blindly guessing based on the previous YAML integration details.
pose = data.get("pose") or {}
theta = pose.get("theta")
point = pose.get("point") or {}
pointX = point.get("x")
pointY = point.get("y")
if theta is not None:
location = f"{pointX}, {pointY}, {theta}"
else:
location = "n-a"
# Networking
signal = data.get("signal")
rssi = signal.get("rssi")
# Runtime Statistics
runtimeStats = data.get("runtimeStats")
sqft = runtimeStats.get("sqft")
hr = runtimeStats.get("hr")
timeMin = runtimeStats.get("min")
# Mission total(s?)
bbmssn = data.get("bbmssn")
numMissions = bbmssn.get("nMssn")
# Run total(s?)
bbrun = data.get("bbrun")
numDirt = bbrun.get("nScrubs")
numEvacs = bbrun.get("nEvacs")
# numEvacs only for I7+/S9+ Models (Clean Base)
pmaps = data.get("pmaps", [])
pmap0id = next(iter(pmaps[0]), None) if pmaps else None
noAutoPasses = data.get("noAutoPasses")
twoPass = data.get("twoPass")
if noAutoPasses is not None and twoPass is not None:
if noAutoPasses is True and twoPass is False:
robotCleanMode = "One"
elif noAutoPasses is True and twoPass is True:
robotCleanMode = "Two"
else:
robotCleanMode = "Auto"
else:
robotCleanMode = "n-a"
return [
("extendedStatus", extv),
("notready_msg", self.returnIn(notReadyMappings, notReady)),
("error_msg", self.returnIn(errorMappings, err)),
("battery", battery),
("software_ver", softwareVer),
("phase", rPhase),
("bin", self.returnIn(binMappings, binFull)),
("bin_present", self.returnIn(yesNoMappings, binPresent)),
("clean_base", self.returnIn(cleanBaseMappings, dockState)),
("location", location),
("rssi", rssi),
("total_area", f"{round(sqft / 10.764 * 100)}"),
("total_time", f"{hr}h {timeMin}m"),
("total_jobs", numMissions),
("dirt_events", numDirt),
("evac_events", numEvacs),
("job_initiator", self.returnIn(jobInitiatorMappings, initiator)),
("job_time", jobTime),
("job_recharge", jobResumeTime),
("job_expire", jobExpireTime),
("clean_mode", robotCleanMode),
("carpet_boost", robotCarpetBoost),
("clean_edges", "true" if not data.get("openOnly", False) else "false"),
("maint_due", False),
("pmap0_id", pmap0id),
]
def returnIn(self, map: map, index: any):
"""Default or map value."""
if index in map:
return map[index]
return index
async def async_clean_spot(self, **kwargs):
"""Spot clean."""
async def async_start(self):
"""Start cleaning floors, check if any are selected or just clean everything."""
payload = []
for entity in self.hass.states.async_all("switch"):
if entity.entity_id.startswith("switch.clean_") and entity.state == "on":
switch_obj = self.hass.data[DOMAIN].get(entity.entity_id)
if switch_obj:
payload.append(switch_obj.get_region_json())
if payload:
# TODO: FIX THIS FIX THIS IT NEEDS TO BE DYNAMIC NOT THIS GARBAGE
# TODO: FIX THIS FIX THIS IT NEEDS TO BE DYNAMIC NOT THIS GARBAGE
# TODO: FIX THIS FIX THIS IT NEEDS TO BE DYNAMIC NOT THIS GARBAGE
await self.hass.services.async_call(
DOMAIN,
"vacuum_clean",
service_data={
"payload": json.dumps(
{
"ordered": 1,
"pmap_id": "BGQxV6zGTmCsalWFHr-S5g",
"regions": payload,
}
)
},
blocking=True,
)
else:
_LOGGER.warning("No rooms selected for cleaning")
async def async_return_to_base(self):
"""Calls the Roomba back to its dock."""
await self.hass.services.async_call(
DOMAIN, "vacuum_action", service_data={"command": "dock"}, blocking=True
)