From 55e0e0262256b47408f27931d2aa8dfa9f22e691 Mon Sep 17 00:00:00 2001 From: Mark <> Date: Tue, 14 May 2024 21:24:01 +0200 Subject: [PATCH] get rid of tokio dependency in musicdb-lib, and add more UpdateEndpoint options --- musicdb-lib/Cargo.toml | 1 - musicdb-lib/src/data/database.rs | 17 +++++--- musicdb-lib/src/server/get.rs | 75 ++++++++++++++++++++------------ musicdb-lib/src/server/mod.rs | 6 +-- 4 files changed, 62 insertions(+), 37 deletions(-) diff --git a/musicdb-lib/Cargo.toml b/musicdb-lib/Cargo.toml index 6cd8a49..b7a870d 100644 --- a/musicdb-lib/Cargo.toml +++ b/musicdb-lib/Cargo.toml @@ -10,7 +10,6 @@ rand = "0.8.5" rc-u8-reader = "2.0.16" rodio = { version = "0.18.0", optional = true } sysinfo = "0.30.12" -tokio = { version = "1.37.0", features = ["sync"] } [features] default = ["playback"] diff --git a/musicdb-lib/src/data/database.rs b/musicdb-lib/src/data/database.rs index 91bc741..10b9e3b 100755 --- a/musicdb-lib/src/data/database.rs +++ b/musicdb-lib/src/data/database.rs @@ -60,8 +60,9 @@ impl ClientIo for T {} pub enum UpdateEndpoint { Bytes(Box), CmdChannel(mpsc::Sender>), - CmdChannelTokio(tokio::sync::mpsc::UnboundedSender>), Custom(Box), + CustomArc(Box) + Send>), + CustomBytes(Box), } impl Database { @@ -970,15 +971,19 @@ impl Database { remove.push(i); } } - UpdateEndpoint::CmdChannelTokio(sender) => { + UpdateEndpoint::Custom(func) => func(update), + UpdateEndpoint::CustomArc(func) => { if arc.is_none() { arc = Some(Arc::new(update.clone())); } - if sender.send(arc.clone().unwrap()).is_err() { - remove.push(i); - } + func(arc.as_ref().unwrap()) + } + UpdateEndpoint::CustomBytes(func) => { + if bytes.is_none() { + bytes = Some(update.to_bytes_vec()); + } + func(bytes.as_ref().unwrap()) } - UpdateEndpoint::Custom(func) => func(update), } } if !remove.is_empty() { diff --git a/musicdb-lib/src/server/get.rs b/musicdb-lib/src/server/get.rs index c0229ef..265d3ea 100755 --- a/musicdb-lib/src/server/get.rs +++ b/musicdb-lib/src/server/get.rs @@ -3,6 +3,7 @@ use std::{ io::{BufRead, BufReader, Read, Write}, path::{Path, PathBuf}, sync::{Arc, Mutex}, + time::Instant, }; use crate::data::{database::Database, CoverId, SongId}; @@ -128,30 +129,36 @@ impl Client { } writeln!(self.0.get_mut(), "{}", con_get_encode_string(&str))?; self.0.get_mut().flush()?; - let mut response = String::new(); - self.0.read_line(&mut response)?; - let len_line = response.trim(); - if len_line.starts_with("len: ") { - if let Ok(len) = len_line[4..].trim().parse() { - let mut out = Vec::with_capacity(len); - for _ in 0..len { - let mut line = String::new(); - self.0.read_line(&mut line)?; - let line = line.trim_end_matches(['\n', '\r']); - if line.starts_with('#') { - out.push((line[1..].to_owned(), false)) - } else if line.starts_with('!') { - out.push((line[1..].to_owned(), true)) - } else { - return Ok(Err(format!("bad line-format: {line}"))); - } - } - Ok(Ok(out)) + loop { + let mut response = String::new(); + self.0.read_line(&mut response)?; + let len_line = response.trim(); + if len_line.starts_with('%') { + eprintln!("[Find Unused Song Files] Status: {}", len_line[1..].trim()); } else { - Ok(Err(format!("bad len in len-line: {len_line}"))) - } - } else { - Ok(Err(format!("bad len-line: {len_line}"))) + break if len_line.starts_with("len: ") { + if let Ok(len) = len_line[4..].trim().parse() { + let mut out = Vec::with_capacity(len); + for _ in 0..len { + let mut line = String::new(); + self.0.read_line(&mut line)?; + let line = line.trim_end_matches(['\n', '\r']); + if line.starts_with('#') { + out.push((line[1..].to_owned(), false)) + } else if line.starts_with('!') { + out.push((line[1..].to_owned(), true)) + } else { + return Ok(Err(format!("bad line-format: {line}"))); + } + } + Ok(Ok(out)) + } else { + Ok(Err(format!("bad len in len-line: {len_line}"))) + } + } else { + Ok(Err(format!("bad len-line: {len_line}"))) + }; + }; } } } @@ -307,9 +314,13 @@ pub fn handle_one_connection_as_get( let unused = find_unused_song_files( &db, &lib_dir, - &FindUnusedSongFilesConfig { + &mut FindUnusedSongFilesConfig { extensions: extensions .unwrap_or_else(|| Some(vec![".mp3".to_owned()])), + w: connection.get_mut(), + last_write: Instant::now(), + new: 0, + songs: 0, }, ); writeln!(connection.get_mut(), "len: {}", unused.len())?; @@ -365,22 +376,26 @@ pub fn con_get_encode_string(line: &str) -> String { fn find_unused_song_files( db: &Arc>, path: &impl AsRef, - cfg: &FindUnusedSongFilesConfig, + cfg: &mut FindUnusedSongFilesConfig, ) -> Vec { let mut files = vec![]; find_unused_song_files_internal(db, path, &"", cfg, &mut files, &mut vec![], true); files } -struct FindUnusedSongFilesConfig { +struct FindUnusedSongFilesConfig<'a> { extensions: Option>, + w: &'a mut dyn Write, + last_write: Instant, + new: usize, + songs: usize, } fn find_unused_song_files_internal( db: &Arc>, path: &impl AsRef, rel_path: &impl AsRef, - cfg: &FindUnusedSongFilesConfig, + cfg: &mut FindUnusedSongFilesConfig, unused_files: &mut Vec, files_buf: &mut Vec, is_final: bool, @@ -421,6 +436,7 @@ fn find_unused_song_files_internal( } if (is_final && files_buf.len() > 0) || files_buf.len() > 50 { let db = db.lock().unwrap(); + cfg.songs += files_buf.len(); for song in db.songs().values() { if let Some(i) = files_buf .iter() @@ -429,6 +445,11 @@ fn find_unused_song_files_internal( files_buf.remove(i); } } + cfg.new += files_buf.len(); unused_files.extend(std::mem::replace(files_buf, vec![]).into_iter()); + if cfg.last_write.elapsed().as_secs_f32() > 1.0 { + cfg.last_write = Instant::now(); + _ = writeln!(cfg.w, "%{}/{}", cfg.new, cfg.songs); + } } } diff --git a/musicdb-lib/src/server/mod.rs b/musicdb-lib/src/server/mod.rs index 3f3ed5a..62dbe18 100755 --- a/musicdb-lib/src/server/mod.rs +++ b/musicdb-lib/src/server/mod.rs @@ -121,7 +121,7 @@ impl Command { pub fn run_server( database: Arc>, addr_tcp: Option, - sender_sender: Option>>, + sender_sender: Option)>>, ) { run_server_caching_thread_opt(database, addr_tcp, sender_sender, None) } @@ -129,7 +129,7 @@ pub fn run_server( pub fn run_server_caching_thread_opt( database: Arc>, addr_tcp: Option, - sender_sender: Option>>, + sender_sender: Option)>>, caching_thread: Option>, ) { use std::time::Instant; @@ -152,7 +152,7 @@ pub fn run_server_caching_thread_opt( None }; if let Some(s) = sender_sender { - s.blocking_send(command_sender.clone()).unwrap(); + s(command_sender.clone()) } database.lock().unwrap().command_sender = Some(command_sender.clone()); if let Some(addr) = addr_tcp {