init
This commit is contained in:
commit
adba8907fd
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
|||||||
|
/target
|
262
Cargo.lock
generated
Normal file
262
Cargo.lock
generated
Normal file
@ -0,0 +1,262 @@
|
|||||||
|
# This file is automatically @generated by Cargo.
|
||||||
|
# It is not intended for manual editing.
|
||||||
|
version = 4
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "addr2line"
|
||||||
|
version = "0.24.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1"
|
||||||
|
dependencies = [
|
||||||
|
"gimli",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "adler2"
|
||||||
|
version = "2.0.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "backtrace"
|
||||||
|
version = "0.3.74"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a"
|
||||||
|
dependencies = [
|
||||||
|
"addr2line",
|
||||||
|
"cfg-if",
|
||||||
|
"libc",
|
||||||
|
"miniz_oxide",
|
||||||
|
"object",
|
||||||
|
"rustc-demangle",
|
||||||
|
"windows-targets",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "bytes"
|
||||||
|
version = "1.10.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f61dac84819c6588b558454b194026eb1f09c293b9036ae9b159e74e73ab6cf9"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "cfg-if"
|
||||||
|
version = "1.0.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "gimli"
|
||||||
|
version = "0.31.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "libc"
|
||||||
|
version = "0.2.170"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "875b3680cb2f8f71bdcf9a30f38d48282f5d3c95cbf9b3fa57269bb5d5c06828"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "logbufpipe"
|
||||||
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"tokio",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "memchr"
|
||||||
|
version = "2.7.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "miniz_oxide"
|
||||||
|
version = "0.8.5"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "8e3e04debbb59698c15bacbb6d93584a8c0ca9cc3213cb423d31f760d8843ce5"
|
||||||
|
dependencies = [
|
||||||
|
"adler2",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "mio"
|
||||||
|
version = "1.0.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd"
|
||||||
|
dependencies = [
|
||||||
|
"libc",
|
||||||
|
"wasi",
|
||||||
|
"windows-sys",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "object"
|
||||||
|
version = "0.36.7"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "62948e14d923ea95ea2c7c86c71013138b66525b86bdc08d2dcc262bdb497b87"
|
||||||
|
dependencies = [
|
||||||
|
"memchr",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "pin-project-lite"
|
||||||
|
version = "0.2.16"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "proc-macro2"
|
||||||
|
version = "1.0.93"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "60946a68e5f9d28b0dc1c21bb8a97ee7d018a8b322fa57838ba31cc878e22d99"
|
||||||
|
dependencies = [
|
||||||
|
"unicode-ident",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "quote"
|
||||||
|
version = "1.0.38"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "0e4dccaaaf89514f546c693ddc140f729f958c247918a13380cccc6078391acc"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "rustc-demangle"
|
||||||
|
version = "0.1.24"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "socket2"
|
||||||
|
version = "0.5.8"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "c970269d99b64e60ec3bd6ad27270092a5394c4e309314b18ae3fe575695fbe8"
|
||||||
|
dependencies = [
|
||||||
|
"libc",
|
||||||
|
"windows-sys",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "syn"
|
||||||
|
version = "2.0.98"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "36147f1a48ae0ec2b5b3bc5b537d267457555a10dc06f3dbc8cb11ba3006d3b1"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"unicode-ident",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tokio"
|
||||||
|
version = "1.43.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e"
|
||||||
|
dependencies = [
|
||||||
|
"backtrace",
|
||||||
|
"bytes",
|
||||||
|
"libc",
|
||||||
|
"mio",
|
||||||
|
"pin-project-lite",
|
||||||
|
"socket2",
|
||||||
|
"tokio-macros",
|
||||||
|
"windows-sys",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tokio-macros"
|
||||||
|
version = "2.5.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "unicode-ident"
|
||||||
|
version = "1.0.16"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "a210d160f08b701c8721ba1c726c11662f877ea6b7094007e1ca9a1041945034"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "wasi"
|
||||||
|
version = "0.11.0+wasi-snapshot-preview1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "windows-sys"
|
||||||
|
version = "0.52.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d"
|
||||||
|
dependencies = [
|
||||||
|
"windows-targets",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "windows-targets"
|
||||||
|
version = "0.52.6"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973"
|
||||||
|
dependencies = [
|
||||||
|
"windows_aarch64_gnullvm",
|
||||||
|
"windows_aarch64_msvc",
|
||||||
|
"windows_i686_gnu",
|
||||||
|
"windows_i686_gnullvm",
|
||||||
|
"windows_i686_msvc",
|
||||||
|
"windows_x86_64_gnu",
|
||||||
|
"windows_x86_64_gnullvm",
|
||||||
|
"windows_x86_64_msvc",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "windows_aarch64_gnullvm"
|
||||||
|
version = "0.52.6"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "windows_aarch64_msvc"
|
||||||
|
version = "0.52.6"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "windows_i686_gnu"
|
||||||
|
version = "0.52.6"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "windows_i686_gnullvm"
|
||||||
|
version = "0.52.6"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "windows_i686_msvc"
|
||||||
|
version = "0.52.6"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "windows_x86_64_gnu"
|
||||||
|
version = "0.52.6"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "windows_x86_64_gnullvm"
|
||||||
|
version = "0.52.6"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "windows_x86_64_msvc"
|
||||||
|
version = "0.52.6"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
|
7
Cargo.toml
Normal file
7
Cargo.toml
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
[package]
|
||||||
|
name = "logbufpipe"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
tokio = { version = "1.43.0", features = ["rt", "sync", "io-std", "io-util", "fs", "macros", "net"] }
|
395
src/main.rs
Normal file
395
src/main.rs
Normal file
@ -0,0 +1,395 @@
|
|||||||
|
use core::str;
|
||||||
|
use std::{
|
||||||
|
collections::VecDeque,
|
||||||
|
env::{args, args_os},
|
||||||
|
path::PathBuf,
|
||||||
|
process::ExitCode,
|
||||||
|
sync::Arc,
|
||||||
|
};
|
||||||
|
|
||||||
|
use tokio::{
|
||||||
|
io::{stdin, stdout, AsyncBufReadExt, AsyncWriteExt, BufReader, Stdout},
|
||||||
|
net::{UnixListener, UnixStream},
|
||||||
|
spawn,
|
||||||
|
sync::Mutex,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[tokio::main(flavor = "current_thread")]
|
||||||
|
async fn main() -> ExitCode {
|
||||||
|
if args()
|
||||||
|
.skip(1)
|
||||||
|
.map(|arg| arg.trim().to_lowercase())
|
||||||
|
.any(|arg| arg == "-h" || arg == "--help")
|
||||||
|
{
|
||||||
|
eprintln!(
|
||||||
|
r#"[logbufpipe]
|
||||||
|
logbufpipe records data from its stdin, immediately forwards it to stdout,
|
||||||
|
and stores the data in an in-memory queue. when this queue is full, old data
|
||||||
|
is dropped so that new data can be added. You can connect to logbufpipe
|
||||||
|
through a unix socket to see the stored data and receive live updates:
|
||||||
|
|
||||||
|
# server: capture data and store it
|
||||||
|
(echo hi; sleep 5; echo goodbye) | logbufpipe 1M /tmp/lbp1
|
||||||
|
|
||||||
|
# client: connect to the server and show the output
|
||||||
|
logbufpipe utf8 1M /tmp/lbp1
|
||||||
|
|
||||||
|
In server mode, logbufpipe will open a unix socket at the given path
|
||||||
|
and keep the specified amount of data in memory. This limit is expressed as
|
||||||
|
a number n, optionally followed by one of K, M, G, or T, representing n bytes,
|
||||||
|
1024n bytes for K, 1024²n bytes for M, 1024³n bytes for G, or 1024⁴ bytes for T.
|
||||||
|
|
||||||
|
As a client, logbufpipe has multiple modes:
|
||||||
|
- raw/bytes: output the newest bytes and live data. may output half of a char.
|
||||||
|
- utf8: same as raw, but skip bytes until a valid utf8 char is encountered.
|
||||||
|
The size limit does not have to match that of the server. The smaller of the
|
||||||
|
two limits decides how much old data can be shown. If the path is a normal file
|
||||||
|
instead of a unix socket, acts as if the server had sent the data in the file.
|
||||||
|
"#
|
||||||
|
);
|
||||||
|
return ExitCode::SUCCESS;
|
||||||
|
}
|
||||||
|
let mut mode = Mode::Server;
|
||||||
|
enum Mode {
|
||||||
|
Server,
|
||||||
|
Raw,
|
||||||
|
Utf8,
|
||||||
|
}
|
||||||
|
impl Mode {
|
||||||
|
fn arg1(&self) -> usize {
|
||||||
|
if matches!(self, Self::Server) {
|
||||||
|
1
|
||||||
|
} else {
|
||||||
|
2
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fn arg2(&self) -> usize {
|
||||||
|
self.arg1() + 1
|
||||||
|
}
|
||||||
|
fn arg1s(&self) -> &'static str {
|
||||||
|
if matches!(self, Self::Server) {
|
||||||
|
"first"
|
||||||
|
} else {
|
||||||
|
"second"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fn arg2s(&self) -> &'static str {
|
||||||
|
if matches!(self, Self::Server) {
|
||||||
|
"second"
|
||||||
|
} else {
|
||||||
|
"third"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
match args()
|
||||||
|
.nth(1)
|
||||||
|
.map(|arg| arg.to_lowercase())
|
||||||
|
.as_ref()
|
||||||
|
.map(|arg| arg.as_str())
|
||||||
|
{
|
||||||
|
Some("raw" | "bytes") => mode = Mode::Raw,
|
||||||
|
Some("utf8" | "utf-8" | "utf_8") => mode = Mode::Utf8,
|
||||||
|
Some(_) => {}
|
||||||
|
None => {
|
||||||
|
eprintln!("[logbufpipe] run logbufpipe --help for help");
|
||||||
|
return ExitCode::FAILURE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let bytes = if let Some(bytes_str) = args().nth(mode.arg1()) {
|
||||||
|
let bytes_str = bytes_str.trim();
|
||||||
|
if let Ok(bytes) = bytes_str.parse() {
|
||||||
|
Some(bytes)
|
||||||
|
} else if let Some(modifier) = bytes_str
|
||||||
|
.as_bytes()
|
||||||
|
.get(bytes_str.len().saturating_sub(1))
|
||||||
|
.and_then(|m| match *m {
|
||||||
|
b'K' | b'k' => Some(1024usize),
|
||||||
|
b'M' | b'm' => 1024usize.checked_mul(1024),
|
||||||
|
b'G' | b'g' => 1024usize
|
||||||
|
.checked_mul(1024)
|
||||||
|
.and_then(|v| v.checked_mul(1024)),
|
||||||
|
b'T' | b't' => 1024usize
|
||||||
|
.checked_mul(1024)
|
||||||
|
.and_then(|v| v.checked_mul(1024))
|
||||||
|
.and_then(|v| v.checked_mul(1024)),
|
||||||
|
_ => None,
|
||||||
|
})
|
||||||
|
{
|
||||||
|
if let Ok(bytes) = bytes_str[0..bytes_str.len() - 1].trim_end().parse() {
|
||||||
|
modifier.checked_mul(bytes)
|
||||||
|
} else if let Ok(bytes) = bytes_str[0..bytes_str.len() - 1].trim_end().parse::<f64>() {
|
||||||
|
let bytes = modifier as f64 * bytes as f64;
|
||||||
|
if bytes.is_finite() && bytes >= 0.0 && bytes <= usize::MAX as f64 {
|
||||||
|
Some(bytes as usize)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
let bytes = if let Some(bytes) = bytes {
|
||||||
|
bytes
|
||||||
|
} else {
|
||||||
|
eprintln!(
|
||||||
|
"[logbufpipe] {} argument must be the buffer's size in bytes or as <num>[KMGT], prefixed with u or b for client mode",
|
||||||
|
mode.arg1s(),
|
||||||
|
);
|
||||||
|
return ExitCode::FAILURE;
|
||||||
|
};
|
||||||
|
|
||||||
|
let data = Arc::new(Mutex::new((VecDeque::<u8>::with_capacity(bytes), vec![])));
|
||||||
|
|
||||||
|
if let Some(path) = args_os().nth(mode.arg2()) {
|
||||||
|
let path = PathBuf::from(path);
|
||||||
|
match mode {
|
||||||
|
Mode::Raw | Mode::Utf8 => {
|
||||||
|
async fn buf_raw(buf: &[u8], stdout: &mut Stdout) {
|
||||||
|
stdout.write_all(buf).await.unwrap();
|
||||||
|
stdout.flush().await.unwrap();
|
||||||
|
}
|
||||||
|
fn buf_utf8_waiting_buf() -> Option<[u8; 4]> {
|
||||||
|
Some([0, 0, 0, 0])
|
||||||
|
}
|
||||||
|
async fn buf_utf8(
|
||||||
|
mut buf: &[u8],
|
||||||
|
stdout: &mut Stdout,
|
||||||
|
waiting_buf: &mut Option<[u8; 4]>,
|
||||||
|
) {
|
||||||
|
if if let Some(waiting) = waiting_buf {
|
||||||
|
// ignore bytes until a valid utf8 character is found
|
||||||
|
if 'waiting_check: {
|
||||||
|
for (i, b) in buf.iter().enumerate() {
|
||||||
|
*waiting = [waiting[1], waiting[2], waiting[3], *b];
|
||||||
|
for slice in [
|
||||||
|
&waiting[3..4],
|
||||||
|
&waiting[2..4],
|
||||||
|
&waiting[1..4],
|
||||||
|
&waiting[0..4],
|
||||||
|
] {
|
||||||
|
if str::from_utf8(slice).is_ok() {
|
||||||
|
stdout.write_all(slice).await.unwrap();
|
||||||
|
buf = &buf[1 + i..];
|
||||||
|
break 'waiting_check true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
false
|
||||||
|
} {
|
||||||
|
*waiting_buf = None;
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
true
|
||||||
|
} {
|
||||||
|
stdout.write_all(&buf).await.unwrap();
|
||||||
|
stdout.flush().await.unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
match UnixStream::connect(&path).await {
|
||||||
|
Ok(mut con) => {
|
||||||
|
con.write_all(format!("{bytes}\n").as_bytes())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let mut con = BufReader::new(con);
|
||||||
|
let mut stdout = stdout();
|
||||||
|
match mode {
|
||||||
|
Mode::Server => unreachable!(),
|
||||||
|
Mode::Raw => loop {
|
||||||
|
let buf = con.fill_buf().await.unwrap();
|
||||||
|
if buf.is_empty() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let len = buf.len();
|
||||||
|
buf_raw(buf, &mut stdout).await;
|
||||||
|
con.consume(len);
|
||||||
|
},
|
||||||
|
Mode::Utf8 => {
|
||||||
|
let mut waiting_buf = buf_utf8_waiting_buf();
|
||||||
|
loop {
|
||||||
|
let buf = con.fill_buf().await.unwrap();
|
||||||
|
if buf.is_empty() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let len = buf.len();
|
||||||
|
buf_utf8(buf, &mut stdout, &mut waiting_buf).await;
|
||||||
|
con.consume(len);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
if let Ok(buf) = tokio::fs::read(&path).await {
|
||||||
|
let mut buf = buf.as_slice();
|
||||||
|
if buf.len() > bytes {
|
||||||
|
buf = &buf[buf.len() - bytes..];
|
||||||
|
}
|
||||||
|
let mut stdout = stdout();
|
||||||
|
match mode {
|
||||||
|
Mode::Server => unreachable!(),
|
||||||
|
Mode::Raw => {
|
||||||
|
buf_raw(buf, &mut stdout).await;
|
||||||
|
}
|
||||||
|
Mode::Utf8 => {
|
||||||
|
buf_utf8(buf, &mut stdout, &mut buf_utf8_waiting_buf()).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
eprintln!(
|
||||||
|
"[logbufpipe] couldn't connect to pipe {}: {e}",
|
||||||
|
path.display()
|
||||||
|
);
|
||||||
|
return ExitCode::FAILURE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Mode::Server => {
|
||||||
|
{
|
||||||
|
let _ = tokio::fs::remove_file(&path).await;
|
||||||
|
match UnixListener::bind(&path) {
|
||||||
|
Ok(listener) => {
|
||||||
|
let mut cleaning_listener =
|
||||||
|
CleaningUnixListener(Some(listener), path, Arc::clone(&data));
|
||||||
|
struct CleaningUnixListener(
|
||||||
|
Option<UnixListener>,
|
||||||
|
PathBuf,
|
||||||
|
Arc<Mutex<(VecDeque<u8>, Vec<UnixStream>)>>,
|
||||||
|
);
|
||||||
|
impl Drop for CleaningUnixListener {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
drop(self.0.take());
|
||||||
|
let _ = std::fs::remove_file(&self.1);
|
||||||
|
let _ = std::fs::write(
|
||||||
|
&self.1,
|
||||||
|
&*self.2.blocking_lock().0.make_contiguous(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
spawn(async move {
|
||||||
|
let listener = cleaning_listener.0.as_mut().unwrap();
|
||||||
|
loop {
|
||||||
|
match listener.accept().await {
|
||||||
|
Ok((mut client, _)) => {
|
||||||
|
let data = Arc::clone(&cleaning_listener.2);
|
||||||
|
spawn(async move {
|
||||||
|
'client_con: {
|
||||||
|
if let Ok(line) = BufReader::new(&mut client)
|
||||||
|
.lines()
|
||||||
|
.next_line()
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
let bytes = if let Some(line) = line {
|
||||||
|
if let Ok(bytes) =
|
||||||
|
line.trim().parse::<usize>()
|
||||||
|
{
|
||||||
|
bytes
|
||||||
|
} else {
|
||||||
|
break 'client_con;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
usize::MAX
|
||||||
|
};
|
||||||
|
let mut lock = data.lock().await;
|
||||||
|
let (bytes_a, bytes_b) = lock.0.as_slices();
|
||||||
|
let (bytes_a, bytes_b) = if bytes
|
||||||
|
< bytes_b.len()
|
||||||
|
{
|
||||||
|
(
|
||||||
|
[].as_slice(),
|
||||||
|
&bytes_b[bytes_b.len() - bytes..],
|
||||||
|
)
|
||||||
|
} else if bytes
|
||||||
|
< bytes_a.len() + bytes_b.len()
|
||||||
|
{
|
||||||
|
(
|
||||||
|
&bytes_a[bytes_a.len()
|
||||||
|
+ bytes_b.len()
|
||||||
|
- bytes..],
|
||||||
|
bytes_b,
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
(bytes_a, bytes_b)
|
||||||
|
};
|
||||||
|
if client.write_all(bytes_a).await.is_ok()
|
||||||
|
&& client
|
||||||
|
.write_all(bytes_b)
|
||||||
|
.await
|
||||||
|
.is_ok()
|
||||||
|
{
|
||||||
|
lock.1.push(client);
|
||||||
|
}
|
||||||
|
drop(lock);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
eprintln!("[logbufpipe] pipe broke: {e}");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
eprintln!("[logbufpipe] couldn't create pipe {}: {e}", path.display());
|
||||||
|
return ExitCode::FAILURE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let mut stdin = BufReader::new(stdin());
|
||||||
|
let mut stdout = stdout();
|
||||||
|
loop {
|
||||||
|
if let Ok(buf) = stdin.fill_buf().await {
|
||||||
|
if buf.is_empty() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let _ = stdout.write_all(&buf).await;
|
||||||
|
let _ = stdout.flush().await;
|
||||||
|
let mut lock = data.lock().await;
|
||||||
|
let mut rm = Vec::new();
|
||||||
|
for (i, client) in lock.1.iter_mut().enumerate() {
|
||||||
|
if client.write_all(buf).await.is_err() {
|
||||||
|
rm.push(i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for i in rm.into_iter().rev() {
|
||||||
|
lock.1.swap_remove(i);
|
||||||
|
}
|
||||||
|
let buffer = &mut lock.0;
|
||||||
|
let len = buf.len();
|
||||||
|
let max_len = bytes.saturating_sub(len);
|
||||||
|
if max_len > 0 {
|
||||||
|
let to_remove = buffer.len().saturating_sub(max_len);
|
||||||
|
if to_remove > 0 {
|
||||||
|
buffer.drain(0..to_remove);
|
||||||
|
}
|
||||||
|
buffer.extend(buf);
|
||||||
|
} else {
|
||||||
|
buffer.clear();
|
||||||
|
buffer.extend(&buf[len - bytes..]);
|
||||||
|
}
|
||||||
|
stdin.consume(len);
|
||||||
|
} else {
|
||||||
|
return ExitCode::FAILURE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
eprintln!(
|
||||||
|
"[logbufpipe] {} argument must be the pipe's path",
|
||||||
|
mode.arg2s(),
|
||||||
|
);
|
||||||
|
return ExitCode::FAILURE;
|
||||||
|
};
|
||||||
|
ExitCode::SUCCESS
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user