use env_logger

This commit is contained in:
Thilo Behnke
2022-06-23 21:49:41 +02:00
parent 7d065530b8
commit bc144785ea
4 changed files with 41 additions and 20 deletions

View File

@@ -10,6 +10,7 @@ use hyper::server::conn::AddrStream;
use hyper::service::{make_service_fn, service_fn};
use hyper_tungstenite::HyperWebsocket;
use hyper_tungstenite::tungstenite::{Error};
use log::{debug, error, info};
use tokio::sync::Mutex;
use crate::request_handler::{DefaultRequestHandler, RequestHandler};
@@ -61,7 +62,7 @@ impl HttpServer {
let host = (self.addr, self.port).into();
let server = Server::bind(&host).serve(make_svc);
println!("Listening on http://{}", host);
info!("running server on http://{}", host);
let graceful = server.with_graceful_shutdown(shutdown_signal());
graceful.await?;
Ok(())
@@ -70,28 +71,28 @@ impl HttpServer {
}
async fn handle_potential_ws_upgrade(session_manager: Arc<Mutex<SessionManager>>, req: Request<Body>, addr: SocketAddr) -> Result<Response<Body>, Infallible> {
println!(
"Received request from {:?} to upgrade to websocket connection: {:?}",
debug!(
"received request from {:?} to upgrade to websocket connection: {:?}",
addr, req
);
let params = get_query_params(&req);
println!("Ws request params: {:?}", params);
debug!("ws request params: {:?}", params);
if !params.contains_key("session_id") {
eprintln!("Missing session id request param for websocket connection, don't upgrade connection to ws.");
error!("Missing session id request param for websocket connection, don't upgrade connection to ws.");
return build_error_res(
"Missing request param: session_id",
StatusCode::BAD_REQUEST,
);
}
if !params.contains_key("player_id") {
eprintln!("Missing player id request param for websocket connection, don't upgrade connection to ws.");
error!("Missing player id request param for websocket connection, don't upgrade connection to ws.");
return build_error_res(
"Missing request param: player_id",
StatusCode::BAD_REQUEST,
);
}
if !params.contains_key("connection_type") {
eprintln!("Missing connection type request param for websocket connection, don't upgrade connection to ws.");
error!("Missing connection type request param for websocket connection, don't upgrade connection to ws.");
let res = build_error_res(
"Missing request param: connection_type",
StatusCode::BAD_REQUEST,
@@ -104,20 +105,20 @@ async fn handle_potential_ws_upgrade(session_manager: Arc<Mutex<SessionManager>>
let session = session_manager.lock().await.get_session(request_session_id);
if let None = session {
let error = format!("Session does not exist: {}", request_session_id);
eprintln!("{}", error);
error!("{}", error);
return build_error_res(error.as_str(), StatusCode::NOT_FOUND);
}
let session = session.unwrap();
let matching_player = session.players.iter().find(|p| p.id == request_player_id);
if let None = matching_player {
let error = format!("Player is not registered in session: {}", request_player_id);
eprintln!("{}", error);
error!("{}", error);
return build_error_res(error.as_str(), StatusCode::FORBIDDEN);
}
let matching_player = matching_player.unwrap();
if matching_player.ip != request_player_ip {
let error = format!("Player with wrong ip tried to join session: {} (expected) vs {} (actual)", matching_player.ip, request_player_ip);
eprintln!("{}", error);
error!("{}", error);
return build_error_res(error.as_str(), StatusCode::FORBIDDEN);
}
let connection_type_raw = params.get("connection_type").unwrap();
@@ -126,7 +127,7 @@ async fn handle_potential_ws_upgrade(session_manager: Arc<Mutex<SessionManager>>
if let Err(_) = connection_type {
let error =
format!("Invalid connection type: {}", connection_type_raw);
eprintln!("{}", error);
error!("{}", error);
return build_error_res(error.as_str(), StatusCode::BAD_REQUEST);
}
let websocket_session = WebSocketSession {
@@ -134,7 +135,7 @@ async fn handle_potential_ws_upgrade(session_manager: Arc<Mutex<SessionManager>>
connection_type: connection_type.unwrap(),
player: matching_player.clone()
};
println!("Websocket upgrade request is valid, will now upgrade to websocket: {:?}", req);
debug!("websocket upgrade request is valid, will now upgrade to websocket: {:?}", req);
let (response, websocket) =
hyper_tungstenite::upgrade(req, None).unwrap();
@@ -145,10 +146,11 @@ async fn handle_potential_ws_upgrade(session_manager: Arc<Mutex<SessionManager>>
serve_websocket(websocket_session, websocket, session_manager)
.await
{
eprintln!("Error in websocket connection: {:?}", e);
error!("Error in websocket connection: {:?}", e);
}
});
debug!("websocket upgrade done.");
// Return the response so the spawned future can continue.
return Ok(response);
}
@@ -178,7 +180,8 @@ async fn handle_http_request(
async fn shutdown_signal() {
// Wait for the CTRL+C signal
tokio::signal::ctrl_c()
let shutdown_received = tokio::signal::ctrl_c()
.await
.expect("failed to install CTRL+C signal handler");
info!("received shutdown signal, shutting down now...");
}

View File

@@ -1,5 +1,6 @@
extern crate core;
use log::{debug, error, info, Level};
use crate::http::HttpServer;
mod hash;
@@ -15,17 +16,22 @@ mod session;
#[tokio::main]
pub async fn main() {
env_logger::init();
info!("preparing environment");
let kafka_host_env = std::env::var("KAFKA_HOST");
let kafka_host = match kafka_host_env {
Ok(val) => val,
Err(_) => "localhost:9093".to_owned(),
};
info!("KAFKA_HOST={}", kafka_host);
let kafka_partition_manager_host_env = std::env::var("KAFKA_TOPIC_MANAGER_HOST");
let kafka_topic_manager_host = match kafka_partition_manager_host_env {
Ok(val) => val,
Err(_) => "localhost:7243".to_owned(),
};
info!("KAFKA_TOPIC_MANAGER_HOST={}", kafka_topic_manager_host);
info!("booting up server");
HttpServer::new([0, 0, 0, 0], 4000, &kafka_host, &kafka_topic_manager_host)
.run()
.await

View File

@@ -3,6 +3,7 @@ use std::net::SocketAddr;
use std::sync::Arc;
use hyper::{Body, Method, Request, Response, StatusCode};
use async_trait::async_trait;
use log::{debug, error, info};
use serde_json::json;
use tokio::sync::Mutex;
use serde::{Deserialize};
@@ -33,7 +34,7 @@ impl DefaultRequestHandler {
#[async_trait]
impl RequestHandler for DefaultRequestHandler {
async fn handle(&self, req: Request<Body>, addr: SocketAddr) -> Result<Response<Body>, Infallible> {
println!("req to {} with method {}", req.uri().path(), req.method());
info!("called route {} {}", req.method(), req.uri());
match (req.method(), req.uri().path()) {
(&Method::GET, "/session") => handle_get_session(&self.session_manager, req).await,
(&Method::POST, "/create_session") => {
@@ -50,17 +51,21 @@ async fn handle_get_session(
session_manager: &Arc<Mutex<SessionManager>>,
req: Request<Body>,
) -> Result<Response<Body>, Infallible> {
info!("called get_session");
let locked = session_manager.lock().await;
let query_params = get_query_params(&req);
let session_id = query_params.get("session_id");
if let None = session_id {
error!("session id was not provided");
return build_error_res("Please provide a valid session id", StatusCode::BAD_REQUEST);
}
let session_id = session_id.unwrap();
let session = locked.get_session(session_id);
if let None = session {
error!("session for session id {} does not exist", session_id);
return build_error_res("Unable to find session for given id", StatusCode::NOT_FOUND);
}
info!("successfully retrieved session for session id {}", session_id);
return build_success_res(&serde_json::to_string(&session.unwrap()).unwrap());
}
@@ -69,19 +74,23 @@ async fn handle_session_create(
req: Request<Body>,
addr: SocketAddr,
) -> Result<Response<Body>, Infallible> {
println!("Called to create new session: {:?}", req);
info!("called create_session");
debug!("req: {:?}", req);
let mut locked = session_manager.lock().await;
let player = Player::new(1, addr.ip().to_string());
let session_create_res = locked.create_session(player.clone()).await;
if let Err(e) = session_create_res {
error!("failed to create session: {:?}", e);
return Ok(Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(e))
.unwrap());
}
let session = session_create_res.unwrap();
error!("session created by player {:?}: {:?}", player, session);
let reason = format!("player {:?} created session", player);
let session_created = SessionEvent::Created(SessionEventPayload {
session: session_create_res.unwrap(),
session,
actor: player,
reason
});
@@ -94,20 +103,21 @@ async fn handle_session_join(
mut req: Request<Body>,
addr: SocketAddr,
) -> Result<Response<Body>, Infallible> {
println!("Received request to join session: {:?}", req);
info!("called join_session");
debug!("req: {:?}", req);
let mut locked = session_manager.lock().await;
let body = read_json_body::<SessionJoinDto>(&mut req).await;
let player = Player::new(2, addr.ip().to_string());
let session_join_res = locked.join_session(body.session_id, player.clone()).await;
if let Err(e) = session_join_res {
eprintln!("Failed to join session: {:?}", e);
error!("Failed to join session: {:?}", e);
return Ok(Response::builder()
.status(StatusCode::CONFLICT)
.body(Body::from(e))
.unwrap());
}
let session = session_join_res.unwrap();
println!("Successfully joined session: {:?}", session);
info!("player {:?} successfully joined session: {:?}", player, session);
let reason = format!("player {:?} joined session", player);
let session_joined = SessionEvent::Joined(SessionEventPayload {
actor: player,