This commit is contained in:
ia74 2025-07-20 14:33:33 -05:00
parent 6164cce27c
commit 44c2a2391d
6 changed files with 254 additions and 208 deletions

View file

@ -0,0 +1,158 @@
"""Bring back the sensor attributes from the YAML config."""
from datetime import datetime
from .const import (
binMappings,
cleanBaseMappings,
cycleMappings,
errorMappings,
jobInitiatorMappings,
notReadyMappings,
phaseMappings,
yesNoMappings,
)
def createExtendedAttributes(self) -> dict[str, any]:
"""Return all the given attributes from rest980."""
data = self.coordinator.data or {}
status = data.get("cleanMissionStatus", {})
# Mission State
cycle = status.get("cycle")
phase = status.get("phase")
err = status.get("error")
notReady = status.get("notReady")
initiator = status.get("initiator")
missionStartTime = status.get("mssnStrtTm")
rechargeTime = status.get("rechrgTm")
expireTime = status.get("expireTm")
# Generic Data
softwareVer = data.get("softwareVer")
vacuumHigh = data.get("vacHigh")
carpetBoost = data.get("carpetBoost")
if vacuumHigh is not None:
if not vacuumHigh and not carpetBoost:
robotCarpetBoost = "Eco"
elif vacuumHigh and not carpetBoost:
robotCarpetBoost = "Performance"
else:
robotCarpetBoost = "Auto"
else:
robotCarpetBoost = "n-a"
battery = data.get("batPct")
if "+" in softwareVer:
softwareVer = softwareVer.split("+")[1]
if cycle == "none" and notReady == 39:
extv = "Pending"
elif notReady > 0:
extv = f"Not Ready ({notReady})"
else:
extv = cycleMappings.get(cycle, cycle)
if phase == "charge" and battery == 100:
rPhase = "Idle"
elif cycle == "none" and phase == "stop":
rPhase = "Stopped"
else:
rPhase = phaseMappings.get(phase, phase)
if missionStartTime != 0:
time = datetime.fromtimestamp(missionStartTime)
elapsed = round((datetime.now().timestamp() - time.timestamp()) / 60)
if elapsed > 60:
jobTime = f"{elapsed // 60}h {f'{elapsed % 60:0>2d}'}m"
else:
jobTime = f"{elapsed}m"
else:
jobTime = "n-a"
if rechargeTime != 0:
time = datetime.fromtimestamp(rechargeTime)
resume = round((datetime.now().timestamp() - time.timestamp()) / 60)
if elapsed > 60:
jobResumeTime = f"{resume // 60}h {f'{resume % 60:0>2d}'}m"
else:
jobResumeTime = f"{resume}m"
else:
jobResumeTime = "n-a"
if expireTime != 0:
time = datetime.fromtimestamp(expireTime)
expire = round((datetime.now().timestamp() - time.timestamp()) / 60)
if elapsed > 60:
jobExpireTime = f"{expire // 60}h {f'{expire % 60:0>2d}'}m"
else:
jobExpireTime = f"{expire}m"
else:
jobExpireTime = "n-a"
# Bin
robotBin = data.get("bin")
binFull = robotBin.get("full")
binPresent = robotBin.get("present")
# Dock
dock = data.get("dock")
dockState = dock.get("state")
# Pose
## NOTE: My roomba's firmware does not support this anymore, so I'm blindly guessing based on the previous YAML integration details.
pose = data.get("pose") or {}
theta = pose.get("theta")
point = pose.get("point") or {}
pointX = point.get("x")
pointY = point.get("y")
if theta is not None:
location = f"{pointX}, {pointY}, {theta}"
else:
location = "n-a"
# Networking
signal = data.get("signal")
rssi = signal.get("rssi")
# Runtime Statistics
runtimeStats = data.get("runtimeStats")
sqft = runtimeStats.get("sqft")
hr = runtimeStats.get("hr")
timeMin = runtimeStats.get("min")
# Mission total(s?)
bbmssn = data.get("bbmssn")
numMissions = bbmssn.get("nMssn")
# Run total(s?)
bbrun = data.get("bbrun")
numDirt = bbrun.get("nScrubs")
numEvacs = bbrun.get("nEvacs")
# numEvacs only for I7+/S9+ Models (Clean Base)
pmaps = data.get("pmaps", [])
pmap0id = next(iter(pmaps[0]), None) if pmaps else None
noAutoPasses = data.get("noAutoPasses")
twoPass = data.get("twoPass")
if noAutoPasses is not None and twoPass is not None:
if noAutoPasses is True and twoPass is False:
robotCleanMode = "One"
elif noAutoPasses is True and twoPass is True:
robotCleanMode = "Two"
else:
robotCleanMode = "Auto"
else:
robotCleanMode = "n-a"
return [
("extendedStatus", extv),
("notready_msg", notReadyMappings.get(notReady, notReady)),
("error_msg", errorMappings.get(err, err)),
("battery", f"{battery}%"),
("software_ver", softwareVer),
("phase", rPhase),
("bin", binMappings.get(binFull, binFull)),
("bin_present", yesNoMappings.get(binPresent, binPresent)),
("clean_base", cleanBaseMappings.get(dockState, dockState)),
("location", location),
("rssi", rssi),
("total_area", f"{round(sqft / 10.764 * 100)}"),
("total_time", f"{hr}h {timeMin}m"),
("total_jobs", numMissions),
("dirt_events", numDirt),
("evac_events", numEvacs),
("job_initiator", jobInitiatorMappings.get(initiator, initiator)),
("job_time", jobTime),
("job_recharge", jobResumeTime),
("job_expire", jobExpireTime),
("clean_mode", robotCleanMode),
("carpet_boost", robotCarpetBoost),
("clean_edges", "true" if not data.get("openOnly", False) else "false"),
("maint_due", False),
("pmap0_id", pmap0id),
]

View file

@ -1,11 +1,15 @@
"""The configuration flow for the robot."""
import voluptuous as vol
import asyncio
from homeassistant import config_entries
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from .const import DOMAIN
SCHEMA = vol.Schema({vol.Required("base_url"): str})
class RoombaConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Config flow."""
@ -13,12 +17,33 @@ class RoombaConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
async def async_step_user(self, user_input=None):
"""Show the user the input for the base url."""
if user_input is not None:
return self.async_create_entry(title="Roomba", data=user_input, options={})
session = async_get_clientsession(self.hass)
data = {}
async with asyncio.timeout(10):
try:
async with session.get(
f"{user_input['base_url']}/api/local/info/state"
) as resp:
data = await resp.json() or {}
if data == {}:
return self.async_show_form(
step_id="user",
data_schema=SCHEMA,
errors=["cannot_connect"],
)
except Exception:
return self.async_show_form(
step_id="user",
data_schema=SCHEMA,
errors=["cannot_connect"],
)
return self.async_show_form(
step_id="user",
data_schema=vol.Schema({vol.Required("base_url"): str}),
)
return self.async_create_entry(
title=data.get("name", "Roomba"),
data=user_input,
)
return self.async_show_form(step_id="user", data_schema=SCHEMA)
async def async_step_options(self, user_input=None):
"""I dont know."""

View file

@ -51,6 +51,7 @@ cleanBaseMappings = {
352: "Sealing Problem",
353: "Bag Full",
360: "Comms Problem",
364: "Bin Full Sensors Not Cleared",
}
jobInitiatorMappings = {

View file

@ -25,6 +25,7 @@ async def async_setup_entry(hass: HomeAssistant, entry, async_add_entities):
RoombaCleanBase(coordinator, entry),
RoombaTotalJobs(coordinator, entry),
RoombaMissionStartTime(coordinator, entry),
RoombaMissionElapsedTime(coordinator, entry),
RoombaRechargeTime(coordinator, entry),
RoombaMissionExpireTime(coordinator, entry),
RoombaCarpetBoostMode(coordinator, entry),
@ -232,6 +233,41 @@ class RoombaMissionStartTime(RoombaSensor):
self.async_write_ha_state()
class RoombaMissionElapsedTime(RoombaSensor):
"""Read the mission elapsed time of the Roomba."""
_rs_given_info = ("Job Elapsed Time", "job_elapsed_time")
def __init__(self, coordinator, entry) -> None:
"""Create a new job elapsed time reading."""
super().__init__(coordinator, entry)
self._attr_device_class = SensorDeviceClass.DURATION
self._attr_native_unit_of_measurement = UnitOfTime.MINUTES
self._attr_icon = "mdi:timeline-clock"
def _handle_coordinator_update(self):
"""Update sensor when coordinator data changes."""
data = self.coordinator.data or {}
status = data.get("cleanMissionStatus", {})
missionStartTime = status.get("mssnStrtTm") # Unix timestamp in seconds?
if missionStartTime:
self._attr_available = True
try:
elapsed_time = dt_util.utcnow() - dt_util.utc_from_timestamp(
missionStartTime
)
# Convert timedelta to minutes
self._attr_native_value = elapsed_time.total_seconds() / 60
except (TypeError, ValueError):
self._attr_native_value = None
else:
self._attr_native_value = None
self._attr_available = False
self.async_write_ha_state()
class RoombaRechargeTime(RoombaSensor):
"""Read the mission start time of the Roomba."""
@ -366,17 +402,18 @@ class RoombaMissionExpireTime(RoombaSensor):
"""Update sensor when coordinator data changes."""
data = self.coordinator.data or {}
status = data.get("cleanMissionStatus", {})
missionStartTime = status.get("expireTm") # Unix timestamp in seconds?
self._attr_available = False
expireTime = status.get("expireTm") # Unix timestamp in seconds?
if missionStartTime:
if expireTime:
self._attr_available = True
try:
self._attr_native_value = dt_util.utc_from_timestamp(missionStartTime)
self._attr_native_value = dt_util.utc_from_timestamp(expireTime)
except (TypeError, ValueError):
self._attr_native_value = None
self._attr_available = False
else:
self._attr_native_value = None
self._attr_available = False
self.async_write_ha_state()

View file

@ -1,6 +1,7 @@
"""The vacuum."""
from datetime import datetime
import json
import logging
@ -13,17 +14,9 @@ from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import (
DOMAIN,
binMappings,
cleanBaseMappings,
cycleMappings,
errorMappings,
jobInitiatorMappings,
notReadyMappings,
phaseMappings,
yesNoMappings,
)
from .const import DOMAIN
from .LegacyCompatibility import createExtendedAttributes
_LOGGER = logging.getLogger(__name__)
@ -61,205 +54,37 @@ class RoombaVacuum(CoordinatorEntity, StateVacuumEntity):
"manufacturer": "iRobot",
}
def _get_default(self, key: str, default: str):
return self.coordinator.data.get(key) if self.coordinator.data else default
@property
def name(self):
"""Return its name."""
return self.coordinator.data.get("name") if self.coordinator.data else "Roomba"
@property
def available(self):
"""Is vacuum available?"""
return self.coordinator.data is not None
@property
def activity(self):
"""Return the state."""
data = self.coordinator.data
if not data:
return None # Return None so HA marks entity as unavailable
def device_info(self):
data = self.coordinator.data or {}
return {
"identifiers": self._attr_device_info.get("identifiers"),
"name": self._attr_device_info.get("name"),
"manufacturer": self._attr_device_info.get("manufacturer"),
"model": f"Roomba {data.get('sku')}",
"sw_version": data.get("softwareVer"),
}
def _handle_coordinator_update(self):
"""Update all attributes."""
data = self.coordinator.data or {}
status = data.get("cleanMissionStatus", {})
cycle = status.get("cycle")
not_ready = status.get("notReady")
self._attr_activity = VacuumActivity.IDLE
if cycle == "none" and not_ready == 39:
return VacuumActivity.IDLE
self._attr_activity = VacuumActivity.IDLE
if not_ready and not_ready > 0:
return VacuumActivity.ERROR
self._attr_activity = VacuumActivity.ERROR
if cycle in ["clean", "quick", "spot", "train"]:
return VacuumActivity.CLEANING
self._attr_activity = VacuumActivity.CLEANING
if cycle in ["evac", "dock"]:
return VacuumActivity.DOCKED
return VacuumActivity.IDLE
self._attr_activity = VacuumActivity.DOCKED
@property
def extra_state_attributes(self):
"""Return all the given attributes from rest980."""
data = self.coordinator.data or {}
status = data.get("cleanMissionStatus", {})
# Mission State
cycle = status.get("cycle")
phase = status.get("phase")
err = status.get("error")
notReady = status.get("notReady")
initiator = status.get("initiator")
missionStartTime = status.get("mssnStrtTm")
rechargeTime = status.get("rechrgTm")
expireTime = status.get("expireTm")
# Generic Data
softwareVer = data.get("softwareVer")
vacuumHigh = data.get("vacHigh")
carpetBoost = data.get("carpetBoost")
if vacuumHigh is not None:
if not vacuumHigh and not carpetBoost:
robotCarpetBoost = "Eco"
elif vacuumHigh and not carpetBoost:
robotCarpetBoost = "Performance"
else:
robotCarpetBoost = "Auto"
else:
robotCarpetBoost = "n-a"
battery = data.get("batPct")
if "+" in softwareVer:
softwareVer = softwareVer.split("+")[1]
if cycle == "none" and notReady == 39:
extv = "Pending"
elif notReady > 0:
extv = f"Not Ready ({notReady})"
else:
extv = self.returnIn(cycleMappings, cycle)
if phase == "charge" and battery == 100:
rPhase = "Idle"
elif cycle == "none" and phase == "stop":
rPhase = "Stopped"
else:
rPhase = self.returnIn(phaseMappings, phase)
if missionStartTime != 0:
time = datetime.fromtimestamp(missionStartTime)
elapsed = round((datetime.now().timestamp() - time.timestamp()) / 60)
if elapsed > 60:
jobTime = f"{elapsed // 60}h {f'{elapsed % 60:0>2d}'}m"
else:
jobTime = f"{elapsed}m"
else:
jobTime = "n-a"
if rechargeTime != 0:
time = datetime.fromtimestamp(rechargeTime)
resume = round((datetime.now().timestamp() - time.timestamp()) / 60)
if elapsed > 60:
jobResumeTime = f"{resume // 60}h {f'{resume % 60:0>2d}'}m"
else:
jobResumeTime = f"{resume}m"
else:
jobResumeTime = "n-a"
if expireTime != 0:
time = datetime.fromtimestamp(expireTime)
expire = round((datetime.now().timestamp() - time.timestamp()) / 60)
if elapsed > 60:
jobExpireTime = f"{expire // 60}h {f'{expire % 60:0>2d}'}m"
else:
jobExpireTime = f"{expire}m"
else:
jobExpireTime = "n-a"
# Bin
robotBin = data.get("bin")
binFull = robotBin.get("full")
binPresent = robotBin.get("present")
# Dock
dock = data.get("dock")
dockState = dock.get("state")
# Pose
## NOTE: My roomba's firmware does not support this anymore, so I'm blindly guessing based on the previous YAML integration details.
pose = data.get("pose") or {}
theta = pose.get("theta")
point = pose.get("point") or {}
pointX = point.get("x")
pointY = point.get("y")
if theta is not None:
location = f"{pointX}, {pointY}, {theta}"
else:
location = "n-a"
# Networking
signal = data.get("signal")
rssi = signal.get("rssi")
# Runtime Statistics
runtimeStats = data.get("runtimeStats")
sqft = runtimeStats.get("sqft")
hr = runtimeStats.get("hr")
timeMin = runtimeStats.get("min")
# Mission total(s?)
bbmssn = data.get("bbmssn")
numMissions = bbmssn.get("nMssn")
# Run total(s?)
bbrun = data.get("bbrun")
numDirt = bbrun.get("nScrubs")
numEvacs = bbrun.get("nEvacs")
# numEvacs only for I7+/S9+ Models (Clean Base)
pmaps = data.get("pmaps", [])
pmap0id = next(iter(pmaps[0]), None) if pmaps else None
noAutoPasses = data.get("noAutoPasses")
twoPass = data.get("twoPass")
if noAutoPasses is not None and twoPass is not None:
if noAutoPasses is True and twoPass is False:
robotCleanMode = "One"
elif noAutoPasses is True and twoPass is True:
robotCleanMode = "Two"
else:
robotCleanMode = "Auto"
else:
robotCleanMode = "n-a"
return [
("extendedStatus", extv),
("notready_msg", self.returnIn(notReadyMappings, notReady)),
("error_msg", self.returnIn(errorMappings, err)),
("battery", battery),
("software_ver", softwareVer),
("phase", rPhase),
("bin", self.returnIn(binMappings, binFull)),
("bin_present", self.returnIn(yesNoMappings, binPresent)),
("clean_base", self.returnIn(cleanBaseMappings, dockState)),
("location", location),
("rssi", rssi),
("total_area", f"{round(sqft / 10.764 * 100)}"),
("total_time", f"{hr}h {timeMin}m"),
("total_jobs", numMissions),
("dirt_events", numDirt),
("evac_events", numEvacs),
("job_initiator", self.returnIn(jobInitiatorMappings, initiator)),
("job_time", jobTime),
("job_recharge", jobResumeTime),
("job_expire", jobExpireTime),
("clean_mode", robotCleanMode),
("carpet_boost", robotCarpetBoost),
("clean_edges", "true" if not data.get("openOnly", False) else "false"),
("maint_due", False),
("pmap0_id", pmap0id),
]
def returnIn(self, map: map, index: any):
"""Default or map value."""
if index in map:
return map[index]
return index
self._attr_available = data != {}
self._attr_battery_level = data.get("batPct", 0)
self._attr_extra_state_attributes = createExtendedAttributes(self)
async def async_clean_spot(self, **kwargs):
"""Spot clean."""