improve song caching. can be optionally enabled using --advanced-caching in the server. see --help for help.

This commit is contained in:
Mark
2024-01-17 22:16:06 +01:00
parent 26b06fa336
commit 3ffd6b6377
8 changed files with 329 additions and 42 deletions

View File

@@ -11,6 +11,7 @@ base64 = "0.21.2"
colorize = "0.1.0"
rand = "0.8.5"
rc-u8-reader = "2.0.16"
sysinfo = "0.30.5"
tokio = { version = "1.29.1", features = ["sync"] }
[features]

View File

@@ -0,0 +1,204 @@
use std::{
collections::BTreeSet,
sync::{
atomic::{AtomicU32, AtomicU64},
Arc, Mutex,
},
thread::{self, JoinHandle},
time::{Duration, Instant},
};
use colorize::AnsiColor;
use super::database::Database;
// CacheManage will never uncache the currently playing song or the song that will be played next.
pub struct CacheManager {
/// Amount of bytes. If free system memory drops below this number, initiate cleanup.
pub min_avail_mem: Arc<AtomicU64>,
/// Amount of bytes. If free system memory is greater than this number, consider caching more songs.
pub max_avail_mem: Arc<AtomicU64>,
pub songs_to_cache: Arc<AtomicU32>,
thread: Arc<JoinHandle<()>>,
}
impl CacheManager {
pub fn new(database: Arc<Mutex<Database>>) -> Self {
let min_avail_mem = Arc::new(AtomicU64::new(1024 * 1024 * 1024));
let max_avail_mem = Arc::new(AtomicU64::new(1024 * 1024 * 2048));
// if < 2, does the same as 2.
let songs_to_cache = Arc::new(AtomicU32::new(10));
Self {
min_avail_mem: Arc::clone(&min_avail_mem),
max_avail_mem: Arc::clone(&max_avail_mem),
songs_to_cache: Arc::clone(&songs_to_cache),
thread: Arc::new(thread::spawn(move || {
let sleep_dur_long = Duration::from_secs(60);
let sleep_dur_short = Duration::from_secs(5);
let mut si = sysinfo::System::new_with_specifics(
sysinfo::RefreshKind::new()
.with_memory(sysinfo::MemoryRefreshKind::new().with_ram()),
);
let mut sleep_short = true;
loop {
thread::sleep(if sleep_short {
sleep_dur_short
} else {
sleep_dur_long
});
sleep_short = false;
si.refresh_memory_specifics(sysinfo::MemoryRefreshKind::new().with_ram());
let available_memory = si.available_memory();
let min_avail_mem = min_avail_mem.load(std::sync::atomic::Ordering::Relaxed);
let max_avail_mem = max_avail_mem.load(std::sync::atomic::Ordering::Relaxed);
let songs_to_cache = songs_to_cache.load(std::sync::atomic::Ordering::Relaxed);
// let db_lock_start_time = Instant::now();
let db = database.lock().unwrap();
let (_queue_current_song, queue_next_song, ids_to_cache) =
if songs_to_cache <= 2 {
let queue_current_song = db.queue.get_current_song().copied();
let queue_next_song = db.queue.get_next_song().copied();
let ids_to_cache = queue_current_song
.into_iter()
.chain(queue_next_song)
.collect::<Vec<_>>();
(
queue_current_song,
queue_next_song,
match (queue_current_song, queue_next_song) {
(None, None) => vec![],
(Some(a), None) | (None, Some(a)) => vec![a],
(Some(a), Some(b)) => {
if a == b {
vec![a]
} else {
vec![a, b]
}
}
},
)
} else {
let mut queue = db.queue.clone();
let mut actions = vec![];
let queue_current_song = queue.get_current_song().copied();
queue.advance_index_inner(vec![], &mut actions);
let queue_next_song = if actions.is_empty() {
queue.get_current_song().copied()
} else {
None
};
let mut ids_to_cache = queue_current_song
.into_iter()
.chain(queue_next_song)
.collect::<Vec<_>>();
for _ in 2..songs_to_cache {
queue.advance_index_inner(vec![], &mut actions);
if !actions.is_empty() {
break;
}
if let Some(id) = queue.get_current_song() {
if !ids_to_cache.contains(id) {
ids_to_cache.push(*id);
}
} else {
break;
}
}
(queue_current_song, queue_next_song, ids_to_cache)
};
if available_memory < min_avail_mem {
let mem_to_free = min_avail_mem - available_memory;
let mut freed_memory = 0;
for (id, song) in db.songs().iter() {
if !ids_to_cache.contains(id) {
let cache = song.cached_data.0.lock().unwrap();
if let Some(size) = cache.1 {
if let Ok(true) = song.uncache_data() {
freed_memory += size;
if freed_memory >= mem_to_free as usize {
break;
}
}
}
}
}
eprintln!(
"[{}] CacheManager :: Uncaching songs freed {:.1} mb of memory",
if freed_memory >= mem_to_free as usize {
"INFO".cyan()
} else {
sleep_short = true;
"INFO".blue()
},
freed_memory as f32 / 1024.0 / 1024.0
);
} else if available_memory > max_avail_mem {
// we have some memory left, maybe cache a song (or cache multiple if we know their byte-sizes)
for song in &ids_to_cache {
if let Some(song) = db.get_song(song) {
match song.cache_data_start_thread_or_say_already_running(&db) {
Err(false) => (),
// thread already running, don't start a second one (otherwise we may load too many songs, using too much memory, causing a cache-uncache-cycle)
Err(true) => {
sleep_short = true;
break;
}
Ok(()) => {
eprintln!(
"[{}] CacheManager :: Start caching bytes for song '{}'.",
"INFO".cyan(),
song.title
);
sleep_short = true;
break;
}
}
}
}
}
if let Some(song_id) = queue_next_song {
if let Some(song) = db.get_song(&song_id) {
if song.cache_data_start_thread(&db) {
eprintln!(
"[{}] CacheManager :: Start caching bytes for next song, '{}'.",
"INFO".cyan(),
song.title
);
}
}
}
}
})),
}
}
/// Songs will be removed from cache if `available_memory < min_avail_mem`.
/// New songs will only be cached if `available_memory > max_avail_mem`.
/// `min` and `max` in MiB (1024*1024 Bytes)
pub fn set_memory_mib(&self, min: u64, max: u64) {
self.min_avail_mem
.store(1024 * 1024 * min, std::sync::atomic::Ordering::Relaxed);
self.max_avail_mem
.store(1024 * 1024 * max, std::sync::atomic::Ordering::Relaxed);
}
/// How many songs to load ahead of time. `< 2` behaves like `2`.
/// Songs will be cached slowly over time.
/// New songs will only be cached if `available_memory > max_avail_mem`.
pub fn set_cache_songs_count(&self, count: u32) {
self.songs_to_cache
.store(count, std::sync::atomic::Ordering::Relaxed);
}
}

View File

@@ -7,6 +7,7 @@ use crate::load::ToFromBytes;
pub mod album;
pub mod artist;
pub mod cache_manager;
pub mod database;
pub mod queue;
pub mod song;

View File

@@ -354,7 +354,7 @@ impl Queue {
}
}
}
fn advance_index_inner(
pub fn advance_index_inner(
&mut self,
mut path: Vec<usize>,
actions: &mut Vec<QueueAction>,

View File

@@ -31,7 +31,7 @@ pub struct Song {
/// None => No cached data
/// Some(Err) => No cached data yet, but a thread is working on loading it.
/// Some(Ok(data)) => Cached data is available.
pub cached_data: Arc<Mutex<Option<Result<Arc<Vec<u8>>, JoinHandle<Option<Arc<Vec<u8>>>>>>>>,
pub cached_data: CachedData,
}
impl Song {
pub fn new(
@@ -55,47 +55,51 @@ impl Song {
file_size,
duration_millis,
general: GeneralData::default(),
cached_data: Arc::new(Mutex::new(None)),
cached_data: CachedData(Arc::new(Mutex::new((None, None)))),
}
}
pub fn uncache_data(&self) -> Result<(), ()> {
let mut cached = self.cached_data.lock().unwrap();
match cached.as_ref() {
pub fn uncache_data(&self) -> Result<bool, ()> {
let mut cached = self.cached_data.0.lock().unwrap();
match cached.0.as_ref() {
Some(Ok(_data)) => {
*cached = None;
Ok(())
cached.0 = None;
Ok(true)
}
Some(Err(_thread)) => Err(()),
None => Ok(()),
None => Ok(false),
}
}
/// If no data is cached yet and no caching thread is running, starts a thread to cache the data.
pub fn cache_data_start_thread(&self, db: &Database) -> bool {
let mut cd = self.cached_data.lock().unwrap();
let start_thread = match cd.as_ref() {
None => true,
Some(Err(_)) | Some(Ok(_)) => false,
self.cache_data_start_thread_or_say_already_running(db)
.is_ok()
}
pub fn cache_data_start_thread_or_say_already_running(
&self,
db: &Database,
) -> Result<(), bool> {
let mut cd = self.cached_data.0.lock().unwrap();
match cd.0.as_ref() {
None => (),
Some(Err(_)) => return Err(false),
Some(Ok(_)) => return Err(true),
};
if start_thread {
let src = if let Some(dlcon) = &db.remote_server_as_song_file_source {
Err((self.id, Arc::clone(dlcon)))
} else {
Ok(db.get_path(&self.location))
};
*cd = Some(Err(std::thread::spawn(move || {
let data = Self::load_data(src)?;
Some(Arc::new(data))
})));
true
let src = if let Some(dlcon) = &db.remote_server_as_song_file_source {
Err((self.id, Arc::clone(dlcon)))
} else {
false
}
Ok(db.get_path(&self.location))
};
cd.0 = Some(Err(std::thread::spawn(move || {
let data = Self::load_data(src)?;
Some(Arc::new(data))
})));
Ok(())
}
/// Gets the cached data, if available.
/// If a thread is running to load the data, it is not awaited.
/// This function doesn't block.
pub fn cached_data(&self) -> Option<Arc<Vec<u8>>> {
if let Some(Ok(v)) = self.cached_data.lock().unwrap().as_ref() {
if let Some(Ok(v)) = self.cached_data.0.lock().unwrap().0.as_ref() {
Some(Arc::clone(v))
} else {
None
@@ -106,8 +110,8 @@ impl Song {
/// This function will block until the data is loaded.
/// If it still returns none, some error must have occured.
pub fn cached_data_now(&self, db: &Database) -> Option<Arc<Vec<u8>>> {
let mut cd = self.cached_data.lock().unwrap();
*cd = match cd.take() {
let mut cd = self.cached_data.0.lock().unwrap();
cd.0 = match cd.0.take() {
None => {
let src = if let Some(dlcon) = &db.remote_server_as_song_file_source {
Err((self.id, Arc::clone(dlcon)))
@@ -127,6 +131,9 @@ impl Song {
},
Some(Ok(v)) => Some(Ok(v)),
};
if let Some(Ok(v)) = &cd.0 {
cd.1 = Some(v.len());
}
drop(cd);
self.cached_data()
}
@@ -214,7 +221,17 @@ impl ToFromBytes for Song {
file_size: ToFromBytes::from_bytes(s)?,
duration_millis: ToFromBytes::from_bytes(s)?,
general: ToFromBytes::from_bytes(s)?,
cached_data: Arc::new(Mutex::new(None)),
cached_data: CachedData(Arc::new(Mutex::new((None, None)))),
})
}
}
#[derive(Clone, Debug)]
pub struct CachedData(
pub Arc<
Mutex<(
Option<Result<Arc<Vec<u8>>, JoinHandle<Option<Arc<Vec<u8>>>>>>,
Option<usize>,
)>,
>,
);

View File

@@ -89,6 +89,22 @@ impl Player {
&mut self,
db: &mut Database,
command_sender: &Arc<impl Fn(Command) + Send + Sync + 'static>,
) {
self.update_uncache_opt(db, command_sender, true)
}
/// never uncache songs (this is something the CacheManager has to do if you decide to use this function)
pub fn update_dont_uncache(
&mut self,
db: &mut Database,
command_sender: &Arc<impl Fn(Command) + Send + Sync + 'static>,
) {
self.update_uncache_opt(db, command_sender, true)
}
pub fn update_uncache_opt(
&mut self,
db: &mut Database,
command_sender: &Arc<impl Fn(Command) + Send + Sync + 'static>,
allow_uncaching: bool,
) {
macro_rules! apply_command {
($cmd:expr) => {
@@ -193,18 +209,20 @@ impl Player {
self.current_song_id = SongOpt::None;
}
let next_song = db.queue.get_next_song().and_then(|v| db.get_song(v));
for &id in &self.cached {
if Some(id) != next_song.map(|v| v.id)
&& !matches!(self.current_song_id, SongOpt::Some(v) if v == id)
{
if let Some(song) = db.songs().get(&id) {
if let Ok(()) = song.uncache_data() {
if allow_uncaching {
for &id in &self.cached {
if Some(id) != next_song.map(|v| v.id)
&& !matches!(self.current_song_id, SongOpt::Some(v) if v == id)
{
if let Some(song) = db.songs().get(&id) {
if let Ok(_) = song.uncache_data() {
self.cached.remove(&id);
break;
}
} else {
self.cached.remove(&id);
break;
}
} else {
self.cached.remove(&id);
break;
}
}
}

View File

@@ -114,10 +114,28 @@ pub fn run_server(
database: Arc<Mutex<Database>>,
addr_tcp: Option<SocketAddr>,
sender_sender: Option<tokio::sync::mpsc::Sender<mpsc::Sender<Command>>>,
) {
run_server_caching_thread_opt(database, addr_tcp, sender_sender, None)
}
#[cfg(feature = "playback")]
pub fn run_server_caching_thread_opt(
database: Arc<Mutex<Database>>,
addr_tcp: Option<SocketAddr>,
sender_sender: Option<tokio::sync::mpsc::Sender<mpsc::Sender<Command>>>,
caching_thread: Option<Box<dyn FnOnce(&mut crate::data::cache_manager::CacheManager)>>,
) {
use std::time::Instant;
use crate::data::cache_manager::CacheManager;
let mut player = Player::new().unwrap();
let cache_manager = if let Some(func) = caching_thread {
let mut cm = CacheManager::new(Arc::clone(&database));
func(&mut cm);
Some(cm)
} else {
None
};
// commands sent to this will be handeled later in this function in an infinite loop.
// these commands are sent to the database asap.
let (command_sender, command_receiver) = mpsc::channel();
@@ -182,7 +200,11 @@ pub fn run_server(
// at the start and once after every command sent to the server,
let mut db = database.lock().unwrap();
// update the player
player.update(&mut db, &command_sender);
if cache_manager.is_some() {
player.update_dont_uncache(&mut db, &command_sender);
} else {
player.update(&mut db, &command_sender);
}
// autosave if necessary
if let Some((first, last)) = db.times_data_modified {
let now = Instant::now();