From adba8907fd52b4bee85bb5491293bd6d19c218b4 Mon Sep 17 00:00:00 2001 From: mark Date: Sun, 2 Mar 2025 00:22:42 +0100 Subject: [PATCH] init --- .gitignore | 1 + Cargo.lock | 262 ++++++++++++++++++++++++++++++++++ Cargo.toml | 7 + src/main.rs | 395 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 665 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.lock create mode 100644 Cargo.toml create mode 100644 src/main.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/target diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..63f7434 --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,262 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "addr2line" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" + +[[package]] +name = "backtrace" +version = "0.3.74" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a" +dependencies = [ + "addr2line", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", + "windows-targets", +] + +[[package]] +name = "bytes" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f61dac84819c6588b558454b194026eb1f09c293b9036ae9b159e74e73ab6cf9" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "gimli" +version = "0.31.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" + +[[package]] +name = "libc" +version = "0.2.170" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "875b3680cb2f8f71bdcf9a30f38d48282f5d3c95cbf9b3fa57269bb5d5c06828" + +[[package]] +name = "logbufpipe" +version = "0.1.0" +dependencies = [ + "tokio", +] + +[[package]] +name = "memchr" +version = "2.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" + +[[package]] +name = "miniz_oxide" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e3e04debbb59698c15bacbb6d93584a8c0ca9cc3213cb423d31f760d8843ce5" +dependencies = [ + "adler2", +] + +[[package]] +name = "mio" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" +dependencies = [ + "libc", + "wasi", + "windows-sys", +] + +[[package]] +name = "object" +version = "0.36.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62948e14d923ea95ea2c7c86c71013138b66525b86bdc08d2dcc262bdb497b87" +dependencies = [ + "memchr", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" + +[[package]] +name = "proc-macro2" +version = "1.0.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60946a68e5f9d28b0dc1c21bb8a97ee7d018a8b322fa57838ba31cc878e22d99" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.38" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e4dccaaaf89514f546c693ddc140f729f958c247918a13380cccc6078391acc" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "rustc-demangle" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" + +[[package]] +name = "socket2" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c970269d99b64e60ec3bd6ad27270092a5394c4e309314b18ae3fe575695fbe8" +dependencies = [ + "libc", + "windows-sys", +] + +[[package]] +name = "syn" +version = "2.0.98" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36147f1a48ae0ec2b5b3bc5b537d267457555a10dc06f3dbc8cb11ba3006d3b1" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "tokio" +version = "1.43.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e" +dependencies = [ + "backtrace", + "bytes", + "libc", + "mio", + "pin-project-lite", + "socket2", + "tokio-macros", + "windows-sys", +] + +[[package]] +name = "tokio-macros" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "unicode-ident" +version = "1.0.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a210d160f08b701c8721ba1c726c11662f877ea6b7094007e1ca9a1041945034" + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_gnullvm", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..40f76b3 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "logbufpipe" +version = "0.1.0" +edition = "2021" + +[dependencies] +tokio = { version = "1.43.0", features = ["rt", "sync", "io-std", "io-util", "fs", "macros", "net"] } diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..40e7653 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,395 @@ +use core::str; +use std::{ + collections::VecDeque, + env::{args, args_os}, + path::PathBuf, + process::ExitCode, + sync::Arc, +}; + +use tokio::{ + io::{stdin, stdout, AsyncBufReadExt, AsyncWriteExt, BufReader, Stdout}, + net::{UnixListener, UnixStream}, + spawn, + sync::Mutex, +}; + +#[tokio::main(flavor = "current_thread")] +async fn main() -> ExitCode { + if args() + .skip(1) + .map(|arg| arg.trim().to_lowercase()) + .any(|arg| arg == "-h" || arg == "--help") + { + eprintln!( + r#"[logbufpipe] +logbufpipe records data from its stdin, immediately forwards it to stdout, +and stores the data in an in-memory queue. when this queue is full, old data +is dropped so that new data can be added. You can connect to logbufpipe +through a unix socket to see the stored data and receive live updates: + +# server: capture data and store it +(echo hi; sleep 5; echo goodbye) | logbufpipe 1M /tmp/lbp1 + +# client: connect to the server and show the output +logbufpipe utf8 1M /tmp/lbp1 + +In server mode, logbufpipe will open a unix socket at the given path +and keep the specified amount of data in memory. This limit is expressed as +a number n, optionally followed by one of K, M, G, or T, representing n bytes, +1024n bytes for K, 1024²n bytes for M, 1024³n bytes for G, or 1024⁴ bytes for T. + +As a client, logbufpipe has multiple modes: +- raw/bytes: output the newest bytes and live data. may output half of a char. +- utf8: same as raw, but skip bytes until a valid utf8 char is encountered. +The size limit does not have to match that of the server. The smaller of the +two limits decides how much old data can be shown. If the path is a normal file +instead of a unix socket, acts as if the server had sent the data in the file. +"# + ); + return ExitCode::SUCCESS; + } + let mut mode = Mode::Server; + enum Mode { + Server, + Raw, + Utf8, + } + impl Mode { + fn arg1(&self) -> usize { + if matches!(self, Self::Server) { + 1 + } else { + 2 + } + } + fn arg2(&self) -> usize { + self.arg1() + 1 + } + fn arg1s(&self) -> &'static str { + if matches!(self, Self::Server) { + "first" + } else { + "second" + } + } + fn arg2s(&self) -> &'static str { + if matches!(self, Self::Server) { + "second" + } else { + "third" + } + } + } + match args() + .nth(1) + .map(|arg| arg.to_lowercase()) + .as_ref() + .map(|arg| arg.as_str()) + { + Some("raw" | "bytes") => mode = Mode::Raw, + Some("utf8" | "utf-8" | "utf_8") => mode = Mode::Utf8, + Some(_) => {} + None => { + eprintln!("[logbufpipe] run logbufpipe --help for help"); + return ExitCode::FAILURE; + } + } + let bytes = if let Some(bytes_str) = args().nth(mode.arg1()) { + let bytes_str = bytes_str.trim(); + if let Ok(bytes) = bytes_str.parse() { + Some(bytes) + } else if let Some(modifier) = bytes_str + .as_bytes() + .get(bytes_str.len().saturating_sub(1)) + .and_then(|m| match *m { + b'K' | b'k' => Some(1024usize), + b'M' | b'm' => 1024usize.checked_mul(1024), + b'G' | b'g' => 1024usize + .checked_mul(1024) + .and_then(|v| v.checked_mul(1024)), + b'T' | b't' => 1024usize + .checked_mul(1024) + .and_then(|v| v.checked_mul(1024)) + .and_then(|v| v.checked_mul(1024)), + _ => None, + }) + { + if let Ok(bytes) = bytes_str[0..bytes_str.len() - 1].trim_end().parse() { + modifier.checked_mul(bytes) + } else if let Ok(bytes) = bytes_str[0..bytes_str.len() - 1].trim_end().parse::() { + let bytes = modifier as f64 * bytes as f64; + if bytes.is_finite() && bytes >= 0.0 && bytes <= usize::MAX as f64 { + Some(bytes as usize) + } else { + None + } + } else { + None + } + } else { + None + } + } else { + None + }; + let bytes = if let Some(bytes) = bytes { + bytes + } else { + eprintln!( + "[logbufpipe] {} argument must be the buffer's size in bytes or as [KMGT], prefixed with u or b for client mode", + mode.arg1s(), + ); + return ExitCode::FAILURE; + }; + + let data = Arc::new(Mutex::new((VecDeque::::with_capacity(bytes), vec![]))); + + if let Some(path) = args_os().nth(mode.arg2()) { + let path = PathBuf::from(path); + match mode { + Mode::Raw | Mode::Utf8 => { + async fn buf_raw(buf: &[u8], stdout: &mut Stdout) { + stdout.write_all(buf).await.unwrap(); + stdout.flush().await.unwrap(); + } + fn buf_utf8_waiting_buf() -> Option<[u8; 4]> { + Some([0, 0, 0, 0]) + } + async fn buf_utf8( + mut buf: &[u8], + stdout: &mut Stdout, + waiting_buf: &mut Option<[u8; 4]>, + ) { + if if let Some(waiting) = waiting_buf { + // ignore bytes until a valid utf8 character is found + if 'waiting_check: { + for (i, b) in buf.iter().enumerate() { + *waiting = [waiting[1], waiting[2], waiting[3], *b]; + for slice in [ + &waiting[3..4], + &waiting[2..4], + &waiting[1..4], + &waiting[0..4], + ] { + if str::from_utf8(slice).is_ok() { + stdout.write_all(slice).await.unwrap(); + buf = &buf[1 + i..]; + break 'waiting_check true; + } + } + } + false + } { + *waiting_buf = None; + true + } else { + false + } + } else { + true + } { + stdout.write_all(&buf).await.unwrap(); + stdout.flush().await.unwrap(); + } + } + match UnixStream::connect(&path).await { + Ok(mut con) => { + con.write_all(format!("{bytes}\n").as_bytes()) + .await + .unwrap(); + let mut con = BufReader::new(con); + let mut stdout = stdout(); + match mode { + Mode::Server => unreachable!(), + Mode::Raw => loop { + let buf = con.fill_buf().await.unwrap(); + if buf.is_empty() { + break; + } + let len = buf.len(); + buf_raw(buf, &mut stdout).await; + con.consume(len); + }, + Mode::Utf8 => { + let mut waiting_buf = buf_utf8_waiting_buf(); + loop { + let buf = con.fill_buf().await.unwrap(); + if buf.is_empty() { + break; + } + let len = buf.len(); + buf_utf8(buf, &mut stdout, &mut waiting_buf).await; + con.consume(len); + } + } + } + } + Err(e) => { + if let Ok(buf) = tokio::fs::read(&path).await { + let mut buf = buf.as_slice(); + if buf.len() > bytes { + buf = &buf[buf.len() - bytes..]; + } + let mut stdout = stdout(); + match mode { + Mode::Server => unreachable!(), + Mode::Raw => { + buf_raw(buf, &mut stdout).await; + } + Mode::Utf8 => { + buf_utf8(buf, &mut stdout, &mut buf_utf8_waiting_buf()).await; + } + } + } else { + eprintln!( + "[logbufpipe] couldn't connect to pipe {}: {e}", + path.display() + ); + return ExitCode::FAILURE; + } + } + } + } + Mode::Server => { + { + let _ = tokio::fs::remove_file(&path).await; + match UnixListener::bind(&path) { + Ok(listener) => { + let mut cleaning_listener = + CleaningUnixListener(Some(listener), path, Arc::clone(&data)); + struct CleaningUnixListener( + Option, + PathBuf, + Arc, Vec)>>, + ); + impl Drop for CleaningUnixListener { + fn drop(&mut self) { + drop(self.0.take()); + let _ = std::fs::remove_file(&self.1); + let _ = std::fs::write( + &self.1, + &*self.2.blocking_lock().0.make_contiguous(), + ); + } + } + spawn(async move { + let listener = cleaning_listener.0.as_mut().unwrap(); + loop { + match listener.accept().await { + Ok((mut client, _)) => { + let data = Arc::clone(&cleaning_listener.2); + spawn(async move { + 'client_con: { + if let Ok(line) = BufReader::new(&mut client) + .lines() + .next_line() + .await + { + let bytes = if let Some(line) = line { + if let Ok(bytes) = + line.trim().parse::() + { + bytes + } else { + break 'client_con; + } + } else { + usize::MAX + }; + let mut lock = data.lock().await; + let (bytes_a, bytes_b) = lock.0.as_slices(); + let (bytes_a, bytes_b) = if bytes + < bytes_b.len() + { + ( + [].as_slice(), + &bytes_b[bytes_b.len() - bytes..], + ) + } else if bytes + < bytes_a.len() + bytes_b.len() + { + ( + &bytes_a[bytes_a.len() + + bytes_b.len() + - bytes..], + bytes_b, + ) + } else { + (bytes_a, bytes_b) + }; + if client.write_all(bytes_a).await.is_ok() + && client + .write_all(bytes_b) + .await + .is_ok() + { + lock.1.push(client); + } + drop(lock); + } + } + }); + } + Err(e) => { + eprintln!("[logbufpipe] pipe broke: {e}"); + break; + } + } + } + }); + } + Err(e) => { + eprintln!("[logbufpipe] couldn't create pipe {}: {e}", path.display()); + return ExitCode::FAILURE; + } + } + } + let mut stdin = BufReader::new(stdin()); + let mut stdout = stdout(); + loop { + if let Ok(buf) = stdin.fill_buf().await { + if buf.is_empty() { + break; + } + let _ = stdout.write_all(&buf).await; + let _ = stdout.flush().await; + let mut lock = data.lock().await; + let mut rm = Vec::new(); + for (i, client) in lock.1.iter_mut().enumerate() { + if client.write_all(buf).await.is_err() { + rm.push(i); + } + } + for i in rm.into_iter().rev() { + lock.1.swap_remove(i); + } + let buffer = &mut lock.0; + let len = buf.len(); + let max_len = bytes.saturating_sub(len); + if max_len > 0 { + let to_remove = buffer.len().saturating_sub(max_len); + if to_remove > 0 { + buffer.drain(0..to_remove); + } + buffer.extend(buf); + } else { + buffer.clear(); + buffer.extend(&buf[len - bytes..]); + } + stdin.consume(len); + } else { + return ExitCode::FAILURE; + } + } + } + } + } else { + eprintln!( + "[logbufpipe] {} argument must be the pipe's path", + mode.arg2s(), + ); + return ExitCode::FAILURE; + }; + ExitCode::SUCCESS +}