commit 87ef490eed2731daa96ee41193c4f038a21c4a21 Author: ia74 <68617740+ia74@users.noreply.github.com> Date: Tue Jul 15 17:33:16 2025 -0500 opl diff --git a/roomba_rest980/RoombaSensor.py b/roomba_rest980/RoombaSensor.py new file mode 100644 index 0000000..7aa07be --- /dev/null +++ b/roomba_rest980/RoombaSensor.py @@ -0,0 +1,39 @@ +"""A generic sensor to provide the coordinator and device info.""" + +import logging + +from homeassistant.components.sensor import SensorEntity +from homeassistant.helpers.entity import EntityCategory +from homeassistant.helpers.update_coordinator import CoordinatorEntity + +from .const import DOMAIN + +_LOGGER = logging.getLogger(__name__) + + +class RoombaSensor(CoordinatorEntity, SensorEntity): + """Generic Roomba sensor to provide coordinator.""" + + _rs_given_info: tuple[str, str] = ("Sensor", "sensor") + + def __init__(self, coordinator, entry) -> None: + """Create a new generic sensor.""" + super().__init__(coordinator) + _LOGGER.debug("Entry unique_id: %s", entry.unique_id) + self._entry = entry + self._attr_entity_category = EntityCategory.DIAGNOSTIC + self._attr_has_entity_name = True + self._attr_name = self._rs_given_info[0] + self._attr_unique_id = f"{entry.unique_id}_{self._rs_given_info[1]}" + self._attr_device_info = { + "identifiers": {(DOMAIN, entry.unique_id)}, + "name": entry.title, + "manufacturer": "iRobot", + } + + def returnIn(self, mapping: dict[str, str], index: str) -> str: + """Default or map value.""" + mapping.get(index, index) + + def _get_default(self, key: str, default: str): + return self.coordinator.data.get(key) if self.coordinator.data else default diff --git a/roomba_rest980/__init__.py b/roomba_rest980/__init__.py new file mode 100644 index 0000000..52aa849 --- /dev/null +++ b/roomba_rest980/__init__.py @@ -0,0 +1,54 @@ +"""Roomba integration using an external Rest980 server.""" + +import asyncio +import logging + +from homeassistant.config_entries import ConfigEntry +from homeassistant.core import HomeAssistant +from homeassistant.helpers import config_validation as cv +from homeassistant.helpers.aiohttp_client import async_get_clientsession +from homeassistant.helpers.update_coordinator import DataUpdateCoordinator + +from .const import DEFAULT_SCAN_INTERVAL, DOMAIN + +CONFIG_SCHEMA = cv.config_entry_only_config_schema(DOMAIN) + +_LOGGER = logging.getLogger(__name__) + + +async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: + """Setup Roombas with the Rest980 base url.""" + hass.data.setdefault(DOMAIN, {}) + url = entry.data["base_url"] + session = async_get_clientsession(hass) # Use HA’s shared session + + async def async_update_data(): + async with asyncio.timeout(10): + async with session.get(f"{url}/api/local/info/state") as resp: + return await resp.json() + + coordinator = DataUpdateCoordinator( + hass, + _LOGGER, + name="Roomba REST Data", + update_method=async_update_data, + update_interval=DEFAULT_SCAN_INTERVAL, + ) + hass.data[DOMAIN][entry.entry_id + "_coordinator"] = coordinator + await coordinator.update_method() + + # Forward platforms; create tasks but await to ensure no failure? + await hass.config_entries.async_forward_entry_setups( + entry, ["vacuum", "sensor", "switch"] + ) + + return True + + +async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: + """Safely remove Roombas.""" + await hass.config_entries.async_unload_platforms( + entry, ["vacuum", "sensor", "switch"] + ) + hass.data[DOMAIN].pop(entry.entry_id + "_coordinator") + return True diff --git a/roomba_rest980/config_flow.py b/roomba_rest980/config_flow.py new file mode 100644 index 0000000..df7c544 --- /dev/null +++ b/roomba_rest980/config_flow.py @@ -0,0 +1,29 @@ +"""The configuration flow for the robot.""" + +import voluptuous as vol + +from homeassistant import config_entries + +from .const import DOMAIN + + +class RoombaConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): + """Config flow.""" + + async def async_step_user(self, user_input=None): + """Show the user the input for the base url.""" + if user_input is not None: + return self.async_create_entry(title="Roomba", data=user_input, options={}) + + return self.async_show_form( + step_id="user", + data_schema=vol.Schema({vol.Required("base_url"): str}), + ) + + async def async_step_options(self, user_input=None): + """I dont know.""" + return self.async_create_entry( + title="Room Switches Configured via UI", + data=self.options, + options=self.options, + ) diff --git a/roomba_rest980/const.py b/roomba_rest980/const.py new file mode 100644 index 0000000..d8bfa0f --- /dev/null +++ b/roomba_rest980/const.py @@ -0,0 +1,62 @@ +"""Constants for the Roomba (Rest980) integration.""" + +from datetime import timedelta + +DOMAIN = "roomba_rest980" +DEFAULT_SCAN_INTERVAL = timedelta(seconds=10) # or whatever interval you want + +notReadyMappings = { + 0: "n-a", + 2: "Uneven Ground", + 15: "Low Battery", + 39: "Pending", + 48: "Path Blocked", +} +errorMappings = {0: "n-a", 15: "Reboot Required", 18: "Docking Issue"} + +cycleMappings = { + "clean": "Clean", + "quick": "Clean (Quick)", + "spot": "Spot", + "evac": "Emptying", + "dock": "Docking", + "train": "Training", + "none": "Ready", +} + +phaseMappings = { + "charge": "Charge", + "run": "Run", + "evac": "Empty", + "stop": "Paused", + "stuck": "Stuck", + "hmUsrDock": "Sent Home", + "hmMidMsn": "Mid Dock", + "hmPostMsn": "Final Dock", + "idle": "Idle", # Added for RoombaPhase + "stopped": "Stopped", # Added for RoombaPhase +} + +binMappings = {True: "Full", False: "Not Full"} + +yesNoMappings = {True: "Yes", False: "No"} + +cleanBaseMappings = { + 300: "Ready", + 301: "Ready", + 302: "Empty", + 303: "Empty", + 350: "Bag Missing", + 351: "Clogged", + 352: "Sealing Problem", + 353: "Bag Full", + 360: "Comms Problem", +} + +jobInitiatorMappings = { + "schedule": "iRobot Schedule", + "rmtApp": "iRobot App", + "manual": "Robot", + "localApp": "HA", + "none": "None", # Added for RoombaJobInitiator +} diff --git a/roomba_rest980/manifest.json b/roomba_rest980/manifest.json new file mode 100644 index 0000000..ce02a06 --- /dev/null +++ b/roomba_rest980/manifest.json @@ -0,0 +1,39 @@ +{ + "domain": "roomba_rest980", + "name": "Roomba (Rest980)", + "codeowners": ["@ia74"], + "version": "0.0.0", + "config_flow": true, + "dependencies": [], + "dhcp": [ + { + "hostname": "irobot-*", + "macaddress": "501479*" + }, + { + "hostname": "roomba-*", + "macaddress": "80A589*" + }, + { + "hostname": "roomba-*", + "macaddress": "DCF505*" + }, + { + "hostname": "roomba-*", + "macaddress": "204EF6*" + } + ], + "documentation": "https://github.com/ia74/roomba_rest980", + "iot_class": "local_polling", + "requirements": ["aiohttp"], + "zeroconf": [ + { + "type": "_amzn-alexa._tcp.local.", + "name": "irobot-*" + }, + { + "type": "_amzn-alexa._tcp.local.", + "name": "roomba-*" + } + ] +} diff --git a/roomba_rest980/sensor.py b/roomba_rest980/sensor.py new file mode 100644 index 0000000..082a00f --- /dev/null +++ b/roomba_rest980/sensor.py @@ -0,0 +1,446 @@ +"""Create sensors that poll Roomba's data.""" + +from homeassistant.components.sensor import SensorDeviceClass +from homeassistant.const import PERCENTAGE, UnitOfArea, UnitOfTime +from homeassistant.core import HomeAssistant +from homeassistant.helpers.entity import EntityCategory +from homeassistant.util import dt as dt_util + +from .const import DOMAIN, cleanBaseMappings, jobInitiatorMappings, phaseMappings +from .RoombaSensor import RoombaSensor + + +async def async_setup_entry(hass: HomeAssistant, entry, async_add_entities): + """Create the sensors needed to poll Roomba's data.""" + coordinator = hass.data[DOMAIN][entry.entry_id + "_coordinator"] + async_add_entities( + [ + RoombaAttributes(coordinator, entry), + RoombaBatterySensor(coordinator, entry), + RoombaBinSensor(coordinator, entry), + RoombaJobInitiator(coordinator, entry), + RoombaPhase(coordinator, entry), + RoombaTotalArea(coordinator, entry), + RoombaTotalTime(coordinator, entry), + RoombaCleanBase(coordinator, entry), + RoombaTotalJobs(coordinator, entry), + RoombaMissionStartTime(coordinator, entry), + RoombaRechargeTime(coordinator, entry), + RoombaMissionExpireTime(coordinator, entry), + RoombaCarpetBoostMode(coordinator, entry), + RoombaCleanEdges(coordinator, entry), + RoombaCleanMode(coordinator, entry), + ], + update_before_add=True, + ) + + +class RoombaBatterySensor(RoombaSensor): + """Read the battery level of the Roomba.""" + + _rs_given_info = ("Battery", "battery") + + def __init__(self, coordinator, entry) -> None: + """Create a new battery level sensor.""" + super().__init__(coordinator, entry) + self._attr_native_unit_of_measurement = PERCENTAGE + self._attr_entity_category = EntityCategory.DIAGNOSTIC + + def _handle_coordinator_update(self): + """Update sensor when coordinator data changes.""" + data = self.coordinator.data or {} + self._attr_native_value = data.get("batPct", 0) + self.async_write_ha_state() + + @property + def extra_state_attributes(self): + """Return all the attributes returned by rest980.""" + return self._get_default("batInfo", {}) + + @property + def icon(self): + """Return a dynamic icon based on battery percentage.""" + batLevel = self.native_value or 0 + if batLevel >= 95: + return "mdi:battery" + if batLevel >= 60: + return "mdi:battery-60" + if batLevel >= 30: + return "mdi:battery-30" + if batLevel < 30: + return "mdi:battery-alert" + return "mdi:battery" + + +class RoombaAttributes(RoombaSensor): + """A simple sensor that returns all given datapoints without modification.""" + + _rs_given_info = ("Attributes", "attributes") + + def _handle_coordinator_update(self): + """Update sensor when coordinator data changes.""" + self._attr_native_value = "OK" if self.coordinator.data else "Unavailable" + self.async_write_ha_state() + + @property + def extra_state_attributes(self): + """Return all the attributes returned by rest980.""" + return self.coordinator.data or {} + + +class RoombaPhase(RoombaSensor): + """A simple sensor that returns the phase of the Roomba.""" + + _rs_given_info = ("Phase", "phase") + + def __init__(self, coordinator, entry) -> None: + """Initialize.""" + super().__init__(coordinator, entry) + self._attr_device_class = SensorDeviceClass.ENUM + self._attr_options = list(phaseMappings.values()) + self._attr_entity_category = EntityCategory.DIAGNOSTIC + + def _handle_coordinator_update(self): + """Update sensor when coordinator data changes.""" + data = self.coordinator.data or {} + status = data.get("cleanMissionStatus", {}) + # Mission State + cycle = status.get("cycle") + phase = status.get("phase") + battery = data.get("batPct") + + if phase == "charge" and battery == 100: + rPhase = "Idle" + elif cycle == "none" and phase == "stop": + rPhase = "Stopped" + else: + rPhase = phaseMappings.get(phase, "Unknown") + self._attr_native_value = rPhase + self.async_write_ha_state() + + @property + def icon(self): + """Return the current phase of the Roomba.""" + data = self.coordinator.data or {} + status = data.get("cleanMissionStatus", {}) + # Mission State + cycle = status.get("cycle") + phase = status.get("phase") + + if cycle == "none" and phase == "stop": + return "mdi:progress-alert" + return "mdi:progress-helper" + + +class RoombaCleanBase(RoombaSensor): + """A simple sensor that returns the phase of the Roomba.""" + + _rs_given_info = ("Clean Base", "clean_base") + + def __init__(self, coordinator, entry) -> None: + """Initialize.""" + super().__init__(coordinator, entry) + self._attr_device_class = SensorDeviceClass.ENUM + self._attr_options = list(cleanBaseMappings.values()) + self._attr_entity_category = EntityCategory.DIAGNOSTIC + self._attr_icon = "mdi:trash-can" + + def _handle_coordinator_update(self): + """Update sensor when coordinator data changes.""" + data = self.coordinator.data or {} + dock = data.get("dock") + dockState = dock.get("state") + self._attr_native_value = cleanBaseMappings.get(dockState, "Unknown") + self.async_write_ha_state() + + +class RoombaBinSensor(RoombaSensor): + """Read the bin data of the Roomba.""" + + _rs_given_info = ("Bin", "bin") + + def __init__(self, coordinator, entry) -> None: + """Create a new battery level sensor.""" + super().__init__(coordinator, entry) + self._attr_device_class = SensorDeviceClass.ENUM + self._attr_options = ["Not Full", "Full"] + self._attr_entity_category = EntityCategory.DIAGNOSTIC + + def _handle_coordinator_update(self): + """Update sensor when coordinator data changes.""" + self._attr_native_value = ( + "Full" if self._get_default("bin", {}).get("full") else "Not Full" + ) + self.async_write_ha_state() + + @property + def icon(self): + """Return a dynamic icon based on bin being full or not.""" + full: bool = self._get_default("bin", {}).get("full") + return "mdi:trash-can-outline" if not full else "mdi:trash-can" + + +class RoombaJobInitiator(RoombaSensor): + """Read the job initiator of the Roomba.""" + + _rs_given_info = ("Job Initiator", "job_initiator") + + def __init__(self, coordinator, entry) -> None: + """Create a new job initiator reading.""" + super().__init__(coordinator, entry) + self._attr_device_class = SensorDeviceClass.ENUM + self._attr_options = list(jobInitiatorMappings.values()) + self._attr_entity_category = EntityCategory.DIAGNOSTIC + self._attr_icon = "mdi:cursor-pointer" + + def _handle_coordinator_update(self): + """Update sensor when coordinator data changes.""" + data = self.coordinator.data or {} + status = data.get("cleanMissionStatus", {}) + initiator = status.get("initiator") or "none" + self._attr_native_value = jobInitiatorMappings.get(initiator, "Unknown") + self.async_write_ha_state() + + +class RoombaMissionStartTime(RoombaSensor): + """Read the mission start time of the Roomba.""" + + _rs_given_info = ("Job Start Time", "job_start_time") + + def __init__(self, coordinator, entry) -> None: + """Create a new job start time reading.""" + super().__init__(coordinator, entry) + self._attr_device_class = SensorDeviceClass.TIMESTAMP + self._attr_icon = "mdi:clock-start" + + def _handle_coordinator_update(self): + """Update sensor when coordinator data changes.""" + data = self.coordinator.data or {} + status = data.get("cleanMissionStatus", {}) + missionStartTime = status.get("mssnStrtTm") # Unix timestamp in seconds? + + if missionStartTime: + self._attr_available = True + try: + self._attr_native_value = dt_util.utc_from_timestamp(missionStartTime) + except (TypeError, ValueError): + self._attr_native_value = None + else: + self._attr_native_value = None + self._attr_available = False + + self.async_write_ha_state() + + +class RoombaRechargeTime(RoombaSensor): + """Read the mission start time of the Roomba.""" + + _rs_given_info = ("Recharge Time", "job_recharge_time") + + def __init__(self, coordinator, entry) -> None: + """Create a new job recharge time reading.""" + super().__init__(coordinator, entry) + self._attr_device_class = SensorDeviceClass.TIMESTAMP + self._attr_icon = "mdi:battery-clock" + + def _handle_coordinator_update(self): + """Update sensor when coordinator data changes.""" + data = self.coordinator.data or {} + status = data.get("cleanMissionStatus", {}) + missionStartTime = status.get("rechrgTm") # Unix timestamp in seconds? + + if missionStartTime: + self._attr_available = True + try: + self._attr_native_value = dt_util.utc_from_timestamp(missionStartTime) + except (TypeError, ValueError): + self._attr_native_value = None + else: + self._attr_native_value = None + self._attr_available = False + + self.async_write_ha_state() + + +class RoombaCarpetBoostMode(RoombaSensor): + """Read the mission start time of the Roomba.""" + + _rs_given_info = ("Carpet Boost", "carpet_boost") + + def __init__(self, coordinator, entry) -> None: + """Create a new job carpet boost mode.""" + super().__init__(coordinator, entry) + self._attr_device_class = SensorDeviceClass.ENUM + self._attr_options = ["Eco", "Performance", "Auto", "n-a"] + self._attr_icon = "mdi:rug" + + def _handle_coordinator_update(self): + """Update sensor when coordinator data changes.""" + data = self.coordinator.data or {} + vacuumHigh = data.get("vacHigh") + carpetBoost = data.get("carpetBoost") + + if vacuumHigh is not None: + if not vacuumHigh and not carpetBoost: + self._attr_native_value = "Eco" + elif vacuumHigh and not carpetBoost: + self._attr_native_value = "Performance" + else: + self._attr_native_value = "Auto" + else: + self._attr_native_value = "n-a" + + self.async_write_ha_state() + + +class RoombaCleanEdges(RoombaSensor): + """Read the mission start time of the Roomba.""" + + _rs_given_info = ("Clean Edges", "clean_edges") + + def __init__(self, coordinator, entry) -> None: + """Create a new clean_edges sensor.""" + super().__init__(coordinator, entry) + self._attr_device_class = SensorDeviceClass.ENUM + self._attr_options = ["Yes", "No", "n-a"] + self._attr_icon = "mdi:wall" + + def _handle_coordinator_update(self): + """Update sensor when coordinator data changes.""" + data = self.coordinator.data or {} + openOnly = data.get("openOnly") + + if openOnly is not None: + if openOnly: + self._attr_native_value = "No" + else: + self._attr_native_value = "Yes" + else: + self._attr_native_value = "n-a" + + self.async_write_ha_state() + + +class RoombaCleanMode(RoombaSensor): + """Read the clean mode of the Roomba.""" + + _rs_given_info = ("Clean Mode", "clean_mode") + + def __init__(self, coordinator, entry) -> None: + """Create a new clean_edges sensor.""" + super().__init__(coordinator, entry) + self._attr_device_class = SensorDeviceClass.ENUM + self._attr_options = ["One", "Two", "Auto", "n-a"] + self._attr_icon = "mdi:broom" + + def _handle_coordinator_update(self): + """Update sensor when coordinator data changes.""" + data = self.coordinator.data or {} + noAutoPasses = data.get("noAutoPasses") + twoPass = data.get("twoPass") + if noAutoPasses is not None and twoPass is not None: + if noAutoPasses is True and twoPass is False: + self._attr_native_value = "One" + elif noAutoPasses is True and twoPass is True: + self._attr_native_value = "Two" + else: + self._attr_native_value = "Auto" + else: + self._attr_native_value = "n-a" + + self.async_write_ha_state() + + +class RoombaMissionExpireTime(RoombaSensor): + """Read the mission start time of the Roomba.""" + + _rs_given_info = ("Job Expire Time", "job_expire_time") + + def __init__(self, coordinator, entry) -> None: + """Create a new job recharge time reading.""" + super().__init__(coordinator, entry) + self._attr_device_class = SensorDeviceClass.TIMESTAMP + self._attr_icon = "mdi:timeline-alert" + + def _handle_coordinator_update(self): + """Update sensor when coordinator data changes.""" + data = self.coordinator.data or {} + status = data.get("cleanMissionStatus", {}) + missionStartTime = status.get("expireTm") # Unix timestamp in seconds? + self._attr_available = False + + if missionStartTime: + self._attr_available = True + try: + self._attr_native_value = dt_util.utc_from_timestamp(missionStartTime) + except (TypeError, ValueError): + self._attr_native_value = None + else: + self._attr_native_value = None + + self.async_write_ha_state() + + +class RoombaTotalArea(RoombaSensor): + """Read the job initiator of the Roomba.""" + + _rs_given_info = ("Total Area", "total_area") + + def __init__(self, coordinator, entry) -> None: + """Create a new job initiator reading.""" + super().__init__(coordinator, entry) + self._attr_device_class = SensorDeviceClass.AREA + self._attr_native_unit_of_measurement = UnitOfArea.SQUARE_METERS + self._attr_entity_category = EntityCategory.DIAGNOSTIC + self._attr_icon = "mdi:texture-box" + + def _handle_coordinator_update(self): + """Update sensor when coordinator data changes.""" + data = self.coordinator.data or {} + runtimeStats = data.get("runtimeStats") or {} + sqft = runtimeStats.get("sqft") + self._attr_native_value = sqft + self.async_write_ha_state() + + +class RoombaTotalTime(RoombaSensor): + """Read the total time the Roomba.""" + + _rs_given_info = ("Total Time", "total_time") + + def __init__(self, coordinator, entry) -> None: + """Create a new job initiator reading.""" + super().__init__(coordinator, entry) + self._attr_device_class = SensorDeviceClass.DURATION + self._attr_native_unit_of_measurement = UnitOfTime.MINUTES + self._attr_entity_category = EntityCategory.DIAGNOSTIC + self._attr_icon = "mdi:clock-time-five" + + def _handle_coordinator_update(self): + """Update sensor when coordinator data changes.""" + data = self.coordinator.data or {} + runtimeStats = data.get("runtimeStats") or {} + hr = runtimeStats.get("hr") + timeMin = runtimeStats.get("min") + self._attr_native_value = (hr * 60) + timeMin + self.async_write_ha_state() + + +class RoombaTotalJobs(RoombaSensor): + """Read the total jobs from the Roomba.""" + + _rs_given_info = ("Total Jobs", "total_jobs") + + def __init__(self, coordinator, entry) -> None: + """Create a new job initiator reading.""" + super().__init__(coordinator, entry) + self._attr_native_unit_of_measurement = UnitOfTime.MINUTES + self._attr_entity_category = EntityCategory.DIAGNOSTIC + self._attr_icon = "mdi:transmission-tower" + + def _handle_coordinator_update(self): + """Update sensor when coordinator data changes.""" + data = self.coordinator.data or {} + bbmssn = data.get("bbmssn") or {} + self._attr_native_value = bbmssn.get("nMssn") + self.async_write_ha_state() diff --git a/roomba_rest980/services.yaml b/roomba_rest980/services.yaml new file mode 100644 index 0000000..22b3f84 --- /dev/null +++ b/roomba_rest980/services.yaml @@ -0,0 +1,12 @@ +# services.yaml +vacuum_action: + fields: + command: + required: true + example: "dock" + +vacuum_clean: + fields: + payload: + required: true + example: '{"regions": [{"region_id": "11", "type": "rid"}]}' diff --git a/roomba_rest980/strings.json b/roomba_rest980/strings.json new file mode 100644 index 0000000..5c3693f --- /dev/null +++ b/roomba_rest980/strings.json @@ -0,0 +1,24 @@ +{ + "config": { + "step": { + "user": { + "title": "Connect to rest980", + "description": "This is needed to grab your roomba's API data.", + "data": { + "base_url": "rest980 Server Base URL" + }, + "data_description": { + "base_url": "Example: http://localhost:3000" + } + } + }, + "error": { + "cannot_connect": "[%key:common::config_flow::error::cannot_connect%]", + "invalid_auth": "[%key:common::config_flow::error::invalid_auth%]", + "unknown": "[%key:common::config_flow::error::unknown%]" + }, + "abort": { + "already_configured": "[%key:common::config_flow::abort::already_configured_device%]" + } + } +} diff --git a/roomba_rest980/switch.py b/roomba_rest980/switch.py new file mode 100644 index 0000000..a663e3c --- /dev/null +++ b/roomba_rest980/switch.py @@ -0,0 +1,59 @@ +"""Switches needed.""" + +from homeassistant.components.switch import SwitchEntity +from homeassistant.core import HomeAssistant +from homeassistant.helpers.entity import EntityCategory + +from .const import DOMAIN + +ROOMS = { + "Kitchen": {"region_id": "11", "type": "rid"}, + "Living Room": {"region_id": "9", "type": "rid"}, + "Dining Room": {"region_id": "1", "type": "rid"}, + "Hallway": {"region_id": "10", "type": "rid"}, + "Office": {"region_id": "12", "type": "rid"}, +} + + +async def async_setup_entry(hass: HomeAssistant, entry, async_add_entities): + """Create the switches to identify cleanable rooms.""" + entities = [RoomSwitch(entry, name, data) for name, data in ROOMS.items()] + for ent in entities: + hass.data[DOMAIN][f"switch.{ent.unique_id}"] = ent + async_add_entities(entities) + + +class RoomSwitch(SwitchEntity): + """A switch entity to determine whether or not a room should be cleaned by the vacuum.""" + + def __init__(self, entry, name, data) -> None: + """Creates a switch entity for rooms.""" + self._attr_name = f"Clean {name}" + self._attr_unique_id = f"{entry.entry_id}_{name.lower().replace(' ', '_')}" + self._is_on = False + self._room_json = data + self._attr_entity_category = EntityCategory.CONFIG + self._attr_device_info = { + "identifiers": {(DOMAIN, entry.unique_id)}, + "name": entry.title, + "manufacturer": "iRobot", + } + + @property + def is_on(self): + """Does the user want the room to be cleaned?""" + return self._is_on + + async def async_turn_on(self, **kwargs): + """Yes.""" + self._is_on = True + self.async_write_ha_state() + + async def async_turn_off(self, **kwargs): + """No.""" + self._is_on = False + self.async_write_ha_state() + + def get_region_json(self): + """I'm not sure what this does to be honest.""" + return self._room_json diff --git a/roomba_rest980/vacuum.py b/roomba_rest980/vacuum.py new file mode 100644 index 0000000..5569241 --- /dev/null +++ b/roomba_rest980/vacuum.py @@ -0,0 +1,302 @@ +"""The vacuum.""" + +from datetime import datetime +import json +import logging + +from homeassistant.components.vacuum import ( + StateVacuumEntity, + VacuumActivity, + VacuumEntityFeature, +) +from homeassistant.config_entries import ConfigEntry +from homeassistant.core import HomeAssistant +from homeassistant.helpers.update_coordinator import CoordinatorEntity + +from .const import ( + DOMAIN, + binMappings, + cleanBaseMappings, + cycleMappings, + errorMappings, + jobInitiatorMappings, + notReadyMappings, + phaseMappings, + yesNoMappings, +) + +_LOGGER = logging.getLogger(__name__) + +SUPPORT_ROBOT = ( + VacuumEntityFeature.START + | VacuumEntityFeature.RETURN_HOME + | VacuumEntityFeature.CLEAN_SPOT + | VacuumEntityFeature.MAP + | VacuumEntityFeature.SEND_COMMAND + | VacuumEntityFeature.STATE + | VacuumEntityFeature.STATUS +) + + +async def async_setup_entry(hass: HomeAssistant, entry, async_add_entities): + """Create the vacuum.""" + coordinator = hass.data[DOMAIN][entry.entry_id + "_coordinator"] + await coordinator.async_config_entry_first_refresh() + async_add_entities([RoombaVacuum(hass, coordinator, entry)]) + + +class RoombaVacuum(CoordinatorEntity, StateVacuumEntity): + """The Rest980 controlled vacuum.""" + + def __init__(self, hass: HomeAssistant, coordinator, entry: ConfigEntry) -> None: + """Setup the robot.""" + super().__init__(coordinator) + self.hass = hass + self._entry: ConfigEntry = entry + self._attr_supported_features = SUPPORT_ROBOT + self._attr_unique_id = f"{entry.unique_id}_vacuum" + self._attr_device_info = { + "identifiers": {(DOMAIN, entry.unique_id)}, + "name": entry.title, + "manufacturer": "iRobot", + } + + def _get_default(self, key: str, default: str): + return self.coordinator.data.get(key) if self.coordinator.data else default + + @property + def name(self): + """Return its name.""" + return self.coordinator.data.get("name") if self.coordinator.data else "Roomba" + + @property + def available(self): + """Is vacuum available?""" + return self.coordinator.data is not None + + @property + def activity(self): + """Return the state.""" + data = self.coordinator.data + if not data: + return None # Return None so HA marks entity as unavailable + + status = data.get("cleanMissionStatus", {}) + cycle = status.get("cycle") + not_ready = status.get("notReady") + + if cycle == "none" and not_ready == 39: + return VacuumActivity.IDLE + if not_ready and not_ready > 0: + return VacuumActivity.ERROR + if cycle in ["clean", "quick", "spot", "train"]: + return VacuumActivity.CLEANING + if cycle in ["evac", "dock"]: + return VacuumActivity.DOCKED + return VacuumActivity.IDLE + + @property + def extra_state_attributes(self): + """Return all the given attributes from rest980.""" + data = self.coordinator.data or {} + status = data.get("cleanMissionStatus", {}) + # Mission State + cycle = status.get("cycle") + phase = status.get("phase") + err = status.get("error") + notReady = status.get("notReady") + initiator = status.get("initiator") + missionStartTime = status.get("mssnStrtTm") + rechargeTime = status.get("rechrgTm") + expireTime = status.get("expireTm") + + # Generic Data + softwareVer = data.get("softwareVer") + vacuumHigh = data.get("vacHigh") + carpetBoost = data.get("carpetBoost") + + if vacuumHigh is not None: + if not vacuumHigh and not carpetBoost: + robotCarpetBoost = "Eco" + elif vacuumHigh and not carpetBoost: + robotCarpetBoost = "Performance" + else: + robotCarpetBoost = "Auto" + else: + robotCarpetBoost = "n-a" + + battery = data.get("batPct") + if "+" in softwareVer: + softwareVer = softwareVer.split("+")[1] + + if cycle == "none" and notReady == 39: + extv = "Pending" + elif notReady > 0: + extv = f"Not Ready ({notReady})" + else: + extv = self.returnIn(cycleMappings, cycle) + + if phase == "charge" and battery == 100: + rPhase = "Idle" + elif cycle == "none" and phase == "stop": + rPhase = "Stopped" + else: + rPhase = self.returnIn(phaseMappings, phase) + + if missionStartTime != 0: + time = datetime.fromtimestamp(missionStartTime) + elapsed = round((datetime.now().timestamp() - time.timestamp()) / 60) + if elapsed > 60: + jobTime = f"{elapsed // 60}h {f'{elapsed % 60:0>2d}'}m" + else: + jobTime = f"{elapsed}m" + else: + jobTime = "n-a" + + if rechargeTime != 0: + time = datetime.fromtimestamp(rechargeTime) + resume = round((datetime.now().timestamp() - time.timestamp()) / 60) + if elapsed > 60: + jobResumeTime = f"{resume // 60}h {f'{resume % 60:0>2d}'}m" + else: + jobResumeTime = f"{resume}m" + else: + jobResumeTime = "n-a" + + if expireTime != 0: + time = datetime.fromtimestamp(expireTime) + expire = round((datetime.now().timestamp() - time.timestamp()) / 60) + if elapsed > 60: + jobExpireTime = f"{expire // 60}h {f'{expire % 60:0>2d}'}m" + else: + jobExpireTime = f"{expire}m" + else: + jobExpireTime = "n-a" + # Bin + robotBin = data.get("bin") + binFull = robotBin.get("full") + binPresent = robotBin.get("present") + + # Dock + dock = data.get("dock") + dockState = dock.get("state") + + # Pose + ## NOTE: My roomba's firmware does not support this anymore, so I'm blindly guessing based on the previous YAML integration details. + pose = data.get("pose") or {} + theta = pose.get("theta") + point = pose.get("point") or {} + pointX = point.get("x") + pointY = point.get("y") + if theta is not None: + location = f"{pointX}, {pointY}, {theta}" + else: + location = "n-a" + + # Networking + signal = data.get("signal") + rssi = signal.get("rssi") + + # Runtime Statistics + runtimeStats = data.get("runtimeStats") + sqft = runtimeStats.get("sqft") + hr = runtimeStats.get("hr") + timeMin = runtimeStats.get("min") + + # Mission total(s?) + bbmssn = data.get("bbmssn") + numMissions = bbmssn.get("nMssn") + # Run total(s?) + bbrun = data.get("bbrun") + numDirt = bbrun.get("nScrubs") + numEvacs = bbrun.get("nEvacs") + # numEvacs only for I7+/S9+ Models (Clean Base) + + pmaps = data.get("pmaps", []) + pmap0id = next(iter(pmaps[0]), None) if pmaps else None + + noAutoPasses = data.get("noAutoPasses") + twoPass = data.get("twoPass") + if noAutoPasses is not None and twoPass is not None: + if noAutoPasses is True and twoPass is False: + robotCleanMode = "One" + elif noAutoPasses is True and twoPass is True: + robotCleanMode = "Two" + else: + robotCleanMode = "Auto" + else: + robotCleanMode = "n-a" + + return [ + ("extendedStatus", extv), + ("notready_msg", self.returnIn(notReadyMappings, notReady)), + ("error_msg", self.returnIn(errorMappings, err)), + ("battery", battery), + ("software_ver", softwareVer), + ("phase", rPhase), + ("bin", self.returnIn(binMappings, binFull)), + ("bin_present", self.returnIn(yesNoMappings, binPresent)), + ("clean_base", self.returnIn(cleanBaseMappings, dockState)), + ("location", location), + ("rssi", rssi), + ("total_area", f"{round(sqft / 10.764 * 100)}m²"), + ("total_time", f"{hr}h {timeMin}m"), + ("total_jobs", numMissions), + ("dirt_events", numDirt), + ("evac_events", numEvacs), + ("job_initiator", self.returnIn(jobInitiatorMappings, initiator)), + ("job_time", jobTime), + ("job_recharge", jobResumeTime), + ("job_expire", jobExpireTime), + ("clean_mode", robotCleanMode), + ("carpet_boost", robotCarpetBoost), + ("clean_edges", "true" if not data.get("openOnly", False) else "false"), + ("maint_due", False), + ("pmap0_id", pmap0id), + ] + + def returnIn(self, map: map, index: any): + """Default or map value.""" + if index in map: + return map[index] + return index + + async def async_clean_spot(self, **kwargs): + """Spot clean.""" + + async def async_start(self): + """Start cleaning floors, check if any are selected or just clean everything.""" + payload = [] + + for entity in self.hass.states.async_all("switch"): + if entity.entity_id.startswith("switch.clean_") and entity.state == "on": + switch_obj = self.hass.data[DOMAIN].get(entity.entity_id) + if switch_obj: + payload.append(switch_obj.get_region_json()) + + if payload: + # TODO: FIX THIS FIX THIS IT NEEDS TO BE DYNAMIC NOT THIS GARBAGE + # TODO: FIX THIS FIX THIS IT NEEDS TO BE DYNAMIC NOT THIS GARBAGE + # TODO: FIX THIS FIX THIS IT NEEDS TO BE DYNAMIC NOT THIS GARBAGE + await self.hass.services.async_call( + DOMAIN, + "vacuum_clean", + service_data={ + "payload": json.dumps( + { + "ordered": 1, + "pmap_id": "BGQxV6zGTmCsalWFHr-S5g", + "regions": payload, + } + ) + }, + blocking=True, + ) + else: + _LOGGER.warning("No rooms selected for cleaning") + + async def async_return_to_base(self): + """Calls the Roomba back to its dock.""" + await self.hass.services.async_call( + DOMAIN, "vacuum_action", service_data={"command": "dock"}, blocking=True + )