logbufpipe/src/main.rs
2025-03-02 00:22:42 +01:00

396 lines
17 KiB
Rust

use core::str;
use std::{
collections::VecDeque,
env::{args, args_os},
path::PathBuf,
process::ExitCode,
sync::Arc,
};
use tokio::{
io::{stdin, stdout, AsyncBufReadExt, AsyncWriteExt, BufReader, Stdout},
net::{UnixListener, UnixStream},
spawn,
sync::Mutex,
};
#[tokio::main(flavor = "current_thread")]
async fn main() -> ExitCode {
if args()
.skip(1)
.map(|arg| arg.trim().to_lowercase())
.any(|arg| arg == "-h" || arg == "--help")
{
eprintln!(
r#"[logbufpipe]
logbufpipe records data from its stdin, immediately forwards it to stdout,
and stores the data in an in-memory queue. when this queue is full, old data
is dropped so that new data can be added. You can connect to logbufpipe
through a unix socket to see the stored data and receive live updates:
# server: capture data and store it
(echo hi; sleep 5; echo goodbye) | logbufpipe 1M /tmp/lbp1
# client: connect to the server and show the output
logbufpipe utf8 1M /tmp/lbp1
In server mode, logbufpipe will open a unix socket at the given path
and keep the specified amount of data in memory. This limit is expressed as
a number n, optionally followed by one of K, M, G, or T, representing n bytes,
1024n bytes for K, 1024²n bytes for M, 1024³n bytes for G, or 1024⁴ bytes for T.
As a client, logbufpipe has multiple modes:
- raw/bytes: output the newest bytes and live data. may output half of a char.
- utf8: same as raw, but skip bytes until a valid utf8 char is encountered.
The size limit does not have to match that of the server. The smaller of the
two limits decides how much old data can be shown. If the path is a normal file
instead of a unix socket, acts as if the server had sent the data in the file.
"#
);
return ExitCode::SUCCESS;
}
let mut mode = Mode::Server;
enum Mode {
Server,
Raw,
Utf8,
}
impl Mode {
fn arg1(&self) -> usize {
if matches!(self, Self::Server) {
1
} else {
2
}
}
fn arg2(&self) -> usize {
self.arg1() + 1
}
fn arg1s(&self) -> &'static str {
if matches!(self, Self::Server) {
"first"
} else {
"second"
}
}
fn arg2s(&self) -> &'static str {
if matches!(self, Self::Server) {
"second"
} else {
"third"
}
}
}
match args()
.nth(1)
.map(|arg| arg.to_lowercase())
.as_ref()
.map(|arg| arg.as_str())
{
Some("raw" | "bytes") => mode = Mode::Raw,
Some("utf8" | "utf-8" | "utf_8") => mode = Mode::Utf8,
Some(_) => {}
None => {
eprintln!("[logbufpipe] run logbufpipe --help for help");
return ExitCode::FAILURE;
}
}
let bytes = if let Some(bytes_str) = args().nth(mode.arg1()) {
let bytes_str = bytes_str.trim();
if let Ok(bytes) = bytes_str.parse() {
Some(bytes)
} else if let Some(modifier) = bytes_str
.as_bytes()
.get(bytes_str.len().saturating_sub(1))
.and_then(|m| match *m {
b'K' | b'k' => Some(1024usize),
b'M' | b'm' => 1024usize.checked_mul(1024),
b'G' | b'g' => 1024usize
.checked_mul(1024)
.and_then(|v| v.checked_mul(1024)),
b'T' | b't' => 1024usize
.checked_mul(1024)
.and_then(|v| v.checked_mul(1024))
.and_then(|v| v.checked_mul(1024)),
_ => None,
})
{
if let Ok(bytes) = bytes_str[0..bytes_str.len() - 1].trim_end().parse() {
modifier.checked_mul(bytes)
} else if let Ok(bytes) = bytes_str[0..bytes_str.len() - 1].trim_end().parse::<f64>() {
let bytes = modifier as f64 * bytes as f64;
if bytes.is_finite() && bytes >= 0.0 && bytes <= usize::MAX as f64 {
Some(bytes as usize)
} else {
None
}
} else {
None
}
} else {
None
}
} else {
None
};
let bytes = if let Some(bytes) = bytes {
bytes
} else {
eprintln!(
"[logbufpipe] {} argument must be the buffer's size in bytes or as <num>[KMGT], prefixed with u or b for client mode",
mode.arg1s(),
);
return ExitCode::FAILURE;
};
let data = Arc::new(Mutex::new((VecDeque::<u8>::with_capacity(bytes), vec![])));
if let Some(path) = args_os().nth(mode.arg2()) {
let path = PathBuf::from(path);
match mode {
Mode::Raw | Mode::Utf8 => {
async fn buf_raw(buf: &[u8], stdout: &mut Stdout) {
stdout.write_all(buf).await.unwrap();
stdout.flush().await.unwrap();
}
fn buf_utf8_waiting_buf() -> Option<[u8; 4]> {
Some([0, 0, 0, 0])
}
async fn buf_utf8(
mut buf: &[u8],
stdout: &mut Stdout,
waiting_buf: &mut Option<[u8; 4]>,
) {
if if let Some(waiting) = waiting_buf {
// ignore bytes until a valid utf8 character is found
if 'waiting_check: {
for (i, b) in buf.iter().enumerate() {
*waiting = [waiting[1], waiting[2], waiting[3], *b];
for slice in [
&waiting[3..4],
&waiting[2..4],
&waiting[1..4],
&waiting[0..4],
] {
if str::from_utf8(slice).is_ok() {
stdout.write_all(slice).await.unwrap();
buf = &buf[1 + i..];
break 'waiting_check true;
}
}
}
false
} {
*waiting_buf = None;
true
} else {
false
}
} else {
true
} {
stdout.write_all(&buf).await.unwrap();
stdout.flush().await.unwrap();
}
}
match UnixStream::connect(&path).await {
Ok(mut con) => {
con.write_all(format!("{bytes}\n").as_bytes())
.await
.unwrap();
let mut con = BufReader::new(con);
let mut stdout = stdout();
match mode {
Mode::Server => unreachable!(),
Mode::Raw => loop {
let buf = con.fill_buf().await.unwrap();
if buf.is_empty() {
break;
}
let len = buf.len();
buf_raw(buf, &mut stdout).await;
con.consume(len);
},
Mode::Utf8 => {
let mut waiting_buf = buf_utf8_waiting_buf();
loop {
let buf = con.fill_buf().await.unwrap();
if buf.is_empty() {
break;
}
let len = buf.len();
buf_utf8(buf, &mut stdout, &mut waiting_buf).await;
con.consume(len);
}
}
}
}
Err(e) => {
if let Ok(buf) = tokio::fs::read(&path).await {
let mut buf = buf.as_slice();
if buf.len() > bytes {
buf = &buf[buf.len() - bytes..];
}
let mut stdout = stdout();
match mode {
Mode::Server => unreachable!(),
Mode::Raw => {
buf_raw(buf, &mut stdout).await;
}
Mode::Utf8 => {
buf_utf8(buf, &mut stdout, &mut buf_utf8_waiting_buf()).await;
}
}
} else {
eprintln!(
"[logbufpipe] couldn't connect to pipe {}: {e}",
path.display()
);
return ExitCode::FAILURE;
}
}
}
}
Mode::Server => {
{
let _ = tokio::fs::remove_file(&path).await;
match UnixListener::bind(&path) {
Ok(listener) => {
let mut cleaning_listener =
CleaningUnixListener(Some(listener), path, Arc::clone(&data));
struct CleaningUnixListener(
Option<UnixListener>,
PathBuf,
Arc<Mutex<(VecDeque<u8>, Vec<UnixStream>)>>,
);
impl Drop for CleaningUnixListener {
fn drop(&mut self) {
drop(self.0.take());
let _ = std::fs::remove_file(&self.1);
let _ = std::fs::write(
&self.1,
&*self.2.blocking_lock().0.make_contiguous(),
);
}
}
spawn(async move {
let listener = cleaning_listener.0.as_mut().unwrap();
loop {
match listener.accept().await {
Ok((mut client, _)) => {
let data = Arc::clone(&cleaning_listener.2);
spawn(async move {
'client_con: {
if let Ok(line) = BufReader::new(&mut client)
.lines()
.next_line()
.await
{
let bytes = if let Some(line) = line {
if let Ok(bytes) =
line.trim().parse::<usize>()
{
bytes
} else {
break 'client_con;
}
} else {
usize::MAX
};
let mut lock = data.lock().await;
let (bytes_a, bytes_b) = lock.0.as_slices();
let (bytes_a, bytes_b) = if bytes
< bytes_b.len()
{
(
[].as_slice(),
&bytes_b[bytes_b.len() - bytes..],
)
} else if bytes
< bytes_a.len() + bytes_b.len()
{
(
&bytes_a[bytes_a.len()
+ bytes_b.len()
- bytes..],
bytes_b,
)
} else {
(bytes_a, bytes_b)
};
if client.write_all(bytes_a).await.is_ok()
&& client
.write_all(bytes_b)
.await
.is_ok()
{
lock.1.push(client);
}
drop(lock);
}
}
});
}
Err(e) => {
eprintln!("[logbufpipe] pipe broke: {e}");
break;
}
}
}
});
}
Err(e) => {
eprintln!("[logbufpipe] couldn't create pipe {}: {e}", path.display());
return ExitCode::FAILURE;
}
}
}
let mut stdin = BufReader::new(stdin());
let mut stdout = stdout();
loop {
if let Ok(buf) = stdin.fill_buf().await {
if buf.is_empty() {
break;
}
let _ = stdout.write_all(&buf).await;
let _ = stdout.flush().await;
let mut lock = data.lock().await;
let mut rm = Vec::new();
for (i, client) in lock.1.iter_mut().enumerate() {
if client.write_all(buf).await.is_err() {
rm.push(i);
}
}
for i in rm.into_iter().rev() {
lock.1.swap_remove(i);
}
let buffer = &mut lock.0;
let len = buf.len();
let max_len = bytes.saturating_sub(len);
if max_len > 0 {
let to_remove = buffer.len().saturating_sub(max_len);
if to_remove > 0 {
buffer.drain(0..to_remove);
}
buffer.extend(buf);
} else {
buffer.clear();
buffer.extend(&buf[len - bytes..]);
}
stdin.consume(len);
} else {
return ExitCode::FAILURE;
}
}
}
}
} else {
eprintln!(
"[logbufpipe] {} argument must be the pipe's path",
mode.arg2s(),
);
return ExitCode::FAILURE;
};
ExitCode::SUCCESS
}