fin-assistant/media_source.py
2021-05-07 19:36:17 +02:00

359 lines
12 KiB
Python

from __future__ import annotations
import logging
from typing import Tuple
from homeassistant.components.media_source.error import MediaSourceError, Unresolvable
from homeassistant.components.media_source.models import (
BrowseMediaSource,
MediaSource,
MediaSourceItem,
PlayMedia,
)
from homeassistant.core import HomeAssistant, callback
from homeassistant.components.media_player import BrowseError, BrowseMedia
from homeassistant.components.media_source.const import MEDIA_MIME_TYPES, URI_SCHEME
from homeassistant.const import ( # pylint: disable=import-error
CONF_URL,
)
from homeassistant.components.media_player.const import (
MEDIA_CLASS_ALBUM,
MEDIA_CLASS_ARTIST,
MEDIA_CLASS_CHANNEL,
MEDIA_CLASS_DIRECTORY,
MEDIA_CLASS_EPISODE,
MEDIA_CLASS_MOVIE,
MEDIA_CLASS_MUSIC,
MEDIA_CLASS_PLAYLIST,
MEDIA_CLASS_SEASON,
MEDIA_CLASS_TRACK,
MEDIA_CLASS_TV_SHOW,
MEDIA_TYPE_ALBUM,
MEDIA_TYPE_ARTIST,
MEDIA_TYPE_CHANNEL,
MEDIA_TYPE_EPISODE,
MEDIA_TYPE_MOVIE,
MEDIA_TYPE_PLAYLIST,
MEDIA_TYPE_SEASON,
MEDIA_TYPE_TRACK,
MEDIA_TYPE_TVSHOW,
)
from . import JellyfinClientManager, JellyfinDevice, autolog
from .const import (
DOMAIN,
USER_APP_NAME,
)
PLAYABLE_MEDIA_TYPES = [
MEDIA_TYPE_ALBUM,
MEDIA_TYPE_ARTIST,
MEDIA_TYPE_TRACK,
]
CONTAINER_TYPES_SPECIFIC_MEDIA_CLASS = {
MEDIA_TYPE_ALBUM: MEDIA_CLASS_ALBUM,
MEDIA_TYPE_ARTIST: MEDIA_CLASS_ARTIST,
MEDIA_TYPE_PLAYLIST: MEDIA_CLASS_PLAYLIST,
MEDIA_TYPE_SEASON: MEDIA_CLASS_SEASON,
MEDIA_TYPE_TVSHOW: MEDIA_CLASS_TV_SHOW,
}
CHILD_TYPE_MEDIA_CLASS = {
MEDIA_TYPE_SEASON: MEDIA_CLASS_SEASON,
MEDIA_TYPE_ALBUM: MEDIA_CLASS_ALBUM,
MEDIA_TYPE_ARTIST: MEDIA_CLASS_ARTIST,
MEDIA_TYPE_MOVIE: MEDIA_CLASS_MOVIE,
MEDIA_TYPE_PLAYLIST: MEDIA_CLASS_PLAYLIST,
MEDIA_TYPE_TRACK: MEDIA_CLASS_TRACK,
MEDIA_TYPE_TVSHOW: MEDIA_CLASS_TV_SHOW,
MEDIA_TYPE_CHANNEL: MEDIA_CLASS_CHANNEL,
MEDIA_TYPE_EPISODE: MEDIA_CLASS_EPISODE,
}
IDENTIFIER_SPLIT = "~~"
_LOGGER = logging.getLogger(__name__)
class UnknownMediaType(BrowseError):
"""Unknown media type."""
async def async_get_media_source(hass: HomeAssistant):
"""Set up Netatmo media source."""
entry = hass.config_entries.async_entries(DOMAIN)[0]
jelly_cm: JellyfinClientManager = hass.data[DOMAIN][entry.data[CONF_URL]]["manager"]
return JellyfinSource(hass, jelly_cm)
class JellyfinSource(MediaSource):
"""Media source for Jellyfin"""
def __init__(self, hass: HomeAssistant, manager: JellyfinClientManager):
"""Initialize Netatmo source."""
super().__init__(DOMAIN)
self.hass = hass
self.jelly_cm = manager
async def async_resolve_media(self, item: MediaSourceItem) -> PlayMedia:
"""Resolve a media item to a playable item."""
autolog("<<<")
if not item or not item.identifier:
return None
_, media_content_id = parse_mediasource_identifier(item.identifier)
profile = {
"Name": USER_APP_NAME,
"MaxStreamingBitrate": 25000 * 1000,
"MusicStreamingTranscodingBitrate": 1280000,
"TimelineOffsetSeconds": 5,
"TranscodingProfiles": [
{"Type": "Audio"},
{
"Container": "mp4",
"Type": "Video",
"Protocol": "http",
"AudioCodec": "aac,mp3,opus,flac,vorbis",
"VideoCodec": "h264,mpeg4,mpeg2video",
"MaxAudioChannels": "6",
},
{"Container": "jpeg", "Type": "Photo"},
],
"DirectPlayProfiles": [],
"ResponseProfiles": [],
"ContainerProfiles": [],
"CodecProfiles": [],
"SubtitleProfiles": [
{"Format": "srt", "Method": "External"},
{"Format": "srt", "Method": "Embed"},
{"Format": "ass", "Method": "External"},
{"Format": "ass", "Method": "Embed"},
{"Format": "sub", "Method": "Embed"},
{"Format": "sub", "Method": "External"},
{"Format": "ssa", "Method": "Embed"},
{"Format": "ssa", "Method": "External"},
{"Format": "smi", "Method": "Embed"},
{"Format": "smi", "Method": "External"},
# Jellyfin currently refuses to serve these subtitle types as external.
{"Format": "pgssub", "Method": "Embed"},
# {
# "Format": "pgssub",
# "Method": "External"
# },
{"Format": "dvdsub", "Method": "Embed"},
# {
# "Format": "dvdsub",
# "Method": "External"
# },
{"Format": "pgs", "Method": "Embed"},
# {
# "Format": "pgs",
# "Method": "External"
# }
],
}
playback_info = await self.jelly_cm.get_play_info(media_content_id, profile)
_LOGGER.debug("playbackinfo: %s", str(playback_info))
selected = None
weight_selected = 0
for media_source in playback_info["MediaSources"]:
weight = (media_source.get("SupportsDirectPlay") or 0) * 50000 + (
media_source.get("Bitrate") or 0
) / 1000
if weight > weight_selected:
weight_selected = weight
selected = media_source
if selected["SupportsTranscoding"]:
url = self.jelly_cm.get_server_url() + selected.get("TranscodingUrl")
_LOGGER.debug("cast url: %s", url)
return PlayMedia(url, "video/mp4")
return None
async def async_browse_media(
self, item: MediaSourceItem, media_types: Tuple[str] = MEDIA_MIME_TYPES
) -> BrowseMediaSource:
"""Browse media."""
autolog("<<<")
media_contant_type, media_content_id = async_parse_identifier(item)
return await async_library_items(self.jelly_cm, media_contant_type, media_content_id)
@callback
def async_parse_identifier(
item: MediaSourceItem,
) -> tuple[str | None, str | None]:
"""Parse identifier."""
if not item.identifier:
# Empty source_dir_id and location
return None, None
return item.identifier, item.identifier
def Type2Mediatype(type):
switcher = {
"Movie": MEDIA_TYPE_MOVIE,
"Series": MEDIA_TYPE_TVSHOW,
"Season": MEDIA_TYPE_SEASON,
"Episode": MEDIA_TYPE_EPISODE,
"Music": MEDIA_TYPE_ALBUM,
"Audio": MEDIA_TYPE_TRACK,
"BoxSet": MEDIA_CLASS_DIRECTORY,
"Folder": MEDIA_CLASS_DIRECTORY,
"CollectionFolder": MEDIA_CLASS_DIRECTORY,
"Playlist": MEDIA_CLASS_DIRECTORY,
"MusicArtist": MEDIA_TYPE_ARTIST,
"MusicAlbum": MEDIA_TYPE_ALBUM,
"Audio": MEDIA_TYPE_TRACK,
}
return switcher[type]
def Type2Mediaclass(type):
switcher = {
"Movie": MEDIA_CLASS_MOVIE,
"Series": MEDIA_CLASS_TV_SHOW,
"Season": MEDIA_CLASS_SEASON,
"Episode": MEDIA_CLASS_EPISODE,
"Music": MEDIA_CLASS_DIRECTORY,
"BoxSet": MEDIA_CLASS_DIRECTORY,
"Folder": MEDIA_CLASS_DIRECTORY,
"CollectionFolder": MEDIA_CLASS_DIRECTORY,
"Playlist": MEDIA_CLASS_DIRECTORY,
"MusicArtist": MEDIA_CLASS_ARTIST,
"MusicAlbum": MEDIA_CLASS_ALBUM,
"Audio": MEDIA_CLASS_TRACK,
}
return switcher[type]
def IsPlayable(type):
switcher = {
"Movie": True,
"Series": True,
"Season": True,
"Episode": True,
"Music": False,
"BoxSet": True,
"Folder": False,
"CollectionFolder": False,
"Playlist": True,
"MusicArtist": True,
"MusicAlbum": True,
"Audio": True,
}
return switcher[type]
def parse_mediasource_identifier(identifier: str):
prefix = f"{URI_SCHEME}{DOMAIN}/"
text = identifier
if identifier.startswith(prefix):
text = identifier[len(prefix):]
return text.split(IDENTIFIER_SPLIT, 2)
async def async_library_items(jelly_cm: JellyfinClientManager, media_content_type_in=None, media_content_id_in=None) -> BrowseMediaSource:
"""
Create response payload to describe contents of a specific library.
Used by async_browse_media.
"""
_LOGGER.debug(f'>> async_library_items: {media_content_id_in}')
library_info = None
query = None
if (media_content_type_in is None):
media_content_type = None
media_content_id = None
else:
media_content_type, media_content_id = parse_mediasource_identifier(media_content_id_in)
_LOGGER.debug(f'>> {media_content_type} / {media_content_id}')
if media_content_type in [None, "library"]:
library_info = BrowseMediaSource(
domain=DOMAIN,
identifier=f'library{IDENTIFIER_SPLIT}library',
media_class=MEDIA_CLASS_DIRECTORY,
media_content_type="library",
title="Media Library",
can_play=False,
can_expand=True,
children=[],
)
elif media_content_type in [MEDIA_CLASS_DIRECTORY, MEDIA_TYPE_ARTIST, MEDIA_TYPE_ALBUM, MEDIA_TYPE_PLAYLIST, MEDIA_TYPE_TVSHOW, MEDIA_TYPE_SEASON]:
query = {
"ParentId": media_content_id
}
parent_item = await jelly_cm.get_item(media_content_id)
library_info = BrowseMediaSource(
domain=DOMAIN,
identifier=f'{media_content_type}{IDENTIFIER_SPLIT}{media_content_id}',
media_class=media_content_type,
media_content_type=media_content_type,
title=parent_item["Name"],
can_play=IsPlayable(parent_item["Type"]),
can_expand=True,
thumbnail=await jelly_cm.get_artwork_url(media_content_id),
children=[],
)
else:
query = {
"Id": media_content_id
}
library_info = BrowseMediaSource(
domain=DOMAIN,
identifier=f'{media_content_type}{IDENTIFIER_SPLIT}{media_content_id}',
media_class=MEDIA_CLASS_DIRECTORY,
media_content_type=media_content_type,
title="",
can_play=True,
can_expand=False,
thumbnail=await jelly_cm.get_artwork_url(media_content_id),
children=[],
)
items = await jelly_cm.get_items(query)
for item in items:
if media_content_type in [None, "library", MEDIA_CLASS_DIRECTORY, MEDIA_TYPE_ARTIST, MEDIA_TYPE_ALBUM, MEDIA_TYPE_PLAYLIST, MEDIA_TYPE_TVSHOW, MEDIA_TYPE_SEASON]:
if item["IsFolder"]:
library_info.children.append(BrowseMediaSource(
domain=DOMAIN,
identifier=f'{Type2Mediatype(item["Type"])}{IDENTIFIER_SPLIT}{item["Id"]}',
media_class=Type2Mediaclass(item["Type"]),
media_content_type=Type2Mediatype(item["Type"]),
title=item["Name"],
can_play=IsPlayable(item["Type"]),
can_expand=True,
children=[],
thumbnail=await jelly_cm.get_artwork_url(item["Id"])
))
else:
library_info.children.append(BrowseMediaSource(
domain=DOMAIN,
identifier=f'{Type2Mediatype(item["Type"])}{IDENTIFIER_SPLIT}{item["Id"]}',
media_class=Type2Mediaclass(item["Type"]),
media_content_type=Type2Mediatype(item["Type"]),
title=item["Name"],
can_play=IsPlayable(item["Type"]),
can_expand=False,
children=[],
thumbnail=await jelly_cm.get_artwork_url(item["Id"])
))
else:
library_info.domain=DOMAIN
library_info.identifier=f'{Type2Mediatype(item["Type"])}{IDENTIFIER_SPLIT}{item["Id"]}',
library_info.title = item["Name"]
library_info.media_content_type = Type2Mediatype(item["Type"])
library_info.media_class = Type2Mediaclass(item["Type"])
library_info.can_expand = False
library_info.can_play=IsPlayable(item["Type"]),
break
return library_info