improve website, add "sleep" playback backend

This commit is contained in:
Mark
2025-02-28 20:38:30 +01:00
parent d39d030640
commit f1dd980b52
10 changed files with 534 additions and 18 deletions

View File

@@ -14,6 +14,8 @@ serde_json = "1.0"
tokio = { version = "1.37.0", optional = true, features = ["rt"] }
rocket = { version = "0.5.0", optional = true }
html-escape = { version = "0.2.13", optional = true }
rocket_ws = "0.1.1"
rocket_seek_stream = "0.2.6"
[target.aarch64-linux-android.dependencies]
# required for cross-compilation to android to work: link to shared c++ stdlib instead of c++_static
@@ -24,5 +26,6 @@ default = ["website", "default-playback"]
website = ["dep:tokio", "dep:rocket", "dep:html-escape"]
playback = []
default-playback = ["playback", "musicdb-lib/default-playback"]
playback-via-sleep = ["playback", "musicdb-lib/playback-via-sleep"]
playback-via-playback-rs = ["playback", "musicdb-lib/playback-via-playback-rs"]
playback-via-rodio = ["playback", "musicdb-lib/playback-via-rodio"]

View File

@@ -241,11 +241,7 @@ fn main() {
run_server(database, Some(Box::new(move |c| s.send(c).unwrap())))
});
let sender = r.recv().unwrap();
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(web::main(db, sender, *addr));
web::main(db, sender, *addr);
}
} else {
run_server(database, None);

View File

@@ -3,13 +3,20 @@ use std::sync::{mpsc, Arc, Mutex};
use musicdb_lib::data::album::Album;
use musicdb_lib::data::artist::Artist;
use musicdb_lib::data::database::Database;
use musicdb_lib::data::database::{Database, UpdateEndpoint};
use musicdb_lib::data::queue::{Queue, QueueContent, QueueFolder};
use musicdb_lib::data::song::Song;
use musicdb_lib::data::SongId;
use musicdb_lib::server::{Action, Command, Req};
use rocket::futures::{SinkExt, StreamExt};
use rocket::http::ContentType;
use rocket::response::content::RawHtml;
use rocket::{get, routes, Config, State};
use rocket::response::Responder;
use rocket::{get, routes, Config, Response, State};
use rocket_seek_stream::SeekStream;
use rocket_ws::{Message, WebSocket};
use tokio::select;
use tokio::sync::mpsc::Sender;
/*
@@ -41,6 +48,7 @@ const HTML_END: &'static str = "</body></html>";
struct Data {
db: Arc<Mutex<Database>>,
command_sender: mpsc::Sender<(Command, Option<u64>)>,
websocket_connections: Arc<tokio::sync::Mutex<Vec<Sender<Message>>>>,
}
#[get("/")]
@@ -91,7 +99,151 @@ const nowPlayingDiv = document.getElementById("nowPlayingDiv");
const queueDiv = document.getElementById("queueDiv");
var didFinish = false;
var averageLoopTimeMs = 250;
var dbPlaying = null;
function showDbPlaying() {
if (dbPlaying === true) {
document.getElementById("playLiveCurrent").play();
} else if (dbPlaying === false) {
document.getElementById("playLiveCurrent").pause();
} else {
document.getElementById("playLiveCurrent").pause();
}
const postheader = document.getElementById("postheader");
if (postheader) {
if (dbPlaying === true) {
postheader.innerText = "";
} else if (dbPlaying === false) {
postheader.innerText = "(paused)";
} else {
postheader.innerText = "";
}
}
}
async function updateNowPlaying() {
nowPlayingDiv.innerHTML = await (await fetch("/now-playing-html")).text();
showDbPlaying();
}
async function updateQueue() {
queueDiv.innerHTML = await (await fetch("/queue-html")).text();
}
var livePlaybackCurrentId = null;
var livePlaybackNextId = null;
async function updateLivePlaybackIds() {
if (document.getElementById("playLiveEnabled").checked) {
let resp = (await (await fetch("/now-playing-ids")).text()).trim();
let current = null;
let next = null;
if (resp != "") {
if (resp.includes("/")) {
let [c, n] = resp.split("/");
current = c.trim();
next = n.trim();
} else {
current = resp;
}
}
if (current !== livePlaybackCurrentId) {
livePlaybackCurrentId = current;
document.getElementById("playLiveCurrentSrc").src = livePlaybackCurrentId == null ? "" : "/song/" + livePlaybackCurrentId + "/current";
let audioElem = document.getElementById("playLiveCurrent");
audioElem.pause();
audioElem.currentTime = 0;
if (dbPlaying) {
audioElem.setAttribute("autoplay", "");
} else {
audioElem.removeAttribute("autoplay");
}
audioElem.load();
if (dbPlaying) {
audioElem.play();
}
}
if (next !== livePlaybackNextId) {
livePlaybackNextId = next;
document.getElementById("playLiveNextSrc").src = livePlaybackNextId == null ? "" : "/song/" + livePlaybackNextId + "/next";
document.getElementById("playLiveNext").load();
}
} else {
if (livePlaybackCurrentId !== null) {
livePlaybackCurrentId = null;
let audioElem = document.getElementById("playLiveCurrent");
audioElem.pause();
audioElem.currentTime = 0;
document.getElementById("playLiveCurrentSrc").src = "";
audioElem.load();
}
if (livePlaybackNextId !== null) {
livePlaybackNextId = null;
document.getElementById("playLiveNextSrc").src = "";
document.getElementById("playLiveNext").load();
}
}
}
async function runLoop() {
let websocketConnection = null;
try {
websocketConnection = new WebSocket("/ws");
var websocketCounter = 0;
websocketConnection.addEventListener("message", async function(e) {
++websocketCounter;
if (websocketCounter > 2) {
websocketCounter = 0;
}
document.getElementById("warnLag").innerText = "using websocket" + (websocketCounter == 0 ? "." : (websocketCounter == 1 ? ".." : "..."));
switch (e.data.trim()) {
case "init/playing=true":
if (dbPlaying === null) {
dbPlaying = true;
showDbPlaying();
}
break;
case "init/playing=false":
if (dbPlaying === null) {
dbPlaying = false;
showDbPlaying();
}
break;
case "pause":
dbPlaying = false;
showDbPlaying();
break;
case "stop":
dbPlaying = false;
document.getElementById("playLiveCurrent").pause();
document.getElementById("playLiveCurrent").currentTime = 0;
showDbPlaying();
break;
case "resume":
dbPlaying = true;
showDbPlaying();
break;
case "next":
await updateLivePlaybackIds();
await updateNowPlaying();
await updateQueue();
break;
case "update/data":
await updateLivePlaybackIds();
await updateNowPlaying();
await updateQueue();
break;
case "update/queue":
await updateLivePlaybackIds();
await updateNowPlaying();
await updateQueue();
break;
default:
console.log("Unknown websocket message: ", e.data.trim());
break;
}
});
return;
} catch (e) {
console.log("Error in websocket connection:");
console.log(e);
console.log("Falling back to polling.");
websocketConnection = null;
}
while (true) {
await sleep(1000);
didFinish = false;
@@ -103,8 +255,8 @@ async function runLoop() {
await sleep(100);
}
});
nowPlayingDiv.innerHTML = await (await fetch("/now-playing-html")).text();
queueDiv.innerHTML = await (await fetch("/queue-html")).text();
await updateNowPlaying();
await updateQueue();
var elapsedTime = new Date() - startTime;
didFinish = true;
averageLoopTimeMs = ((averageLoopTimeMs * 4) + elapsedTime) / 5;
@@ -115,6 +267,7 @@ runLoop();</script>"#;
let buttons = "<button onclick=\"fetch('/play')\">play</button><button onclick=\"fetch('/pause')\">pause</button><button onclick=\"fetch('/stop')\">stop</button><button onclick=\"fetch('/skip')\">skip</button><button onclick=\"fetch('/clear-queue')\">clear queue</button>";
let search = "<input id=\"searchFieldArtist\" placeholder=\"artist\"><input id=\"searchFieldAlbum\" placeholder=\"album\"><input id=\"searchFieldTitle\" placeholder=\"title\">
<button onclick=\"performSearch()\">search</button><div id=\"searchResultDiv\"></div>";
let playback_live = r#"<div><input id="playLiveEnabled" onchange="updateLivePlaybackIds();" type="checkbox"><audio controls autoplay id="playLiveCurrent"><source id="playLiveCurrentSrc" src=""></audio><audio style="visibility:hidden;" id="playLiveNext"><source id="playLiveNextSrc" src=""></audio></span></div>"#;
let db = data.db.lock().unwrap();
let now_playing = gen_now_playing(&db);
let mut queue = String::new();
@@ -122,13 +275,54 @@ runLoop();</script>"#;
dbg!(&queue);
drop(db);
RawHtml(format!(
"{HTML_START}<title>MusicDb</title>{script}{HTML_SEP}<small><small><div id=\"warnLag\">no javascript? reload to see updated information.</div></small></small><div id=\"nowPlayingDiv\">{now_playing}</div><div>{buttons}</div><div id=\"searchDiv\" style=\"display:none;\">{search}</div><div id=\"queueDiv\">{queue}</div>{script2}{HTML_END}",
"{HTML_START}<title>MusicDb</title>{script}{HTML_SEP}<small><small><div id=\"warnLag\">no javascript? reload to see updated information.</div></small></small><div id=\"nowPlayingDiv\">{now_playing}</div><div>{buttons}</div>{playback_live}<div id=\"searchDiv\" style=\"display:none;\">{search}</div><div id=\"queueDiv\">{queue}</div>{script2}{HTML_END}",
))
}
#[get("/now-playing-html")]
fn now_playing_html(data: &State<Data>) -> RawHtml<String> {
RawHtml(gen_now_playing(&*data.db.lock().unwrap()))
}
#[get("/now-playing-ids")]
fn now_playing_ids(data: &State<Data>) -> String {
let db = data.db.lock().unwrap();
let (c, n) = (
db.queue.get_current_song().copied(),
db.queue.get_next_song().copied(),
);
drop(db);
if let Some(c) = c {
if let Some(n) = n {
format!("{c}/{n}")
} else {
format!("{c}")
}
} else {
"".to_owned()
}
}
#[get("/song/<id>/<name>")]
fn song(data: &State<Data>, id: SongId, name: String) -> Option<SeekStream> {
let db = data.db.lock().unwrap();
if let Some(song) = db.get_song(&id) {
song.cached_data().cache_data_start_thread(&*db, song);
if let Some(bytes) = song.cached_data().cached_data_await() {
drop(db);
Some(SeekStream::new(std::io::Cursor::new(ArcBytes(bytes))))
} else {
None
}
} else {
None
}
}
struct ArcBytes(pub Arc<Vec<u8>>);
impl AsRef<[u8]> for ArcBytes {
fn as_ref(&self) -> &[u8] {
self.0.as_ref()
}
}
#[get("/queue-html")]
fn queue_html(data: &State<Data>) -> RawHtml<String> {
let mut str = String::new();
@@ -139,7 +333,7 @@ fn queue_html(data: &State<Data>) -> RawHtml<String> {
fn gen_now_playing(db: &Database) -> String {
if let Some(current_song) = db.queue.get_current_song().and_then(|id| db.get_song(id)) {
format!(
"<h1>Now Playing</h1><h4>{}</h4>",
"<h1>Now Playing <small id=\"postheader\"></small></h1><h4>{}</h4>",
html_escape::encode_safe(&current_song.title),
)
} else {
@@ -545,22 +739,181 @@ fn search(
RawHtml(out)
}
pub async fn main(
#[get("/ws")]
async fn websocket(websocket: WebSocket, state: &State<Data>) -> rocket_ws::Channel<'static> {
// a channel so other threads/tasks can send messages to this websocket client
let (sender, mut receiver) = tokio::sync::mpsc::channel(5);
state.websocket_connections.lock().await.push(sender);
let (db_playing, ()) = tokio::task::block_in_place(|| {
let db = state.db.lock().unwrap();
(db.playing, ())
});
// handle messages from the websocket and from the channel
websocket.channel(move |mut websocket| {
Box::pin(async move {
if db_playing {
let _ = websocket.send(Message::text("init/playing=true")).await;
} else {
let _ = websocket.send(Message::text("init/playing=false")).await;
}
loop {
// async magic:
// handle a message from the websocket client or from other
// threads/tasks in the server, whichever happens first
select! {
message = websocket.next() => {
if let Some(message) = message {
// server received `message` from the websocket client
match message? {
Message::Text(text) => {
// it was a text message, prefix it with "You sent: " and echo
websocket
.send(Message::text(format!("You sent: {text}")))
.await?
}
Message::Binary(_bytes) => {
// it was a binary message, ignore it
}
Message::Ping(payload) => {
websocket.send(Message::Pong(payload)).await?
}
Message::Close(close) => {
websocket.close(close).await?;
break;
}
// these messages get ignored
Message::Pong(_) | Message::Frame(_) => (),
}
} else {
// websocket connection was closed
break;
}
},
message_to_be_sent = receiver.recv() => {
if let Some(message) = message_to_be_sent {
// server received `message` from another thread/task
websocket.send(message).await?;
} else {
// channel has been closed, close websocket connection too
websocket.close(None).await?;
break;
}
},
}
}
Ok(())
})
})
}
pub fn main(
db: Arc<Mutex<Database>>,
command_sender: mpsc::Sender<(Command, Option<u64>)>,
addr: SocketAddr,
) {
let websocket_connections = Arc::new(tokio::sync::Mutex::new(vec![]));
let data = Data {
db: Arc::clone(&db),
command_sender,
websocket_connections: Arc::clone(&websocket_connections),
};
let mut db = db.lock().unwrap();
let udepid = db.update_endpoints_id;
db.update_endpoints_id += 1;
db.update_endpoints.push((
udepid,
UpdateEndpoint::Custom(Box::new(move |cmd| {
let mut msgs = vec![];
fn action(a: &Action, msgs: &mut Vec<Message>) {
match a {
Action::Resume => msgs.push(Message::text("resume")),
Action::Pause => msgs.push(Message::text("pause")),
Action::Stop => msgs.push(Message::text("stop")),
Action::NextSong => msgs.push(Message::text("next")),
Action::SyncDatabase(..)
| Action::AddSong(..)
| Action::AddAlbum(..)
| Action::AddArtist(..)
| Action::AddCover(..)
| Action::ModifySong(..)
| Action::ModifyAlbum(..)
| Action::ModifyArtist(..)
| Action::RemoveSong(..)
| Action::RemoveAlbum(..)
| Action::RemoveArtist(..)
| Action::SetSongDuration(..)
| Action::TagSongFlagSet(..)
| Action::TagSongFlagUnset(..)
| Action::TagAlbumFlagSet(..)
| Action::TagAlbumFlagUnset(..)
| Action::TagArtistFlagSet(..)
| Action::TagArtistFlagUnset(..)
| Action::TagSongPropertySet(..)
| Action::TagSongPropertyUnset(..)
| Action::TagAlbumPropertySet(..)
| Action::TagAlbumPropertyUnset(..)
| Action::TagArtistPropertySet(..)
| Action::TagArtistPropertyUnset(..) => msgs.push(Message::text("update/data")),
Action::QueueUpdate(..)
| Action::QueueAdd(..)
| Action::QueueInsert(..)
| Action::QueueRemove(..)
| Action::QueueMove(..)
| Action::QueueMoveInto(..)
| Action::QueueGoto(..)
| Action::QueueShuffle(..)
| Action::QueueSetShuffle(..)
| Action::QueueUnshuffle(..) => msgs.push(Message::text("update/queue")),
Action::Multiple(actions) => {
for inner in actions {
action(inner, msgs);
}
}
Action::InitComplete
| Action::Save
| Action::ErrorInfo(..)
| Action::Denied(..) => {}
}
}
action(&cmd.action, &mut msgs);
if !msgs.is_empty() {
let mut ws_cons = websocket_connections.blocking_lock();
let mut rm = vec![];
for msg in msgs {
rm.clear();
for (i, con) in ws_cons.iter_mut().enumerate() {
if con.blocking_send(msg.clone()).is_err() {
rm.push(i);
}
}
for i in rm.iter().rev() {
ws_cons.remove(*i);
}
}
}
})),
));
drop(db);
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async_main(data, addr));
}
pub async fn async_main(data: Data, addr: SocketAddr) {
rocket::build()
.configure(Config {
address: addr.ip(),
port: addr.port(),
..Default::default()
})
.manage(Data { db, command_sender })
.manage(data)
.mount(
"/",
routes![
index,
websocket,
play,
pause,
stop,
@@ -571,7 +924,9 @@ pub async fn main(
add_song,
search,
now_playing_html,
queue_html
now_playing_ids,
song,
queue_html,
],
)
.launch()