make player backend a generic trait

This commit is contained in:
Mark 2024-05-13 16:58:15 +02:00
parent a316f6282e
commit 46fdb20953
7 changed files with 472 additions and 280 deletions

View File

@ -3,16 +3,15 @@ name = "musicdb-lib"
version = "0.1.0" version = "0.1.0"
edition = "2021" edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
awedio = { version = "0.2.0", optional = true } base64 = "0.22.1"
base64 = "0.21.2"
colorize = "0.1.0" colorize = "0.1.0"
rand = "0.8.5" rand = "0.8.5"
rc-u8-reader = "2.0.16" rc-u8-reader = "2.0.16"
sysinfo = "0.30.5" rodio = { version = "0.18.0", optional = true }
tokio = { version = "1.29.1", features = ["sync"] } sysinfo = "0.30.12"
tokio = { version = "1.37.0", features = ["sync"] }
[features] [features]
playback = ["awedio"] default = ["playback"]
playback = ["dep:rodio"]

View File

@ -115,7 +115,7 @@ impl CacheManager {
let mut found = false; let mut found = false;
for (id, song) in db.songs().iter() { for (id, song) in db.songs().iter() {
if !ids_to_cache.contains(id) { if !ids_to_cache.contains(id) {
if let Ok(true) = song.uncache_data() { if let Ok(true) = song.cached_data().uncache_data() {
eprintln!( eprintln!(
"[{}] CacheManager :: Uncached bytes for song '{}' (memory limit).", "[{}] CacheManager :: Uncached bytes for song '{}' (memory limit).",
"INFO".cyan(), "INFO".cyan(),
@ -133,7 +133,7 @@ impl CacheManager {
|| queue_next_song.is_some_and(|i| i == *id)) || queue_next_song.is_some_and(|i| i == *id))
{ {
if let Some(song) = db.get_song(id) { if let Some(song) = db.get_song(id) {
if let Ok(true) = song.uncache_data() { if let Ok(true) = song.cached_data().uncache_data() {
eprintln!( eprintln!(
"[{}] CacheManager :: Uncached bytes for song '{}' (memory limit).", "[{}] CacheManager :: Uncached bytes for song '{}' (memory limit).",
"INFO".cyan(), "INFO".cyan(),
@ -154,7 +154,10 @@ impl CacheManager {
// we have some memory left, maybe cache a song (or cache multiple if we know their byte-sizes) // we have some memory left, maybe cache a song (or cache multiple if we know their byte-sizes)
for song in &ids_to_cache { for song in &ids_to_cache {
if let Some(song) = db.get_song(song) { if let Some(song) = db.get_song(song) {
match song.cache_data_start_thread_or_say_already_running(&db) { match song
.cached_data()
.cache_data_start_thread_or_say_already_running(&db, song)
{
Err(false) => (), Err(false) => (),
// thread already running, don't start a second one (otherwise we may load too many songs, using too much memory, causing a cache-uncache-cycle) // thread already running, don't start a second one (otherwise we may load too many songs, using too much memory, causing a cache-uncache-cycle)
Err(true) => { Err(true) => {
@ -176,7 +179,7 @@ impl CacheManager {
if let Some(song_id) = queue_next_song { if let Some(song_id) = queue_next_song {
if let Some(song) = db.get_song(&song_id) { if let Some(song) = db.get_song(&song_id) {
if song.cache_data_start_thread(&db) { if song.cached_data().cache_data_start_thread(&db, song) {
eprintln!( eprintln!(
"[{}] CacheManager :: Start caching bytes for next song, '{}'.", "[{}] CacheManager :: Start caching bytes for next song, '{}'.",
"INFO".cyan(), "INFO".cyan(),
@ -192,9 +195,9 @@ impl CacheManager {
} else { } else {
cleanup_countdown = cleanup_max; cleanup_countdown = cleanup_max;
for (id, song) in db.songs() { for (id, song) in db.songs() {
if let Some(_size) = song.has_cached_data() { if let Some(_size) = song.cached_data().has_cached_data() {
if !is_in_queue(*id, &db.queue) { if !is_in_queue(*id, &db.queue) {
if let Ok(true) = song.uncache_data() { if let Ok(true) = song.cached_data().uncache_data() {
eprintln!( eprintln!(
"[{}] CacheManager :: Uncached bytes for song '{}' (not in queue).", "[{}] CacheManager :: Uncached bytes for song '{}' (not in queue).",
"INFO".cyan(), "INFO".cyan(),

View File

@ -1,9 +1,11 @@
use std::{ use std::{
fmt::Display, fmt::Display,
io::{Read, Write}, io::{Read, Write},
mem::replace,
path::PathBuf, path::PathBuf,
sync::{Arc, Mutex}, sync::{Arc, Mutex},
thread::JoinHandle, thread::JoinHandle,
time::Instant,
}; };
use colorize::AnsiColor; use colorize::AnsiColor;
@ -55,56 +57,113 @@ impl Song {
file_size, file_size,
duration_millis, duration_millis,
general: GeneralData::default(), general: GeneralData::default(),
cached_data: CachedData(Arc::new(Mutex::new((None, None)))), cached_data: CachedData(Arc::new(Mutex::new((Err(None), None)))),
} }
} }
pub fn cached_data(&self) -> &CachedData {
&self.cached_data
}
}
impl CachedData {
pub fn uncache_data(&self) -> Result<bool, ()> { pub fn uncache_data(&self) -> Result<bool, ()> {
let mut cached = self.cached_data.0.lock().unwrap(); let mut cached = self.0.lock().unwrap();
match cached.0.take() { match replace(&mut cached.0, Err(None)) {
Some(Ok(_data)) => Ok(true), Ok(Ok(_data)) => Ok(true),
Some(Err(thread)) => { Ok(Err(thread)) => {
if thread.is_finished() { if thread.is_finished() {
// get value from thread and drop it // get value from thread and drop it
_ = thread.join(); _ = thread.join();
Ok(true) Ok(true)
} else { } else {
// thread is still running... // thread is still running...
cached.0 = Some(Err(thread)); cached.0 = Ok(Err(thread));
Err(()) Err(())
} }
} }
None => Ok(false), Err(e) => {
cached.0 = Err(e);
Ok(false)
}
} }
} }
/// If no data is cached yet and no caching thread is running, starts a thread to cache the data. /// If no data is cached yet and no caching thread is running, starts a thread to cache the data.
pub fn cache_data_start_thread(&self, db: &Database) -> bool { pub fn cache_data_start_thread(&self, db: &Database, song: &Song) -> bool {
self.cache_data_start_thread_or_say_already_running(db) self.cache_data_start_thread_or_say_already_running(db, song)
.is_ok() .is_ok()
} }
pub fn cache_data_start_thread_or_say_already_running( pub fn cache_data_start_thread_or_say_already_running(
&self, &self,
db: &Database, db: &Database,
song: &Song,
) -> Result<(), bool> { ) -> Result<(), bool> {
let mut cd = self.cached_data.0.lock().unwrap(); self.get_data_or_start_thread_and_say_already_running(db, |_| (), || (), song)
match cd.0.as_ref() { }
None => (), /// gets the data if available, or, if no thread is running, starts a thread to get the data.
Some(Err(t)) => return Err(!t.is_finished()), /// if a thread is running, was started, or recently encountered an error, `None` is returned, otherwise `Some(data)`.
Some(Ok(_)) => return Err(false), pub fn get_data_or_maybe_start_thread(
&self,
db: &Database,
song: &Song,
) -> Option<Arc<Vec<u8>>> {
self.get_data_or_start_thread_and_say_already_running(
db,
|data| Some(Arc::clone(data)),
|| None,
song,
)
.ok()
.and_then(|v| v)
}
/// `Err(true)` if a thread is already running,
/// `Ok(get_data(data))` if there is data,
/// `Ok(started())` if a thread was started,
/// `Err(false)` otherwise (i.e. loading data failed recently, 60 second cooldown between retries is active).
pub fn get_data_or_start_thread_and_say_already_running<T>(
&self,
db: &Database,
get_data: impl FnOnce(&Arc<Vec<u8>>) -> T,
started: impl FnOnce() -> T,
song: &Song,
) -> Result<T, bool> {
let mut cd = self.0.lock().unwrap();
match cd.0.as_mut() {
Err(Some(i)) if i.elapsed().as_secs_f32() > 60.0 => return Err(false),
Err(_) => (),
Ok(Err(t)) => {
if t.is_finished() {
if let Some(bytes) = replace(&mut cd.0, Err(None))
.unwrap()
.unwrap_err()
.join()
.unwrap()
{
cd.0 = Ok(Ok(bytes));
return Ok(get_data(cd.0.as_ref().unwrap().as_ref().unwrap()));
} else {
cd.0 = Err(Some(Instant::now()));
return Err(false);
}
} else {
return Err(true);
}
}
Ok(Ok(bytes)) => return Ok(get_data(&*bytes)),
}; };
let src = if let Some(dlcon) = &db.remote_server_as_song_file_source { let src = if let Some(dlcon) = &db.remote_server_as_song_file_source {
Err((self.id, Arc::clone(dlcon))) Err((song.id, Arc::clone(dlcon)))
} else { } else {
Ok(db.get_path(&self.location)) Ok(db.get_path(&song.location))
}; };
cd.0 = Some(Err(std::thread::spawn(move || { cd.0 = Ok(Err(std::thread::spawn(move || {
let data = Self::load_data(src)?; let data = Self::load_data(src)?;
Some(Arc::new(data)) Some(Arc::new(data))
}))); })));
Ok(()) Ok(started())
} }
/// If the song's data is cached, returns the number of bytes. /// If the song's data is cached, returns the number of bytes.
pub fn has_cached_data(&self) -> Option<usize> { pub fn has_cached_data(&self) -> Option<usize> {
if let Some(Ok(v)) = self.cached_data.0.lock().unwrap().0.as_ref() { if let Ok(Ok(v)) = self.0.lock().unwrap().0.as_ref() {
Some(v.len()) Some(v.len())
} else { } else {
None None
@ -114,43 +173,30 @@ impl Song {
/// If a thread is running to load the data, it is not awaited. /// If a thread is running to load the data, it is not awaited.
/// This function doesn't block. /// This function doesn't block.
pub fn cached_data(&self) -> Option<Arc<Vec<u8>>> { pub fn cached_data(&self) -> Option<Arc<Vec<u8>>> {
if let Some(Ok(v)) = self.cached_data.0.lock().unwrap().0.as_ref() { if let Ok(Ok(v)) = self.0.lock().unwrap().0.as_ref() {
Some(Arc::clone(v)) Some(Arc::clone(v))
} else { } else {
None None
} }
} }
/// Gets or loads the cached data. /// Gets the cached data, if available.
/// If a thread is running to load the data, it *is* awaited. /// If a thread is running to load the data, it is awaited.
/// This function will block until the data is loaded. /// This function doesn't block.
/// If it still returns none, some error must have occured. pub fn cached_data_await(&self) -> Option<Arc<Vec<u8>>> {
pub fn cached_data_now(&self, db: &Database) -> Option<Arc<Vec<u8>>> { let mut cd = self.0.lock().unwrap();
let mut cd = self.cached_data.0.lock().unwrap(); let (out, next) = match replace(&mut cd.0, Err(None)) {
cd.0 = match cd.0.take() { Ok(Ok(bytes)) => (Some(Arc::clone(&bytes)), Ok(Ok(bytes))),
None => { Ok(Err(t)) => {
let src = if let Some(dlcon) = &db.remote_server_as_song_file_source { if let Some(bytes) = t.join().unwrap() {
Err((self.id, Arc::clone(dlcon))) (Some(Arc::clone(&bytes)), Ok(Ok(bytes)))
} else { } else {
Ok(db.get_path(&self.location)) (None, Err(Some(Instant::now())))
}
}
Err(e) => (None, Err(e)),
}; };
if let Some(v) = Self::load_data(src) { cd.0 = next;
Some(Ok(Arc::new(v))) out
} else {
None
}
}
Some(Err(t)) => match t.join() {
Err(_e) => None,
Ok(Some(v)) => Some(Ok(v)),
Ok(None) => None,
},
Some(Ok(v)) => Some(Ok(v)),
};
if let Some(Ok(v)) = &cd.0 {
cd.1 = Some(v.len());
}
drop(cd);
self.cached_data()
} }
fn load_data( fn load_data(
src: Result< src: Result<
@ -236,17 +282,22 @@ impl ToFromBytes for Song {
file_size: ToFromBytes::from_bytes(s)?, file_size: ToFromBytes::from_bytes(s)?,
duration_millis: ToFromBytes::from_bytes(s)?, duration_millis: ToFromBytes::from_bytes(s)?,
general: ToFromBytes::from_bytes(s)?, general: ToFromBytes::from_bytes(s)?,
cached_data: CachedData(Arc::new(Mutex::new((None, None)))), cached_data: CachedData(Arc::new(Mutex::new((Err(None), None)))),
}) })
} }
} }
#[derive(Clone, Debug)] #[derive(Debug)]
pub struct CachedData( pub struct CachedData(
pub Arc< pub Arc<
Mutex<( Mutex<(
Option<Result<Arc<Vec<u8>>, JoinHandle<Option<Arc<Vec<u8>>>>>>, Result<Result<Arc<Vec<u8>>, JoinHandle<Option<Arc<Vec<u8>>>>>, Option<Instant>>,
Option<usize>, Option<usize>,
)>, )>,
>, >,
); );
impl Clone for CachedData {
fn clone(&self) -> Self {
Self(Arc::clone(&self.0))
}
}

View File

@ -1,58 +1,86 @@
use std::{ pub mod rodio;
collections::HashSet,
sync::{atomic::AtomicBool, Arc},
};
use awedio::{ use std::{collections::HashMap, ffi::OsStr, sync::Arc};
backends::CpalBackend,
manager::Manager,
sounds::wrappers::{AsyncCompletionNotifier, Controller, Pausable},
Sound,
};
use colorize::AnsiColor;
use rc_u8_reader::ArcU8Reader;
use crate::{ use crate::{
data::{database::Database, SongId}, data::{database::Database, song::CachedData, SongId},
server::Command, server::Command,
}; };
pub struct Player { pub struct Player<T: PlayerBackend<SongCustomData>> {
/// can be unused, but must be present otherwise audio playback breaks cached: HashMap<SongId, CachedData>,
#[allow(unused)] pub backend: T,
backend: CpalBackend,
source: Option<(
Controller<AsyncCompletionNotifier<Pausable<Box<dyn Sound>>>>,
Arc<AtomicBool>,
)>,
manager: Manager,
current_song_id: SongOpt,
cached: HashSet<SongId>,
allow_sending_commands: bool,
} }
pub enum SongOpt { pub struct SongCustomData {
None, load_duration: bool,
Some(SongId), }
/// Will be set to Some or None once handeled pub trait PlayerBackend<T> {
New(Option<SongId>), /// load the next song from its bytes
fn load_next_song(
&mut self,
id: SongId,
filename: &OsStr,
bytes: Arc<Vec<u8>>,
load_duration: bool,
custom_data: T,
);
/// pause playback. if `resume` is called, resume the song where it was paused.
fn pause(&mut self);
/// stop playback. if `resume` is called, restart the song.
fn stop(&mut self);
/// used after pause or stop.
/// does nothing if no song was playing, song was cleared, or a song is already playing.
fn resume(&mut self);
/// stop and discard the currently playing song, then set the next song as the current one.
/// `play` decides whether the next song should start playing or not.
fn next(&mut self, play: bool, load_duration: bool);
/// stop and discard the currently playing and next song.
/// calling `resume` after this was called but before a new song was loaded does nothing.
fn clear(&mut self);
/// Should be `true` after calling `resume()` or `next(true)` if `current_song().is_some()`
fn playing(&self) -> bool;
/// - `None` before a song is loaded or after `clear` was called,
/// - `Some((id, false))` while loading (if loading is done on a separate thread),
/// - `Some((id, true))` if a song is loaded and ready to be played (or loading failed)
/// performance notes: must be fast, as it is called repeatedly
fn current_song(&self) -> Option<(SongId, bool, &T)>;
/// like `has_current_song`.
/// performance notes: must be fast, as it is called repeatedly
fn next_song(&self) -> Option<(SongId, bool, &T)>;
fn gen_data_mut(&mut self) -> (Option<&mut T>, Option<&mut T>);
// if true, call `song_finished` more often. if false, song_finished may also be a constant `false`, but events should be sent directly to the server instead.
// this **must be constant**: it cannot change after the backend was constructed
fn song_finished_polling(&self) -> bool;
/// true if the currently playing song has finished playing.
/// this may return a constant `false` if the playback thread automatically sends a `NextSong` command to the database when this happens.
/// performance notes: must be fast, as it is called repeatedly
fn song_finished(&self) -> bool;
/// If possible, return the current song's duration in milliseconds.
/// It could also just return `None`.
/// If `load_duration` was `false` in either `load_next_song` or `next`, for performance reasons,
/// this should probably return `None` (unless getting the duration is virtually free).
/// `load_duration` can be ignored if you don't want to load the duration anyway, it's just there to prevent you from loading the duration if it won't be used anyway
fn current_song_duration(&self) -> Option<u64>;
/// If known, get the current playback position in the song, in milliseconds.
fn current_song_playback_position(&self) -> Option<u64>;
} }
impl Player { impl<T: PlayerBackend<SongCustomData>> Player<T> {
pub fn new() -> Result<Self, Box<dyn std::error::Error>> { pub fn new(backend: T) -> Self {
let (manager, backend) = awedio::start()?; Self {
Ok(Self { cached: HashMap::new(),
manager,
backend, backend,
source: None,
current_song_id: SongOpt::None,
cached: HashSet::new(),
allow_sending_commands: true,
})
} }
pub fn without_sending_commands(mut self) -> Self {
self.allow_sending_commands = false;
self
} }
pub fn handle_command(&mut self, command: &Command) { pub fn handle_command(&mut self, command: &Command) {
match command { match command {
@ -63,189 +91,136 @@ impl Player {
} }
} }
pub fn pause(&mut self) { pub fn pause(&mut self) {
if let Some((source, _notif)) = &mut self.source { self.backend.pause();
source.set_paused(true);
}
} }
pub fn resume(&mut self) { pub fn resume(&mut self) {
if let Some((source, _notif)) = &mut self.source { self.backend.resume();
source.set_paused(false);
} else if let SongOpt::Some(id) = &self.current_song_id {
// there is no source to resume playback on, but there is a current song
self.current_song_id = SongOpt::New(Some(*id));
}
} }
pub fn stop(&mut self) { pub fn stop(&mut self) {
if let Some((source, _notif)) = &mut self.source { self.backend.stop();
source.set_paused(true);
} }
if let SongOpt::Some(id) | SongOpt::New(Some(id)) = self.current_song_id {
self.current_song_id = SongOpt::New(Some(id)); pub fn update(&mut self, db: &mut Database) {
} else { self.update_uncache_opt(db, true)
self.current_song_id = SongOpt::New(None);
}
}
pub fn update(
&mut self,
db: &mut Database,
command_sender: &Arc<impl Fn(Command) + Send + Sync + 'static>,
) {
self.update_uncache_opt(db, command_sender, true)
} }
/// never uncache songs (this is something the CacheManager has to do if you decide to use this function) /// never uncache songs (this is something the CacheManager has to do if you decide to use this function)
pub fn update_dont_uncache( pub fn update_dont_uncache(&mut self, db: &mut Database) {
&mut self, self.update_uncache_opt(db, false)
db: &mut Database,
command_sender: &Arc<impl Fn(Command) + Send + Sync + 'static>,
) {
self.update_uncache_opt(db, command_sender, false)
}
pub fn update_uncache_opt(
&mut self,
db: &mut Database,
command_sender: &Arc<impl Fn(Command) + Send + Sync + 'static>,
allow_uncaching: bool,
) {
macro_rules! apply_command {
($cmd:expr) => {
if self.allow_sending_commands {
db.apply_command($cmd);
}
};
}
if db.playing && self.source.is_none() {
if let Some(song) = db.queue.get_current_song() {
// db playing, but no source - initialize a source (via SongOpt::New)
self.current_song_id = SongOpt::New(Some(*song));
} else {
// db.playing, but no song in queue...
apply_command!(Command::Stop);
}
} else if let Some((_source, notif)) = &mut self.source {
if notif.load(std::sync::atomic::Ordering::Relaxed) {
// song has finished playing
self.current_song_id = SongOpt::New(db.queue.get_current_song().cloned());
} }
pub fn update_uncache_opt(&mut self, db: &mut Database, allow_uncaching: bool) {
if self.backend.song_finished() {
db.apply_command(Command::NextSong);
} }
// check the queue's current index let queue_current_song = db.queue.get_current_song().copied();
if let SongOpt::None = self.current_song_id { let queue_next_song = db.queue.get_next_song().copied();
if let Some(id) = db.queue.get_current_song() {
self.current_song_id = SongOpt::New(Some(*id));
}
} else if let SongOpt::Some(l_id) = &self.current_song_id {
if let Some(id) = db.queue.get_current_song() {
if *id != *l_id {
self.current_song_id = SongOpt::New(Some(*id));
}
} else {
self.current_song_id = SongOpt::New(None);
}
}
// new current song match (self.backend.current_song().map(|v| v.0), queue_current_song) {
if let SongOpt::New(song_opt) = &self.current_song_id { (None, None) => (),
// stop playback (Some(a), Some(b)) if a == b => (),
// eprintln!("[play] stopping playback"); (_, Some(id)) => {
self.manager.clear(); if self.backend.next_song().map(|v| v.0) == queue_current_song {
if let Some(song_id) = song_opt { let load_duration = self
// start playback again .backend
if let Some(song) = db.get_song(song_id) { .next_song()
// eprintln!("[play] starting playback..."); .is_some_and(|(_, _, t)| t.load_duration);
// add our song self.backend.next(db.playing, load_duration);
let ext = match &song.location.rel_path.extension() { if load_duration {
Some(s) => s.to_str().unwrap_or(""), if let Some(dur) = self.backend.current_song_duration() {
None => "", db.apply_command(Command::SetSongDuration(id, dur))
};
if db.playing {
if let Some(bytes) = song.cached_data_now(db) {
self.cached.insert(song.id);
match Self::sound_from_bytes(ext, bytes) {
Ok(v) => {
let (sound, notif) =
v.pausable().with_async_completion_notifier();
// add it
let (sound, controller) = sound.controllable();
let finished = Arc::new(AtomicBool::new(false));
let fin = Arc::clone(&finished);
let command_sender = Arc::clone(command_sender);
std::thread::spawn(move || {
// `blocking_recv` returns `Err(_)` when the sound is dropped, so the thread won't linger forever
if let Ok(_v) = notif.blocking_recv() {
fin.store(true, std::sync::atomic::Ordering::Relaxed);
command_sender(Command::NextSong);
}
});
self.source = Some((controller, finished));
// and play it
self.manager.play(Box::new(sound));
}
Err(e) => {
eprintln!(
"[{}] [player] Can't play, skipping! {e}",
"INFO".blue()
);
apply_command!(Command::NextSong);
} }
} }
} else { } else if let Some(song) = db.get_song(&id) {
// couldn't load song bytes self.cached.insert(id, song.cached_data().clone());
db.broadcast_update(&Command::ErrorInfo( if let Some(bytes) = song
"NoSongData".to_owned(), .cached_data()
format!("Couldn't load song #{}\n({})", song.id, song.title), .get_data_or_maybe_start_thread(db, song)
)); .or_else(|| song.cached_data().cached_data_await())
apply_command!(Command::NextSong);
}
} else {
self.source = None;
song.cache_data_start_thread(&db);
self.cached.insert(song.id);
}
} else {
panic!("invalid song ID: current_song_id not found in DB!");
}
self.current_song_id = SongOpt::Some(*song_id);
} else {
self.current_song_id = SongOpt::None;
}
let next_song = db.queue.get_next_song().and_then(|v| db.get_song(v));
if allow_uncaching {
for &id in &self.cached {
if Some(id) != next_song.map(|v| v.id)
&& !matches!(self.current_song_id, SongOpt::Some(v) if v == id)
{ {
if let Some(song) = db.songs().get(&id) { let load_duration = song.duration_millis == 0;
if let Ok(_) = song.uncache_data() { self.backend.load_next_song(
self.cached.remove(&id); id,
break; song.location
.rel_path
.file_name()
.unwrap_or_else(|| OsStr::new("")),
bytes,
load_duration,
SongCustomData { load_duration },
);
self.backend.next(db.playing, load_duration);
if load_duration {
if let Some(dur) = self.backend.current_song_duration() {
db.apply_command(Command::SetSongDuration(id, dur))
}
} }
} else { } else {
self.cached.remove(&id); // only show an error if the user tries to play the song.
break; // otherwise, the error might be spammed.
if db.playing {
db.apply_command(Command::ErrorInfo(
format!("Couldn't load bytes for song {id}"),
format!(
"Song: {}\nby {:?} on {:?}",
song.title, song.artist, song.album
),
));
db.apply_command(Command::NextSong);
}
self.backend.clear();
}
} else {
self.backend.clear();
}
}
(Some(_), None) => self.backend.clear(),
}
match (self.backend.next_song().map(|v| v.0), queue_next_song) {
(None, None) => (),
(Some(a), Some(b)) if a == b => (),
(_, Some(id)) => {
if let Some(song) = db.get_song(&id) {
self.cached.insert(id, song.cached_data().clone());
if let Some(bytes) =
song.cached_data().get_data_or_maybe_start_thread(&db, song)
{
let load_duration = song.duration_millis == 0;
self.backend.load_next_song(
id,
song.location
.rel_path
.file_name()
.unwrap_or_else(|| OsStr::new("")),
bytes,
load_duration,
SongCustomData { load_duration },
);
} }
} }
} }
(Some(_), None) => (),
} }
if let Some(song) = next_song { if db.playing != self.backend.playing() {
song.cache_data_start_thread(&db); if db.playing {
self.cached.insert(song.id); self.backend.resume();
// if we can't resume (i.e. there is no song), send `Pause` command
if !self.backend.playing() {
db.apply_command(Command::Pause);
} }
} else {
self.backend.pause();
} }
} }
/// partly identical to awedio/src/sounds/open_file.rs open_file_with_reader(), which is a private function I can't access if allow_uncaching {
fn sound_from_bytes( for (&id, cd) in &self.cached {
extension: &str, if Some(id) != queue_current_song && Some(id) != queue_next_song {
bytes: Arc<Vec<u8>>, if let Ok(_) = cd.uncache_data() {
) -> Result<Box<dyn Sound>, std::io::Error> { self.cached.remove(&id);
let reader = ArcU8Reader::new(bytes); break;
Ok(match extension { }
"wav" => Box::new( }
awedio::sounds::decoders::WavDecoder::new(reader) }
.map_err(|_e| std::io::Error::from(std::io::ErrorKind::InvalidData))?, }
),
"mp3" => Box::new(awedio::sounds::decoders::Mp3Decoder::new(reader)),
_ => return Err(std::io::Error::from(std::io::ErrorKind::Unsupported)),
})
} }
} }

View File

@ -0,0 +1,139 @@
use std::{ffi::OsStr, sync::Arc};
use rc_u8_reader::ArcU8Reader;
use rodio::{decoder::DecoderError, Decoder, OutputStream, OutputStreamHandle, Sink, Source};
use crate::{data::SongId, server::Command};
use super::PlayerBackend;
pub struct PlayerBackendRodio<T> {
#[allow(unused)]
output_stream: OutputStream,
#[allow(unused)]
output_stream_handle: OutputStreamHandle,
sink: Sink,
stopped: bool,
current: Option<(SongId, Arc<Vec<u8>>, Option<u128>, T)>,
next: Option<(SongId, Arc<Vec<u8>>, Option<MyDecoder>, T)>,
command_sender: std::sync::mpsc::Sender<Command>,
}
impl<T> PlayerBackendRodio<T> {
pub fn new(
command_sender: std::sync::mpsc::Sender<Command>,
) -> Result<Self, Box<dyn std::error::Error>> {
let (output_stream, output_stream_handle) = rodio::OutputStream::try_default()?;
let sink = Sink::try_new(&output_stream_handle)?;
Ok(Self {
output_stream,
output_stream_handle,
sink,
stopped: true,
current: None,
next: None,
command_sender,
})
}
}
impl<T> PlayerBackend<T> for PlayerBackendRodio<T> {
fn load_next_song(
&mut self,
id: SongId,
_filename: &OsStr,
bytes: Arc<Vec<u8>>,
_load_duration: bool,
custom_data: T,
) {
let decoder = decoder_from_bytes(Arc::clone(&bytes));
if let Err(e) = &decoder {
self.command_sender
.send(Command::ErrorInfo(
format!("Couldn't decode song #{id}!"),
format!("Error: '{e}'"),
))
.unwrap();
}
self.next = Some((id, bytes, decoder.ok(), custom_data));
}
fn pause(&mut self) {
self.sink.pause();
}
fn stop(&mut self) {
if !self.stopped {
self.sink.clear();
if let Some((_, bytes, _, _)) = &self.current {
if let Ok(decoder) = decoder_from_bytes(Arc::clone(bytes)) {
self.sink.append(decoder);
}
}
}
}
fn resume(&mut self) {
self.stopped = false;
self.sink.play();
}
fn next(&mut self, play: bool, load_duration: bool) {
self.stopped = false;
self.sink.clear();
self.current = self
.next
.take()
.map(|(id, bytes, mut decoder, custom_data)| {
let duration = if let Some(decoder) = decoder.take() {
let duration = if load_duration {
dbg!(decoder.total_duration().map(|v| v.as_millis()))
} else {
None
};
self.sink.append(decoder);
if play {
self.sink.play();
}
duration
} else {
None
};
(id, bytes, duration, custom_data)
});
}
fn clear(&mut self) {
self.sink.clear();
}
fn playing(&self) -> bool {
!(self.sink.is_paused() || self.sink.empty())
}
fn current_song(&self) -> Option<(SongId, bool, &T)> {
self.current.as_ref().map(|(id, _, _, t)| (*id, true, t))
}
fn next_song(&self) -> Option<(SongId, bool, &T)> {
self.next.as_ref().map(|(id, _, _, t)| (*id, true, t))
}
fn gen_data_mut(&mut self) -> (Option<&mut T>, Option<&mut T>) {
(
self.current.as_mut().map(|(_, _, _, t)| t),
self.next.as_mut().map(|(_, _, _, t)| t),
)
}
fn song_finished_polling(&self) -> bool {
true
}
fn song_finished(&self) -> bool {
self.current.is_some() && self.sink.empty()
}
fn current_song_duration(&self) -> Option<u64> {
self.current
.as_ref()
.and_then(|(_, _, dur, _)| dur.map(|v| v as _))
}
fn current_song_playback_position(&self) -> Option<u64> {
None
}
}
type MyDecoder = Decoder<ArcU8Reader<Vec<u8>>>;
fn decoder_from_bytes(bytes: Arc<Vec<u8>>) -> Result<MyDecoder, DecoderError> {
Decoder::new(ArcU8Reader::new(Arc::clone(&bytes))).map(|decoder| decoder)
}

View File

@ -199,7 +199,20 @@ pub fn handle_one_connection_as_get(
.and_then(|id| id.parse().ok()) .and_then(|id| id.parse().ok())
.and_then(|id| { .and_then(|id| {
let db = db.lock().unwrap(); let db = db.lock().unwrap();
db.get_song(&id).and_then(|song| song.cached_data_now(&db)) if let Some(song) = db.get_song(&id) {
let cd = song.cached_data();
if let Some(data) =
cd.get_data_or_maybe_start_thread(&db, song)
{
Some(data)
} else {
let cd = cd.clone();
drop(db);
cd.cached_data_await()
}
} else {
None
}
}) })
{ {
writeln!(connection.get_mut(), "len: {}", bytes.len())?; writeln!(connection.get_mut(), "len: {}", bytes.len())?;

View File

@ -134,9 +134,16 @@ pub fn run_server_caching_thread_opt(
) { ) {
use std::time::Instant; use std::time::Instant;
use crate::data::cache_manager::CacheManager; use crate::{
data::cache_manager::CacheManager,
player::{rodio::PlayerBackendRodio, PlayerBackend},
};
let mut player = Player::new().unwrap(); // commands sent to this will be handeled later in this function in an infinite loop.
// these commands are sent to the database asap.
let (command_sender, command_receiver) = mpsc::channel();
let mut player = Player::new(PlayerBackendRodio::new(command_sender.clone()).unwrap());
let cache_manager = if let Some(func) = caching_thread { let cache_manager = if let Some(func) = caching_thread {
let mut cm = CacheManager::new(Arc::clone(&database)); let mut cm = CacheManager::new(Arc::clone(&database));
func(&mut cm); func(&mut cm);
@ -144,9 +151,6 @@ pub fn run_server_caching_thread_opt(
} else { } else {
None None
}; };
// commands sent to this will be handeled later in this function in an infinite loop.
// these commands are sent to the database asap.
let (command_sender, command_receiver) = mpsc::channel();
if let Some(s) = sender_sender { if let Some(s) = sender_sender {
s.blocking_send(command_sender.clone()).unwrap(); s.blocking_send(command_sender.clone()).unwrap();
} }
@ -199,19 +203,26 @@ pub fn run_server_caching_thread_opt(
} }
} }
} }
let dur = Duration::from_secs(10); let song_done_polling = player.backend.song_finished_polling();
let command_sender = Arc::new(move |cmd| { let (dur, check_every) = if song_done_polling {
_ = command_sender.send(cmd); (Duration::from_millis(50), 200)
}); } else {
(Duration::from_secs(10), 0)
};
let mut check = 0;
let mut checkf = true;
loop { loop {
{ check += 1;
if check >= check_every || checkf || player.backend.song_finished() {
check = 0;
checkf = false;
// at the start and once after every command sent to the server, // at the start and once after every command sent to the server,
let mut db = database.lock().unwrap(); let mut db = database.lock().unwrap();
// update the player // update the player
if cache_manager.is_some() { if cache_manager.is_some() {
player.update_dont_uncache(&mut db, &command_sender); player.update_dont_uncache(&mut db);
} else { } else {
player.update(&mut db, &command_sender); player.update(&mut db);
} }
// autosave if necessary // autosave if necessary
if let Some((first, last)) = db.times_data_modified { if let Some((first, last)) = db.times_data_modified {
@ -224,6 +235,7 @@ pub fn run_server_caching_thread_opt(
} }
} }
if let Ok(command) = command_receiver.recv_timeout(dur) { if let Ok(command) = command_receiver.recv_timeout(dur) {
checkf = true;
player.handle_command(&command); player.handle_command(&command);
database.lock().unwrap().apply_command(command); database.lock().unwrap().apply_command(command);
} }