feat: playback-via-mpv, rust 2024

This commit is contained in:
Mark
2026-04-15 22:07:49 +02:00
committed by Mark
parent 8d4d418166
commit ab17477285
12 changed files with 460 additions and 94 deletions

View File

@@ -1,7 +1,7 @@
[package]
name = "musicdb-lib"
version = "0.1.0"
edition = "2021"
edition = "2024"
[dependencies]
base64 = "0.22.1"
@@ -18,5 +18,6 @@ playback = []
default-playback = ["playback-via-playback-rs"]
# default-playback = ["playback-via-rodio"]
playback-via-sleep = ["playback"]
playback-via-mpv = ["playback"]
playback-via-playback-rs = ["playback", "dep:playback-rs"]
playback-via-rodio = ["playback", "dep:rodio"]

View File

@@ -1,5 +1,5 @@
use std::{
collections::{HashMap, VecDeque},
collections::{BTreeMap, HashMap, VecDeque},
io::{Read, Write},
path::PathBuf,
};
@@ -164,6 +164,34 @@ where
Ok(o)
}
}
impl<K, V> ToFromBytes for BTreeMap<K, V>
where
K: ToFromBytes + std::cmp::Ord,
V: ToFromBytes,
{
fn to_bytes<T>(&self, s: &mut T) -> Result<(), std::io::Error>
where
T: Write,
{
self.len().to_bytes(s)?;
for (key, val) in self.iter() {
key.to_bytes(s)?;
val.to_bytes(s)?;
}
Ok(())
}
fn from_bytes<T>(s: &mut T) -> Result<Self, std::io::Error>
where
T: Read,
{
let len = ToFromBytes::from_bytes(s)?;
let mut o = Self::new();
for _ in 0..len {
o.insert(ToFromBytes::from_bytes(s)?, ToFromBytes::from_bytes(s)?);
}
Ok(o)
}
}
// - for (i/u)(size/8/16/32/64/128)

View File

@@ -1,3 +1,5 @@
#[cfg(feature = "playback-via-mpv")]
pub mod mpv;
#[cfg(feature = "playback-via-playback-rs")]
pub mod playback_rs;
#[cfg(feature = "playback-via-rodio")]
@@ -8,14 +10,18 @@ pub mod sleep;
pub type PlayerBackendFeat<T> = playback_rs::PlayerBackendPlaybackRs<T>;
#[cfg(feature = "playback-via-rodio")]
pub type PlayerBackendFeat<T> = rodio::PlayerBackendRodio<T>;
#[cfg(feature = "playback-via-sleep")]
pub type PlayerBackendFeat<T> = sleep::PlayerBackendSleep<T>;
#[cfg(feature = "playback-via-mpv")]
pub type PlayerBackendFeat<T> = mpv::PlayerBackendMpv<T>;
use std::{collections::HashMap, ffi::OsStr, sync::Arc};
use crate::{
data::{
SongId,
database::Database,
song::{CachedData, Song},
SongId,
},
server::Action,
};

View File

@@ -0,0 +1,252 @@
use std::{
ffi::OsStr,
io::Write,
os::unix::net::UnixStream,
process::Stdio,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
time::Duration,
};
use crate::{
data::{SongId, song::Song},
server::{Action, Command},
};
use super::PlayerBackend;
const IPC_QUIT: &[u8] = b"{\"command\":[\"quit\"]}\n";
const IPC_PAUSE: &[u8] = b"{\"command\":[\"set_property\",\"pause\",true]}\n";
const IPC_RESUME: &[u8] = b"{\"command\":[\"set_property\",\"pause\",false]}\n";
const IPC_STOP: &[u8] =
b"{\"command\":[\"set_property\",\"pause\",true]}\n{\"command\":[\"seek\",0,\"absolute\"]}\n";
pub struct PlayerBackendMpv<T> {
id: (u32, u8),
current: Option<(SongId, Option<(UnixStream, bool, Arc<AtomicBool>)>, T)>,
next: Option<(SongId, Option<(UnixStream, bool, Arc<AtomicBool>)>, T)>,
/// unused, but could be used to do something smarter than polling at some point
#[allow(unused)]
command_sender: Option<std::sync::mpsc::Sender<(Command, Option<u64>)>>,
}
impl<T> PlayerBackendMpv<T> {
pub fn new(
command_sender: std::sync::mpsc::Sender<(Command, Option<u64>)>,
) -> Result<Self, Box<dyn std::error::Error>> {
Self::new_with_optional_command_sending(Some(command_sender))
}
pub fn new_without_command_sending() -> Result<Self, Box<dyn std::error::Error>> {
Self::new_with_optional_command_sending(None)
}
pub fn new_with_optional_command_sending(
command_sender: Option<std::sync::mpsc::Sender<(Command, Option<u64>)>>,
) -> Result<Self, Box<dyn std::error::Error>> {
Ok(Self {
id: (std::process::id(), 0),
current: None,
next: None,
command_sender,
})
}
}
impl<T> PlayerBackend<T> for PlayerBackendMpv<T> {
fn load_next_song(
&mut self,
id: SongId,
_song: &Song,
_filename: &OsStr,
bytes: Arc<Vec<u8>>,
_load_duration: bool,
custom_data: T,
) {
if let Some((_, Some((mut ipc, _, quit)), _)) = self.next.take() {
quit.store(true, Ordering::Release);
ipc.write_all(IPC_QUIT).ok();
}
self.id.1 = 1 + (self.id.1 % 9);
let ipc_path = format!("/tmp/musicdb-server-mpv-ipc-{:X}-{}", self.id.0, self.id.1);
match std::process::Command::new("mpv")
.args(["--no-config", "--no-video", "--pause"])
.arg(format!("--input-ipc-server={ipc_path}"))
.args(["--no-terminal", "--cache=yes", "--cache-on-disk=no", "-"])
.stdin(Stdio::piped())
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()
{
Ok(mut proc) => {
let quit = Arc::new(AtomicBool::new(false));
for i in 1..=34 {
std::thread::sleep(Duration::from_millis(100 * i));
if let Ok(ipc) = UnixStream::connect(&ipc_path) {
self.next = Some((id, Some((ipc, false, Arc::clone(&quit))), custom_data));
break;
}
}
if self.next.is_some()
&& let Some(mut stdin) = proc.stdin.take()
{
let s = self.command_sender.clone();
std::thread::spawn(move || {
stdin.write_all(&bytes).ok();
drop(stdin);
match proc.wait() {
Ok(status) => {
if quit.load(Ordering::Acquire) {
return;
}
quit.store(true, Ordering::Release);
if let Some(s) = &s {
if status.success() {
eprintln!("mpv exited, success");
s.send((Action::NextSong.cmd(0xFFu8), None)).unwrap();
} else {
s.send((
Action::ErrorInfo(
"mpv process crashed!".to_owned(),
format!(
"Exit code: {}",
status
.code()
.map(|n| n.to_string())
.unwrap_or("unknown".to_owned())
),
)
.cmd(0xFFu8),
None,
))
.unwrap();
}
}
}
Err(e) => {
if quit.load(Ordering::Acquire) {
return;
}
quit.store(true, Ordering::Release);
if let Some(s) = &s {
s.send((
Action::ErrorInfo(
"Error waiting for mpv to exit!".to_owned(),
format!("Error: {e}"),
)
.cmd(0xFFu8),
None,
))
.unwrap();
}
}
}
});
} else {
proc.kill().ok();
if let Some(s) = &self.command_sender {
s.send((
Action::ErrorInfo(
"Error waiting for mpv to start!".to_owned(),
"Could not get process' stdin or could not connect to ipc socket."
.to_owned(),
)
.cmd(0xFFu8),
None,
))
.unwrap();
}
}
}
Err(e) => {
if let Some(s) = &self.command_sender {
s.send((
Action::ErrorInfo(
"Error starting mpv process!".to_owned(),
format!("Error: {e}"),
)
.cmd(0xFFu8),
None,
))
.unwrap();
}
}
}
}
fn pause(&mut self) {
if let Some((_, Some((ipc, playing, _)), _)) = &mut self.current {
ipc.write_all(IPC_PAUSE).ok();
ipc.flush().ok();
*playing = false;
}
}
fn stop(&mut self) {
if let Some((_, Some((ipc, playing, _)), _)) = &mut self.current {
ipc.write_all(IPC_STOP).ok();
ipc.flush().ok();
*playing = false;
}
}
fn resume(&mut self) {
if let Some((_, Some((ipc, playing, _)), _)) = &mut self.current {
ipc.write_all(IPC_RESUME).ok();
ipc.flush().ok();
*playing = true;
}
}
fn next(&mut self, play: bool, _load_duration: bool) {
if let Some((_, Some((mut ipc, _, quit)), _)) = self.current.take() {
quit.store(true, Ordering::Release);
ipc.write_all(IPC_QUIT).ok();
}
self.current = self.next.take();
if play {
self.resume();
}
}
fn clear(&mut self) {
self.next(false, false);
self.next(false, false);
}
fn playing(&self) -> bool {
if let Some((_, Some((_, playing, _)), _)) = self.current {
playing
} else {
false
}
}
fn current_song(&self) -> Option<(SongId, bool, &T)> {
self.current
.as_ref()
.map(|(id, _, custom)| (*id, true, custom))
}
fn next_song(&self) -> Option<(SongId, bool, &T)> {
self.next
.as_ref()
.map(|(id, _, custom)| (*id, true, custom))
}
fn gen_data_mut(&mut self) -> (Option<&mut T>, Option<&mut T>) {
(
self.current.as_mut().map(|(_, _, t)| t),
self.next.as_mut().map(|(_, _, t)| t),
)
}
fn song_finished_polling(&self) -> bool {
self.command_sender.is_none()
}
fn song_finished(&self) -> bool {
if self.command_sender.is_none()
&& let Some((_, Some((_, _, quit)), _)) = &self.current
{
quit.load(Ordering::Relaxed)
} else {
false
}
}
fn current_song_duration(&self) -> Option<u64> {
None
}
fn current_song_playback_position(&self) -> Option<u64> {
None
}
}

View File

@@ -5,7 +5,7 @@ use std::{
};
use crate::{
data::{song::Song, SongId},
data::{SongId, song::Song},
server::Command,
};
@@ -28,6 +28,14 @@ enum SongFinished {
impl<T> PlayerBackendSleep<T> {
pub fn new(
command_sender: std::sync::mpsc::Sender<(Command, Option<u64>)>,
) -> Result<Self, Box<dyn std::error::Error>> {
Self::new_with_optional_command_sending(Some(command_sender))
}
pub fn new_without_command_sending() -> Result<Self, Box<dyn std::error::Error>> {
Self::new_with_optional_command_sending(None)
}
pub fn new_with_optional_command_sending(
command_sender: Option<std::sync::mpsc::Sender<(Command, Option<u64>)>>,
) -> Result<Self, Box<dyn std::error::Error>> {
Ok(Self {

View File

@@ -3,7 +3,7 @@ pub mod get;
use std::{
io::{BufRead as _, BufReader, Read, Write},
net::TcpListener,
sync::{mpsc, Arc, Mutex},
sync::{Arc, Mutex, mpsc},
thread,
time::Duration,
};
@@ -15,12 +15,12 @@ use crate::player::Player;
use crate::server::get::handle_one_connection_as_get;
use crate::{
data::{
AlbumId, ArtistId, SongId,
album::Album,
artist::Artist,
database::{Cover, Database, UpdateEndpoint},
queue::Queue,
song::Song,
AlbumId, ArtistId, SongId,
},
load::ToFromBytes,
};
@@ -284,23 +284,15 @@ pub fn run_server_caching_thread_opt(
) {
#[cfg(not(feature = "playback"))]
if play_audio {
panic!("Can't run the server: cannot play audio because the `playback` feature was disabled when compiling, but `play_audio` was set to `true`!");
panic!(
"Can't run the server: cannot play audio because the `playback` feature was disabled when compiling, but `play_audio` was set to `true`!"
);
}
use std::time::Instant;
use crate::data::cache_manager::CacheManager;
#[cfg(feature = "playback-via-playback-rs")]
use crate::player::playback_rs::PlayerBackendPlaybackRs;
#[cfg(feature = "playback-via-rodio")]
use crate::player::rodio::PlayerBackendRodio;
#[cfg(feature = "playback-via-sleep")]
use crate::player::sleep::PlayerBackendSleep;
#[cfg(any(
feature = "playback",
feature = "playback-via-playback-rs",
feature = "playback-via-rodio"
))]
#[cfg(any(feature = "playback"))]
use crate::player::PlayerBackend;
// commands sent to this will be handeled later in this function in an infinite loop.
@@ -309,12 +301,8 @@ pub fn run_server_caching_thread_opt(
#[cfg(feature = "playback")]
let mut player = if play_audio {
#[cfg(feature = "playback-via-sleep")]
let backend = PlayerBackendSleep::new(Some(command_sender.clone())).unwrap();
#[cfg(feature = "playback-via-playback-rs")]
let backend = PlayerBackendPlaybackRs::new(command_sender.clone()).unwrap();
#[cfg(feature = "playback-via-rodio")]
let backend = PlayerBackendRodio::new(command_sender.clone()).unwrap();
use crate::player::PlayerBackendFeat;
let backend = PlayerBackendFeat::new(command_sender.clone()).unwrap();
Some(Player::new(backend))
} else {
None
@@ -336,42 +324,46 @@ pub fn run_server_caching_thread_opt(
Ok(v) => {
let command_sender = command_sender.clone();
let db = Arc::clone(&database);
thread::spawn(move || loop {
if let Ok((connection, _con_addr)) = v.accept() {
let command_sender = command_sender.clone();
let db = Arc::clone(&db);
thread::spawn(move || {
// each connection first has to send one line to tell us what it wants
let mut connection = BufReader::new(connection);
let mut line = String::new();
if connection.read_line(&mut line).is_ok() {
// based on that line, we adjust behavior
match line.as_str().trim() {
// sends all updates to this connection and reads commands from it
"main" => {
let connection = connection.into_inner();
_ = handle_one_connection_as_main(
db,
&mut connection.try_clone().unwrap(),
connection,
thread::spawn(move || {
loop {
if let Ok((connection, _con_addr)) = v.accept() {
let command_sender = command_sender.clone();
let db = Arc::clone(&db);
thread::spawn(move || {
// each connection first has to send one line to tell us what it wants
let mut connection = BufReader::new(connection);
let mut line = String::new();
if connection.read_line(&mut line).is_ok() {
// based on that line, we adjust behavior
match line.as_str().trim() {
// sends all updates to this connection and reads commands from it
"main" => {
let connection = connection.into_inner();
_ = handle_one_connection_as_main(
db,
&mut connection.try_clone().unwrap(),
connection,
&command_sender,
)
}
// reads commands from the connection, but (unlike main) doesn't send any updates
"control" => handle_one_connection_as_control(
&mut connection,
&command_sender,
)
}
// reads commands from the connection, but (unlike main) doesn't send any updates
"control" => handle_one_connection_as_control(
&mut connection,
&command_sender,
None,
),
"get" => _ = handle_one_connection_as_get(db, &mut connection),
_ => {
_ = connection
.into_inner()
.shutdown(std::net::Shutdown::Both)
None,
),
"get" => {
_ = handle_one_connection_as_get(db, &mut connection)
}
_ => {
_ = connection
.into_inner()
.shutdown(std::net::Shutdown::Both)
}
}
}
}
});
});
}
}
});
}