get rid of tokio dependency in musicdb-lib,

and add more UpdateEndpoint options
This commit is contained in:
Mark 2024-05-14 21:24:01 +02:00
parent 0c0c535d2c
commit 55e0e02622
4 changed files with 62 additions and 37 deletions

View File

@ -10,7 +10,6 @@ rand = "0.8.5"
rc-u8-reader = "2.0.16" rc-u8-reader = "2.0.16"
rodio = { version = "0.18.0", optional = true } rodio = { version = "0.18.0", optional = true }
sysinfo = "0.30.12" sysinfo = "0.30.12"
tokio = { version = "1.37.0", features = ["sync"] }
[features] [features]
default = ["playback"] default = ["playback"]

View File

@ -60,8 +60,9 @@ impl<T: Read + Write + Send> ClientIo for T {}
pub enum UpdateEndpoint { pub enum UpdateEndpoint {
Bytes(Box<dyn Write + Sync + Send>), Bytes(Box<dyn Write + Sync + Send>),
CmdChannel(mpsc::Sender<Arc<Command>>), CmdChannel(mpsc::Sender<Arc<Command>>),
CmdChannelTokio(tokio::sync::mpsc::UnboundedSender<Arc<Command>>),
Custom(Box<dyn FnMut(&Command) + Send>), Custom(Box<dyn FnMut(&Command) + Send>),
CustomArc(Box<dyn FnMut(&Arc<Command>) + Send>),
CustomBytes(Box<dyn FnMut(&[u8]) + Send>),
} }
impl Database { impl Database {
@ -970,15 +971,19 @@ impl Database {
remove.push(i); remove.push(i);
} }
} }
UpdateEndpoint::CmdChannelTokio(sender) => { UpdateEndpoint::Custom(func) => func(update),
UpdateEndpoint::CustomArc(func) => {
if arc.is_none() { if arc.is_none() {
arc = Some(Arc::new(update.clone())); arc = Some(Arc::new(update.clone()));
} }
if sender.send(arc.clone().unwrap()).is_err() { func(arc.as_ref().unwrap())
remove.push(i);
} }
UpdateEndpoint::CustomBytes(func) => {
if bytes.is_none() {
bytes = Some(update.to_bytes_vec());
}
func(bytes.as_ref().unwrap())
} }
UpdateEndpoint::Custom(func) => func(update),
} }
} }
if !remove.is_empty() { if !remove.is_empty() {

View File

@ -3,6 +3,7 @@ use std::{
io::{BufRead, BufReader, Read, Write}, io::{BufRead, BufReader, Read, Write},
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::{Arc, Mutex}, sync::{Arc, Mutex},
time::Instant,
}; };
use crate::data::{database::Database, CoverId, SongId}; use crate::data::{database::Database, CoverId, SongId};
@ -128,10 +129,14 @@ impl<T: Write + Read> Client<T> {
} }
writeln!(self.0.get_mut(), "{}", con_get_encode_string(&str))?; writeln!(self.0.get_mut(), "{}", con_get_encode_string(&str))?;
self.0.get_mut().flush()?; self.0.get_mut().flush()?;
loop {
let mut response = String::new(); let mut response = String::new();
self.0.read_line(&mut response)?; self.0.read_line(&mut response)?;
let len_line = response.trim(); let len_line = response.trim();
if len_line.starts_with("len: ") { if len_line.starts_with('%') {
eprintln!("[Find Unused Song Files] Status: {}", len_line[1..].trim());
} else {
break if len_line.starts_with("len: ") {
if let Ok(len) = len_line[4..].trim().parse() { if let Ok(len) = len_line[4..].trim().parse() {
let mut out = Vec::with_capacity(len); let mut out = Vec::with_capacity(len);
for _ in 0..len { for _ in 0..len {
@ -152,6 +157,8 @@ impl<T: Write + Read> Client<T> {
} }
} else { } else {
Ok(Err(format!("bad len-line: {len_line}"))) Ok(Err(format!("bad len-line: {len_line}")))
};
};
} }
} }
} }
@ -307,9 +314,13 @@ pub fn handle_one_connection_as_get(
let unused = find_unused_song_files( let unused = find_unused_song_files(
&db, &db,
&lib_dir, &lib_dir,
&FindUnusedSongFilesConfig { &mut FindUnusedSongFilesConfig {
extensions: extensions extensions: extensions
.unwrap_or_else(|| Some(vec![".mp3".to_owned()])), .unwrap_or_else(|| Some(vec![".mp3".to_owned()])),
w: connection.get_mut(),
last_write: Instant::now(),
new: 0,
songs: 0,
}, },
); );
writeln!(connection.get_mut(), "len: {}", unused.len())?; writeln!(connection.get_mut(), "len: {}", unused.len())?;
@ -365,22 +376,26 @@ pub fn con_get_encode_string(line: &str) -> String {
fn find_unused_song_files( fn find_unused_song_files(
db: &Arc<Mutex<Database>>, db: &Arc<Mutex<Database>>,
path: &impl AsRef<Path>, path: &impl AsRef<Path>,
cfg: &FindUnusedSongFilesConfig, cfg: &mut FindUnusedSongFilesConfig,
) -> Vec<PathBuf> { ) -> Vec<PathBuf> {
let mut files = vec![]; let mut files = vec![];
find_unused_song_files_internal(db, path, &"", cfg, &mut files, &mut vec![], true); find_unused_song_files_internal(db, path, &"", cfg, &mut files, &mut vec![], true);
files files
} }
struct FindUnusedSongFilesConfig { struct FindUnusedSongFilesConfig<'a> {
extensions: Option<Vec<String>>, extensions: Option<Vec<String>>,
w: &'a mut dyn Write,
last_write: Instant,
new: usize,
songs: usize,
} }
fn find_unused_song_files_internal( fn find_unused_song_files_internal(
db: &Arc<Mutex<Database>>, db: &Arc<Mutex<Database>>,
path: &impl AsRef<Path>, path: &impl AsRef<Path>,
rel_path: &impl AsRef<Path>, rel_path: &impl AsRef<Path>,
cfg: &FindUnusedSongFilesConfig, cfg: &mut FindUnusedSongFilesConfig,
unused_files: &mut Vec<PathBuf>, unused_files: &mut Vec<PathBuf>,
files_buf: &mut Vec<PathBuf>, files_buf: &mut Vec<PathBuf>,
is_final: bool, is_final: bool,
@ -421,6 +436,7 @@ fn find_unused_song_files_internal(
} }
if (is_final && files_buf.len() > 0) || files_buf.len() > 50 { if (is_final && files_buf.len() > 0) || files_buf.len() > 50 {
let db = db.lock().unwrap(); let db = db.lock().unwrap();
cfg.songs += files_buf.len();
for song in db.songs().values() { for song in db.songs().values() {
if let Some(i) = files_buf if let Some(i) = files_buf
.iter() .iter()
@ -429,6 +445,11 @@ fn find_unused_song_files_internal(
files_buf.remove(i); files_buf.remove(i);
} }
} }
cfg.new += files_buf.len();
unused_files.extend(std::mem::replace(files_buf, vec![]).into_iter()); unused_files.extend(std::mem::replace(files_buf, vec![]).into_iter());
if cfg.last_write.elapsed().as_secs_f32() > 1.0 {
cfg.last_write = Instant::now();
_ = writeln!(cfg.w, "%{}/{}", cfg.new, cfg.songs);
}
} }
} }

View File

@ -121,7 +121,7 @@ impl Command {
pub fn run_server( pub fn run_server(
database: Arc<Mutex<Database>>, database: Arc<Mutex<Database>>,
addr_tcp: Option<SocketAddr>, addr_tcp: Option<SocketAddr>,
sender_sender: Option<tokio::sync::mpsc::Sender<mpsc::Sender<Command>>>, sender_sender: Option<Box<dyn FnOnce(mpsc::Sender<Command>)>>,
) { ) {
run_server_caching_thread_opt(database, addr_tcp, sender_sender, None) run_server_caching_thread_opt(database, addr_tcp, sender_sender, None)
} }
@ -129,7 +129,7 @@ pub fn run_server(
pub fn run_server_caching_thread_opt( pub fn run_server_caching_thread_opt(
database: Arc<Mutex<Database>>, database: Arc<Mutex<Database>>,
addr_tcp: Option<SocketAddr>, addr_tcp: Option<SocketAddr>,
sender_sender: Option<tokio::sync::mpsc::Sender<mpsc::Sender<Command>>>, sender_sender: Option<Box<dyn FnOnce(mpsc::Sender<Command>)>>,
caching_thread: Option<Box<dyn FnOnce(&mut crate::data::cache_manager::CacheManager)>>, caching_thread: Option<Box<dyn FnOnce(&mut crate::data::cache_manager::CacheManager)>>,
) { ) {
use std::time::Instant; use std::time::Instant;
@ -152,7 +152,7 @@ pub fn run_server_caching_thread_opt(
None None
}; };
if let Some(s) = sender_sender { if let Some(s) = sender_sender {
s.blocking_send(command_sender.clone()).unwrap(); s(command_sender.clone())
} }
database.lock().unwrap().command_sender = Some(command_sender.clone()); database.lock().unwrap().command_sender = Some(command_sender.clone());
if let Some(addr) = addr_tcp { if let Some(addr) = addr_tcp {