add more modes to client and change server cli

server can now source its database and files from
another server, but it will have its own
queue and appears as separate to clients.

the client now has gui-syncplayer-{local,network}
modes which show the gui and also play the songs.
using a syncplayer-network mode now automatically
enables the cache manager, which should make
waiting for songs to load less frequent.
This commit is contained in:
Mark
2024-05-28 13:20:43 +02:00
parent 7d6d6d295b
commit 61110f5f4a
14 changed files with 405 additions and 139 deletions

View File

@@ -12,5 +12,5 @@ rodio = { version = "0.18.0", optional = true }
sysinfo = "0.30.12"
[features]
default = ["playback"]
# default = ["playback"]
playback = ["dep:rodio"]

View File

@@ -163,7 +163,7 @@ impl CacheManager {
Err(true) => {
break;
}
Ok(()) => {
Ok(true) => {
eprintln!(
"[{}] CacheManager :: Start caching bytes for song '{}'.",
"INFO".cyan(),
@@ -172,6 +172,7 @@ impl CacheManager {
sleep_short = true;
break;
}
Ok(false) => (),
}
}
}

View File

@@ -89,15 +89,16 @@ impl CachedData {
}
/// If no data is cached yet and no caching thread is running, starts a thread to cache the data.
pub fn cache_data_start_thread(&self, db: &Database, song: &Song) -> bool {
self.cache_data_start_thread_or_say_already_running(db, song)
.is_ok()
self.cache_data_start_thread_or_say_already_running(db, song) == Ok(true)
}
/// Ok(true) => thread started,
/// Ok(false) => data was already loaded
pub fn cache_data_start_thread_or_say_already_running(
&self,
db: &Database,
song: &Song,
) -> Result<(), bool> {
self.get_data_or_start_thread_and_say_already_running(db, |_| (), || (), song)
) -> Result<bool, bool> {
self.get_data_or_start_thread_and_say_already_running(db, |_| false, || true, song)
}
/// gets the data if available, or, if no thread is running, starts a thread to get the data.
/// if a thread is running, was started, or recently encountered an error, `None` is returned, otherwise `Some(data)`.
@@ -128,7 +129,7 @@ impl CachedData {
) -> Result<T, bool> {
let mut cd = self.0.lock().unwrap();
match cd.0.as_mut() {
Err(Some(i)) if i.elapsed().as_secs_f32() > 60.0 => return Err(false),
Err(Some(i)) if i.elapsed().as_secs_f32() < 60.0 => return Err(false),
Err(_) => (),
Ok(Err(t)) => {
if t.is_finished() {

View File

@@ -10,6 +10,7 @@ use crate::{
pub struct Player<T: PlayerBackend<SongCustomData>> {
cached: HashMap<SongId, CachedData>,
pub backend: T,
allow_sending_commands: bool,
}
pub struct SongCustomData {
@@ -80,6 +81,14 @@ impl<T: PlayerBackend<SongCustomData>> Player<T> {
Self {
cached: HashMap::new(),
backend,
allow_sending_commands: true,
}
}
pub fn new_client(backend: T) -> Self {
Self {
cached: HashMap::new(),
backend,
allow_sending_commands: false,
}
}
pub fn handle_command(&mut self, command: &Command) {
@@ -108,8 +117,10 @@ impl<T: PlayerBackend<SongCustomData>> Player<T> {
self.update_uncache_opt(db, false)
}
pub fn update_uncache_opt(&mut self, db: &mut Database, allow_uncaching: bool) {
if self.backend.song_finished() {
db.apply_command(Command::NextSong);
if self.allow_sending_commands {
if self.allow_sending_commands && self.backend.song_finished() {
db.apply_command(Command::NextSong);
}
}
let queue_current_song = db.queue.get_current_song().copied();
@@ -125,7 +136,7 @@ impl<T: PlayerBackend<SongCustomData>> Player<T> {
.next_song()
.is_some_and(|(_, _, t)| t.load_duration);
self.backend.next(db.playing, load_duration);
if load_duration {
if self.allow_sending_commands && load_duration {
if let Some(dur) = self.backend.current_song_duration() {
db.apply_command(Command::SetSongDuration(id, dur))
}
@@ -149,7 +160,7 @@ impl<T: PlayerBackend<SongCustomData>> Player<T> {
SongCustomData { load_duration },
);
self.backend.next(db.playing, load_duration);
if load_duration {
if self.allow_sending_commands && load_duration {
if let Some(dur) = self.backend.current_song_duration() {
db.apply_command(Command::SetSongDuration(id, dur))
}
@@ -157,7 +168,7 @@ impl<T: PlayerBackend<SongCustomData>> Player<T> {
} else {
// only show an error if the user tries to play the song.
// otherwise, the error might be spammed.
if db.playing {
if self.allow_sending_commands && db.playing {
db.apply_command(Command::ErrorInfo(
format!("Couldn't load bytes for song {id}"),
format!(
@@ -204,7 +215,7 @@ impl<T: PlayerBackend<SongCustomData>> Player<T> {
if db.playing {
self.backend.resume();
// if we can't resume (i.e. there is no song), send `Pause` command
if !self.backend.playing() {
if self.allow_sending_commands && !self.backend.playing() {
db.apply_command(Command::Pause);
}
} else {

View File

@@ -16,12 +16,20 @@ pub struct PlayerBackendRodio<T> {
stopped: bool,
current: Option<(SongId, Arc<Vec<u8>>, Option<u128>, T)>,
next: Option<(SongId, Arc<Vec<u8>>, Option<MyDecoder>, T)>,
command_sender: std::sync::mpsc::Sender<Command>,
command_sender: Option<std::sync::mpsc::Sender<Command>>,
}
impl<T> PlayerBackendRodio<T> {
pub fn new(
command_sender: std::sync::mpsc::Sender<Command>,
) -> Result<Self, Box<dyn std::error::Error>> {
Self::new_with_optional_command_sending(Some(command_sender))
}
pub fn new_without_command_sending() -> Result<Self, Box<dyn std::error::Error>> {
Self::new_with_optional_command_sending(None)
}
pub fn new_with_optional_command_sending(
command_sender: Option<std::sync::mpsc::Sender<Command>>,
) -> Result<Self, Box<dyn std::error::Error>> {
let (output_stream, output_stream_handle) = rodio::OutputStream::try_default()?;
let sink = Sink::try_new(&output_stream_handle)?;
@@ -48,12 +56,13 @@ impl<T> PlayerBackend<T> for PlayerBackendRodio<T> {
) {
let decoder = decoder_from_bytes(Arc::clone(&bytes));
if let Err(e) = &decoder {
self.command_sender
.send(Command::ErrorInfo(
if let Some(s) = &self.command_sender {
s.send(Command::ErrorInfo(
format!("Couldn't decode song #{id}!"),
format!("Error: '{e}'"),
))
.unwrap();
}
}
self.next = Some((id, bytes, decoder.ok(), custom_data));
}

View File

@@ -229,28 +229,49 @@ pub fn handle_one_connection_as_get(
}
}
"custom-file" => {
if let Some(bytes) = request.next().and_then(|path| {
let db = db.lock().unwrap();
let mut parent = match &db.custom_files {
None => None,
Some(None) => Some(db.lib_directory.clone()),
Some(Some(p)) => Some(p.clone()),
};
// check for malicious paths [TODO: Improve]
if Path::new(path).is_absolute() {
parent = None;
}
if let Some(parent) = parent {
let path = parent.join(path);
if path.starts_with(parent) {
fs::read(path).ok()
if let Some(bytes) =
request.next().and_then(|path| 'load_custom_file_data: {
let db = db.lock().unwrap();
let mut parent = match &db.custom_files {
None => {
if let Some(con) = &db.remote_server_as_song_file_source {
if let Ok(Ok(data)) =
con.lock().unwrap().custom_file(path)
{
break 'load_custom_file_data Some(data);
} else {
None
}
} else {
None
}
}
// if a remote source is present, this means we should ignore it. if no remote source is present, use the lib_dir.
Some(None) => {
if db.remote_server_as_song_file_source.is_none() {
Some(db.lib_directory.clone())
} else {
None
}
}
Some(Some(p)) => Some(p.clone()),
};
// check for malicious paths [TODO: Improve]
if Path::new(path).is_absolute() {
parent = None;
}
if let Some(parent) = parent {
let path = parent.join(path);
if path.starts_with(parent) {
fs::read(path).ok()
} else {
None
}
} else {
None
}
} else {
None
}
}) {
})
{
writeln!(connection.get_mut(), "len: {}", bytes.len())?;
connection.get_mut().write_all(&bytes)?;
} else {

View File

@@ -1,12 +1,18 @@
pub mod get;
use std::{
io::{Read, Write},
io::{BufRead as _, BufReader, Read, Write},
net::{SocketAddr, TcpListener},
sync::{mpsc, Arc, Mutex},
thread,
time::Duration,
};
use colorize::AnsiColor;
#[cfg(feature = "playback")]
use crate::player::Player;
use crate::server::get::handle_one_connection_as_get;
use crate::{
data::{
album::Album,
@@ -18,15 +24,6 @@ use crate::{
},
load::ToFromBytes,
};
#[cfg(feature = "playback")]
use crate::{player::Player, server::get::handle_one_connection_as_get};
#[cfg(feature = "playback")]
use std::{
io::{BufRead, BufReader},
net::{SocketAddr, TcpListener},
thread,
time::Duration,
};
#[derive(Clone, Debug)]
pub enum Command {
@@ -117,33 +114,45 @@ impl Command {
/// a) initialize new connections using db.init_connection() to synchronize the new client
/// b) handle the decoding of messages using Command::from_bytes()
/// c) re-encode all received messages using Command::to_bytes_vec(), send them to the db, and send them to all your clients.
#[cfg(feature = "playback")]
pub fn run_server(
database: Arc<Mutex<Database>>,
addr_tcp: Option<SocketAddr>,
sender_sender: Option<Box<dyn FnOnce(mpsc::Sender<Command>)>>,
play_audio: bool,
) {
run_server_caching_thread_opt(database, addr_tcp, sender_sender, None)
run_server_caching_thread_opt(database, addr_tcp, sender_sender, None, play_audio)
}
#[cfg(feature = "playback")]
pub fn run_server_caching_thread_opt(
database: Arc<Mutex<Database>>,
addr_tcp: Option<SocketAddr>,
sender_sender: Option<Box<dyn FnOnce(mpsc::Sender<Command>)>>,
caching_thread: Option<Box<dyn FnOnce(&mut crate::data::cache_manager::CacheManager)>>,
play_audio: bool,
) {
#[cfg(not(feature = "playback"))]
if play_audio {
panic!("Can't run the server: cannot play audio because the `playback` feature was disabled when compiling, but `play_audio` was set to `true`!");
}
use std::time::Instant;
use crate::{
data::cache_manager::CacheManager,
player::{rodio::PlayerBackendRodio, PlayerBackend},
};
use crate::data::cache_manager::CacheManager;
#[cfg(feature = "playback")]
use crate::player::{rodio::PlayerBackendRodio, PlayerBackend};
// commands sent to this will be handeled later in this function in an infinite loop.
// these commands are sent to the database asap.
let (command_sender, command_receiver) = mpsc::channel();
let mut player = Player::new(PlayerBackendRodio::new(command_sender.clone()).unwrap());
#[cfg(feature = "playback")]
let mut player = if play_audio {
Some(Player::new(
PlayerBackendRodio::new(command_sender.clone()).unwrap(),
))
} else {
None
};
#[allow(unused)]
let cache_manager = if let Some(func) = caching_thread {
let mut cm = CacheManager::new(Arc::clone(&database));
func(&mut cm);
@@ -203,7 +212,12 @@ pub fn run_server_caching_thread_opt(
}
}
}
let song_done_polling = player.backend.song_finished_polling();
#[cfg(feature = "playback")]
let song_done_polling = player
.as_ref()
.is_some_and(|p| p.backend.song_finished_polling());
#[cfg(not(feature = "playback"))]
let song_done_polling = false;
let (dur, check_every) = if song_done_polling {
(Duration::from_millis(50), 200)
} else {
@@ -213,16 +227,23 @@ pub fn run_server_caching_thread_opt(
let mut checkf = true;
loop {
check += 1;
if check >= check_every || checkf || player.backend.song_finished() {
#[cfg(feature = "playback")]
let song_finished = player.as_ref().is_some_and(|p| p.backend.song_finished());
#[cfg(not(feature = "playback"))]
let song_finished = false;
if check >= check_every || checkf || song_finished {
check = 0;
checkf = false;
// at the start and once after every command sent to the server,
let mut db = database.lock().unwrap();
// update the player
if cache_manager.is_some() {
player.update_dont_uncache(&mut db);
} else {
player.update(&mut db);
#[cfg(feature = "playback")]
if let Some(player) = &mut player {
if cache_manager.is_some() {
player.update_dont_uncache(&mut db);
} else {
player.update(&mut db);
}
}
// autosave if necessary
if let Some((first, last)) = db.times_data_modified {
@@ -236,7 +257,10 @@ pub fn run_server_caching_thread_opt(
}
if let Ok(command) = command_receiver.recv_timeout(dur) {
checkf = true;
player.handle_command(&command);
#[cfg(feature = "playback")]
if let Some(player) = &mut player {
player.handle_command(&command);
}
database.lock().unwrap().apply_command(command);
}
}