team04_server/server/
connection.rs

1use std::{
2    sync::{Arc, atomic::AtomicBool},
3    time::Duration,
4};
5
6use super::state::SyncedServerState;
7use futures_util::{
8    SinkExt, StreamExt,
9    stream::{SplitSink, SplitStream},
10};
11use tokio::{spawn, sync::Mutex, task::AbortHandle, time::sleep};
12use tokio_tungstenite::{WebSocketStream, tungstenite::Message};
13
14#[cfg(test)]
15use crate::mock::TcpStream;
16#[cfg(not(test))]
17use tokio::net::TcpStream;
18
19use crate::{
20    lobby::state::{
21        PlayerJoinError, SharedLobbyState,
22        clients::{PlayerId, Spectator},
23        players::Player,
24    },
25    log,
26    messages::{
27        MessageTx, RxError, RxMessage,
28        character_chosen::CharacterChosen,
29        client_role::{ClientRole, PlayerRole},
30        connect_game::{ConnectGame, ConnectGameError},
31        connected::{Connected, ConnectedAsSpectator},
32        error::{ErrorInvalid, ErrorMessage, error_code},
33        hello_client::HelloClient,
34        reconnect::Reconnect,
35        text_message::TextMessage,
36    },
37};
38
39pub async fn handle(
40    con: TcpStream,
41    server_state: &SyncedServerState,
42    in_lobby: &mut Option<(SharedLobbyState, String, Option<PlayerId>)>,
43    abort_handle: AbortHandle,
44) -> Result<(), ConnectionError> {
45    let websocket = tokio_tungstenite::accept_async(con).await?;
46    let (ws_send, mut ws_recv) = websocket.split();
47    let ws_send = Arc::new(Mutex::new(ws_send));
48    let active = Arc::new(AtomicBool::new(true));
49
50    // send periodic pings to keep the connection alive and to check if the client has disconnected
51    spawn({
52        let send = Arc::clone(&ws_send);
53        let active = Arc::clone(&active);
54        async move {
55            loop {
56                sleep(Duration::from_secs(25)).await;
57                if send
58                    .lock()
59                    .await
60                    .send(Message::Ping(Default::default()))
61                    .await
62                    .is_err()
63                {
64                    active.store(false, std::sync::atomic::Ordering::Relaxed);
65                    break;
66                }
67            }
68        }
69    });
70
71    let con = Connection::Real {
72        send: Arc::clone(&ws_send),
73        active,
74        abort_handle,
75    };
76
77    // client connection can transition from in-lobby and not-in-lobby, so we loop
78    'list_join_and_handle_lobby: loop {
79        let (lobby, name, is_player) =
80            pre_lobby(&mut ws_recv, &ws_send, server_state, &con).await?;
81        let mut pfx = log::pfx();
82        pfx.lobby(lobby.id());
83
84        // client is now in a lobby
85
86        *in_lobby = Some((lobby.clone(), name.clone(), is_player));
87
88        loop {
89            match recv_msg(&mut ws_recv, &ws_send).await? {
90                (Ok(RxMessage::TextMessage(TextMessage { message })), _) => {
91                    let mut lobby_lock = lobby.lock().await;
92                    lobby_lock.broadcast_chat_message(&name, &message).await;
93                }
94                (Ok(RxMessage::LeaveLobby), _) => {
95                    let mut lobby_lock = lobby.lock().await;
96                    // this message is only valid if the game has not yet started or the client is a spectator,
97                    // this check is implemented in `client_leave` tho
98                    if lobby_lock.client_leave(&name, is_player).await {
99                        // update client by sending a HELLO_CLIENT unprompted
100                        drop(lobby_lock);
101                        let hello_client = HelloClient::new_from_server_state(server_state)
102                            .await
103                            .serialize();
104                        con.send(&hello_client).await?;
105                        continue 'list_join_and_handle_lobby;
106                    } else {
107                        log::warning!("client_leave failed! Invalid state?"; &pfx);
108                    }
109                }
110                (Ok(msg), original_message) => {
111                    let mut lobby_lock = lobby.lock().await;
112                    // was not a chat message, if client is a player (not spectator), see if it's either READY or CHARACTER_CHOSEN,
113                    // otherwise respond with the error as spectators can only chat in this phase.
114                    if let Some(player_id) = is_player {
115                        if lobby_lock.game_started() {
116                            // is handled outside of the server module
117                            lobby_lock
118                                .message_from(&player_id, msg, original_message)
119                                .await;
120                        } else {
121                            // handle non-spectator messages too
122                            match msg {
123                                RxMessage::Ready => {
124                                    let player = lobby_lock
125                                        .clients
126                                        .players
127                                        .get_mut(&player_id)
128                                        .ok_or(ConnectionError::InternalStateError)?;
129                                    if !player.ready {
130                                        player.ready = true;
131                                        lobby_lock.broadcast_lobby_info().await;
132                                        lobby_lock.consider_starting_game().await;
133                                    }
134                                }
135                                RxMessage::CharacterChosen(CharacterChosen { character }) => {
136                                    let character_taken = character
137                                        .0
138                                        .is_some_and(|ch| lobby_lock.character_taken(ch));
139                                    let player = lobby_lock
140                                        .clients
141                                        .players
142                                        .get_mut(&player_id)
143                                        .ok_or(ConnectionError::InternalStateError)?;
144                                    if player.character != character.0 {
145                                        if character_taken {
146                                            drop(lobby_lock);
147                                            con.send(&ErrorMessage {
148                                                code: error_code::CHARACTER_TAKEN,
149                                                reason: "This character has already been picked by another player",
150                                            }.serialize()).await?;
151                                        } else {
152                                            player.character = character.0;
153                                            lobby_lock.broadcast_lobby_info().await;
154                                            // choosing a character *shouldn't* influence the game starting,
155                                            // but in case this changes in the future, just to be sure,
156                                            // we re-check the game start conditions anyway.
157                                            lobby_lock.consider_starting_game().await;
158                                        }
159                                    }
160                                }
161                                _ => {
162                                    drop(lobby_lock);
163                                    con.send(
164                                        &ErrorInvalid {
165                                            original_message: &original_message,
166                                        }
167                                        .serialize(),
168                                    )
169                                    .await?;
170                                }
171                            }
172                        }
173                    } else {
174                        // client is spectator, respond with an error
175                        drop(lobby_lock);
176                        con.send(
177                            &ErrorInvalid {
178                                original_message: &original_message,
179                            }
180                            .serialize(),
181                        )
182                        .await?;
183                    }
184                }
185                (Err(e), original_message) => {
186                    log::debug!("Message validation error: {}", e; &pfx);
187                    con.send(
188                        &ErrorInvalid {
189                            original_message: &original_message,
190                        }
191                        .serialize(),
192                    )
193                    .await?;
194                }
195            }
196        }
197        #[expect(unreachable_code)]
198        "should not go here, ever";
199    }
200}
201
202async fn pre_lobby(
203    ws_recv: &mut SplitStream<WebSocketStream<TcpStream>>,
204    ws_send: &Mutex<SplitSink<WebSocketStream<TcpStream>, Message>>,
205    server_state: &SyncedServerState,
206    con: &Connection,
207) -> Result<(SharedLobbyState, String, Option<PlayerId>), ConnectionError> {
208    Ok(loop {
209        // handle HELLO_SERVER and CONNECT_GAME until a CONNECT_GAME is successful
210
211        // respond to HELLO_SERVER messages from the client.
212        // since UIs may want to refresh this information, this is allowed to happen repeatedly (or not at all).
213        // since "not at all, once, or repeatedly" includes "once", this will not break standard-compliant clients
214        // (although it is more lenient, which may be a problem if strictness is enforced in the standard).
215
216        // await a HELLO_SERVER message from the client
217        match recv_msg(ws_recv, ws_send).await? {
218            (Ok(RxMessage::HelloServer), _) => {
219                // respond with HELLO_CLIENT
220                let hello_client = HelloClient::new_from_server_state(server_state)
221                    .await
222                    .serialize();
223                con.send(&hello_client).await?;
224            }
225            (
226                Ok(RxMessage::ConnectGame(ConnectGame {
227                    name,
228                    role,
229                    lobby_id,
230                })),
231                _,
232            ) => {
233                let server_state_lock = server_state.lock().await;
234                let lobby = server_state_lock.lobby(&lobby_id);
235                drop(server_state_lock);
236                if let Some(lobby) = lobby {
237                    match role {
238                        ClientRole::Player | ClientRole::AI => {
239                            let role = match role {
240                                ClientRole::Player => PlayerRole::Player,
241                                ClientRole::AI => PlayerRole::AI,
242                                ClientRole::Spectator => unreachable!(),
243                            };
244                            let player = Player::new(name.clone(), role, con.clone());
245                            let mut lobby_lock = lobby.lock().await;
246                            let join_result = lobby_lock
247                                .player_join(player, || async {
248                                    server_state
249                                        .lock()
250                                        .await
251                                        .gen_random_player_id_and_reconnect_token(lobby_id)
252                                })
253                                .await;
254                            match join_result {
255                                Ok((player_id, reconnect_token)) => {
256                                    let game_may_start =
257                                        lobby_lock.could_start_timer_for_game_start();
258                                    drop(lobby_lock);
259                                    if game_may_start {
260                                        let lobby = lobby.clone();
261                                        spawn(async move {
262                                            lobby.start_game_after_timeout(true).await;
263                                        });
264                                    }
265                                    let connected =
266                                        Connected::new(reconnect_token, player_id).serialize();
267                                    let send_connected_result = con.send(&connected).await;
268                                    lobby.lock().await.broadcast_lobby_info().await;
269                                    send_connected_result?;
270                                    break (lobby, name, Some(player_id));
271                                }
272                                Err(PlayerJoinError::GameAlreadyStarted) => {
273                                    let err = ErrorMessage {
274                                        code: error_code::GAME_ALREADY_STARTED,
275                                        reason: "This game has already started, you can only join as a spectator",
276                                    }.serialize();
277                                    drop(lobby_lock);
278                                    con.send(&err).await?;
279                                }
280                                Err(PlayerJoinError::LobbyFull) => {
281                                    let err = ErrorMessage {
282                                        code: error_code::LOBBY_FULL,
283                                        reason: "This lobby is already full",
284                                    }
285                                    .serialize();
286                                    drop(lobby_lock);
287                                    con.send(&err).await?;
288                                }
289                                Err(PlayerJoinError::NameInUse) => {
290                                    let err = ErrorMessage {
291                                        code: error_code::NAME_IN_USE,
292                                        reason: "this name is already used in this lobby",
293                                    }
294                                    .serialize();
295                                    drop(lobby_lock);
296                                    con.send(&err).await?;
297                                }
298                            }
299                        }
300                        ClientRole::Spectator => {
301                            let mut lobby_lock = lobby.lock().await;
302                            let join_result = lobby_lock
303                                .spectator_join(Spectator::new(name.clone(), con.clone()));
304                            drop(lobby_lock);
305                            match join_result {
306                                Ok(()) => {
307                                    let connected = ConnectedAsSpectator::new().serialize();
308                                    let send_connected_result = con.send(&connected).await;
309                                    lobby.lock().await.broadcast_lobby_info().await;
310                                    send_connected_result?;
311                                    break (lobby, name, None);
312                                }
313                            }
314                        }
315                    }
316                } else {
317                    con.send(
318                        &ErrorMessage {
319                            code: error_code::NO_LOBBY_WITH_ID,
320                            reason: "there is no lobby with this id",
321                        }
322                        .serialize(),
323                    )
324                    .await?;
325                }
326            }
327            (
328                Ok(RxMessage::Reconnect(Reconnect {
329                    lobby_id,
330                    player_id,
331                    reconnect_token,
332                })),
333                _,
334            ) => {
335                let lobby = match {
336                    let server_state_lock = server_state.lock().await;
337                    server_state_lock.lobby(&lobby_id)
338                } {
339                    Some(l) => l,
340                    None => {
341                        con.send(
342                            &ErrorMessage {
343                                code: error_code::NO_LOBBY_WITH_ID,
344                                reason: "there is no lobby with this id",
345                            }
346                            .serialize(),
347                        )
348                        .await?;
349                        continue;
350                    }
351                };
352
353                let mut lobby_lock = lobby.lock().await;
354
355                if !lobby_lock.game_started() {
356                    drop(lobby_lock);
357                    con.send(
358                        &ErrorMessage {
359                            code: error_code::GAME_NOT_STARTED,
360                            reason: "the game has not started yet",
361                        }
362                        .serialize(),
363                    )
364                    .await?;
365                    continue;
366                }
367
368                if let Some(player) = lobby_lock.clients.players.get_mut(&player_id) {
369                    if player.reconnect_token() == &reconnect_token {
370                        let player_name = player.name().to_string();
371                        log::info!("player '{}' reconnected", player_name; &log::pfx().lobby(lobby_id).player(player_id));
372                        // We're just gonna replace the connection, it's gonna be fineā„¢
373                        if let Some(old_conn) = player.con.replace(con.clone()) {
374                            old_conn.terminate();
375                        }
376
377                        lobby_lock.broadcast_lobby_info().await;
378                        lobby_lock.broadcast_gamestate().await;
379                        drop(lobby_lock);
380                        break (lobby, player_name, Some(player_id));
381                    } else {
382                        drop(lobby_lock);
383                        con.send(
384                            &ErrorMessage {
385                                code: error_code::WRONG_RECONNECT_TOKEN,
386                                reason: "this reconnect token is not valid",
387                            }
388                            .serialize(),
389                        )
390                        .await?;
391                    }
392                } else {
393                    drop(lobby_lock);
394                    con.send(
395                        &ErrorMessage {
396                            code: error_code::NO_PLAYER_WITH_ID,
397                            reason: "there is no player with this id",
398                        }
399                        .serialize(),
400                    )
401                    .await?;
402                }
403            }
404            (Err(RxError::ConnectGame(ConnectGameError::InvalidNameLength)), _) => {
405                con.send(
406                    &ErrorMessage {
407                        reason: &(ConnectGameError::InvalidNameLength).to_string(),
408                        code: error_code::NAME_TOO_LONG,
409                    }
410                    .serialize(),
411                )
412                .await?;
413            }
414            (poss_e @ (Ok(_) | Err(RxError::InvalidMessage(_))), original_message) => {
415                if let Err(e) = poss_e {
416                    log::debug!("Message validation error: {}", e; &log::pfx());
417                }
418                con.send(
419                    &ErrorInvalid {
420                        original_message: &original_message,
421                    }
422                    .serialize(),
423                )
424                .await?;
425            }
426        }
427    })
428}
429
430async fn recv_msg(
431    ws_recv: &mut SplitStream<WebSocketStream<TcpStream>>,
432    ws_send: &Mutex<SplitSink<WebSocketStream<TcpStream>, Message>>,
433) -> tokio_tungstenite::tungstenite::Result<(Result<RxMessage, RxError>, String)> {
434    let msg = recv_text(ws_recv, ws_send).await?;
435    Ok((RxMessage::from_str(&msg), msg))
436}
437
438async fn recv_text(
439    ws_recv: &mut SplitStream<WebSocketStream<TcpStream>>,
440    ws_send: &Mutex<SplitSink<WebSocketStream<TcpStream>, Message>>,
441) -> tokio_tungstenite::tungstenite::Result<String> {
442    while let Some(Ok(msg)) = ws_recv.next().await {
443        match msg {
444            Message::Text(msg) => return Ok(msg.to_string()),
445            Message::Ping(payload) => ws_send.lock().await.send(Message::Pong(payload)).await?,
446            Message::Binary(_) | Message::Frame(_) | Message::Pong(_) => {}
447            Message::Close(_) => {
448                let _ = ws_send.lock().await.close().await;
449                break;
450            }
451        }
452    }
453    Err(tokio_tungstenite::tungstenite::Error::ConnectionClosed)
454}
455
456#[derive(Debug)]
457pub enum ConnectionError {
458    ProtocolError(ProtocolError),
459    MalformedJson(serde_json::Error),
460    InternalStateError,
461    WsError(tokio_tungstenite::tungstenite::Error),
462}
463#[derive(Debug)]
464pub enum ProtocolError {
465    OnlyHelloServerOrConnectGameAllowedAtThisTime { incorrect_message_type: String },
466}
467impl From<ProtocolError> for ConnectionError {
468    fn from(value: ProtocolError) -> Self {
469        Self::ProtocolError(value)
470    }
471}
472impl From<serde_json::Error> for ConnectionError {
473    fn from(value: serde_json::Error) -> Self {
474        Self::MalformedJson(value)
475    }
476}
477impl From<tokio_tungstenite::tungstenite::Error> for ConnectionError {
478    fn from(value: tokio_tungstenite::tungstenite::Error) -> Self {
479        Self::WsError(value)
480    }
481}
482
483#[derive(Clone, Debug)]
484pub enum Connection {
485    Real {
486        send: Arc<Mutex<SplitSink<WebSocketStream<TcpStream>, Message>>>,
487        active: Arc<AtomicBool>,
488        abort_handle: AbortHandle,
489    },
490    Fake {
491        send: tokio::sync::mpsc::UnboundedSender<String>,
492    },
493}
494impl Connection {
495    #[cfg(test)]
496    pub(crate) fn fake() -> (Self, tokio::sync::mpsc::UnboundedReceiver<String>) {
497        let (send, recv) = tokio::sync::mpsc::unbounded_channel();
498        (Self::Fake { send }, recv)
499    }
500    pub fn is_active(&self) -> bool {
501        match self {
502            Self::Real {
503                send: _,
504                active,
505                abort_handle: _,
506            } => active.load(std::sync::atomic::Ordering::Relaxed),
507            Self::Fake { send } => !send.is_closed(),
508        }
509    }
510    pub async fn send(&self, message: &str) -> Result<(), tokio_tungstenite::tungstenite::Error> {
511        match self {
512            Self::Real {
513                send,
514                active: _,
515                abort_handle: _,
516            } => send.lock().await.send(Message::text(message)).await,
517            Self::Fake { send } => send
518                .send(message.to_owned())
519                .map_err(|_| tokio_tungstenite::tungstenite::Error::ConnectionClosed),
520        }
521    }
522    /// Signals to the task serving this connection that the connection should not
523    /// be handled further, for example, on invalid messages, or when a reconnect
524    /// replaces a connection.
525    pub fn terminate(&self) {
526        if let Self::Real {
527            abort_handle,
528            send: _,
529            active: _,
530        } = self
531        {
532            abort_handle.abort();
533        }
534    }
535}