improve CacheManager

This commit is contained in:
Mark 2024-01-18 00:27:42 +01:00
parent b3aea8a4c9
commit b0417a72b6
4 changed files with 105 additions and 90 deletions

View File

@ -1,11 +1,10 @@
use std::{ use std::{
collections::BTreeSet,
sync::{ sync::{
atomic::{AtomicU32, AtomicU64}, atomic::{AtomicU32, AtomicU64},
Arc, Mutex, Arc, Mutex,
}, },
thread::{self, JoinHandle}, thread::{self, JoinHandle},
time::{Duration, Instant}, time::Duration,
}; };
use colorize::AnsiColor; use colorize::AnsiColor;
@ -34,12 +33,13 @@ impl CacheManager {
max_avail_mem: Arc::clone(&max_avail_mem), max_avail_mem: Arc::clone(&max_avail_mem),
songs_to_cache: Arc::clone(&songs_to_cache), songs_to_cache: Arc::clone(&songs_to_cache),
thread: Arc::new(thread::spawn(move || { thread: Arc::new(thread::spawn(move || {
let sleep_dur_long = Duration::from_secs(60); let sleep_dur_long = Duration::from_secs(20);
let sleep_dur_short = Duration::from_secs(5); let sleep_dur_short = Duration::from_secs(1);
let mut si = sysinfo::System::new_with_specifics( let mut si = sysinfo::System::new_with_specifics(
sysinfo::RefreshKind::new() sysinfo::RefreshKind::new()
.with_memory(sysinfo::MemoryRefreshKind::new().with_ram()), .with_memory(sysinfo::MemoryRefreshKind::new().with_ram()),
); );
eprintln!("[{}] Starting CacheManager", "INFO".cyan());
let mut sleep_short = true; let mut sleep_short = true;
loop { loop {
thread::sleep(if sleep_short { thread::sleep(if sleep_short {
@ -51,6 +51,7 @@ impl CacheManager {
si.refresh_memory_specifics(sysinfo::MemoryRefreshKind::new().with_ram()); si.refresh_memory_specifics(sysinfo::MemoryRefreshKind::new().with_ram());
let available_memory = si.available_memory(); let available_memory = si.available_memory();
let min_avail_mem = min_avail_mem.load(std::sync::atomic::Ordering::Relaxed); let min_avail_mem = min_avail_mem.load(std::sync::atomic::Ordering::Relaxed);
let low_memory = available_memory < min_avail_mem;
let max_avail_mem = max_avail_mem.load(std::sync::atomic::Ordering::Relaxed); let max_avail_mem = max_avail_mem.load(std::sync::atomic::Ordering::Relaxed);
let songs_to_cache = songs_to_cache.load(std::sync::atomic::Ordering::Relaxed); let songs_to_cache = songs_to_cache.load(std::sync::atomic::Ordering::Relaxed);
@ -58,92 +59,100 @@ impl CacheManager {
let db = database.lock().unwrap(); let db = database.lock().unwrap();
let (_queue_current_song, queue_next_song, ids_to_cache) = let (queue_current_song, queue_next_song, ids_to_cache) = if songs_to_cache <= 2
if songs_to_cache <= 2 { {
let queue_current_song = db.queue.get_current_song().copied(); let queue_current_song = db.queue.get_current_song().copied();
let queue_next_song = db.queue.get_next_song().copied(); let queue_next_song = db.queue.get_next_song().copied();
let ids_to_cache = queue_current_song (
.into_iter() queue_current_song,
.chain(queue_next_song) queue_next_song,
.collect::<Vec<_>>(); match (queue_current_song, queue_next_song) {
(None, None) => vec![],
( (Some(a), None) | (None, Some(a)) => vec![a],
queue_current_song, (Some(a), Some(b)) => {
queue_next_song, if a == b {
match (queue_current_song, queue_next_song) { vec![a]
(None, None) => vec![], } else {
(Some(a), None) | (None, Some(a)) => vec![a], vec![a, b]
(Some(a), Some(b)) => {
if a == b {
vec![a]
} else {
vec![a, b]
}
} }
},
)
} else {
let mut queue = db.queue.clone();
let mut actions = vec![];
let queue_current_song = queue.get_current_song().copied();
queue.advance_index_inner(vec![], &mut actions);
let queue_next_song = if actions.is_empty() {
queue.get_current_song().copied()
} else {
None
};
let mut ids_to_cache = queue_current_song
.into_iter()
.chain(queue_next_song)
.collect::<Vec<_>>();
for _ in 2..songs_to_cache {
queue.advance_index_inner(vec![], &mut actions);
if !actions.is_empty() {
break;
} }
if let Some(id) = queue.get_current_song() { },
if !ids_to_cache.contains(id) { )
ids_to_cache.push(*id); } else {
} let mut queue = db.queue.clone();
} else {
let mut actions = vec![];
let queue_current_song = queue.get_current_song().copied();
queue.advance_index_inner(vec![], &mut actions);
let queue_next_song = if actions.is_empty() {
queue.get_current_song().copied()
} else {
None
};
let mut ids_to_cache = queue_current_song
.into_iter()
.chain(queue_next_song)
.collect::<Vec<_>>();
for _ in 2..songs_to_cache {
queue.advance_index_inner(vec![], &mut actions);
if !actions.is_empty() {
break;
}
if let Some(id) = queue.get_current_song() {
if !ids_to_cache.contains(id) {
ids_to_cache.push(*id);
}
} else {
break;
}
}
(queue_current_song, queue_next_song, ids_to_cache)
};
if low_memory {
let mut found = false;
for (id, song) in db.songs().iter() {
if !ids_to_cache.contains(id) {
if let Ok(true) = song.uncache_data() {
eprintln!(
"[{}] CacheManager :: Uncached bytes for song '{}'.",
"INFO".cyan(),
song.title
);
found = true;
break; break;
} }
} }
}
(queue_current_song, queue_next_song, ids_to_cache) if !found {
}; // also uncache songs that should be cached, but not current/next song
for id in ids_to_cache.iter().rev() {
if available_memory < min_avail_mem { if !(queue_current_song.is_some_and(|i| i == *id)
let mem_to_free = min_avail_mem - available_memory; || queue_next_song.is_some_and(|i| i == *id))
let mut freed_memory = 0; {
for (id, song) in db.songs().iter() { if let Some(song) = db.get_song(id) {
if !ids_to_cache.contains(id) { if let Ok(true) = song.uncache_data() {
let cache = song.cached_data.0.lock().unwrap(); eprintln!(
if let Some(size) = cache.1 { "[{}] CacheManager :: Uncached bytes for song '{}'.",
if let Ok(true) = song.uncache_data() { "INFO".cyan(),
freed_memory += size; song.title
if freed_memory >= mem_to_free as usize { );
found = true;
break; break;
} }
} }
} }
} }
} }
eprintln!( if found {
"[{}] CacheManager :: Uncaching songs freed {:.1} mb of memory", // uncache more songs
if freed_memory >= mem_to_free as usize { sleep_short = true;
"INFO".cyan() }
} else {
sleep_short = true;
"INFO".blue()
},
freed_memory as f32 / 1024.0 / 1024.0
);
} else if available_memory > max_avail_mem { } else if available_memory > max_avail_mem {
// we have some memory left, maybe cache a song (or cache multiple if we know their byte-sizes) // we have some memory left, maybe cache a song (or cache multiple if we know their byte-sizes)
for song in &ids_to_cache { for song in &ids_to_cache {
@ -152,15 +161,14 @@ impl CacheManager {
Err(false) => (), Err(false) => (),
// thread already running, don't start a second one (otherwise we may load too many songs, using too much memory, causing a cache-uncache-cycle) // thread already running, don't start a second one (otherwise we may load too many songs, using too much memory, causing a cache-uncache-cycle)
Err(true) => { Err(true) => {
sleep_short = true;
break; break;
} }
Ok(()) => { Ok(()) => {
eprintln!( eprintln!(
"[{}] CacheManager :: Start caching bytes for song '{}'.", "[{}] CacheManager :: Start caching bytes for song '{}'.",
"INFO".cyan(), "INFO".cyan(),
song.title song.title
); );
sleep_short = true; sleep_short = true;
break; break;
} }

View File

@ -60,12 +60,19 @@ impl Song {
} }
pub fn uncache_data(&self) -> Result<bool, ()> { pub fn uncache_data(&self) -> Result<bool, ()> {
let mut cached = self.cached_data.0.lock().unwrap(); let mut cached = self.cached_data.0.lock().unwrap();
match cached.0.as_ref() { match cached.0.take() {
Some(Ok(_data)) => { Some(Ok(_data)) => Ok(true),
cached.0 = None; Some(Err(thread)) => {
Ok(true) if thread.is_finished() {
// get value from thread and drop it
_ = thread.join();
Ok(true)
} else {
// thread is still running...
cached.0 = Some(Err(thread));
Err(())
}
} }
Some(Err(_thread)) => Err(()),
None => Ok(false), None => Ok(false),
} }
} }

View File

@ -98,7 +98,7 @@ impl Player {
db: &mut Database, db: &mut Database,
command_sender: &Arc<impl Fn(Command) + Send + Sync + 'static>, command_sender: &Arc<impl Fn(Command) + Send + Sync + 'static>,
) { ) {
self.update_uncache_opt(db, command_sender, true) self.update_uncache_opt(db, command_sender, false)
} }
pub fn update_uncache_opt( pub fn update_uncache_opt(
&mut self, &mut self,

View File

@ -9,7 +9,7 @@ use std::{
}; };
use clap::Parser; use clap::Parser;
use musicdb_lib::server::{run_server, run_server_caching_thread_opt}; use musicdb_lib::server::run_server_caching_thread_opt;
use musicdb_lib::data::database::Database; use musicdb_lib::data::database::Database;