hacs-connectd/custom_components/connectd/__init__.py
2025-12-15 11:08:23 -06:00

117 lines
4.1 KiB
Python

"""connectd integration for home assistant."""
from __future__ import annotations
import asyncio
import logging
from datetime import timedelta
import aiohttp
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import Platform
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
_LOGGER = logging.getLogger(__name__)
DOMAIN = "connectd"
PLATFORMS = [Platform.SENSOR]
SCAN_INTERVAL = timedelta(minutes=1)
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""set up connectd from a config entry."""
host = entry.data["host"]
port = entry.data["port"]
coordinator = ConnectdDataUpdateCoordinator(hass, host, port)
await coordinator.async_config_entry_first_refresh()
hass.data.setdefault(DOMAIN, {})
hass.data[DOMAIN][entry.entry_id] = coordinator
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""unload a config entry."""
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if unload_ok:
hass.data[DOMAIN].pop(entry.entry_id)
return unload_ok
class ConnectdDataUpdateCoordinator(DataUpdateCoordinator):
"""class to manage fetching connectd data."""
def __init__(self, hass: HomeAssistant, host: str, port: int) -> None:
"""initialize."""
self.host = host
self.port = port
self.base_url = f"http://{host}:{port}"
super().__init__(
hass,
_LOGGER,
name=DOMAIN,
update_interval=SCAN_INTERVAL,
)
async def _async_update_data(self):
"""fetch data from connectd api."""
try:
async with asyncio.timeout(10):
async with aiohttp.ClientSession() as session:
# get stats
async with session.get(f"{self.base_url}/api/stats") as resp:
if resp.status != 200:
raise UpdateFailed(f"error fetching stats: {resp.status}")
stats = await resp.json()
# get state
async with session.get(f"{self.base_url}/api/state") as resp:
if resp.status != 200:
raise UpdateFailed(f"error fetching state: {resp.status}")
state = await resp.json()
# get priority matches (optional)
priority_matches = {}
try:
async with session.get(f"{self.base_url}/api/priority_matches") as resp:
if resp.status == 200:
priority_matches = await resp.json()
except Exception:
pass
# get top humans (optional)
top_humans = {}
try:
async with session.get(f"{self.base_url}/api/top_humans") as resp:
if resp.status == 200:
top_humans = await resp.json()
except Exception:
pass
# get user info (optional)
user = {}
try:
async with session.get(f"{self.base_url}/api/user") as resp:
if resp.status == 200:
user = await resp.json()
except Exception:
pass
return {
"stats": stats,
"state": state,
"priority_matches": priority_matches,
"top_humans": top_humans,
"user": user,
}
except aiohttp.ClientError as err:
raise UpdateFailed(f"error communicating with connectd: {err}")
except Exception as err:
raise UpdateFailed(f"unexpected error: {err}")