sequence numbers

This commit is contained in:
Mark
2024-12-08 12:03:24 +01:00
parent d02646406d
commit c3622aca30
18 changed files with 429 additions and 257 deletions

View File

@@ -11,7 +11,10 @@ use std::{
use colorize::AnsiColor;
use rand::thread_rng;
use crate::{load::ToFromBytes, server::Command};
use crate::{
load::ToFromBytes,
server::{Action, Command, Commander},
};
use super::{
album::Album,
@@ -22,6 +25,7 @@ use super::{
};
pub struct Database {
pub seq: Commander,
/// the directory that contains the dbfile, backups, statistics, ...
pub db_dir: PathBuf,
/// the path to the file used to save/load the data. empty if database is in client mode.
@@ -495,75 +499,93 @@ impl Database {
pub fn init_connection<T: Write>(&self, con: &mut T) -> Result<(), std::io::Error> {
// TODO! this is slow because it clones everything - there has to be a better way...
Command::SyncDatabase(
self.artists().iter().map(|v| v.1.clone()).collect(),
self.albums().iter().map(|v| v.1.clone()).collect(),
self.songs().iter().map(|v| v.1.clone()).collect(),
)
.to_bytes(con)?;
Command::QueueUpdate(vec![], self.queue.clone()).to_bytes(con)?;
self.seq
.pack(Action::SyncDatabase(
self.artists().iter().map(|v| v.1.clone()).collect(),
self.albums().iter().map(|v| v.1.clone()).collect(),
self.songs().iter().map(|v| v.1.clone()).collect(),
))
.to_bytes(con)?;
self.seq
.pack(Action::QueueUpdate(vec![], self.queue.clone()))
.to_bytes(con)?;
if self.playing {
Command::Resume.to_bytes(con)?;
self.seq.pack(Action::Resume).to_bytes(con)?;
}
// this allows clients to find out when init_connection is done.
Command::InitComplete.to_bytes(con)?;
self.seq.pack(Action::InitComplete).to_bytes(con)?;
// is initialized now - client can receive updates after this point.
// NOTE: Don't write to connection anymore - the db will dispatch updates on its own.
// we just need to handle commands (receive from the connection).
Ok(())
}
pub fn apply_command(&mut self, mut command: Command) {
/// `apply_action_unchecked_seq(command.action)` if `command.seq` is correct or `0xFF`
pub fn apply_command(&mut self, command: Command) {
if command.seq != self.seq.seq() && command.seq != 0xFF {
eprintln!(
"Invalid sequence number: got {} but expected {}.",
command.seq,
self.seq.seq()
);
return;
}
self.apply_action_unchecked_seq(command.action)
}
pub fn apply_action_unchecked_seq(&mut self, mut action: Action) {
if !self.is_client() {
if let Command::ErrorInfo(t, _) = &mut command {
if let Action::ErrorInfo(t, _) = &mut action {
// clients can send ErrorInfo to the server and it will show up on other clients,
// BUT only the server can set the Title of the ErrorInfo.
t.clear();
}
}
// some commands shouldn't be broadcast. these will broadcast a different command in their specific implementation.
match &command {
match &action {
// Will broadcast `QueueSetShuffle`
Command::QueueShuffle(_) => (),
Action::QueueShuffle(_) => (),
Action::NextSong if self.queue.is_almost_empty() => (),
Action::Pause if !self.playing => (),
Action::Resume if self.playing => (),
// since db.update_endpoints is empty for clients, this won't cause unwanted back and forth
_ => self.broadcast_update(&command),
_ => action = self.broadcast_update(action),
}
match command {
Command::Resume => self.playing = true,
Command::Pause => self.playing = false,
Command::Stop => self.playing = false,
Command::NextSong => {
match action {
Action::Resume => self.playing = true,
Action::Pause => self.playing = false,
Action::Stop => self.playing = false,
Action::NextSong => {
if !Queue::advance_index_db(self) {
// end of queue
self.apply_command(Command::Pause);
self.apply_action_unchecked_seq(Action::Pause);
self.queue.init();
}
}
Command::Save => {
Action::Save => {
if let Err(e) = self.save_database(None) {
eprintln!("[{}] Couldn't save: {e}", "ERR!".red());
}
}
Command::SyncDatabase(a, b, c) => self.sync(a, b, c),
Command::QueueUpdate(index, new_data) => {
Action::SyncDatabase(a, b, c) => self.sync(a, b, c),
Action::QueueUpdate(index, new_data) => {
if let Some(v) = self.queue.get_item_at_index_mut(&index, 0) {
*v = new_data;
}
}
Command::QueueAdd(index, new_data) => {
Action::QueueAdd(index, new_data) => {
if let Some(v) = self.queue.get_item_at_index_mut(&index, 0) {
v.add_to_end(new_data, false);
}
}
Command::QueueInsert(index, pos, new_data) => {
Action::QueueInsert(index, pos, new_data) => {
if let Some(v) = self.queue.get_item_at_index_mut(&index, 0) {
v.insert(new_data, pos, false);
}
}
Command::QueueRemove(index) => {
Action::QueueRemove(index) => {
self.queue.remove_by_index(&index, 0);
}
Command::QueueMove(index_from, mut index_to) => 'queue_move: {
Action::QueueMove(index_from, mut index_to) => 'queue_move: {
if index_to.len() == 0 || index_to.starts_with(&index_from) {
break 'queue_move;
}
@@ -605,7 +627,7 @@ impl Database {
}
}
}
Command::QueueMoveInto(index_from, mut parent_to) => 'queue_move_into: {
Action::QueueMoveInto(index_from, mut parent_to) => 'queue_move_into: {
if parent_to.starts_with(&index_from) {
break 'queue_move_into;
}
@@ -628,8 +650,8 @@ impl Database {
}
}
}
Command::QueueGoto(index) => Queue::set_index_db(self, &index),
Command::QueueShuffle(path) => {
Action::QueueGoto(index) => Queue::set_index_db(self, &index),
Action::QueueShuffle(path) => {
if let Some(elem) = self.queue.get_item_at_index_mut(&path, 0) {
if let QueueContent::Folder(QueueFolder {
index: _,
@@ -640,7 +662,7 @@ impl Database {
{
let mut ord: Vec<usize> = (0..content.len()).collect();
ord.shuffle(&mut thread_rng());
self.apply_command(Command::QueueSetShuffle(path, ord));
self.apply_action_unchecked_seq(Action::QueueSetShuffle(path, ord));
} else {
eprintln!("(QueueShuffle) QueueElement at {path:?} not a folder!");
}
@@ -648,7 +670,7 @@ impl Database {
eprintln!("(QueueShuffle) No QueueElement at {path:?}");
}
}
Command::QueueSetShuffle(path, ord) => {
Action::QueueSetShuffle(path, ord) => {
if let Some(elem) = self.queue.get_item_at_index_mut(&path, 0) {
if let QueueContent::Folder(QueueFolder {
index,
@@ -681,7 +703,7 @@ impl Database {
);
}
}
Command::QueueUnshuffle(path) => {
Action::QueueUnshuffle(path) => {
if let Some(elem) = self.queue.get_item_at_index_mut(&path, 0) {
if let QueueContent::Folder(QueueFolder {
index,
@@ -697,77 +719,77 @@ impl Database {
}
}
}
Command::AddSong(song) => {
Action::AddSong(song) => {
self.add_song_new(song);
}
Command::AddAlbum(album) => {
Action::AddAlbum(album) => {
self.add_album_new(album);
}
Command::AddArtist(artist) => {
Action::AddArtist(artist) => {
self.add_artist_new(artist);
}
Command::AddCover(cover) => _ = self.add_cover_new(cover),
Command::ModifySong(song) => {
Action::AddCover(cover) => _ = self.add_cover_new(cover),
Action::ModifySong(song) => {
_ = self.update_song(song);
}
Command::ModifyAlbum(album) => {
Action::ModifyAlbum(album) => {
_ = self.update_album(album);
}
Command::ModifyArtist(artist) => {
Action::ModifyArtist(artist) => {
_ = self.update_artist(artist);
}
Command::RemoveSong(song) => {
Action::RemoveSong(song) => {
_ = self.remove_song(song);
}
Command::RemoveAlbum(album) => {
Action::RemoveAlbum(album) => {
_ = self.remove_album(album);
}
Command::RemoveArtist(artist) => {
Action::RemoveArtist(artist) => {
_ = self.remove_artist(artist);
}
Command::TagSongFlagSet(id, tag) => {
Action::TagSongFlagSet(id, tag) => {
if let Some(v) = self.get_song_mut(&id) {
if !v.general.tags.contains(&tag) {
v.general.tags.push(tag);
}
}
}
Command::TagSongFlagUnset(id, tag) => {
Action::TagSongFlagUnset(id, tag) => {
if let Some(v) = self.get_song_mut(&id) {
if let Some(i) = v.general.tags.iter().position(|v| v == &tag) {
v.general.tags.remove(i);
}
}
}
Command::TagAlbumFlagSet(id, tag) => {
Action::TagAlbumFlagSet(id, tag) => {
if let Some(v) = self.albums.get_mut(&id) {
if !v.general.tags.contains(&tag) {
v.general.tags.push(tag);
}
}
}
Command::TagAlbumFlagUnset(id, tag) => {
Action::TagAlbumFlagUnset(id, tag) => {
if let Some(v) = self.albums.get_mut(&id) {
if let Some(i) = v.general.tags.iter().position(|v| v == &tag) {
v.general.tags.remove(i);
}
}
}
Command::TagArtistFlagSet(id, tag) => {
Action::TagArtistFlagSet(id, tag) => {
if let Some(v) = self.artists.get_mut(&id) {
if !v.general.tags.contains(&tag) {
v.general.tags.push(tag);
}
}
}
Command::TagArtistFlagUnset(id, tag) => {
Action::TagArtistFlagUnset(id, tag) => {
if let Some(v) = self.artists.get_mut(&id) {
if let Some(i) = v.general.tags.iter().position(|v| v == &tag) {
v.general.tags.remove(i);
}
}
}
Command::TagSongPropertySet(id, key, val) => {
Action::TagSongPropertySet(id, key, val) => {
if let Some(v) = self.get_song_mut(&id) {
let new = format!("{key}{val}");
if let Some(v) = v.general.tags.iter_mut().find(|v| v.starts_with(&key)) {
@@ -777,13 +799,13 @@ impl Database {
}
}
}
Command::TagSongPropertyUnset(id, key) => {
Action::TagSongPropertyUnset(id, key) => {
if let Some(v) = self.get_song_mut(&id) {
let tags = std::mem::replace(&mut v.general.tags, vec![]);
v.general.tags = tags.into_iter().filter(|v| !v.starts_with(&key)).collect();
}
}
Command::TagAlbumPropertySet(id, key, val) => {
Action::TagAlbumPropertySet(id, key, val) => {
if let Some(v) = self.albums.get_mut(&id) {
let new = format!("{key}{val}");
if let Some(v) = v.general.tags.iter_mut().find(|v| v.starts_with(&key)) {
@@ -793,13 +815,13 @@ impl Database {
}
}
}
Command::TagAlbumPropertyUnset(id, key) => {
Action::TagAlbumPropertyUnset(id, key) => {
if let Some(v) = self.albums.get_mut(&id) {
let tags = std::mem::replace(&mut v.general.tags, vec![]);
v.general.tags = tags.into_iter().filter(|v| !v.starts_with(&key)).collect();
}
}
Command::TagArtistPropertySet(id, key, val) => {
Action::TagArtistPropertySet(id, key, val) => {
if let Some(v) = self.artists.get_mut(&id) {
let new = format!("{key}{val}");
if let Some(v) = v.general.tags.iter_mut().find(|v| v.starts_with(&key)) {
@@ -809,21 +831,21 @@ impl Database {
}
}
}
Command::TagArtistPropertyUnset(id, key) => {
Action::TagArtistPropertyUnset(id, key) => {
if let Some(v) = self.artists.get_mut(&id) {
let tags = std::mem::replace(&mut v.general.tags, vec![]);
v.general.tags = tags.into_iter().filter(|v| !v.starts_with(&key)).collect();
}
}
Command::SetSongDuration(id, duration) => {
Action::SetSongDuration(id, duration) => {
if let Some(song) = self.get_song_mut(&id) {
song.duration_millis = duration;
}
}
Command::InitComplete => {
Action::InitComplete => {
self.client_is_init = true;
}
Command::ErrorInfo(..) => {}
Action::ErrorInfo(..) => {}
}
}
}
@@ -842,6 +864,7 @@ impl Database {
/// A client database doesn't need any storage paths and won't perform autosaves.
pub fn new_clientside() -> Self {
Self {
seq: Commander::new(true),
db_dir: PathBuf::new(),
db_file: PathBuf::new(),
lib_directory: PathBuf::new(),
@@ -862,6 +885,7 @@ impl Database {
pub fn new_empty_in_dir(dir: PathBuf, lib_dir: PathBuf) -> Self {
let path = dir.join("dbfile");
Self {
seq: Commander::new(false),
db_dir: dir,
db_file: path,
lib_directory: lib_dir,
@@ -887,6 +911,7 @@ impl Database {
let mut file = BufReader::new(File::open(&path)?);
eprintln!("[{}] loading library from {file:?}", "INFO".cyan());
let s = Self {
seq: Commander::new(false),
db_dir: dir,
db_file: path,
lib_directory,
@@ -948,11 +973,15 @@ impl Database {
self.times_data_modified = None;
Ok(path)
}
pub fn broadcast_update(&mut self, update: &Command) {
pub fn broadcast_update(&mut self, update: Action) -> Action {
match update {
Command::InitComplete => return,
Action::InitComplete => return update,
_ => {}
}
if !self.is_client() {
self.seq.inc();
}
let update = self.seq.pack(update);
let mut remove = vec![];
let mut bytes = None;
let mut arc = None;
@@ -974,7 +1003,7 @@ impl Database {
remove.push(i);
}
}
UpdateEndpoint::Custom(func) => func(update),
UpdateEndpoint::Custom(func) => func(&update),
UpdateEndpoint::CustomArc(func) => {
if arc.is_none() {
arc = Some(Arc::new(update.clone()));
@@ -999,6 +1028,7 @@ impl Database {
self.update_endpoints.remove(i);
}
}
update.action
}
pub fn sync(&mut self, artists: Vec<Artist>, albums: Vec<Album>, songs: Vec<Song>) {
self.modified_data();