1use std::{
2 sync::{Arc, atomic::AtomicBool},
3 time::Duration,
4};
5
6use super::state::SyncedServerState;
7use futures_util::{
8 SinkExt, StreamExt,
9 stream::{SplitSink, SplitStream},
10};
11use tokio::{spawn, sync::Mutex, task::AbortHandle, time::sleep};
12use tokio_tungstenite::{WebSocketStream, tungstenite::Message};
13
14#[cfg(test)]
15use crate::mock::TcpStream;
16#[cfg(not(test))]
17use tokio::net::TcpStream;
18
19use crate::{
20 lobby::state::{
21 PlayerJoinError, SharedLobbyState,
22 clients::{PlayerId, Spectator},
23 players::Player,
24 },
25 log,
26 messages::{
27 MessageTx, RxError, RxMessage,
28 character_chosen::CharacterChosen,
29 client_role::{ClientRole, PlayerRole},
30 connect_game::{ConnectGame, ConnectGameError},
31 connected::{Connected, ConnectedAsSpectator},
32 error::{ErrorInvalid, ErrorMessage, error_code},
33 hello_client::HelloClient,
34 reconnect::Reconnect,
35 text_message::TextMessage,
36 },
37};
38
39pub async fn handle(
40 con: TcpStream,
41 server_state: &SyncedServerState,
42 in_lobby: &mut Option<(SharedLobbyState, String, Option<PlayerId>)>,
43 abort_handle: AbortHandle,
44) -> Result<(), ConnectionError> {
45 let websocket = tokio_tungstenite::accept_async(con).await?;
46 let (ws_send, mut ws_recv) = websocket.split();
47 let ws_send = Arc::new(Mutex::new(ws_send));
48 let active = Arc::new(AtomicBool::new(true));
49
50 spawn({
52 let send = Arc::clone(&ws_send);
53 let active = Arc::clone(&active);
54 async move {
55 loop {
56 sleep(Duration::from_secs(25)).await;
57 if send
58 .lock()
59 .await
60 .send(Message::Ping(Default::default()))
61 .await
62 .is_err()
63 {
64 active.store(false, std::sync::atomic::Ordering::Relaxed);
65 break;
66 }
67 }
68 }
69 });
70
71 let con = Connection::Real {
72 send: Arc::clone(&ws_send),
73 active,
74 abort_handle,
75 };
76
77 'list_join_and_handle_lobby: loop {
79 let (lobby, name, is_player) =
80 pre_lobby(&mut ws_recv, &ws_send, server_state, &con).await?;
81 let mut pfx = log::pfx();
82 pfx.lobby(lobby.id());
83
84 *in_lobby = Some((lobby.clone(), name.clone(), is_player));
87
88 loop {
89 match recv_msg(&mut ws_recv, &ws_send).await? {
90 (Ok(RxMessage::TextMessage(TextMessage { message })), _) => {
91 let mut lobby_lock = lobby.lock().await;
92 lobby_lock.broadcast_chat_message(&name, &message).await;
93 }
94 (Ok(RxMessage::LeaveLobby), _) => {
95 let mut lobby_lock = lobby.lock().await;
96 if lobby_lock.client_leave(&name, is_player).await {
99 drop(lobby_lock);
101 let hello_client = HelloClient::new_from_server_state(server_state)
102 .await
103 .serialize();
104 con.send(&hello_client).await?;
105 continue 'list_join_and_handle_lobby;
106 } else {
107 log::warning!("client_leave failed! Invalid state?"; &pfx);
108 }
109 }
110 (Ok(msg), original_message) => {
111 let mut lobby_lock = lobby.lock().await;
112 if let Some(player_id) = is_player {
115 if lobby_lock.game_started() {
116 lobby_lock
118 .message_from(&player_id, msg, original_message)
119 .await;
120 } else {
121 match msg {
123 RxMessage::Ready => {
124 let player = lobby_lock
125 .clients
126 .players
127 .get_mut(&player_id)
128 .ok_or(ConnectionError::InternalStateError)?;
129 if !player.ready {
130 player.ready = true;
131 lobby_lock.broadcast_lobby_info().await;
132 lobby_lock.consider_starting_game().await;
133 }
134 }
135 RxMessage::CharacterChosen(CharacterChosen { character }) => {
136 let character_taken = character
137 .0
138 .is_some_and(|ch| lobby_lock.character_taken(ch));
139 let player = lobby_lock
140 .clients
141 .players
142 .get_mut(&player_id)
143 .ok_or(ConnectionError::InternalStateError)?;
144 if player.character != character.0 {
145 if character_taken {
146 drop(lobby_lock);
147 con.send(&ErrorMessage {
148 code: error_code::CHARACTER_TAKEN,
149 reason: "This character has already been picked by another player",
150 }.serialize()).await?;
151 } else {
152 player.character = character.0;
153 lobby_lock.broadcast_lobby_info().await;
154 lobby_lock.consider_starting_game().await;
158 }
159 }
160 }
161 _ => {
162 drop(lobby_lock);
163 con.send(
164 &ErrorInvalid {
165 original_message: &original_message,
166 }
167 .serialize(),
168 )
169 .await?;
170 }
171 }
172 }
173 } else {
174 drop(lobby_lock);
176 con.send(
177 &ErrorInvalid {
178 original_message: &original_message,
179 }
180 .serialize(),
181 )
182 .await?;
183 }
184 }
185 (Err(e), original_message) => {
186 log::debug!("Message validation error: {}", e; &pfx);
187 con.send(
188 &ErrorInvalid {
189 original_message: &original_message,
190 }
191 .serialize(),
192 )
193 .await?;
194 }
195 }
196 }
197 #[expect(unreachable_code)]
198 "should not go here, ever";
199 }
200}
201
202async fn pre_lobby(
203 ws_recv: &mut SplitStream<WebSocketStream<TcpStream>>,
204 ws_send: &Mutex<SplitSink<WebSocketStream<TcpStream>, Message>>,
205 server_state: &SyncedServerState,
206 con: &Connection,
207) -> Result<(SharedLobbyState, String, Option<PlayerId>), ConnectionError> {
208 Ok(loop {
209 match recv_msg(ws_recv, ws_send).await? {
218 (Ok(RxMessage::HelloServer), _) => {
219 let hello_client = HelloClient::new_from_server_state(server_state)
221 .await
222 .serialize();
223 con.send(&hello_client).await?;
224 }
225 (
226 Ok(RxMessage::ConnectGame(ConnectGame {
227 name,
228 role,
229 lobby_id,
230 })),
231 _,
232 ) => {
233 let server_state_lock = server_state.lock().await;
234 let lobby = server_state_lock.lobby(&lobby_id);
235 drop(server_state_lock);
236 if let Some(lobby) = lobby {
237 match role {
238 ClientRole::Player | ClientRole::AI => {
239 let role = match role {
240 ClientRole::Player => PlayerRole::Player,
241 ClientRole::AI => PlayerRole::AI,
242 ClientRole::Spectator => unreachable!(),
243 };
244 let player = Player::new(name.clone(), role, con.clone());
245 let mut lobby_lock = lobby.lock().await;
246 let join_result = lobby_lock
247 .player_join(player, || async {
248 server_state
249 .lock()
250 .await
251 .gen_random_player_id_and_reconnect_token(lobby_id)
252 })
253 .await;
254 match join_result {
255 Ok((player_id, reconnect_token)) => {
256 let game_may_start =
257 lobby_lock.could_start_timer_for_game_start();
258 drop(lobby_lock);
259 if game_may_start {
260 let lobby = lobby.clone();
261 spawn(async move {
262 lobby.start_game_after_timeout(true).await;
263 });
264 }
265 let connected =
266 Connected::new(reconnect_token, player_id).serialize();
267 let send_connected_result = con.send(&connected).await;
268 lobby.lock().await.broadcast_lobby_info().await;
269 send_connected_result?;
270 break (lobby, name, Some(player_id));
271 }
272 Err(PlayerJoinError::GameAlreadyStarted) => {
273 let err = ErrorMessage {
274 code: error_code::GAME_ALREADY_STARTED,
275 reason: "This game has already started, you can only join as a spectator",
276 }.serialize();
277 drop(lobby_lock);
278 con.send(&err).await?;
279 }
280 Err(PlayerJoinError::LobbyFull) => {
281 let err = ErrorMessage {
282 code: error_code::LOBBY_FULL,
283 reason: "This lobby is already full",
284 }
285 .serialize();
286 drop(lobby_lock);
287 con.send(&err).await?;
288 }
289 Err(PlayerJoinError::NameInUse) => {
290 let err = ErrorMessage {
291 code: error_code::NAME_IN_USE,
292 reason: "this name is already used in this lobby",
293 }
294 .serialize();
295 drop(lobby_lock);
296 con.send(&err).await?;
297 }
298 }
299 }
300 ClientRole::Spectator => {
301 let mut lobby_lock = lobby.lock().await;
302 let join_result = lobby_lock
303 .spectator_join(Spectator::new(name.clone(), con.clone()));
304 drop(lobby_lock);
305 match join_result {
306 Ok(()) => {
307 let connected = ConnectedAsSpectator::new().serialize();
308 let send_connected_result = con.send(&connected).await;
309 lobby.lock().await.broadcast_lobby_info().await;
310 send_connected_result?;
311 break (lobby, name, None);
312 }
313 }
314 }
315 }
316 } else {
317 con.send(
318 &ErrorMessage {
319 code: error_code::NO_LOBBY_WITH_ID,
320 reason: "there is no lobby with this id",
321 }
322 .serialize(),
323 )
324 .await?;
325 }
326 }
327 (
328 Ok(RxMessage::Reconnect(Reconnect {
329 lobby_id,
330 player_id,
331 reconnect_token,
332 })),
333 _,
334 ) => {
335 let lobby = match {
336 let server_state_lock = server_state.lock().await;
337 server_state_lock.lobby(&lobby_id)
338 } {
339 Some(l) => l,
340 None => {
341 con.send(
342 &ErrorMessage {
343 code: error_code::NO_LOBBY_WITH_ID,
344 reason: "there is no lobby with this id",
345 }
346 .serialize(),
347 )
348 .await?;
349 continue;
350 }
351 };
352
353 let mut lobby_lock = lobby.lock().await;
354
355 if !lobby_lock.game_started() {
356 drop(lobby_lock);
357 con.send(
358 &ErrorMessage {
359 code: error_code::GAME_NOT_STARTED,
360 reason: "the game has not started yet",
361 }
362 .serialize(),
363 )
364 .await?;
365 continue;
366 }
367
368 if let Some(player) = lobby_lock.clients.players.get_mut(&player_id) {
369 if player.reconnect_token() == &reconnect_token {
370 let player_name = player.name().to_string();
371 log::info!("player '{}' reconnected", player_name; &log::pfx().lobby(lobby_id).player(player_id));
372 if let Some(old_conn) = player.con.replace(con.clone()) {
374 old_conn.terminate();
375 }
376
377 lobby_lock.broadcast_lobby_info().await;
378 lobby_lock.broadcast_gamestate().await;
379 drop(lobby_lock);
380 break (lobby, player_name, Some(player_id));
381 } else {
382 drop(lobby_lock);
383 con.send(
384 &ErrorMessage {
385 code: error_code::WRONG_RECONNECT_TOKEN,
386 reason: "this reconnect token is not valid",
387 }
388 .serialize(),
389 )
390 .await?;
391 }
392 } else {
393 drop(lobby_lock);
394 con.send(
395 &ErrorMessage {
396 code: error_code::NO_PLAYER_WITH_ID,
397 reason: "there is no player with this id",
398 }
399 .serialize(),
400 )
401 .await?;
402 }
403 }
404 (Err(RxError::ConnectGame(ConnectGameError::InvalidNameLength)), _) => {
405 con.send(
406 &ErrorMessage {
407 reason: &(ConnectGameError::InvalidNameLength).to_string(),
408 code: error_code::NAME_TOO_LONG,
409 }
410 .serialize(),
411 )
412 .await?;
413 }
414 (poss_e @ (Ok(_) | Err(RxError::InvalidMessage(_))), original_message) => {
415 if let Err(e) = poss_e {
416 log::debug!("Message validation error: {}", e; &log::pfx());
417 }
418 con.send(
419 &ErrorInvalid {
420 original_message: &original_message,
421 }
422 .serialize(),
423 )
424 .await?;
425 }
426 }
427 })
428}
429
430async fn recv_msg(
431 ws_recv: &mut SplitStream<WebSocketStream<TcpStream>>,
432 ws_send: &Mutex<SplitSink<WebSocketStream<TcpStream>, Message>>,
433) -> tokio_tungstenite::tungstenite::Result<(Result<RxMessage, RxError>, String)> {
434 let msg = recv_text(ws_recv, ws_send).await?;
435 Ok((RxMessage::from_str(&msg), msg))
436}
437
438async fn recv_text(
439 ws_recv: &mut SplitStream<WebSocketStream<TcpStream>>,
440 ws_send: &Mutex<SplitSink<WebSocketStream<TcpStream>, Message>>,
441) -> tokio_tungstenite::tungstenite::Result<String> {
442 while let Some(Ok(msg)) = ws_recv.next().await {
443 match msg {
444 Message::Text(msg) => return Ok(msg.to_string()),
445 Message::Ping(payload) => ws_send.lock().await.send(Message::Pong(payload)).await?,
446 Message::Binary(_) | Message::Frame(_) | Message::Pong(_) => {}
447 Message::Close(_) => {
448 let _ = ws_send.lock().await.close().await;
449 break;
450 }
451 }
452 }
453 Err(tokio_tungstenite::tungstenite::Error::ConnectionClosed)
454}
455
456#[derive(Debug)]
457pub enum ConnectionError {
458 ProtocolError(ProtocolError),
459 MalformedJson(serde_json::Error),
460 InternalStateError,
461 WsError(tokio_tungstenite::tungstenite::Error),
462}
463#[derive(Debug)]
464pub enum ProtocolError {
465 OnlyHelloServerOrConnectGameAllowedAtThisTime { incorrect_message_type: String },
466}
467impl From<ProtocolError> for ConnectionError {
468 fn from(value: ProtocolError) -> Self {
469 Self::ProtocolError(value)
470 }
471}
472impl From<serde_json::Error> for ConnectionError {
473 fn from(value: serde_json::Error) -> Self {
474 Self::MalformedJson(value)
475 }
476}
477impl From<tokio_tungstenite::tungstenite::Error> for ConnectionError {
478 fn from(value: tokio_tungstenite::tungstenite::Error) -> Self {
479 Self::WsError(value)
480 }
481}
482
483#[derive(Clone, Debug)]
484pub enum Connection {
485 Real {
486 send: Arc<Mutex<SplitSink<WebSocketStream<TcpStream>, Message>>>,
487 active: Arc<AtomicBool>,
488 abort_handle: AbortHandle,
489 },
490 Fake {
491 send: tokio::sync::mpsc::UnboundedSender<String>,
492 },
493}
494impl Connection {
495 #[cfg(test)]
496 pub(crate) fn fake() -> (Self, tokio::sync::mpsc::UnboundedReceiver<String>) {
497 let (send, recv) = tokio::sync::mpsc::unbounded_channel();
498 (Self::Fake { send }, recv)
499 }
500 pub fn is_active(&self) -> bool {
501 match self {
502 Self::Real {
503 send: _,
504 active,
505 abort_handle: _,
506 } => active.load(std::sync::atomic::Ordering::Relaxed),
507 Self::Fake { send } => !send.is_closed(),
508 }
509 }
510 pub async fn send(&self, message: &str) -> Result<(), tokio_tungstenite::tungstenite::Error> {
511 match self {
512 Self::Real {
513 send,
514 active: _,
515 abort_handle: _,
516 } => send.lock().await.send(Message::text(message)).await,
517 Self::Fake { send } => send
518 .send(message.to_owned())
519 .map_err(|_| tokio_tungstenite::tungstenite::Error::ConnectionClosed),
520 }
521 }
522 pub fn terminate(&self) {
526 if let Self::Real {
527 abort_handle,
528 send: _,
529 active: _,
530 } = self
531 {
532 abort_handle.abort();
533 }
534 }
535}