From bc144785ea958cabaa895ee3873fa60b29baa2f3 Mon Sep 17 00:00:00 2001 From: Thilo Behnke Date: Thu, 23 Jun 2022 21:49:41 +0200 Subject: [PATCH] use env_logger --- server/Cargo.toml | 2 ++ server/src/http.rs | 31 +++++++++++++++++-------------- server/src/main.rs | 6 ++++++ server/src/request_handler.rs | 22 ++++++++++++++++------ 4 files changed, 41 insertions(+), 20 deletions(-) diff --git a/server/Cargo.toml b/server/Cargo.toml index ed097a4..505173c 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -18,6 +18,8 @@ hyper-tungstenite = "0.8.0" futures = { version = "0.3.12" } async-trait = "0.1.56" uuid = { version = "1.1.2", features = ["v4"] } +log = "0.4" +env_logger = "0.9.0" [dev-dependencies] rstest = "0.12.0" diff --git a/server/src/http.rs b/server/src/http.rs index 31a2b91..c80bdfc 100644 --- a/server/src/http.rs +++ b/server/src/http.rs @@ -10,6 +10,7 @@ use hyper::server::conn::AddrStream; use hyper::service::{make_service_fn, service_fn}; use hyper_tungstenite::HyperWebsocket; use hyper_tungstenite::tungstenite::{Error}; +use log::{debug, error, info}; use tokio::sync::Mutex; use crate::request_handler::{DefaultRequestHandler, RequestHandler}; @@ -61,7 +62,7 @@ impl HttpServer { let host = (self.addr, self.port).into(); let server = Server::bind(&host).serve(make_svc); - println!("Listening on http://{}", host); + info!("running server on http://{}", host); let graceful = server.with_graceful_shutdown(shutdown_signal()); graceful.await?; Ok(()) @@ -70,28 +71,28 @@ impl HttpServer { } async fn handle_potential_ws_upgrade(session_manager: Arc>, req: Request, addr: SocketAddr) -> Result, Infallible> { - println!( - "Received request from {:?} to upgrade to websocket connection: {:?}", + debug!( + "received request from {:?} to upgrade to websocket connection: {:?}", addr, req ); let params = get_query_params(&req); - println!("Ws request params: {:?}", params); + debug!("ws request params: {:?}", params); if !params.contains_key("session_id") { - eprintln!("Missing session id request param for websocket connection, don't upgrade connection to ws."); + error!("Missing session id request param for websocket connection, don't upgrade connection to ws."); return build_error_res( "Missing request param: session_id", StatusCode::BAD_REQUEST, ); } if !params.contains_key("player_id") { - eprintln!("Missing player id request param for websocket connection, don't upgrade connection to ws."); + error!("Missing player id request param for websocket connection, don't upgrade connection to ws."); return build_error_res( "Missing request param: player_id", StatusCode::BAD_REQUEST, ); } if !params.contains_key("connection_type") { - eprintln!("Missing connection type request param for websocket connection, don't upgrade connection to ws."); + error!("Missing connection type request param for websocket connection, don't upgrade connection to ws."); let res = build_error_res( "Missing request param: connection_type", StatusCode::BAD_REQUEST, @@ -104,20 +105,20 @@ async fn handle_potential_ws_upgrade(session_manager: Arc> let session = session_manager.lock().await.get_session(request_session_id); if let None = session { let error = format!("Session does not exist: {}", request_session_id); - eprintln!("{}", error); + error!("{}", error); return build_error_res(error.as_str(), StatusCode::NOT_FOUND); } let session = session.unwrap(); let matching_player = session.players.iter().find(|p| p.id == request_player_id); if let None = matching_player { let error = format!("Player is not registered in session: {}", request_player_id); - eprintln!("{}", error); + error!("{}", error); return build_error_res(error.as_str(), StatusCode::FORBIDDEN); } let matching_player = matching_player.unwrap(); if matching_player.ip != request_player_ip { let error = format!("Player with wrong ip tried to join session: {} (expected) vs {} (actual)", matching_player.ip, request_player_ip); - eprintln!("{}", error); + error!("{}", error); return build_error_res(error.as_str(), StatusCode::FORBIDDEN); } let connection_type_raw = params.get("connection_type").unwrap(); @@ -126,7 +127,7 @@ async fn handle_potential_ws_upgrade(session_manager: Arc> if let Err(_) = connection_type { let error = format!("Invalid connection type: {}", connection_type_raw); - eprintln!("{}", error); + error!("{}", error); return build_error_res(error.as_str(), StatusCode::BAD_REQUEST); } let websocket_session = WebSocketSession { @@ -134,7 +135,7 @@ async fn handle_potential_ws_upgrade(session_manager: Arc> connection_type: connection_type.unwrap(), player: matching_player.clone() }; - println!("Websocket upgrade request is valid, will now upgrade to websocket: {:?}", req); + debug!("websocket upgrade request is valid, will now upgrade to websocket: {:?}", req); let (response, websocket) = hyper_tungstenite::upgrade(req, None).unwrap(); @@ -145,10 +146,11 @@ async fn handle_potential_ws_upgrade(session_manager: Arc> serve_websocket(websocket_session, websocket, session_manager) .await { - eprintln!("Error in websocket connection: {:?}", e); + error!("Error in websocket connection: {:?}", e); } }); + debug!("websocket upgrade done."); // Return the response so the spawned future can continue. return Ok(response); } @@ -178,7 +180,8 @@ async fn handle_http_request( async fn shutdown_signal() { // Wait for the CTRL+C signal - tokio::signal::ctrl_c() + let shutdown_received = tokio::signal::ctrl_c() .await .expect("failed to install CTRL+C signal handler"); + info!("received shutdown signal, shutting down now..."); } diff --git a/server/src/main.rs b/server/src/main.rs index bf8f647..72603eb 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,5 +1,6 @@ extern crate core; +use log::{debug, error, info, Level}; use crate::http::HttpServer; mod hash; @@ -15,17 +16,22 @@ mod session; #[tokio::main] pub async fn main() { + env_logger::init(); + info!("preparing environment"); let kafka_host_env = std::env::var("KAFKA_HOST"); let kafka_host = match kafka_host_env { Ok(val) => val, Err(_) => "localhost:9093".to_owned(), }; + info!("KAFKA_HOST={}", kafka_host); let kafka_partition_manager_host_env = std::env::var("KAFKA_TOPIC_MANAGER_HOST"); let kafka_topic_manager_host = match kafka_partition_manager_host_env { Ok(val) => val, Err(_) => "localhost:7243".to_owned(), }; + info!("KAFKA_TOPIC_MANAGER_HOST={}", kafka_topic_manager_host); + info!("booting up server"); HttpServer::new([0, 0, 0, 0], 4000, &kafka_host, &kafka_topic_manager_host) .run() .await diff --git a/server/src/request_handler.rs b/server/src/request_handler.rs index 9cebd9e..4c1f394 100644 --- a/server/src/request_handler.rs +++ b/server/src/request_handler.rs @@ -3,6 +3,7 @@ use std::net::SocketAddr; use std::sync::Arc; use hyper::{Body, Method, Request, Response, StatusCode}; use async_trait::async_trait; +use log::{debug, error, info}; use serde_json::json; use tokio::sync::Mutex; use serde::{Deserialize}; @@ -33,7 +34,7 @@ impl DefaultRequestHandler { #[async_trait] impl RequestHandler for DefaultRequestHandler { async fn handle(&self, req: Request, addr: SocketAddr) -> Result, Infallible> { - println!("req to {} with method {}", req.uri().path(), req.method()); + info!("called route {} {}", req.method(), req.uri()); match (req.method(), req.uri().path()) { (&Method::GET, "/session") => handle_get_session(&self.session_manager, req).await, (&Method::POST, "/create_session") => { @@ -50,17 +51,21 @@ async fn handle_get_session( session_manager: &Arc>, req: Request, ) -> Result, Infallible> { + info!("called get_session"); let locked = session_manager.lock().await; let query_params = get_query_params(&req); let session_id = query_params.get("session_id"); if let None = session_id { + error!("session id was not provided"); return build_error_res("Please provide a valid session id", StatusCode::BAD_REQUEST); } let session_id = session_id.unwrap(); let session = locked.get_session(session_id); if let None = session { + error!("session for session id {} does not exist", session_id); return build_error_res("Unable to find session for given id", StatusCode::NOT_FOUND); } + info!("successfully retrieved session for session id {}", session_id); return build_success_res(&serde_json::to_string(&session.unwrap()).unwrap()); } @@ -69,19 +74,23 @@ async fn handle_session_create( req: Request, addr: SocketAddr, ) -> Result, Infallible> { - println!("Called to create new session: {:?}", req); + info!("called create_session"); + debug!("req: {:?}", req); let mut locked = session_manager.lock().await; let player = Player::new(1, addr.ip().to_string()); let session_create_res = locked.create_session(player.clone()).await; if let Err(e) = session_create_res { + error!("failed to create session: {:?}", e); return Ok(Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body(Body::from(e)) .unwrap()); } + let session = session_create_res.unwrap(); + error!("session created by player {:?}: {:?}", player, session); let reason = format!("player {:?} created session", player); let session_created = SessionEvent::Created(SessionEventPayload { - session: session_create_res.unwrap(), + session, actor: player, reason }); @@ -94,20 +103,21 @@ async fn handle_session_join( mut req: Request, addr: SocketAddr, ) -> Result, Infallible> { - println!("Received request to join session: {:?}", req); + info!("called join_session"); + debug!("req: {:?}", req); let mut locked = session_manager.lock().await; let body = read_json_body::(&mut req).await; let player = Player::new(2, addr.ip().to_string()); let session_join_res = locked.join_session(body.session_id, player.clone()).await; if let Err(e) = session_join_res { - eprintln!("Failed to join session: {:?}", e); + error!("Failed to join session: {:?}", e); return Ok(Response::builder() .status(StatusCode::CONFLICT) .body(Body::from(e)) .unwrap()); } let session = session_join_res.unwrap(); - println!("Successfully joined session: {:?}", session); + info!("player {:?} successfully joined session: {:?}", player, session); let reason = format!("player {:?} joined session", player); let session_joined = SessionEvent::Joined(SessionEventPayload { actor: player,