From 46fdb20953cb96e380568cfa4ffbce251811920f Mon Sep 17 00:00:00 2001 From: Mark <> Date: Mon, 13 May 2024 16:58:15 +0200 Subject: [PATCH] make player backend a generic trait --- musicdb-lib/Cargo.toml | 13 +- musicdb-lib/src/data/cache_manager.rs | 15 +- musicdb-lib/src/data/song.rs | 151 ++++++---- musicdb-lib/src/player/mod.rs | 383 ++++++++++++-------------- musicdb-lib/src/player/rodio.rs | 139 ++++++++++ musicdb-lib/src/server/get.rs | 15 +- musicdb-lib/src/server/mod.rs | 36 ++- 7 files changed, 472 insertions(+), 280 deletions(-) create mode 100644 musicdb-lib/src/player/rodio.rs diff --git a/musicdb-lib/Cargo.toml b/musicdb-lib/Cargo.toml index bb3ed5e..6cd8a49 100644 --- a/musicdb-lib/Cargo.toml +++ b/musicdb-lib/Cargo.toml @@ -3,16 +3,15 @@ name = "musicdb-lib" version = "0.1.0" edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] -awedio = { version = "0.2.0", optional = true } -base64 = "0.21.2" +base64 = "0.22.1" colorize = "0.1.0" rand = "0.8.5" rc-u8-reader = "2.0.16" -sysinfo = "0.30.5" -tokio = { version = "1.29.1", features = ["sync"] } +rodio = { version = "0.18.0", optional = true } +sysinfo = "0.30.12" +tokio = { version = "1.37.0", features = ["sync"] } [features] -playback = ["awedio"] +default = ["playback"] +playback = ["dep:rodio"] diff --git a/musicdb-lib/src/data/cache_manager.rs b/musicdb-lib/src/data/cache_manager.rs index 002899b..3dcac24 100644 --- a/musicdb-lib/src/data/cache_manager.rs +++ b/musicdb-lib/src/data/cache_manager.rs @@ -115,7 +115,7 @@ impl CacheManager { let mut found = false; for (id, song) in db.songs().iter() { if !ids_to_cache.contains(id) { - if let Ok(true) = song.uncache_data() { + if let Ok(true) = song.cached_data().uncache_data() { eprintln!( "[{}] CacheManager :: Uncached bytes for song '{}' (memory limit).", "INFO".cyan(), @@ -133,7 +133,7 @@ impl CacheManager { || queue_next_song.is_some_and(|i| i == *id)) { if let Some(song) = db.get_song(id) { - if let Ok(true) = song.uncache_data() { + if let Ok(true) = song.cached_data().uncache_data() { eprintln!( "[{}] CacheManager :: Uncached bytes for song '{}' (memory limit).", "INFO".cyan(), @@ -154,7 +154,10 @@ impl CacheManager { // we have some memory left, maybe cache a song (or cache multiple if we know their byte-sizes) for song in &ids_to_cache { if let Some(song) = db.get_song(song) { - match song.cache_data_start_thread_or_say_already_running(&db) { + match song + .cached_data() + .cache_data_start_thread_or_say_already_running(&db, song) + { Err(false) => (), // thread already running, don't start a second one (otherwise we may load too many songs, using too much memory, causing a cache-uncache-cycle) Err(true) => { @@ -176,7 +179,7 @@ impl CacheManager { if let Some(song_id) = queue_next_song { if let Some(song) = db.get_song(&song_id) { - if song.cache_data_start_thread(&db) { + if song.cached_data().cache_data_start_thread(&db, song) { eprintln!( "[{}] CacheManager :: Start caching bytes for next song, '{}'.", "INFO".cyan(), @@ -192,9 +195,9 @@ impl CacheManager { } else { cleanup_countdown = cleanup_max; for (id, song) in db.songs() { - if let Some(_size) = song.has_cached_data() { + if let Some(_size) = song.cached_data().has_cached_data() { if !is_in_queue(*id, &db.queue) { - if let Ok(true) = song.uncache_data() { + if let Ok(true) = song.cached_data().uncache_data() { eprintln!( "[{}] CacheManager :: Uncached bytes for song '{}' (not in queue).", "INFO".cyan(), diff --git a/musicdb-lib/src/data/song.rs b/musicdb-lib/src/data/song.rs index f4517f3..bb4e2ec 100755 --- a/musicdb-lib/src/data/song.rs +++ b/musicdb-lib/src/data/song.rs @@ -1,9 +1,11 @@ use std::{ fmt::Display, io::{Read, Write}, + mem::replace, path::PathBuf, sync::{Arc, Mutex}, thread::JoinHandle, + time::Instant, }; use colorize::AnsiColor; @@ -55,56 +57,113 @@ impl Song { file_size, duration_millis, general: GeneralData::default(), - cached_data: CachedData(Arc::new(Mutex::new((None, None)))), + cached_data: CachedData(Arc::new(Mutex::new((Err(None), None)))), } } + + pub fn cached_data(&self) -> &CachedData { + &self.cached_data + } +} +impl CachedData { pub fn uncache_data(&self) -> Result { - let mut cached = self.cached_data.0.lock().unwrap(); - match cached.0.take() { - Some(Ok(_data)) => Ok(true), - Some(Err(thread)) => { + let mut cached = self.0.lock().unwrap(); + match replace(&mut cached.0, Err(None)) { + Ok(Ok(_data)) => Ok(true), + Ok(Err(thread)) => { if thread.is_finished() { // get value from thread and drop it _ = thread.join(); Ok(true) } else { // thread is still running... - cached.0 = Some(Err(thread)); + cached.0 = Ok(Err(thread)); Err(()) } } - None => Ok(false), + Err(e) => { + cached.0 = Err(e); + Ok(false) + } } } /// If no data is cached yet and no caching thread is running, starts a thread to cache the data. - pub fn cache_data_start_thread(&self, db: &Database) -> bool { - self.cache_data_start_thread_or_say_already_running(db) + pub fn cache_data_start_thread(&self, db: &Database, song: &Song) -> bool { + self.cache_data_start_thread_or_say_already_running(db, song) .is_ok() } pub fn cache_data_start_thread_or_say_already_running( &self, db: &Database, + song: &Song, ) -> Result<(), bool> { - let mut cd = self.cached_data.0.lock().unwrap(); - match cd.0.as_ref() { - None => (), - Some(Err(t)) => return Err(!t.is_finished()), - Some(Ok(_)) => return Err(false), + self.get_data_or_start_thread_and_say_already_running(db, |_| (), || (), song) + } + /// gets the data if available, or, if no thread is running, starts a thread to get the data. + /// if a thread is running, was started, or recently encountered an error, `None` is returned, otherwise `Some(data)`. + pub fn get_data_or_maybe_start_thread( + &self, + db: &Database, + song: &Song, + ) -> Option>> { + self.get_data_or_start_thread_and_say_already_running( + db, + |data| Some(Arc::clone(data)), + || None, + song, + ) + .ok() + .and_then(|v| v) + } + /// `Err(true)` if a thread is already running, + /// `Ok(get_data(data))` if there is data, + /// `Ok(started())` if a thread was started, + /// `Err(false)` otherwise (i.e. loading data failed recently, 60 second cooldown between retries is active). + pub fn get_data_or_start_thread_and_say_already_running( + &self, + db: &Database, + get_data: impl FnOnce(&Arc>) -> T, + started: impl FnOnce() -> T, + song: &Song, + ) -> Result { + let mut cd = self.0.lock().unwrap(); + match cd.0.as_mut() { + Err(Some(i)) if i.elapsed().as_secs_f32() > 60.0 => return Err(false), + Err(_) => (), + Ok(Err(t)) => { + if t.is_finished() { + if let Some(bytes) = replace(&mut cd.0, Err(None)) + .unwrap() + .unwrap_err() + .join() + .unwrap() + { + cd.0 = Ok(Ok(bytes)); + return Ok(get_data(cd.0.as_ref().unwrap().as_ref().unwrap())); + } else { + cd.0 = Err(Some(Instant::now())); + return Err(false); + } + } else { + return Err(true); + } + } + Ok(Ok(bytes)) => return Ok(get_data(&*bytes)), }; let src = if let Some(dlcon) = &db.remote_server_as_song_file_source { - Err((self.id, Arc::clone(dlcon))) + Err((song.id, Arc::clone(dlcon))) } else { - Ok(db.get_path(&self.location)) + Ok(db.get_path(&song.location)) }; - cd.0 = Some(Err(std::thread::spawn(move || { + cd.0 = Ok(Err(std::thread::spawn(move || { let data = Self::load_data(src)?; Some(Arc::new(data)) }))); - Ok(()) + Ok(started()) } /// If the song's data is cached, returns the number of bytes. pub fn has_cached_data(&self) -> Option { - if let Some(Ok(v)) = self.cached_data.0.lock().unwrap().0.as_ref() { + if let Ok(Ok(v)) = self.0.lock().unwrap().0.as_ref() { Some(v.len()) } else { None @@ -114,43 +173,30 @@ impl Song { /// If a thread is running to load the data, it is not awaited. /// This function doesn't block. pub fn cached_data(&self) -> Option>> { - if let Some(Ok(v)) = self.cached_data.0.lock().unwrap().0.as_ref() { + if let Ok(Ok(v)) = self.0.lock().unwrap().0.as_ref() { Some(Arc::clone(v)) } else { None } } - /// Gets or loads the cached data. - /// If a thread is running to load the data, it *is* awaited. - /// This function will block until the data is loaded. - /// If it still returns none, some error must have occured. - pub fn cached_data_now(&self, db: &Database) -> Option>> { - let mut cd = self.cached_data.0.lock().unwrap(); - cd.0 = match cd.0.take() { - None => { - let src = if let Some(dlcon) = &db.remote_server_as_song_file_source { - Err((self.id, Arc::clone(dlcon))) + /// Gets the cached data, if available. + /// If a thread is running to load the data, it is awaited. + /// This function doesn't block. + pub fn cached_data_await(&self) -> Option>> { + let mut cd = self.0.lock().unwrap(); + let (out, next) = match replace(&mut cd.0, Err(None)) { + Ok(Ok(bytes)) => (Some(Arc::clone(&bytes)), Ok(Ok(bytes))), + Ok(Err(t)) => { + if let Some(bytes) = t.join().unwrap() { + (Some(Arc::clone(&bytes)), Ok(Ok(bytes))) } else { - Ok(db.get_path(&self.location)) - }; - if let Some(v) = Self::load_data(src) { - Some(Ok(Arc::new(v))) - } else { - None + (None, Err(Some(Instant::now()))) } } - Some(Err(t)) => match t.join() { - Err(_e) => None, - Ok(Some(v)) => Some(Ok(v)), - Ok(None) => None, - }, - Some(Ok(v)) => Some(Ok(v)), + Err(e) => (None, Err(e)), }; - if let Some(Ok(v)) = &cd.0 { - cd.1 = Some(v.len()); - } - drop(cd); - self.cached_data() + cd.0 = next; + out } fn load_data( src: Result< @@ -236,17 +282,22 @@ impl ToFromBytes for Song { file_size: ToFromBytes::from_bytes(s)?, duration_millis: ToFromBytes::from_bytes(s)?, general: ToFromBytes::from_bytes(s)?, - cached_data: CachedData(Arc::new(Mutex::new((None, None)))), + cached_data: CachedData(Arc::new(Mutex::new((Err(None), None)))), }) } } -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct CachedData( pub Arc< Mutex<( - Option>, JoinHandle>>>>>, + Result>, JoinHandle>>>>, Option>, Option, )>, >, ); +impl Clone for CachedData { + fn clone(&self) -> Self { + Self(Arc::clone(&self.0)) + } +} diff --git a/musicdb-lib/src/player/mod.rs b/musicdb-lib/src/player/mod.rs index 0df444d..daacd01 100755 --- a/musicdb-lib/src/player/mod.rs +++ b/musicdb-lib/src/player/mod.rs @@ -1,58 +1,86 @@ -use std::{ - collections::HashSet, - sync::{atomic::AtomicBool, Arc}, -}; +pub mod rodio; -use awedio::{ - backends::CpalBackend, - manager::Manager, - sounds::wrappers::{AsyncCompletionNotifier, Controller, Pausable}, - Sound, -}; -use colorize::AnsiColor; -use rc_u8_reader::ArcU8Reader; +use std::{collections::HashMap, ffi::OsStr, sync::Arc}; use crate::{ - data::{database::Database, SongId}, + data::{database::Database, song::CachedData, SongId}, server::Command, }; -pub struct Player { - /// can be unused, but must be present otherwise audio playback breaks - #[allow(unused)] - backend: CpalBackend, - source: Option<( - Controller>>>, - Arc, - )>, - manager: Manager, - current_song_id: SongOpt, - cached: HashSet, - allow_sending_commands: bool, +pub struct Player> { + cached: HashMap, + pub backend: T, } -pub enum SongOpt { - None, - Some(SongId), - /// Will be set to Some or None once handeled - New(Option), +pub struct SongCustomData { + load_duration: bool, +} +pub trait PlayerBackend { + /// load the next song from its bytes + fn load_next_song( + &mut self, + id: SongId, + filename: &OsStr, + bytes: Arc>, + load_duration: bool, + custom_data: T, + ); + + /// pause playback. if `resume` is called, resume the song where it was paused. + fn pause(&mut self); + /// stop playback. if `resume` is called, restart the song. + fn stop(&mut self); + /// used after pause or stop. + /// does nothing if no song was playing, song was cleared, or a song is already playing. + fn resume(&mut self); + + /// stop and discard the currently playing song, then set the next song as the current one. + /// `play` decides whether the next song should start playing or not. + fn next(&mut self, play: bool, load_duration: bool); + /// stop and discard the currently playing and next song. + /// calling `resume` after this was called but before a new song was loaded does nothing. + fn clear(&mut self); + + /// Should be `true` after calling `resume()` or `next(true)` if `current_song().is_some()` + fn playing(&self) -> bool; + + /// - `None` before a song is loaded or after `clear` was called, + /// - `Some((id, false))` while loading (if loading is done on a separate thread), + /// - `Some((id, true))` if a song is loaded and ready to be played (or loading failed) + /// performance notes: must be fast, as it is called repeatedly + fn current_song(&self) -> Option<(SongId, bool, &T)>; + /// like `has_current_song`. + /// performance notes: must be fast, as it is called repeatedly + fn next_song(&self) -> Option<(SongId, bool, &T)>; + + fn gen_data_mut(&mut self) -> (Option<&mut T>, Option<&mut T>); + + // if true, call `song_finished` more often. if false, song_finished may also be a constant `false`, but events should be sent directly to the server instead. + // this **must be constant**: it cannot change after the backend was constructed + fn song_finished_polling(&self) -> bool; + + /// true if the currently playing song has finished playing. + /// this may return a constant `false` if the playback thread automatically sends a `NextSong` command to the database when this happens. + /// performance notes: must be fast, as it is called repeatedly + fn song_finished(&self) -> bool; + + /// If possible, return the current song's duration in milliseconds. + /// It could also just return `None`. + /// If `load_duration` was `false` in either `load_next_song` or `next`, for performance reasons, + /// this should probably return `None` (unless getting the duration is virtually free). + /// `load_duration` can be ignored if you don't want to load the duration anyway, it's just there to prevent you from loading the duration if it won't be used anyway + fn current_song_duration(&self) -> Option; + + /// If known, get the current playback position in the song, in milliseconds. + fn current_song_playback_position(&self) -> Option; } -impl Player { - pub fn new() -> Result> { - let (manager, backend) = awedio::start()?; - Ok(Self { - manager, +impl> Player { + pub fn new(backend: T) -> Self { + Self { + cached: HashMap::new(), backend, - source: None, - current_song_id: SongOpt::None, - cached: HashSet::new(), - allow_sending_commands: true, - }) - } - pub fn without_sending_commands(mut self) -> Self { - self.allow_sending_commands = false; - self + } } pub fn handle_command(&mut self, command: &Command) { match command { @@ -63,189 +91,136 @@ impl Player { } } pub fn pause(&mut self) { - if let Some((source, _notif)) = &mut self.source { - source.set_paused(true); - } + self.backend.pause(); } pub fn resume(&mut self) { - if let Some((source, _notif)) = &mut self.source { - source.set_paused(false); - } else if let SongOpt::Some(id) = &self.current_song_id { - // there is no source to resume playback on, but there is a current song - self.current_song_id = SongOpt::New(Some(*id)); - } + self.backend.resume(); } pub fn stop(&mut self) { - if let Some((source, _notif)) = &mut self.source { - source.set_paused(true); - } - if let SongOpt::Some(id) | SongOpt::New(Some(id)) = self.current_song_id { - self.current_song_id = SongOpt::New(Some(id)); - } else { - self.current_song_id = SongOpt::New(None); - } + self.backend.stop(); } - pub fn update( - &mut self, - db: &mut Database, - command_sender: &Arc, - ) { - self.update_uncache_opt(db, command_sender, true) + + pub fn update(&mut self, db: &mut Database) { + self.update_uncache_opt(db, true) } /// never uncache songs (this is something the CacheManager has to do if you decide to use this function) - pub fn update_dont_uncache( - &mut self, - db: &mut Database, - command_sender: &Arc, - ) { - self.update_uncache_opt(db, command_sender, false) + pub fn update_dont_uncache(&mut self, db: &mut Database) { + self.update_uncache_opt(db, false) } - pub fn update_uncache_opt( - &mut self, - db: &mut Database, - command_sender: &Arc, - allow_uncaching: bool, - ) { - macro_rules! apply_command { - ($cmd:expr) => { - if self.allow_sending_commands { - db.apply_command($cmd); - } - }; - } - if db.playing && self.source.is_none() { - if let Some(song) = db.queue.get_current_song() { - // db playing, but no source - initialize a source (via SongOpt::New) - self.current_song_id = SongOpt::New(Some(*song)); - } else { - // db.playing, but no song in queue... - apply_command!(Command::Stop); - } - } else if let Some((_source, notif)) = &mut self.source { - if notif.load(std::sync::atomic::Ordering::Relaxed) { - // song has finished playing - self.current_song_id = SongOpt::New(db.queue.get_current_song().cloned()); - } + pub fn update_uncache_opt(&mut self, db: &mut Database, allow_uncaching: bool) { + if self.backend.song_finished() { + db.apply_command(Command::NextSong); } - // check the queue's current index - if let SongOpt::None = self.current_song_id { - if let Some(id) = db.queue.get_current_song() { - self.current_song_id = SongOpt::New(Some(*id)); - } - } else if let SongOpt::Some(l_id) = &self.current_song_id { - if let Some(id) = db.queue.get_current_song() { - if *id != *l_id { - self.current_song_id = SongOpt::New(Some(*id)); - } - } else { - self.current_song_id = SongOpt::New(None); - } - } + let queue_current_song = db.queue.get_current_song().copied(); + let queue_next_song = db.queue.get_next_song().copied(); - // new current song - if let SongOpt::New(song_opt) = &self.current_song_id { - // stop playback - // eprintln!("[play] stopping playback"); - self.manager.clear(); - if let Some(song_id) = song_opt { - // start playback again - if let Some(song) = db.get_song(song_id) { - // eprintln!("[play] starting playback..."); - // add our song - let ext = match &song.location.rel_path.extension() { - Some(s) => s.to_str().unwrap_or(""), - None => "", - }; - if db.playing { - if let Some(bytes) = song.cached_data_now(db) { - self.cached.insert(song.id); - match Self::sound_from_bytes(ext, bytes) { - Ok(v) => { - let (sound, notif) = - v.pausable().with_async_completion_notifier(); - // add it - let (sound, controller) = sound.controllable(); - let finished = Arc::new(AtomicBool::new(false)); - let fin = Arc::clone(&finished); - let command_sender = Arc::clone(command_sender); - std::thread::spawn(move || { - // `blocking_recv` returns `Err(_)` when the sound is dropped, so the thread won't linger forever - if let Ok(_v) = notif.blocking_recv() { - fin.store(true, std::sync::atomic::Ordering::Relaxed); - command_sender(Command::NextSong); - } - }); - self.source = Some((controller, finished)); - // and play it - self.manager.play(Box::new(sound)); - } - Err(e) => { - eprintln!( - "[{}] [player] Can't play, skipping! {e}", - "INFO".blue() - ); - apply_command!(Command::NextSong); - } + match (self.backend.current_song().map(|v| v.0), queue_current_song) { + (None, None) => (), + (Some(a), Some(b)) if a == b => (), + (_, Some(id)) => { + if self.backend.next_song().map(|v| v.0) == queue_current_song { + let load_duration = self + .backend + .next_song() + .is_some_and(|(_, _, t)| t.load_duration); + self.backend.next(db.playing, load_duration); + if load_duration { + if let Some(dur) = self.backend.current_song_duration() { + db.apply_command(Command::SetSongDuration(id, dur)) + } + } + } else if let Some(song) = db.get_song(&id) { + self.cached.insert(id, song.cached_data().clone()); + if let Some(bytes) = song + .cached_data() + .get_data_or_maybe_start_thread(db, song) + .or_else(|| song.cached_data().cached_data_await()) + { + let load_duration = song.duration_millis == 0; + self.backend.load_next_song( + id, + song.location + .rel_path + .file_name() + .unwrap_or_else(|| OsStr::new("")), + bytes, + load_duration, + SongCustomData { load_duration }, + ); + self.backend.next(db.playing, load_duration); + if load_duration { + if let Some(dur) = self.backend.current_song_duration() { + db.apply_command(Command::SetSongDuration(id, dur)) } - } else { - // couldn't load song bytes - db.broadcast_update(&Command::ErrorInfo( - "NoSongData".to_owned(), - format!("Couldn't load song #{}\n({})", song.id, song.title), - )); - apply_command!(Command::NextSong); } } else { - self.source = None; - song.cache_data_start_thread(&db); - self.cached.insert(song.id); + // only show an error if the user tries to play the song. + // otherwise, the error might be spammed. + if db.playing { + db.apply_command(Command::ErrorInfo( + format!("Couldn't load bytes for song {id}"), + format!( + "Song: {}\nby {:?} on {:?}", + song.title, song.artist, song.album + ), + )); + db.apply_command(Command::NextSong); + } + self.backend.clear(); } } else { - panic!("invalid song ID: current_song_id not found in DB!"); + self.backend.clear(); } - self.current_song_id = SongOpt::Some(*song_id); - } else { - self.current_song_id = SongOpt::None; } - let next_song = db.queue.get_next_song().and_then(|v| db.get_song(v)); - if allow_uncaching { - for &id in &self.cached { - if Some(id) != next_song.map(|v| v.id) - && !matches!(self.current_song_id, SongOpt::Some(v) if v == id) + (Some(_), None) => self.backend.clear(), + } + match (self.backend.next_song().map(|v| v.0), queue_next_song) { + (None, None) => (), + (Some(a), Some(b)) if a == b => (), + (_, Some(id)) => { + if let Some(song) = db.get_song(&id) { + self.cached.insert(id, song.cached_data().clone()); + if let Some(bytes) = + song.cached_data().get_data_or_maybe_start_thread(&db, song) { - if let Some(song) = db.songs().get(&id) { - if let Ok(_) = song.uncache_data() { - self.cached.remove(&id); - break; - } - } else { - self.cached.remove(&id); - break; - } + let load_duration = song.duration_millis == 0; + self.backend.load_next_song( + id, + song.location + .rel_path + .file_name() + .unwrap_or_else(|| OsStr::new("")), + bytes, + load_duration, + SongCustomData { load_duration }, + ); } } } - if let Some(song) = next_song { - song.cache_data_start_thread(&db); - self.cached.insert(song.id); + (Some(_), None) => (), + } + if db.playing != self.backend.playing() { + if db.playing { + self.backend.resume(); + // if we can't resume (i.e. there is no song), send `Pause` command + if !self.backend.playing() { + db.apply_command(Command::Pause); + } + } else { + self.backend.pause(); + } + } + + if allow_uncaching { + for (&id, cd) in &self.cached { + if Some(id) != queue_current_song && Some(id) != queue_next_song { + if let Ok(_) = cd.uncache_data() { + self.cached.remove(&id); + break; + } + } } } } - - /// partly identical to awedio/src/sounds/open_file.rs open_file_with_reader(), which is a private function I can't access - fn sound_from_bytes( - extension: &str, - bytes: Arc>, - ) -> Result, std::io::Error> { - let reader = ArcU8Reader::new(bytes); - Ok(match extension { - "wav" => Box::new( - awedio::sounds::decoders::WavDecoder::new(reader) - .map_err(|_e| std::io::Error::from(std::io::ErrorKind::InvalidData))?, - ), - "mp3" => Box::new(awedio::sounds::decoders::Mp3Decoder::new(reader)), - _ => return Err(std::io::Error::from(std::io::ErrorKind::Unsupported)), - }) - } } diff --git a/musicdb-lib/src/player/rodio.rs b/musicdb-lib/src/player/rodio.rs new file mode 100644 index 0000000..7e75a25 --- /dev/null +++ b/musicdb-lib/src/player/rodio.rs @@ -0,0 +1,139 @@ +use std::{ffi::OsStr, sync::Arc}; + +use rc_u8_reader::ArcU8Reader; +use rodio::{decoder::DecoderError, Decoder, OutputStream, OutputStreamHandle, Sink, Source}; + +use crate::{data::SongId, server::Command}; + +use super::PlayerBackend; + +pub struct PlayerBackendRodio { + #[allow(unused)] + output_stream: OutputStream, + #[allow(unused)] + output_stream_handle: OutputStreamHandle, + sink: Sink, + stopped: bool, + current: Option<(SongId, Arc>, Option, T)>, + next: Option<(SongId, Arc>, Option, T)>, + command_sender: std::sync::mpsc::Sender, +} + +impl PlayerBackendRodio { + pub fn new( + command_sender: std::sync::mpsc::Sender, + ) -> Result> { + let (output_stream, output_stream_handle) = rodio::OutputStream::try_default()?; + let sink = Sink::try_new(&output_stream_handle)?; + Ok(Self { + output_stream, + output_stream_handle, + sink, + stopped: true, + current: None, + next: None, + command_sender, + }) + } +} + +impl PlayerBackend for PlayerBackendRodio { + fn load_next_song( + &mut self, + id: SongId, + _filename: &OsStr, + bytes: Arc>, + _load_duration: bool, + custom_data: T, + ) { + let decoder = decoder_from_bytes(Arc::clone(&bytes)); + if let Err(e) = &decoder { + self.command_sender + .send(Command::ErrorInfo( + format!("Couldn't decode song #{id}!"), + format!("Error: '{e}'"), + )) + .unwrap(); + } + self.next = Some((id, bytes, decoder.ok(), custom_data)); + } + fn pause(&mut self) { + self.sink.pause(); + } + fn stop(&mut self) { + if !self.stopped { + self.sink.clear(); + if let Some((_, bytes, _, _)) = &self.current { + if let Ok(decoder) = decoder_from_bytes(Arc::clone(bytes)) { + self.sink.append(decoder); + } + } + } + } + fn resume(&mut self) { + self.stopped = false; + self.sink.play(); + } + fn next(&mut self, play: bool, load_duration: bool) { + self.stopped = false; + self.sink.clear(); + self.current = self + .next + .take() + .map(|(id, bytes, mut decoder, custom_data)| { + let duration = if let Some(decoder) = decoder.take() { + let duration = if load_duration { + dbg!(decoder.total_duration().map(|v| v.as_millis())) + } else { + None + }; + self.sink.append(decoder); + if play { + self.sink.play(); + } + duration + } else { + None + }; + (id, bytes, duration, custom_data) + }); + } + fn clear(&mut self) { + self.sink.clear(); + } + fn playing(&self) -> bool { + !(self.sink.is_paused() || self.sink.empty()) + } + fn current_song(&self) -> Option<(SongId, bool, &T)> { + self.current.as_ref().map(|(id, _, _, t)| (*id, true, t)) + } + fn next_song(&self) -> Option<(SongId, bool, &T)> { + self.next.as_ref().map(|(id, _, _, t)| (*id, true, t)) + } + fn gen_data_mut(&mut self) -> (Option<&mut T>, Option<&mut T>) { + ( + self.current.as_mut().map(|(_, _, _, t)| t), + self.next.as_mut().map(|(_, _, _, t)| t), + ) + } + fn song_finished_polling(&self) -> bool { + true + } + fn song_finished(&self) -> bool { + self.current.is_some() && self.sink.empty() + } + fn current_song_duration(&self) -> Option { + self.current + .as_ref() + .and_then(|(_, _, dur, _)| dur.map(|v| v as _)) + } + fn current_song_playback_position(&self) -> Option { + None + } +} + +type MyDecoder = Decoder>>; + +fn decoder_from_bytes(bytes: Arc>) -> Result { + Decoder::new(ArcU8Reader::new(Arc::clone(&bytes))).map(|decoder| decoder) +} diff --git a/musicdb-lib/src/server/get.rs b/musicdb-lib/src/server/get.rs index 04131db..c0229ef 100755 --- a/musicdb-lib/src/server/get.rs +++ b/musicdb-lib/src/server/get.rs @@ -199,7 +199,20 @@ pub fn handle_one_connection_as_get( .and_then(|id| id.parse().ok()) .and_then(|id| { let db = db.lock().unwrap(); - db.get_song(&id).and_then(|song| song.cached_data_now(&db)) + if let Some(song) = db.get_song(&id) { + let cd = song.cached_data(); + if let Some(data) = + cd.get_data_or_maybe_start_thread(&db, song) + { + Some(data) + } else { + let cd = cd.clone(); + drop(db); + cd.cached_data_await() + } + } else { + None + } }) { writeln!(connection.get_mut(), "len: {}", bytes.len())?; diff --git a/musicdb-lib/src/server/mod.rs b/musicdb-lib/src/server/mod.rs index 49a607b..3f3ed5a 100755 --- a/musicdb-lib/src/server/mod.rs +++ b/musicdb-lib/src/server/mod.rs @@ -134,9 +134,16 @@ pub fn run_server_caching_thread_opt( ) { use std::time::Instant; - use crate::data::cache_manager::CacheManager; + use crate::{ + data::cache_manager::CacheManager, + player::{rodio::PlayerBackendRodio, PlayerBackend}, + }; - let mut player = Player::new().unwrap(); + // commands sent to this will be handeled later in this function in an infinite loop. + // these commands are sent to the database asap. + let (command_sender, command_receiver) = mpsc::channel(); + + let mut player = Player::new(PlayerBackendRodio::new(command_sender.clone()).unwrap()); let cache_manager = if let Some(func) = caching_thread { let mut cm = CacheManager::new(Arc::clone(&database)); func(&mut cm); @@ -144,9 +151,6 @@ pub fn run_server_caching_thread_opt( } else { None }; - // commands sent to this will be handeled later in this function in an infinite loop. - // these commands are sent to the database asap. - let (command_sender, command_receiver) = mpsc::channel(); if let Some(s) = sender_sender { s.blocking_send(command_sender.clone()).unwrap(); } @@ -199,19 +203,26 @@ pub fn run_server_caching_thread_opt( } } } - let dur = Duration::from_secs(10); - let command_sender = Arc::new(move |cmd| { - _ = command_sender.send(cmd); - }); + let song_done_polling = player.backend.song_finished_polling(); + let (dur, check_every) = if song_done_polling { + (Duration::from_millis(50), 200) + } else { + (Duration::from_secs(10), 0) + }; + let mut check = 0; + let mut checkf = true; loop { - { + check += 1; + if check >= check_every || checkf || player.backend.song_finished() { + check = 0; + checkf = false; // at the start and once after every command sent to the server, let mut db = database.lock().unwrap(); // update the player if cache_manager.is_some() { - player.update_dont_uncache(&mut db, &command_sender); + player.update_dont_uncache(&mut db); } else { - player.update(&mut db, &command_sender); + player.update(&mut db); } // autosave if necessary if let Some((first, last)) = db.times_data_modified { @@ -224,6 +235,7 @@ pub fn run_server_caching_thread_opt( } } if let Ok(command) = command_receiver.recv_timeout(dur) { + checkf = true; player.handle_command(&command); database.lock().unwrap().apply_command(command); }