init (tcp, udp, ws)

This commit is contained in:
Mark
2026-04-17 01:31:23 +02:00
parent b959097944
commit 83dee38e78
8 changed files with 1735 additions and 202 deletions

786
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,9 +1,11 @@
[package] [package]
name = "wstcp" name = "wsudp"
version = "0.1.0" version = "0.1.0"
edition = "2024" edition = "2024"
[dependencies] [dependencies]
futures-util = "0.3.31" futures-util = "0.3.32"
tokio = { version = "1.45.1", features = ["macros", "net", "rt", "time"] } rustls = { version = "0.23.38", default-features = false, features = ["ring"] }
tokio-tungstenite = "0.26.2" tokio = { version = "1.52.0", features = ["macros", "net", "rt", "time"] }
tokio-tungstenite = { version = "0.29.0", features = ["rustls-tls-native-roots"] }
warp = { version = "0.4.2", features = ["websocket", "server"] }

149
src/args.rs Normal file
View File

@@ -0,0 +1,149 @@
use crate::routes::Prot;
pub struct Args {
pub verbosity: u8,
pub binds: Vec<Bind>,
}
pub struct Ref {
pub spec: String,
}
pub struct Bind {
pub spec: String,
pub id: Option<(Prot, usize)>,
pub prot: String,
pub arg: String,
pub wires: Vec<Result<Wire, Ref>>,
}
pub struct Wire {
pub spec: String,
pub prot: String,
pub arg: String,
}
pub fn args() -> Result<Args, String> {
let mut binds = Vec::new();
let mut verbosity = 0u8;
let mut args = std::env::args().skip(1);
while let Some(arg) = args.next() {
#[allow(irrefutable_let_patterns)]
if let arg = arg.trim_start()
&& arg.starts_with('-')
{
match arg {
"-h" | "--help" => {
return Err("
Arguments: [option1] [option2] [...] [bind1] [wire0] [wire1] [...] [bind2] [wire3] [...] [...]
Options:
`-h, --help` Print this text and exit.
`-v, --verbose` Output more stuff. Can be used multiple times.
`--verbosity <n>` Same as specifying `-v` n times.
Binds and Wires (non-option arguments):
Binds are specified as `<protocol>@<arguments>` or `<name>=<protocol>@<arguments>`.
Wires can be `<name>`, `<protocol>:<arguments>` or `<name>=<protocol>:<arguments>`.
Wires belong to the last bind before the wire. Just `<name>` means reusing another bind or wire.
For each bind, `wsudp` will accept (but not receive data from) connections.
Data received on a connection will be sent to its wires if at least one is present.
Data received from the first wire, if present, will be sent to the connection.
Protocols:
`tcp` - bind/wire for TCP.
note: accepts addresses as `<ipv4>:<port>`, `[<ipv6>]:<port>` or `<hostname>:<port>`.
bind: binds to the given address (e.g. `[::1]:8001`) and accepts connections.
wire: connects to the given address if a process is accepting connections on it.
`udp` - bind/wire for UDP.
note: accepts addresses as `<ipv4>:<port>`, `[<ipv6>]:<port>`, or `<hostname>:<port>`.
unlike most protocols here, udp may drop packets. non-udp programs generally
don't handle this well and may fail in some way when it happens.
tcp->udp->tcp can break things, udp->tcp->udp works but is slower.
bind: binds to the given address (e.g. `[::1]:8001`) and receives packets.
since UDP doesn't have connections, messages from the same source address
are considered being from the same connection. if a source is idle for
too long and then sends messages again, this may be considered a new connection.
wire: sends/receives messages to/from the specified address.
`ws` - bind/wire for websocket and wire for wss
note: accepts `<ipv4>:<port>`, `[<ipv6>]:<port>`, optionally with a `/<path>` suffix.
bind: binds to the given address and accepts websocket connections on any path.
wire: connects to the given address and path. prefix the address with `wss://` to use tls.
bind doesn't (yet) support dns lookups, but wire does. default ports are 80/443.
Examples:
Start a websocket server and connect clients to a udp socket:
`wsudp ws@[::1]:8001 udp:[::1]:8002`
Start a tcp server and connect clients to a websocket server,
while also sending client messages to a udp socket:
`wsudp tcp@[::1]:8001 ws:[::1]:80/api udp:127.0.0.1:8002`
Start a tcp and udp server and connect clients to a websocket server:
`wsudp tcp@[::1]:8001 web udp@[::1]:8002 web=ws:localhost:8000/socket`
"
.to_owned());
}
"-v" | "--verbose" => verbosity += 1,
"--verbosity" => {
verbosity = verbosity.saturating_add(
args.next()
.expect("Missing arguments after `--verbosity`")
.trim()
.parse()
.expect("Invalid data specified for `--verbosity`"),
)
}
_ => return Err(format!("Unknown argument: `{arg}`")),
}
} else if let Some(i) = arg.find([':', '@']) {
let is_bind = &arg[i..=i] == "@";
if binds.is_empty() && !is_bind {
return Err(format!(
"Found `:` instead of `@` when specifying a bind: `{arg}`"
));
}
let (spec, prot) = if let Some(j) = arg[..i].rfind('=') {
(&arg[..j], &arg[j + 1..i])
} else {
("", &arg[..i])
};
let arg = &arg[i + 1..];
if is_bind {
binds.push(Bind {
spec: spec.to_owned(),
id: None,
prot: prot.to_owned(),
arg: arg.to_owned(),
wires: Vec::new(),
});
} else {
binds
.last_mut()
.expect("there's a check to prevent this")
.wires
.push(Ok(Wire {
spec: spec.to_owned(),
prot: prot.to_owned(),
arg: arg.to_owned(),
}));
}
} else if let Some(bind) = binds.last_mut() {
bind.wires.push(Err(Ref {
spec: arg.to_owned(),
}));
} else {
return Err(format!("Found no `@` when specifying a bind: `{arg}`"));
}
}
Ok(Args { verbosity, binds })
}

View File

@@ -1,64 +1,178 @@
use std::time::Duration; #![allow(dead_code)]
mod args;
mod routes;
mod tcp;
mod udp;
mod ws;
use futures_util::{SinkExt, StreamExt}; use std::{collections::HashMap, process::ExitCode};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt}, use crate::routes::Routes;
net::{TcpListener, TcpStream},
spawn, // can contain most udp packets
time::sleep, const BUF_SIZE: usize = 2048;
};
use tokio_tungstenite::tungstenite::Message; // TODO:
//
// - websocket: request path (prefix) matching,
// - websocket: reuse one server for multiple routes
// - udp, ...
trait Wire: Sized {
type R: Rx;
type T: Tx;
async fn parse(arg: &str) -> Result<Self, String>;
/// Open a new connection. For most protocols, this assumes
/// that some endpoint exists to which we can try to connect.
/// (Unless the implementor also implements `Bind`, for this,
/// read the following sections and the doc of `Bind::wait()`)
///
/// For `Bind` implementors, this either has priority over `Bind::wait()` or must return `Err(_)`.
/// NOTE: The order in which `Wire::open()` and `Bind::wait()` are called
/// is not known, as this depends on runtime circumstances.
async fn open(&self) -> Result<(Self::R, Self::T), String>;
}
trait Bind: Wire {
type R: Rx;
type T: Tx;
/// This will be called repeatedly to accept incoming connections.
///
/// The implementor may simply return `Err(_)` from `Wire::open()`.
/// If not, the implementor must support the following:
/// If `Wire::open()` is called `n` times after `Bind::wait()`,
/// the next `n` incoming connections (in order) should be returned
/// from the calls to `Wire::open()` instead of `Bind::wait()`
async fn wait(&self) -> Result<(<Self as Bind>::R, <Self as Bind>::T), String>;
}
trait Rx {
type T<'a>: RxT;
async fn rx<'a>(&mut self, buf: &'a mut [u8]) -> Result<Self::T<'a>, String>;
}
trait Tx {
async fn tx(&mut self, buf: &[u8]) -> Result<(), String>;
}
#[tokio::main(flavor = "current_thread")] #[tokio::main(flavor = "current_thread")]
async fn main() { async fn main() -> ExitCode {
let addr = std::env::var("TCPWS_ADDR").expect("expected env var TCPWS_ADDR"); let mut args = match args::args() {
let listener = Ok(args) => args,
TcpListener::bind(std::env::var("TCPWS_BIND").expect("expected env var TCPWS_BIND")) Err(msg) => {
.await eprintln!("{msg}");
.unwrap(); return ExitCode::FAILURE;
loop {
if let Ok((ws, _)) = listener.accept().await {
if let Ok(con) = TcpStream::connect(&addr).await {
spawn(async move {
let (mut con_rx, mut con_tx) = con.into_split();
let ws = tokio_tungstenite::accept_async(ws).await.unwrap();
let (mut ws_tx, mut ws_rx) = ws.split();
spawn(async move {
loop {
match ws_rx.next().await {
Some(Ok(Message::Binary(data))) => {
con_tx.write_all(&data).await.unwrap();
} }
Some(Ok(Message::Text(data))) => { };
con_tx.write_all(data.as_bytes()).await.unwrap();
let mut routes = Routes::new();
routes.config.verbosity = args.verbosity;
if routes.config.verbosity >= 2 {
eprintln!("[main] preparing...");
} }
Some(Ok(
Message::Ping(_) | Message::Pong(_) | Message::Frame(_), let mut specs = HashMap::new();
)) => {} for bind in &mut args.binds {
None | Some(Err(_)) | Some(Ok(Message::Close(_))) => { match routes.bind(&bind.prot, &bind.arg).await {
con_tx.shutdown().await.ok(); Ok(id) => bind.id = Some(id),
break; Err(msg) => {
eprintln!("{msg}");
return ExitCode::FAILURE;
} }
} }
} }
}); for bind in &args.binds {
let mut buf = [0u8; 256]; if !bind.spec.is_empty() && specs.insert(bind.spec.as_str(), bind.id.unwrap()).is_some() {
loop { eprintln!("Duplicate definition for `{}`", bind.spec);
match con_rx.read(&mut buf).await { return ExitCode::FAILURE;
Err(_) | Ok(0) => {
ws_tx.close().await.ok();
break;
}
Ok(n) => ws_tx
.send(Message::binary(buf[0..n].to_vec()))
.await
.unwrap(),
} }
} }
}); for (bind, wire) in args.binds.iter().flat_map(|bind| {
let id = bind.id.expect("set in a previous loop");
bind.wires
.iter()
.filter_map(|w| w.as_ref().ok())
.map(move |w| (id, w))
}) {
match routes.wire(&wire.prot, &wire.arg).await {
Ok(id) => {
if !wire.spec.is_empty() && specs.insert(wire.spec.as_str(), id).is_some() {
eprintln!("Duplicate definition for `{}`", wire.spec);
return ExitCode::FAILURE;
}
match routes.pipe(bind, id) {
Ok(()) => {}
Err(msg) => {
eprintln!("{msg}");
return ExitCode::FAILURE;
}
}
}
Err(msg) => {
eprintln!("{msg}");
return ExitCode::FAILURE;
}
}
}
for (bind, wire) in args.binds.iter().flat_map(|bind| {
let id = bind.id.expect("set in a previous loop");
bind.wires
.iter()
.filter_map(|w| w.as_ref().err())
.map(move |w| (id, w))
}) {
if let Some(wire) = specs.get(&wire.spec.as_str()) {
match routes.pipe(bind, *wire) {
Ok(()) => {}
Err(msg) => {
eprintln!("{msg}");
return ExitCode::FAILURE;
}
} }
} else { } else {
sleep(Duration::from_millis(100)).await; if wire.spec.is_empty() {
eprintln!("Empty string `` is not a valid name");
} else {
eprintln!("No definition for `{}`", wire.spec);
} }
return ExitCode::FAILURE;
}
}
if routes.config.verbosity >= 1 {
eprintln!("[main] running.");
}
for task in routes.execute().await {
if let Err(e) = task.await {
eprintln!("[ERR] {e}");
}
}
eprintln!("[main] goodbye");
ExitCode::SUCCESS
}
trait RxT {
fn buf(&self) -> &[u8];
}
impl RxT for Vec<u8> {
fn buf(&self) -> &[u8] {
&self[..]
}
}
impl RxT for &[u8] {
fn buf(&self) -> &[u8] {
self
}
}
impl<T: Rx> Rx for &mut T {
type T<'a> = T::T<'a>;
async fn rx<'a>(&mut self, buf: &'a mut [u8]) -> Result<Self::T<'a>, String> {
(*self).rx(buf).await
}
}
impl<T: Tx> Tx for &mut T {
async fn tx(&mut self, buf: &[u8]) -> Result<(), String> {
(*self).tx(buf).await
} }
} }

332
src/routes.rs Normal file
View File

@@ -0,0 +1,332 @@
use std::sync::Arc;
use tokio::task::spawn;
#[derive(Clone, Copy, Default)]
pub struct Config {
pub verbosity: u8,
}
#[derive(Default)]
pub struct Routes {
pub config: Config,
binds: Binds,
wires: Wires,
state: State,
}
#[derive(Clone, Copy, PartialEq, Eq)]
pub struct Prot(ProtType, bool);
macro_rules! protocols {
($($p:ident:$t:ident),*) => {
#[derive(Clone, Copy, PartialEq, Eq)]
pub enum ProtType {
$($t,)*
}
pub enum ProtRx {
$($t($crate::$p::Rx),)*
}
pub enum ProtTx {
$($t($crate::$p::Tx),)*
}
#[derive(Default)]
struct Binds {
$($p: Bind<$crate::$p::Bind>,)*
}
#[derive(Default)]
struct Wires {
$($p: Wire<$crate::$p::Wire>,)*
}
#[derive(Default)]
struct State {
$($p: $crate::$p::State,)*
}
use $crate::{Rx, Tx};
impl Tx for ProtTx {
async fn tx(&mut self, buf: &[u8]) -> Result<(), String> {
match self {
$(
ProtTx::$t($crate::$p::Tx::Wait(tx)) => tx.tx(buf).await,
ProtTx::$t($crate::$p::Tx::Bind(tx)) => tx.tx(buf).await,
ProtTx::$t($crate::$p::Tx::Wire(tx)) => tx.tx(buf).await,
)*
}
}
}
async fn prot_fwd(r: &mut ProtRx, tx: &mut impl Tx) -> Result<bool, String> {
use $crate::{BUF_SIZE, RxT};
match r {
$(
ProtRx::$t($crate::$p::Rx::Wait(r)) => {
let mut buf = [0u8; BUF_SIZE];
let buf = r.rx(&mut buf).await?;
let buf = buf.buf();
if buf.is_empty() {
return Ok(false);
}
tx.tx(buf).await?;
}
ProtRx::$t($crate::$p::Rx::Bind(r)) => {
let mut buf = [0u8; BUF_SIZE];
let buf = r.rx(&mut buf).await?;
let buf = buf.buf();
if buf.is_empty() {
return Ok(false);
}
tx.tx(buf).await?;
}
ProtRx::$t($crate::$p::Rx::Wire(r)) => {
let mut buf = [0u8; BUF_SIZE];
let buf = r.rx(&mut buf).await?;
let buf = buf.buf();
if buf.is_empty() {
return Ok(false);
}
tx.tx(buf).await?;
}
,)*
}
Ok(true)
}
async fn prot_drop(r: &mut ProtRx) -> Result<bool, String> {
use $crate::{BUF_SIZE, RxT};
match r {
$(
ProtRx::$t($crate::$p::Rx::Wait(r)) => {
r.rx(&mut [0u8; BUF_SIZE]).await.map(|buf| !buf.buf().is_empty())
}
ProtRx::$t($crate::$p::Rx::Bind(r)) => {
r.rx(&mut [0u8; BUF_SIZE]).await.map(|buf| !buf.buf().is_empty())
}
ProtRx::$t($crate::$p::Rx::Wire(r)) => {
r.rx(&mut [0u8; BUF_SIZE]).await.map(|buf| !buf.buf().is_empty())
}
)*
}
}
impl Routes {
pub fn has(&self, id: (Prot, usize)) -> bool {
match (id.0.wire(), id.0.prot()) {
$((true, ProtType::$t) => self.wires.$p.get(id.1).is_some(),
(false, ProtType::$t) => self.binds.$p.get(id.1).is_some(),)*
}
}
pub async fn bind(
&mut self,
prot: impl AsRef<str>,
arg: impl AsRef<str>,
) -> Result<(Prot, usize), String> {
use $crate::Wire;
match prot.as_ref().to_lowercase().as_str() {
$(stringify!($p) => {
let bind = crate::$p::Bind::parse(arg.as_ref()).await?;
Ok((ProtType::$t.bind(), self.binds.$p.add(bind)))
},)*
_ => return Err(format!("unknown/unsupported protocol: `{}`", prot.as_ref())),
}
}
pub async fn wire(
&mut self,
prot: impl AsRef<str>,
arg: impl AsRef<str>,
) -> Result<(Prot, usize), String> {
use $crate::Wire;
match prot.as_ref().to_lowercase().as_str() {
$(stringify!($p) => {
let wire = crate::$p::Wire::parse(arg.as_ref()).await?;
Ok((ProtType::$t.wire(), self.wires.$p.add(wire)))
},)*
_ => return Err(format!("unknown/unsupported protocol: `{}`", prot.as_ref())),
}
}
pub fn pipe(&mut self, bind: (Prot, usize), wire: (Prot, usize)) -> Result<(), String> {
if !bind.0.bind() {
Err("can't pipe data from a wire (:), you need to use a bind (@).")?;
}
if !self.has(wire) {
Err("internal error: bad wire index")?;
}
match bind.0.prot() {
$(ProtType::$t => self.binds
.$p
.pipe(bind.1, wire)
.ok_or("internal error: bad bind index")?,)*
}
Ok(())
}
pub async fn execute(self) -> Vec<tokio::task::JoinHandle<()>> {
use $crate::{Bind, Rx, Tx, RxT, BUF_SIZE};
let mut tasks = Vec::new();
let config = self.config;
let routes = Arc::new(self);
$({
for i in 0..routes.binds.$p.0.len() {
let routes = Arc::clone(&routes);
tasks.push(spawn(async move {
let (bind, _) = &routes.binds.$p.0[i];
while let Ok((mut rx, mut tx)) = bind.wait().await {
if config.verbosity >= 2 {
eprintln!("[binds] new connection.");
}
let routes = Arc::clone(&routes);
spawn(async move {
let (_, wires) = &routes.binds.$p.0[i];
// connect all wires
let mut rxs = Vec::new();
let mut txs = Vec::new();
if config.verbosity >= 3 {
eprintln!("[wires] connecting...");
}
for (j, wire) in wires.iter().enumerate() {
match routes.wire_open(*wire).await {
Ok((r, t)) => {
rxs.push(r);
txs.push(t);
}
Err(e) => if config.verbosity >= if j == 0 { 1 } else { 3 } {
eprintln!("[wires] failed to connect: {e}");
}
}
}
if config.verbosity >= 2 {
eprintln!("[wires] connected to {}/{} wires.", txs.len(), wires.len());
}
// forward data to all wires
spawn(async move {
let mut buf = [0u8; BUF_SIZE];
let mut any_ok = 1;
while any_ok > 0 {
any_ok = 0;
let buf = rx.rx(&mut buf).await?;
let buf = buf.buf();
if buf.is_empty() {
break;
}
for t in &mut txs {
if t.tx(buf).await.is_ok() {
any_ok += 1;
}
}
if config.verbosity >= 4 {
eprintln!("[wires] forwarded {} bytes through {}/{} wires.", buf.len(), any_ok, txs.len());
}
}
Ok::<(), String>(())
});
// get the first wire
rxs.reverse();
let r = rxs.pop();
// drop data from other wires
spawn(async move {
let mut any_ok = true;
while any_ok {
any_ok = false;
for r in &mut rxs {
if let Ok(true) = prot_drop(r).await {
any_ok = true;
}
}
}
});
// forward data from first wire
if let Some(mut r) = r {
while let Ok(true) = prot_fwd(&mut r, &mut tx).await {
if config.verbosity >= 4 {
eprintln!("[wires] forwarded response.");
}
}
}
});
}
}));
}
})*
tasks
}
async fn wire_open(&self, id: (Prot, usize)) -> Result<(ProtRx, ProtTx), String> {
use $crate::Wire;
Ok(match (id.0.wire(), id.0.prot()) {
$((true, ProtType::$t) => {
let (r, t) = self.wires.$p.get(id.1).unwrap().open().await?;
(ProtRx::$t($crate::$p::Rx::Wire(r)), ProtTx::$t($crate::$p::Tx::Wire(t)))
}
(false, ProtType::$t) => {
let (r, t) = self.binds.$p.get(id.1).unwrap().open().await?;
(ProtRx::$t($crate::$p::Rx::Bind(r)), ProtTx::$t($crate::$p::Tx::Bind(t)))
}
)*})
}
}
};
}
protocols!(tcp:Tcp, udp:Udp, ws:Ws);
struct Bind<T>(Vec<(T, Vec<(Prot, usize)>)>);
struct Wire<T>(Vec<(T,)>);
impl Routes {
pub fn new() -> Self {
Self::default()
}
}
impl<T> Bind<T> {
fn get(&self, id: usize) -> Option<&T> {
self.0.get(id).map(|(t, _)| t)
}
fn get_mut(&mut self, id: usize) -> Option<&mut T> {
self.0.get_mut(id).map(|(t, _)| t)
}
fn add(&mut self, bind: T) -> usize {
self.0.push((bind, Vec::new()));
self.0.len() - 1
}
fn pipe(&mut self, id: usize, wire: (Prot, usize)) -> Option<()> {
let (_, pipes) = self.0.get_mut(id)?;
pipes.push(wire);
Some(())
}
}
impl<T> Wire<T> {
fn get(&self, id: usize) -> Option<&T> {
self.0.get(id).map(|(t,)| t)
}
fn get_mut(&mut self, id: usize) -> Option<&mut T> {
self.0.get_mut(id).map(|(t,)| t)
}
fn add(&mut self, wire: T) -> usize {
self.0.push((wire,));
self.0.len() - 1
}
}
impl ProtType {
fn bind(self) -> Prot {
Prot(self, false)
}
fn wire(self) -> Prot {
Prot(self, true)
}
}
impl Prot {
fn prot(&self) -> ProtType {
self.0
}
fn bind(&self) -> bool {
!self.1
}
fn wire(&self) -> bool {
self.1
}
}
impl<T> Default for Bind<T> {
fn default() -> Self {
Self(Vec::new())
}
}
impl<T> Default for Wire<T> {
fn default() -> Self {
Self(Vec::new())
}
}

89
src/tcp.rs Normal file
View File

@@ -0,0 +1,89 @@
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{
TcpListener, TcpStream,
tcp::{OwnedReadHalf, OwnedWriteHalf},
},
};
#[derive(Default)]
pub struct State;
pub struct Bind(TcpListener);
pub struct Wire(String);
pub enum Rx {
Wait(Rx1),
Bind(Rx1),
Wire(Rx1),
}
pub enum Tx {
Wait(Tx1),
Bind(Tx1),
Wire(Tx1),
}
pub struct Rx1(OwnedReadHalf);
pub struct Tx1(OwnedWriteHalf);
impl crate::Bind for Bind {
type R = Rx1;
type T = Tx1;
async fn wait(&self) -> Result<(<Self as crate::Bind>::R, <Self as crate::Bind>::T), String> {
let (con, _) = self
.0
.accept()
.await
.map_err(|e| format!("[tcp] accepting a new connection failed: {e}"))?;
let (rx, tx) = con.into_split();
Ok((Rx1(rx), Tx1(tx)))
}
}
impl crate::Wire for Bind {
type R = Rx1;
type T = Tx1;
async fn parse(arg: &str) -> Result<Self, String> {
let addr = arg;
Ok(Self(TcpListener::bind(addr).await.map_err(|e| {
format!("[tcp] binding to `{addr}` failed: {e}")
})?))
}
async fn open(&self) -> Result<(Self::R, Self::T), String> {
Err("using binds as wires is not (yet) supported for tcp".to_owned())
}
}
impl crate::Wire for Wire {
type R = Rx1;
type T = Tx1;
async fn parse(arg: &str) -> Result<Self, String> {
Ok(Self(arg.to_owned()))
}
async fn open(&self) -> Result<(Self::R, Self::T), String> {
let (rx, tx) = TcpStream::connect(&self.0)
.await
.map_err(|e| format!("[tcp] connecting to `{}` failed: {e}", &self.0))?
.into_split();
Ok((Rx1(rx), Tx1(tx)))
}
}
impl crate::Rx for Rx1 {
type T<'a> = &'a [u8];
async fn rx<'a>(&mut self, buf: &'a mut [u8]) -> Result<Self::T<'a>, String> {
let i = self
.0
.read(buf)
.await
.map_err(|e| format!("[tcp] reading bytes failed: {e}"))?;
Ok(&buf[..i])
}
}
impl crate::Tx for Tx1 {
async fn tx(&mut self, buf: &[u8]) -> Result<(), String> {
self.0
.write_all(buf)
.await
.map_err(|e| format!("[tcp] writing bytes failed: {e}"))
}
}

192
src/udp.rs Normal file
View File

@@ -0,0 +1,192 @@
use std::{
collections::{HashMap, VecDeque},
net::SocketAddr,
sync::Arc,
};
use tokio::{
net::UdpSocket,
spawn,
sync::{
Mutex, Notify,
mpsc::{Receiver, Sender},
},
time::Instant,
};
use crate::BUF_SIZE;
#[derive(Default)]
pub struct State;
pub struct Bind(
Arc<UdpSocket>,
#[allow(clippy::type_complexity)]
Arc<
Mutex<(
VecDeque<(SocketAddr, Receiver<Vec<u8>>)>,
HashMap<SocketAddr, (Sender<Vec<u8>>, u8, Instant)>,
)>,
>,
Arc<Notify>,
);
pub struct Wire(String);
pub enum Rx {
Wait(Rx1),
Bind(Rx3),
Wire(Rx3),
}
pub enum Tx {
Wait(Tx1),
Bind(Tx3),
Wire(Tx3),
}
pub struct Rx1(Receiver<Vec<u8>>);
pub struct Tx1(Arc<UdpSocket>, SocketAddr);
pub struct Rx3(Arc<UdpSocket>);
pub struct Tx3(Arc<UdpSocket>);
impl crate::Bind for Bind {
type R = Rx1;
type T = Tx1;
async fn wait(&self) -> Result<(<Self as crate::Bind>::R, <Self as crate::Bind>::T), String> {
let (addr, recv) = loop {
self.2.notified().await;
let mut lock = self.1.lock().await;
if let Some((addr, recv)) = lock.0.pop_front() {
if !lock.0.is_empty() {
self.2.notify_one();
}
break (addr, recv);
}
};
Ok((Rx1(recv), Tx1(Arc::clone(&self.0), addr)))
}
}
impl crate::Wire for Bind {
type R = Rx3;
type T = Tx3;
async fn parse(arg: &str) -> Result<Self, String> {
let addr = arg;
let socket = Arc::new(
UdpSocket::bind(addr)
.await
.map_err(|e| format!("[udp] binding socket to `{addr}` failed: {e}"))?,
);
let data = Arc::new(Mutex::new(Default::default()));
let notify = Arc::new(Notify::new());
let out = Self(Arc::clone(&socket), Arc::clone(&data), Arc::clone(&notify));
spawn(async move {
let mut i = 0u32;
loop {
i += 1;
let cleanup = i > 10000;
if cleanup {
i = 0;
}
let mut buf = [0u8; BUF_SIZE];
if let Ok((i, addr)) = socket.recv_from(&mut buf).await {
let mut lock = data.lock().await;
let (queue, map) = &mut *lock;
let mut now = None;
if cleanup {
let now = *now.get_or_insert_with(Instant::now);
map.retain(|_, (_, _, last_active)| (now - *last_active).as_secs() < 1000);
}
map.entry(addr)
.and_modify(|(send, counter, time)| {
if send.try_send(Vec::from(&buf[..i])).is_ok() {
*counter += 1;
if cleanup || *counter > 100 {
*counter = 0;
*time = *now.get_or_insert_with(Instant::now);
}
}
})
.or_insert_with(|| {
let (send, recv) = tokio::sync::mpsc::channel(4096);
queue.push_back((addr, recv));
notify.notify_one();
send.try_send(Vec::from(&buf[..i])).ok();
(send, 0, Instant::now())
});
} else {
break;
};
}
});
Ok(out)
}
async fn open(&self) -> Result<(Self::R, Self::T), String> {
Err("using binds as wires is not (yet) supported for udp".to_owned())
}
}
impl crate::Wire for Wire {
type R = Rx3;
type T = Tx3;
async fn parse(arg: &str) -> Result<Self, String> {
Ok(Self(arg.to_owned()))
}
async fn open(&self) -> Result<(Self::R, Self::T), String> {
let socket = UdpSocket::bind("[::]:0")
.await
.map_err(|e| format!("[udp] binding wire failed: {e}"))?;
socket
.connect(&self.0)
.await
.map_err(|e| format!("[udp] connecting to `{}` failed: {e}", &self.0))?;
let socket = Arc::new(socket);
Ok((Rx3(Arc::clone(&socket)), Tx3(socket)))
}
}
impl crate::Rx for Rx1 {
type T<'a> = Vec<u8>;
async fn rx<'a>(&mut self, _buf: &'a mut [u8]) -> Result<Self::T<'a>, String> {
self.0
.recv()
.await
.ok_or_else(|| "[udp] receiving message failed, channel closed".to_owned())
}
}
impl crate::Tx for Tx1 {
async fn tx(&mut self, buf: &[u8]) -> Result<(), String> {
let mut n = 0;
while n < buf.len() {
n += self
.0
.send_to(&buf[n..], &self.1)
.await
.map_err(|e| format!("[udp] sending message failed: {e}"))?;
}
Ok(())
}
}
impl crate::Rx for Rx3 {
type T<'a> = &'a [u8];
async fn rx<'a>(&mut self, buf: &'a mut [u8]) -> Result<Self::T<'a>, String> {
let i = self
.0
.recv(buf)
.await
.map_err(|e| format!("[udp] receiving message failed: {e}"))?;
Ok(&buf[..i])
}
}
impl crate::Tx for Tx3 {
async fn tx(&mut self, buf: &[u8]) -> Result<(), String> {
let mut n = 0;
while n < buf.len() {
n += self
.0
.send(&buf[n..])
.await
.map_err(|e| format!("[udp] sending message failed: {e}"))?;
}
Ok(())
}
}

151
src/ws.rs Normal file
View File

@@ -0,0 +1,151 @@
use std::net::SocketAddr;
use futures_util::{
SinkExt, StreamExt,
stream::{SplitSink, SplitStream},
};
use tokio::{
net::TcpStream,
spawn,
sync::{Mutex, mpsc::Receiver},
};
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, tungstenite::Message};
use warp::{
Filter,
filters::ws::{Message as WsMessage, WebSocket},
};
#[derive(Default)]
pub struct State;
pub struct Bind(Mutex<Receiver<WebSocket>>);
pub struct Wire(String);
pub enum Rx {
Wait(Rx1),
Bind(Rx1),
Wire(Rx3),
}
pub enum Tx {
Wait(Tx1),
Bind(Tx1),
Wire(Tx3),
}
pub struct Rx1(SplitStream<WebSocket>);
pub struct Tx1(SplitSink<WebSocket, WsMessage>);
pub struct Rx3(SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>);
pub struct Tx3(SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>);
impl crate::Bind for Bind {
type R = Rx1;
type T = Tx1;
async fn wait(&self) -> Result<(<Self as crate::Bind>::R, <Self as crate::Bind>::T), String> {
let con = self
.0
.lock()
.await
.recv()
.await
.ok_or("[ws] no new connections (sender gone)")?;
let (tx, rx) = con.split();
Ok((Rx1(rx), Tx1(tx)))
}
}
impl crate::Wire for Bind {
type R = Rx1;
type T = Tx1;
async fn parse(arg: &str) -> Result<Self, String> {
let (addr, _path) = arg.split_once('/').unwrap_or((arg, ""));
let (send, recv) = tokio::sync::mpsc::channel(32);
let routes = warp::ws().map(move |ws: warp::ws::Ws| {
let send = send.clone();
ws.on_upgrade(async move |websocket| {
send.send(websocket).await.ok();
})
});
let addr = addr
.parse::<SocketAddr>()
.map_err(|e| format!("[ws] couldn't parse address `{addr}`: {e}"))?;
spawn(warp::serve(routes).run(addr));
Ok(Self(Mutex::new(recv)))
}
async fn open(&self) -> Result<(Self::R, Self::T), String> {
Err("using binds as wires is not (yet) supported for tcp".to_owned())
}
}
impl crate::Wire for Wire {
type R = Rx3;
type T = Tx3;
async fn parse(arg: &str) -> Result<Self, String> {
Ok(Self(arg.to_owned()))
}
async fn open(&self) -> Result<(Self::R, Self::T), String> {
let arg = self.0.as_str();
let pre = if arg.starts_with("ws://") || arg.starts_with("wss://") {
""
} else {
"ws://"
};
let (ws, _) = tokio_tungstenite::connect_async(format!("{pre}{arg}"))
.await
.map_err(|e| format!("[ws] connecting to `{pre}{arg}` failed: {e}"))?;
let (tx, rx) = ws.split();
Ok((Rx3(rx), Tx3(tx)))
}
}
impl crate::Rx for Rx1 {
type T<'a> = Vec<u8>;
async fn rx<'a>(&mut self, _buf: &'a mut [u8]) -> Result<Self::T<'a>, String> {
loop {
break match self.0.next().await {
None => Ok(Vec::new()),
Some(Ok(message)) => {
let bytes = message.into_bytes().to_vec();
if bytes.is_empty() {
continue;
}
Ok(bytes)
}
Some(Err(e)) => Err(format!("[ws] receiving message failed: {e}")),
};
}
}
}
impl crate::Tx for Tx1 {
async fn tx(&mut self, buf: &[u8]) -> Result<(), String> {
self.0
.send(WsMessage::binary(buf.to_vec()))
.await
.map_err(|e| format!("[ws] writing bytes failed: {e}"))
}
}
impl crate::Rx for Rx3 {
type T<'a> = Vec<u8>;
async fn rx<'a>(&mut self, _buf: &'a mut [u8]) -> Result<Self::T<'a>, String> {
loop {
break match self.0.next().await {
None => Ok(Vec::new()),
Some(Ok(message)) => {
let bytes = message.into_data().to_vec();
if bytes.is_empty() {
continue;
}
Ok(bytes)
}
Some(Err(e)) => Err(format!("[ws] receiving message failed: {e}")),
};
}
}
}
impl crate::Tx for Tx3 {
async fn tx(&mut self, buf: &[u8]) -> Result<(), String> {
self.0
.send(Message::binary(buf.to_vec()))
.await
.map_err(|e| format!("[ws] writing bytes failed: {e}"))
}
}