396 lines
17 KiB
Rust
396 lines
17 KiB
Rust
use core::str;
|
|
use std::{
|
|
collections::VecDeque,
|
|
env::{args, args_os},
|
|
path::PathBuf,
|
|
process::ExitCode,
|
|
sync::Arc,
|
|
};
|
|
|
|
use tokio::{
|
|
io::{stdin, stdout, AsyncBufReadExt, AsyncWriteExt, BufReader, Stdout},
|
|
net::{UnixListener, UnixStream},
|
|
spawn,
|
|
sync::Mutex,
|
|
};
|
|
|
|
#[tokio::main(flavor = "current_thread")]
|
|
async fn main() -> ExitCode {
|
|
if args()
|
|
.skip(1)
|
|
.map(|arg| arg.trim().to_lowercase())
|
|
.any(|arg| arg == "-h" || arg == "--help")
|
|
{
|
|
eprintln!(
|
|
r#"[logbufpipe]
|
|
logbufpipe records data from its stdin, immediately forwards it to stdout,
|
|
and stores the data in an in-memory queue. when this queue is full, old data
|
|
is dropped so that new data can be added. You can connect to logbufpipe
|
|
through a unix socket to see the stored data and receive live updates:
|
|
|
|
# server: capture data and store it
|
|
(echo hi; sleep 5; echo goodbye) | logbufpipe 1M /tmp/lbp1
|
|
|
|
# client: connect to the server and show the output
|
|
logbufpipe utf8 1M /tmp/lbp1
|
|
|
|
In server mode, logbufpipe will open a unix socket at the given path
|
|
and keep the specified amount of data in memory. This limit is expressed as
|
|
a number n, optionally followed by one of K, M, G, or T, representing n bytes,
|
|
1024n bytes for K, 1024²n bytes for M, 1024³n bytes for G, or 1024⁴ bytes for T.
|
|
|
|
As a client, logbufpipe has multiple modes:
|
|
- raw/bytes: output the newest bytes and live data. may output half of a char.
|
|
- utf8: same as raw, but skip bytes until a valid utf8 char is encountered.
|
|
The size limit does not have to match that of the server. The smaller of the
|
|
two limits decides how much old data can be shown. If the path is a normal file
|
|
instead of a unix socket, acts as if the server had sent the data in the file.
|
|
"#
|
|
);
|
|
return ExitCode::SUCCESS;
|
|
}
|
|
let mut mode = Mode::Server;
|
|
enum Mode {
|
|
Server,
|
|
Raw,
|
|
Utf8,
|
|
}
|
|
impl Mode {
|
|
fn arg1(&self) -> usize {
|
|
if matches!(self, Self::Server) {
|
|
1
|
|
} else {
|
|
2
|
|
}
|
|
}
|
|
fn arg2(&self) -> usize {
|
|
self.arg1() + 1
|
|
}
|
|
fn arg1s(&self) -> &'static str {
|
|
if matches!(self, Self::Server) {
|
|
"first"
|
|
} else {
|
|
"second"
|
|
}
|
|
}
|
|
fn arg2s(&self) -> &'static str {
|
|
if matches!(self, Self::Server) {
|
|
"second"
|
|
} else {
|
|
"third"
|
|
}
|
|
}
|
|
}
|
|
match args()
|
|
.nth(1)
|
|
.map(|arg| arg.to_lowercase())
|
|
.as_ref()
|
|
.map(|arg| arg.as_str())
|
|
{
|
|
Some("raw" | "bytes") => mode = Mode::Raw,
|
|
Some("utf8" | "utf-8" | "utf_8") => mode = Mode::Utf8,
|
|
Some(_) => {}
|
|
None => {
|
|
eprintln!("[logbufpipe] run logbufpipe --help for help");
|
|
return ExitCode::FAILURE;
|
|
}
|
|
}
|
|
let bytes = if let Some(bytes_str) = args().nth(mode.arg1()) {
|
|
let bytes_str = bytes_str.trim();
|
|
if let Ok(bytes) = bytes_str.parse() {
|
|
Some(bytes)
|
|
} else if let Some(modifier) = bytes_str
|
|
.as_bytes()
|
|
.get(bytes_str.len().saturating_sub(1))
|
|
.and_then(|m| match *m {
|
|
b'K' | b'k' => Some(1024usize),
|
|
b'M' | b'm' => 1024usize.checked_mul(1024),
|
|
b'G' | b'g' => 1024usize
|
|
.checked_mul(1024)
|
|
.and_then(|v| v.checked_mul(1024)),
|
|
b'T' | b't' => 1024usize
|
|
.checked_mul(1024)
|
|
.and_then(|v| v.checked_mul(1024))
|
|
.and_then(|v| v.checked_mul(1024)),
|
|
_ => None,
|
|
})
|
|
{
|
|
if let Ok(bytes) = bytes_str[0..bytes_str.len() - 1].trim_end().parse() {
|
|
modifier.checked_mul(bytes)
|
|
} else if let Ok(bytes) = bytes_str[0..bytes_str.len() - 1].trim_end().parse::<f64>() {
|
|
let bytes = modifier as f64 * bytes as f64;
|
|
if bytes.is_finite() && bytes >= 0.0 && bytes <= usize::MAX as f64 {
|
|
Some(bytes as usize)
|
|
} else {
|
|
None
|
|
}
|
|
} else {
|
|
None
|
|
}
|
|
} else {
|
|
None
|
|
}
|
|
} else {
|
|
None
|
|
};
|
|
let bytes = if let Some(bytes) = bytes {
|
|
bytes
|
|
} else {
|
|
eprintln!(
|
|
"[logbufpipe] {} argument must be the buffer's size in bytes or as <num>[KMGT], prefixed with u or b for client mode",
|
|
mode.arg1s(),
|
|
);
|
|
return ExitCode::FAILURE;
|
|
};
|
|
|
|
let data = Arc::new(Mutex::new((VecDeque::<u8>::with_capacity(bytes), vec![])));
|
|
|
|
if let Some(path) = args_os().nth(mode.arg2()) {
|
|
let path = PathBuf::from(path);
|
|
match mode {
|
|
Mode::Raw | Mode::Utf8 => {
|
|
async fn buf_raw(buf: &[u8], stdout: &mut Stdout) {
|
|
stdout.write_all(buf).await.unwrap();
|
|
stdout.flush().await.unwrap();
|
|
}
|
|
fn buf_utf8_waiting_buf() -> Option<[u8; 4]> {
|
|
Some([0, 0, 0, 0])
|
|
}
|
|
async fn buf_utf8(
|
|
mut buf: &[u8],
|
|
stdout: &mut Stdout,
|
|
waiting_buf: &mut Option<[u8; 4]>,
|
|
) {
|
|
if if let Some(waiting) = waiting_buf {
|
|
// ignore bytes until a valid utf8 character is found
|
|
if 'waiting_check: {
|
|
for (i, b) in buf.iter().enumerate() {
|
|
*waiting = [waiting[1], waiting[2], waiting[3], *b];
|
|
for slice in [
|
|
&waiting[3..4],
|
|
&waiting[2..4],
|
|
&waiting[1..4],
|
|
&waiting[0..4],
|
|
] {
|
|
if str::from_utf8(slice).is_ok() {
|
|
stdout.write_all(slice).await.unwrap();
|
|
buf = &buf[1 + i..];
|
|
break 'waiting_check true;
|
|
}
|
|
}
|
|
}
|
|
false
|
|
} {
|
|
*waiting_buf = None;
|
|
true
|
|
} else {
|
|
false
|
|
}
|
|
} else {
|
|
true
|
|
} {
|
|
stdout.write_all(&buf).await.unwrap();
|
|
stdout.flush().await.unwrap();
|
|
}
|
|
}
|
|
match UnixStream::connect(&path).await {
|
|
Ok(mut con) => {
|
|
con.write_all(format!("{bytes}\n").as_bytes())
|
|
.await
|
|
.unwrap();
|
|
let mut con = BufReader::new(con);
|
|
let mut stdout = stdout();
|
|
match mode {
|
|
Mode::Server => unreachable!(),
|
|
Mode::Raw => loop {
|
|
let buf = con.fill_buf().await.unwrap();
|
|
if buf.is_empty() {
|
|
break;
|
|
}
|
|
let len = buf.len();
|
|
buf_raw(buf, &mut stdout).await;
|
|
con.consume(len);
|
|
},
|
|
Mode::Utf8 => {
|
|
let mut waiting_buf = buf_utf8_waiting_buf();
|
|
loop {
|
|
let buf = con.fill_buf().await.unwrap();
|
|
if buf.is_empty() {
|
|
break;
|
|
}
|
|
let len = buf.len();
|
|
buf_utf8(buf, &mut stdout, &mut waiting_buf).await;
|
|
con.consume(len);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
Err(e) => {
|
|
if let Ok(buf) = tokio::fs::read(&path).await {
|
|
let mut buf = buf.as_slice();
|
|
if buf.len() > bytes {
|
|
buf = &buf[buf.len() - bytes..];
|
|
}
|
|
let mut stdout = stdout();
|
|
match mode {
|
|
Mode::Server => unreachable!(),
|
|
Mode::Raw => {
|
|
buf_raw(buf, &mut stdout).await;
|
|
}
|
|
Mode::Utf8 => {
|
|
buf_utf8(buf, &mut stdout, &mut buf_utf8_waiting_buf()).await;
|
|
}
|
|
}
|
|
} else {
|
|
eprintln!(
|
|
"[logbufpipe] couldn't connect to pipe {}: {e}",
|
|
path.display()
|
|
);
|
|
return ExitCode::FAILURE;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
Mode::Server => {
|
|
{
|
|
let _ = tokio::fs::remove_file(&path).await;
|
|
match UnixListener::bind(&path) {
|
|
Ok(listener) => {
|
|
let mut cleaning_listener =
|
|
CleaningUnixListener(Some(listener), path, Arc::clone(&data));
|
|
struct CleaningUnixListener(
|
|
Option<UnixListener>,
|
|
PathBuf,
|
|
Arc<Mutex<(VecDeque<u8>, Vec<UnixStream>)>>,
|
|
);
|
|
impl Drop for CleaningUnixListener {
|
|
fn drop(&mut self) {
|
|
drop(self.0.take());
|
|
let _ = std::fs::remove_file(&self.1);
|
|
let _ = std::fs::write(
|
|
&self.1,
|
|
&*self.2.blocking_lock().0.make_contiguous(),
|
|
);
|
|
}
|
|
}
|
|
spawn(async move {
|
|
let listener = cleaning_listener.0.as_mut().unwrap();
|
|
loop {
|
|
match listener.accept().await {
|
|
Ok((mut client, _)) => {
|
|
let data = Arc::clone(&cleaning_listener.2);
|
|
spawn(async move {
|
|
'client_con: {
|
|
if let Ok(line) = BufReader::new(&mut client)
|
|
.lines()
|
|
.next_line()
|
|
.await
|
|
{
|
|
let bytes = if let Some(line) = line {
|
|
if let Ok(bytes) =
|
|
line.trim().parse::<usize>()
|
|
{
|
|
bytes
|
|
} else {
|
|
break 'client_con;
|
|
}
|
|
} else {
|
|
usize::MAX
|
|
};
|
|
let mut lock = data.lock().await;
|
|
let (bytes_a, bytes_b) = lock.0.as_slices();
|
|
let (bytes_a, bytes_b) = if bytes
|
|
< bytes_b.len()
|
|
{
|
|
(
|
|
[].as_slice(),
|
|
&bytes_b[bytes_b.len() - bytes..],
|
|
)
|
|
} else if bytes
|
|
< bytes_a.len() + bytes_b.len()
|
|
{
|
|
(
|
|
&bytes_a[bytes_a.len()
|
|
+ bytes_b.len()
|
|
- bytes..],
|
|
bytes_b,
|
|
)
|
|
} else {
|
|
(bytes_a, bytes_b)
|
|
};
|
|
if client.write_all(bytes_a).await.is_ok()
|
|
&& client
|
|
.write_all(bytes_b)
|
|
.await
|
|
.is_ok()
|
|
{
|
|
lock.1.push(client);
|
|
}
|
|
drop(lock);
|
|
}
|
|
}
|
|
});
|
|
}
|
|
Err(e) => {
|
|
eprintln!("[logbufpipe] pipe broke: {e}");
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
});
|
|
}
|
|
Err(e) => {
|
|
eprintln!("[logbufpipe] couldn't create pipe {}: {e}", path.display());
|
|
return ExitCode::FAILURE;
|
|
}
|
|
}
|
|
}
|
|
let mut stdin = BufReader::new(stdin());
|
|
let mut stdout = stdout();
|
|
loop {
|
|
if let Ok(buf) = stdin.fill_buf().await {
|
|
if buf.is_empty() {
|
|
break;
|
|
}
|
|
let _ = stdout.write_all(&buf).await;
|
|
let _ = stdout.flush().await;
|
|
let mut lock = data.lock().await;
|
|
let mut rm = Vec::new();
|
|
for (i, client) in lock.1.iter_mut().enumerate() {
|
|
if client.write_all(buf).await.is_err() {
|
|
rm.push(i);
|
|
}
|
|
}
|
|
for i in rm.into_iter().rev() {
|
|
lock.1.swap_remove(i);
|
|
}
|
|
let buffer = &mut lock.0;
|
|
let len = buf.len();
|
|
let max_len = bytes.saturating_sub(len);
|
|
if max_len > 0 {
|
|
let to_remove = buffer.len().saturating_sub(max_len);
|
|
if to_remove > 0 {
|
|
buffer.drain(0..to_remove);
|
|
}
|
|
buffer.extend(buf);
|
|
} else {
|
|
buffer.clear();
|
|
buffer.extend(&buf[len - bytes..]);
|
|
}
|
|
stdin.consume(len);
|
|
} else {
|
|
return ExitCode::FAILURE;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
eprintln!(
|
|
"[logbufpipe] {} argument must be the pipe's path",
|
|
mode.arg2s(),
|
|
);
|
|
return ExitCode::FAILURE;
|
|
};
|
|
ExitCode::SUCCESS
|
|
}
|