import asyncio
import inspect
import threading
from typing import Any
from datetime import timedelta
from winrt.system import Array
from winrt.windows.foundation import Uri
from winrt.windows.media import SystemMediaTransportControlsTimelineProperties, SystemMediaTransportControls, \
SystemMediaTransportControlsDisplayUpdater, MediaPlaybackStatus, MediaPlaybackType, MediaPlaybackAutoRepeatMode, \
AutoRepeatModeChangeRequestedEventArgs, SystemMediaTransportControlsButtonPressedEventArgs, \
SystemMediaTransportControlsButton, PlaybackPositionChangeRequestedEventArgs, PlaybackRateChangeRequestedEventArgs, \
SystemMediaTransportControlsPropertyChangedEventArgs, SystemMediaTransportControlsProperty, \
ShuffleEnabledChangeRequestedEventArgs
from winrt.windows.media.playback import MediaPlayer
from winrt.windows.storage.streams import RandomAccessStreamReference
from aionowplaying import BaseInterface, PropertyName, PlaybackPropertyName
from aionowplaying.interface.base import TrackListPropertyName, PlaybackStatus, PlaybackProperties, LoopStatus, \
MediaType
[docs]
def TimeSpan(x_microsec):
return timedelta(microseconds=x_microsec)
[docs]
class WindowsInterface(BaseInterface):
def __init__(self, name):
super(WindowsInterface, self).__init__(name)
self._loop = asyncio.get_event_loop()
self._running = True
self._playback_properties = PlaybackProperties()
self._player = MediaPlayer()
# If using manual SMTC control, disable MediaPlayer's automatic integration.
# See: https://learn.microsoft.com/en-us/windows/uwp/audio-video-camera/system-media-transport-controls
try:
self._player.command_manager.is_enabled = False
except AttributeError:
# Some projections may not expose CommandManager; ignore.
pass
self._controls: SystemMediaTransportControls = self._player.system_media_transport_controls
self._updater: SystemMediaTransportControlsDisplayUpdater = self._controls.display_updater
self._timeline = SystemMediaTransportControlsTimelineProperties()
self._controls.add_auto_repeat_mode_change_requested(self.auto_repeat_mode_change_requested)
self._controls.add_button_pressed(self.button_pressed)
self._controls.add_playback_position_change_requested(self.playback_position_change_requested)
self._controls.add_playback_rate_change_requested(self.playback_rate_change_requested)
self._controls.add_property_changed(self.property_changed)
self._controls.add_shuffle_enabled_change_requested(self.shuffle_change_requested)
[docs]
def shuffle_change_requested(self, _, args: ShuffleEnabledChangeRequestedEventArgs):
shuffle_enabled: bool = args.requested_shuffle_enabled
if inspect.iscoroutinefunction(self.on_shuffle):
self._run_task(self.on_shuffle(shuffle_enabled))
else:
self.on_shuffle(shuffle_enabled)
[docs]
def property_changed(self, _, args: SystemMediaTransportControlsPropertyChangedEventArgs):
property_: SystemMediaTransportControlsProperty = args.property
if property_ == SystemMediaTransportControlsProperty.SOUND_LEVEL:
if inspect.iscoroutinefunction(self.on_volume):
self._run_task(self.on_volume(self._controls.sound_level))
else:
self.on_volume(self._controls.sound_level)
[docs]
def playback_rate_change_requested(self, _, args: PlaybackRateChangeRequestedEventArgs):
rate: float = args.requested_playback_rate
min_rate = self._playback_properties.MinimumRate
max_rate = self._playback_properties.MaximumRate
if min_rate != max_rate and min_rate <= max_rate and (rate < min_rate or rate > max_rate):
return
if inspect.iscoroutinefunction(self.on_rate):
self._run_task(self.on_rate(rate))
else:
self.on_rate(rate)
self._controls.playback_rate = rate
self._playback_properties.Rate = rate
[docs]
def playback_position_change_requested(self, _, args: PlaybackPositionChangeRequestedEventArgs):
position = args.requested_playback_position
position = position.seconds * 1000 * 1000 + position.microseconds
if self._playback_properties.CanSeek:
if inspect.iscoroutinefunction(self.on_set_position):
self._run_task(self.on_set_position(self._playback_properties.Metadata.id_, position))
else:
self.on_set_position(self._playback_properties.Metadata.id_, position)
if inspect.iscoroutinefunction(self.on_seek):
self._run_task(self.on_seek(position))
else:
self.on_seek(position)
self._playback_properties.Position = position
[docs]
def auto_repeat_mode_change_requested(self, _, args: AutoRepeatModeChangeRequestedEventArgs):
value = LoopStatus.None_
mode: MediaPlaybackAutoRepeatMode = args.requested_auto_repeat_mode
if mode == MediaPlaybackAutoRepeatMode.LIST:
value = LoopStatus.Playlist
elif mode == MediaPlaybackAutoRepeatMode.TRACK:
value = LoopStatus.Track
if inspect.iscoroutinefunction(self.on_loop_status):
self._run_task(self.on_loop_status(value))
else:
self.on_loop_status(value)
self._playback_properties.LoopStatus = value
[docs]
def set_property(self, name: PropertyName, value: Any):
# SMTC does not expose equivalents for these properties.
super().set_property(name, value)
[docs]
def set_playback_property(self, name: PlaybackPropertyName, value: Any):
if name == PlaybackPropertyName.CanControl:
self._controls.is_stop_enabled = value
self._playback_properties.CanControl = value
elif name == PlaybackPropertyName.CanPlay:
self._controls.is_play_enabled = value
self._playback_properties.CanPlay = value
elif name == PlaybackPropertyName.CanPause:
self._controls.is_pause_enabled = value
self._playback_properties.CanPause = value
elif name == PlaybackPropertyName.CanGoNext:
self._controls.is_next_enabled = value
self._playback_properties.CanGoNext = value
elif name == PlaybackPropertyName.CanGoPrevious:
self._controls.is_previous_enabled = value
self._playback_properties.CanGoPrevious = value
elif name == PlaybackPropertyName.CanSeek:
self._playback_properties.CanSeek = value
# SMTC requires Min/Max seek time to raise position change requests.
# See: https://learn.microsoft.com/en-us/windows/uwp/audio-video-camera/system-media-transport-controls
if value:
self._timeline.start_time = TimeSpan(0)
self._timeline.min_seek_time = TimeSpan(0)
duration = self._playback_properties.Duration
if duration:
self._timeline.max_seek_time = TimeSpan(duration)
self._timeline.end_time = TimeSpan(duration)
self._controls.update_timeline_properties(self._timeline)
elif name == PlaybackPropertyName.PlaybackStatus:
if value == PlaybackStatus.Playing:
self._controls.playback_status = MediaPlaybackStatus.PLAYING
elif value == PlaybackStatus.Paused:
self._controls.playback_status = MediaPlaybackStatus.PAUSED
elif value == PlaybackStatus.Stopped:
self._controls.playback_status = MediaPlaybackStatus.STOPPED
self._playback_properties.PlaybackStatus = value
elif name == PlaybackPropertyName.Metadata:
self._update_metadata(value)
self._playback_properties.Metadata = value
elif name == PlaybackPropertyName.Shuffle:
self._controls.shuffle_enabled = value
self._playback_properties.Shuffle = value
elif name == PlaybackPropertyName.Rate:
self._controls.playback_rate = value
self._playback_properties.Rate = value
elif name == PlaybackPropertyName.LoopStatus:
if value == LoopStatus.None_:
self._controls.auto_repeat_mode = MediaPlaybackAutoRepeatMode.NONE
elif value == LoopStatus.Playlist:
self._controls.auto_repeat_mode = MediaPlaybackAutoRepeatMode.LIST
elif value == LoopStatus.Track:
self._controls.auto_repeat_mode = MediaPlaybackAutoRepeatMode.TRACK
self._playback_properties.LoopStatus = value
elif name == PlaybackPropertyName.Position:
self._timeline.position = TimeSpan(value)
self._timeline.start_time = TimeSpan(0)
self._timeline.min_seek_time = TimeSpan(0)
duration = self._playback_properties.Duration
if duration:
self._timeline.max_seek_time = TimeSpan(duration)
self._timeline.end_time = TimeSpan(duration)
self._controls.update_timeline_properties(self._timeline)
self._playback_properties.Position = value
elif name == PlaybackPropertyName.Duration:
self._timeline.end_time = TimeSpan(value)
self._timeline.max_seek_time = TimeSpan(value)
self._timeline.start_time = TimeSpan(0)
self._timeline.min_seek_time = TimeSpan(0)
self._controls.update_timeline_properties(self._timeline)
self._playback_properties.Duration = value
elif name == PlaybackPropertyName.Volume:
# SMTC exposes SoundLevel as a read-only indicator; no volume setter.
self._playback_properties.Volume = value
elif name == PlaybackPropertyName.MinimumRate:
self._playback_properties.MinimumRate = value
elif name == PlaybackPropertyName.MaximumRate:
self._playback_properties.MaximumRate = value
else:
super().set_playback_property(name, value)
def _update_metadata(self, value: PlaybackProperties.MetadataBean):
# update media info
is_music = True
if value.media_type == MediaType.Image:
self._updater.type = MediaPlaybackType.IMAGE
is_music = False
elif value.media_type == MediaType.Video:
self._updater.type = MediaPlaybackType.VIDEO
is_music = False
else:
self._updater.type = MediaPlaybackType.MUSIC
self._updater.app_media_id = value.id_
if is_music:
music_props = self._updater.music_properties
music_props.artist = ','.join(value.artist)
music_props.title = value.title
music_props.album_title = value.album
# Some projections expose album_artist and genres.
if hasattr(music_props, "album_artist") and value.albumArtist:
music_props.album_artist = ','.join(value.albumArtist)
# self._updater.music_properties.genres: IVector
if value.cover: # not None and not empty
self._updater.thumbnail = RandomAccessStreamReference.create_from_uri(Uri(value.cover))
self._updater.update()
# update timeline
self._timeline.start_time = TimeSpan(0)
self._timeline.end_time = TimeSpan(value.duration)
self._timeline.min_seek_time = TimeSpan(0)
self._timeline.max_seek_time = TimeSpan(value.duration)
self._controls.update_timeline_properties(self._timeline)
[docs]
def set_tracklist_property(self, name: TrackListPropertyName, value: Any):
super().set_tracklist_property(name, value)
[docs]
def get_property(self, name: PropertyName) -> Any:
return super().get_property(name)
[docs]
def get_playback_property(self, name: PlaybackPropertyName) -> Any:
return getattr(self._playback_properties, name.value)
[docs]
def get_tracklist_property(self, name: TrackListPropertyName) -> Any:
return super().get_tracklist_property(name)
[docs]
async def start(self):
# Don't need a background server for Windows
pass
[docs]
async def stop(self):
self._running = False
def _run_task(self, task):
# Windows callbacks may be invoked in non-main thread, besides,
# they may run in different threads.
if threading.current_thread() is not threading.main_thread():
asyncio.run_coroutine_threadsafe(task, self._loop)
else:
asyncio.create_task(task)