small improvements idk i forgot i had a git repo for this project

This commit is contained in:
Mark
2023-08-24 16:15:01 +02:00
parent 0ae0126f04
commit 9fbe67012e
17 changed files with 1894 additions and 455 deletions

View File

@@ -1,6 +1,8 @@
pub mod get;
use std::{
eprintln,
io::Write,
io::{BufRead, BufReader, Read, Write},
net::{SocketAddr, TcpListener},
path::PathBuf,
sync::{mpsc, Arc, Mutex},
@@ -12,12 +14,13 @@ use crate::{
data::{
album::Album,
artist::Artist,
database::{Database, UpdateEndpoint},
database::{Cover, Database, UpdateEndpoint},
queue::Queue,
song::Song,
},
load::ToFromBytes,
player::Player,
server::get::handle_one_connection_as_get,
};
#[derive(Clone, Debug)]
@@ -33,12 +36,14 @@ pub enum Command {
QueueInsert(Vec<usize>, usize, Queue),
QueueRemove(Vec<usize>),
QueueGoto(Vec<usize>),
QueueSetShuffle(Vec<usize>, Vec<usize>, usize),
/// .id field is ignored!
AddSong(Song),
/// .id field is ignored!
AddAlbum(Album),
/// .id field is ignored!
AddArtist(Artist),
AddCover(Cover),
ModifySong(Song),
ModifyAlbum(Album),
ModifyArtist(Artist),
@@ -93,33 +98,41 @@ pub fn run_server(
Ok(v) => {
let command_sender = command_sender.clone();
let db = Arc::clone(&database);
// each connection gets its own thread, but they will be idle most of the time (waiting for data on the tcp stream)
thread::spawn(move || loop {
if let Ok((mut connection, con_addr)) = v.accept() {
eprintln!("[info] TCP connection accepted from {con_addr}.");
if let Ok((connection, con_addr)) = v.accept() {
let command_sender = command_sender.clone();
let db = Arc::clone(&db);
thread::spawn(move || {
// sync database
let mut db = db.lock().unwrap();
db.init_connection(&mut connection)?;
// keep the client in sync:
// the db will send all updates to the client once it is added to update_endpoints
db.update_endpoints.push(UpdateEndpoint::Bytes(Box::new(
// try_clone is used here to split a TcpStream into Writer and Reader
connection.try_clone().unwrap(),
)));
// drop the mutex lock
drop(db);
// read updates from the tcp stream and send them to the database, exit on EOF or Err
loop {
if let Ok(command) = Command::from_bytes(&mut connection) {
command_sender.send(command).unwrap();
} else {
break;
eprintln!("[info] TCP connection accepted from {con_addr}.");
// each connection first has to send one line to tell us what it wants
let mut connection = BufReader::new(connection);
let mut line = String::new();
if connection.read_line(&mut line).is_ok() {
// based on that line, we adjust behavior
match line.as_str().trim() {
// sends all updates to this connection and reads commands from it
"main" => {
let connection = connection.into_inner();
_ = handle_one_connection_as_main(
db,
&mut connection.try_clone().unwrap(),
connection,
&command_sender,
)
}
// reads commands from the connection, but (unlike main) doesn't send any updates
"control" => handle_one_connection_as_control(
&mut connection,
&command_sender,
),
"get" => _ = handle_one_connection_as_get(db, &mut connection),
_ => {
_ = connection
.into_inner()
.shutdown(std::net::Shutdown::Both)
}
}
}
Ok::<(), std::io::Error>(())
});
}
});
@@ -141,24 +154,39 @@ pub fn run_server(
}
}
pub trait Connection: Sized + Send + 'static {
type SendError: Send;
fn send_command(&mut self, command: Command) -> Result<(), Self::SendError>;
fn receive_updates(&mut self) -> Result<Vec<Command>, Self::SendError>;
fn receive_update_blocking(&mut self) -> Result<Command, Self::SendError>;
fn move_to_thread<F: FnMut(&mut Self, Command) -> bool + Send + 'static>(
mut self,
mut handler: F,
) -> JoinHandle<Result<Self, Self::SendError>> {
std::thread::spawn(move || loop {
let update = self.receive_update_blocking()?;
if handler(&mut self, update) {
return Ok(self);
}
})
pub fn handle_one_connection_as_main(
db: Arc<Mutex<Database>>,
connection: &mut impl Read,
mut send_to: (impl Write + Sync + Send + 'static),
command_sender: &mpsc::Sender<Command>,
) -> Result<(), std::io::Error> {
// sync database
let mut db = db.lock().unwrap();
db.init_connection(&mut send_to)?;
// keep the client in sync:
// the db will send all updates to the client once it is added to update_endpoints
db.update_endpoints.push(UpdateEndpoint::Bytes(Box::new(
// try_clone is used here to split a TcpStream into Writer and Reader
send_to,
)));
// drop the mutex lock
drop(db);
handle_one_connection_as_control(connection, command_sender);
Ok(())
}
pub fn handle_one_connection_as_control(
connection: &mut impl Read,
command_sender: &mpsc::Sender<Command>,
) {
// read updates from the tcp stream and send them to the database, exit on EOF or Err
loop {
if let Ok(command) = Command::from_bytes(connection) {
command_sender.send(command).unwrap();
} else {
break;
}
}
}
impl ToFromBytes for Command {
fn to_bytes<T>(&self, s: &mut T) -> Result<(), std::io::Error>
where
@@ -200,6 +228,12 @@ impl ToFromBytes for Command {
s.write_all(&[0b00011011])?;
index.to_bytes(s)?;
}
Self::QueueSetShuffle(path, map, next) => {
s.write_all(&[0b10011011])?;
path.to_bytes(s)?;
map.to_bytes(s)?;
next.to_bytes(s)?;
}
Self::AddSong(song) => {
s.write_all(&[0b01010000])?;
song.to_bytes(s)?;
@@ -212,6 +246,10 @@ impl ToFromBytes for Command {
s.write_all(&[0b01011100])?;
artist.to_bytes(s)?;
}
Self::AddCover(cover) => {
s.write_all(&[0b01011101])?;
cover.to_bytes(s)?;
}
Self::ModifySong(song) => {
s.write_all(&[0b10010000])?;
song.to_bytes(s)?;
@@ -259,12 +297,18 @@ impl ToFromBytes for Command {
),
0b00011001 => Self::QueueRemove(ToFromBytes::from_bytes(s)?),
0b00011011 => Self::QueueGoto(ToFromBytes::from_bytes(s)?),
0b10011011 => Self::QueueSetShuffle(
ToFromBytes::from_bytes(s)?,
ToFromBytes::from_bytes(s)?,
ToFromBytes::from_bytes(s)?,
),
0b01010000 => Self::AddSong(ToFromBytes::from_bytes(s)?),
0b01010011 => Self::AddAlbum(ToFromBytes::from_bytes(s)?),
0b01011100 => Self::AddArtist(ToFromBytes::from_bytes(s)?),
0b10010000 => Self::AddSong(ToFromBytes::from_bytes(s)?),
0b10010011 => Self::AddAlbum(ToFromBytes::from_bytes(s)?),
0b10011100 => Self::AddArtist(ToFromBytes::from_bytes(s)?),
0b01011101 => Self::AddCover(ToFromBytes::from_bytes(s)?),
0b00110001 => Self::SetLibraryDirectory(ToFromBytes::from_bytes(s)?),
_ => {
eprintln!("unexpected byte when reading command; stopping playback.");