mirror of
https://github.com/thilo-behnke/wasm-pong.git
synced 2026-02-14 14:39:51 +00:00
refactoring/server
This commit is contained in:
@@ -3,7 +3,7 @@ mod utils;
|
|||||||
use pong::game_field::{Field, Input, InputType};
|
use pong::game_field::{Field, Input, InputType};
|
||||||
use pong::game_object::game_object::{GameObject};
|
use pong::game_object::game_object::{GameObject};
|
||||||
use pong::geom::shape::ShapeType;
|
use pong::geom::shape::ShapeType;
|
||||||
use pong::pong::pong_events::DefaultPongEventWriter;
|
use pong::pong::pong_events::{NoopPongEventWriter};
|
||||||
use pong::utils::utils::{DefaultLoggerFactory, Logger};
|
use pong::utils::utils::{DefaultLoggerFactory, Logger};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
@@ -113,7 +113,7 @@ impl FieldWrapper {
|
|||||||
pub fn new() -> FieldWrapper {
|
pub fn new() -> FieldWrapper {
|
||||||
let field = Field::new(
|
let field = Field::new(
|
||||||
DefaultLoggerFactory::new(Box::new(WasmLogger::root())),
|
DefaultLoggerFactory::new(Box::new(WasmLogger::root())),
|
||||||
DefaultPongEventWriter::new(),
|
NoopPongEventWriter::new(),
|
||||||
);
|
);
|
||||||
FieldWrapper { field }
|
FieldWrapper { field }
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -170,11 +170,11 @@ impl Field {
|
|||||||
|
|
||||||
let collision_handler = &self.collision_handler;
|
let collision_handler = &self.collision_handler;
|
||||||
let registered_collisions = collisions.get_collisions();
|
let registered_collisions = collisions.get_collisions();
|
||||||
self.logger.log(&*format!(
|
// self.logger.log(&*format!(
|
||||||
"Found {} collisions: {:?}",
|
// "Found {} collisions: {:?}",
|
||||||
registered_collisions.len(),
|
// registered_collisions.len(),
|
||||||
registered_collisions
|
// registered_collisions
|
||||||
));
|
// ));
|
||||||
for collision in registered_collisions.iter() {
|
for collision in registered_collisions.iter() {
|
||||||
let objs = &self.objs;
|
let objs = &self.objs;
|
||||||
let obj_a = objs
|
let obj_a = objs
|
||||||
|
|||||||
@@ -12,5 +12,9 @@ echo "Start docker containers."
|
|||||||
docker-compose down
|
docker-compose down
|
||||||
docker-compose up -d --build --force-recreate kafka zookeeper nginx
|
docker-compose up -d --build --force-recreate kafka zookeeper nginx
|
||||||
|
|
||||||
|
echo "Remove temporary local dependencies from components."
|
||||||
|
rm -rf ./client/wasm/pong
|
||||||
|
rm -rf ./server/pong
|
||||||
|
|
||||||
echo "Initialize kafka."
|
echo "Initialize kafka."
|
||||||
./init-kafka.sh
|
./init-kafka.sh
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ md5 = { version = "0.7.0" }
|
|||||||
pong = { path = "../pong", version = "0.1.0" }
|
pong = { path = "../pong", version = "0.1.0" }
|
||||||
hyper-tungstenite = "0.8.0"
|
hyper-tungstenite = "0.8.0"
|
||||||
futures = { version = "0.3.12" }
|
futures = { version = "0.3.12" }
|
||||||
|
async-trait = "0.1.56"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
rstest = "0.12.0"
|
rstest = "0.12.0"
|
||||||
|
|||||||
45
server/src/event.rs
Normal file
45
server/src/event.rs
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use crate::player::Player;
|
||||||
|
use crate::session::Session;
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize, Serialize)]
|
||||||
|
pub struct SessionEventListDTO {
|
||||||
|
pub session_id: String,
|
||||||
|
pub events: Vec<SessionEventWriteDTO>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize, Serialize)]
|
||||||
|
pub struct SessionEventWriteDTO {
|
||||||
|
pub session_id: String,
|
||||||
|
pub topic: String,
|
||||||
|
pub msg: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize)]
|
||||||
|
pub struct SessionClosedDto {
|
||||||
|
pub session: Session,
|
||||||
|
pub reason: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
pub struct SessionReadDTO {
|
||||||
|
pub session_id: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
pub struct SessionJoinDto {
|
||||||
|
pub session_id: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize)]
|
||||||
|
pub struct SessionJoinedDto {
|
||||||
|
pub session: Session,
|
||||||
|
pub player: Player,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize)]
|
||||||
|
pub struct SessionCreatedDto {
|
||||||
|
pub session: Session,
|
||||||
|
pub player: Player,
|
||||||
|
}
|
||||||
|
|
||||||
@@ -2,28 +2,26 @@ use std::convert::Infallible;
|
|||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
use futures::{sink::SinkExt, stream::StreamExt};
|
use futures::{stream::StreamExt};
|
||||||
|
use hyper::{Body, Request, Response, Server, StatusCode};
|
||||||
use hyper::server::conn::AddrStream;
|
use hyper::server::conn::AddrStream;
|
||||||
use hyper::service::{make_service_fn, service_fn};
|
use hyper::service::{make_service_fn, service_fn};
|
||||||
use hyper::{Body, Method, Request, Response, Server, StatusCode};
|
|
||||||
use hyper_tungstenite::tungstenite::{Error, Message};
|
|
||||||
use hyper_tungstenite::HyperWebsocket;
|
use hyper_tungstenite::HyperWebsocket;
|
||||||
use serde::{Deserialize, Serialize};
|
use hyper_tungstenite::tungstenite::{Error};
|
||||||
use serde_json::json;
|
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
use tokio::time::sleep;
|
|
||||||
|
|
||||||
use crate::player::Player;
|
use crate::request_handler::{DefaultRequestHandler, RequestHandler};
|
||||||
use crate::session::{Session, SessionManager};
|
use crate::session_manager::{SessionManager};
|
||||||
use crate::utils::http_utils::{get_query_params, read_json_body};
|
use crate::utils::http_utils::{build_error_res, get_query_params, read_json_body};
|
||||||
|
use crate::websocket::{DefaultWebsocketHandler, WebSocketConnectionType, WebsocketHandler, WebSocketSession};
|
||||||
|
|
||||||
pub struct HttpServer {
|
pub struct HttpServer {
|
||||||
addr: [u8; 4],
|
addr: [u8; 4],
|
||||||
port: u16,
|
port: u16,
|
||||||
session_manager: Arc<Mutex<SessionManager>>,
|
session_manager: Arc<Mutex<SessionManager>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl HttpServer {
|
impl HttpServer {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
addr: [u8; 4],
|
addr: [u8; 4],
|
||||||
@@ -51,68 +49,10 @@ impl HttpServer {
|
|||||||
let session_manager = Arc::clone(&session_manager);
|
let session_manager = Arc::clone(&session_manager);
|
||||||
async move {
|
async move {
|
||||||
if hyper_tungstenite::is_upgrade_request(&req) {
|
if hyper_tungstenite::is_upgrade_request(&req) {
|
||||||
println!(
|
return handle_potential_ws_upgrade(session_manager, req).await;
|
||||||
"Received request to upgrade to websocket connection: {:?}",
|
|
||||||
req
|
|
||||||
);
|
|
||||||
let params = get_query_params(&req);
|
|
||||||
println!("Ws request params: {:?}", params);
|
|
||||||
if !params.contains_key("session_id") {
|
|
||||||
eprintln!("Missing session id request param for websocket connection, don't upgrade connection to ws.");
|
|
||||||
return build_error_res(
|
|
||||||
"Missing request param: session_id",
|
|
||||||
StatusCode::BAD_REQUEST,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
if !params.contains_key("connection_type") {
|
|
||||||
eprintln!("Missing connection type request param for websocket connection, don't upgrade connection to ws.");
|
|
||||||
let res = build_error_res(
|
|
||||||
"Missing request param: connection_type",
|
|
||||||
StatusCode::BAD_REQUEST,
|
|
||||||
);
|
|
||||||
return res;
|
|
||||||
}
|
|
||||||
let session_id = params.get("session_id").unwrap();
|
|
||||||
let connection_type_raw = params.get("connection_type").unwrap();
|
|
||||||
let connection_type =
|
|
||||||
WebSocketConnectionType::from_str(connection_type_raw);
|
|
||||||
if let Err(_) = connection_type {
|
|
||||||
let error =
|
|
||||||
format!("Invalid connection type: {}", connection_type_raw);
|
|
||||||
eprintln!("{}", error);
|
|
||||||
return build_error_res(error.as_str(), StatusCode::BAD_REQUEST);
|
|
||||||
}
|
|
||||||
let session = session_manager.lock().await.get_session(session_id);
|
|
||||||
if let None = session {
|
|
||||||
let error = format!("Session does not exist: {}", session_id);
|
|
||||||
eprintln!("{}", error);
|
|
||||||
return build_error_res(error.as_str(), StatusCode::NOT_FOUND);
|
|
||||||
}
|
|
||||||
let session = session.unwrap();
|
|
||||||
let websocket_session = WebSocketSession {
|
|
||||||
session: session.clone(),
|
|
||||||
connection_type: connection_type.unwrap(),
|
|
||||||
};
|
|
||||||
println!("Websocket upgrade request is valid, will now upgrade to websocket: {:?}", req);
|
|
||||||
|
|
||||||
let (response, websocket) =
|
|
||||||
hyper_tungstenite::upgrade(req, None).unwrap();
|
|
||||||
|
|
||||||
// Spawn a task to handle the websocket connection.
|
|
||||||
tokio::spawn(async move {
|
|
||||||
if let Err(e) =
|
|
||||||
serve_websocket(websocket_session, websocket, session_manager)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
eprintln!("Error in websocket connection: {:?}", e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// Return the response so the spawned future can continue.
|
|
||||||
return Ok(response);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return handle_request(&session_manager, req, addr).await;
|
return handle_http_request(session_manager, req, addr).await;
|
||||||
}
|
}
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
@@ -125,6 +65,69 @@ impl HttpServer {
|
|||||||
graceful.await?;
|
graceful.await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_potential_ws_upgrade(session_manager: Arc<Mutex<SessionManager>>, req: Request<Body>) -> Result<Response<Body>, Infallible> {
|
||||||
|
println!(
|
||||||
|
"Received request to upgrade to websocket connection: {:?}",
|
||||||
|
req
|
||||||
|
);
|
||||||
|
let params = get_query_params(&req);
|
||||||
|
println!("Ws request params: {:?}", params);
|
||||||
|
if !params.contains_key("session_id") {
|
||||||
|
eprintln!("Missing session id request param for websocket connection, don't upgrade connection to ws.");
|
||||||
|
return build_error_res(
|
||||||
|
"Missing request param: session_id",
|
||||||
|
StatusCode::BAD_REQUEST,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if !params.contains_key("connection_type") {
|
||||||
|
eprintln!("Missing connection type request param for websocket connection, don't upgrade connection to ws.");
|
||||||
|
let res = build_error_res(
|
||||||
|
"Missing request param: connection_type",
|
||||||
|
StatusCode::BAD_REQUEST,
|
||||||
|
);
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
let session_id = params.get("session_id").unwrap();
|
||||||
|
let connection_type_raw = params.get("connection_type").unwrap();
|
||||||
|
let connection_type =
|
||||||
|
WebSocketConnectionType::from_str(connection_type_raw);
|
||||||
|
if let Err(_) = connection_type {
|
||||||
|
let error =
|
||||||
|
format!("Invalid connection type: {}", connection_type_raw);
|
||||||
|
eprintln!("{}", error);
|
||||||
|
return build_error_res(error.as_str(), StatusCode::BAD_REQUEST);
|
||||||
|
}
|
||||||
|
let session = session_manager.lock().await.get_session(session_id);
|
||||||
|
if let None = session {
|
||||||
|
let error = format!("Session does not exist: {}", session_id);
|
||||||
|
eprintln!("{}", error);
|
||||||
|
return build_error_res(error.as_str(), StatusCode::NOT_FOUND);
|
||||||
|
}
|
||||||
|
let session = session.unwrap();
|
||||||
|
let websocket_session = WebSocketSession {
|
||||||
|
session: session.clone(),
|
||||||
|
connection_type: connection_type.unwrap(),
|
||||||
|
};
|
||||||
|
println!("Websocket upgrade request is valid, will now upgrade to websocket: {:?}", req);
|
||||||
|
|
||||||
|
let (response, websocket) =
|
||||||
|
hyper_tungstenite::upgrade(req, None).unwrap();
|
||||||
|
|
||||||
|
// Spawn a task to handle the websocket connection.
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) =
|
||||||
|
serve_websocket(websocket_session, websocket, session_manager)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
eprintln!("Error in websocket connection: {:?}", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Return the response so the spawned future can continue.
|
||||||
|
return Ok(response);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handle a websocket connection.
|
/// Handle a websocket connection.
|
||||||
@@ -133,299 +136,21 @@ async fn serve_websocket(
|
|||||||
websocket: HyperWebsocket,
|
websocket: HyperWebsocket,
|
||||||
session_manager: Arc<Mutex<SessionManager>>,
|
session_manager: Arc<Mutex<SessionManager>>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let websocket = websocket.await?;
|
let handler = DefaultWebsocketHandler::new(
|
||||||
let (mut websocket_writer, mut websocket_reader) = websocket.split();
|
websocket_session, websocket, session_manager,
|
||||||
|
|
||||||
let session_manager = session_manager.lock().await;
|
|
||||||
let event_handler_pair = session_manager.split(
|
|
||||||
&websocket_session.session.hash,
|
|
||||||
websocket_session.connection_type.get_topics(),
|
|
||||||
);
|
);
|
||||||
if let Err(_) = event_handler_pair {
|
handler.serve().await
|
||||||
eprintln!(
|
|
||||||
"Failed to create event reader/writer pair session: {:?}",
|
|
||||||
websocket_session
|
|
||||||
);
|
|
||||||
return Err(Error::ConnectionClosed); // TODO: Use proper error for this case to close the connection
|
|
||||||
}
|
|
||||||
|
|
||||||
let (mut event_reader, mut event_writer) = event_handler_pair.unwrap();
|
|
||||||
let websocket_session_read_copy = websocket_session.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
println!(
|
|
||||||
"Ready to read messages from ws connection: {:?}",
|
|
||||||
websocket_session_read_copy
|
|
||||||
);
|
|
||||||
while let Some(message) = websocket_reader.next().await {
|
|
||||||
match message.unwrap() {
|
|
||||||
Message::Text(msg) => {
|
|
||||||
let events = serde_json::from_str::<SessionEventListDTO>(&msg);
|
|
||||||
println!("Received ws message to persist events to kafka");
|
|
||||||
if let Err(e) = events {
|
|
||||||
eprintln!("Failed to deserialize ws message to event {}: {}", msg, e);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
let event_wrapper = events.unwrap();
|
|
||||||
if event_wrapper.session_id != websocket_session_read_copy.session.hash {
|
|
||||||
eprintln!("Websocket has session {:?} but was asked to write to session {} - skip.", websocket_session_read_copy, event_wrapper.session_id);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
let mut any_error = false;
|
|
||||||
let event_count = event_wrapper.events.len();
|
|
||||||
for event in event_wrapper.events {
|
|
||||||
let write_res = event_writer.write_to_session(&event.topic, &event.msg);
|
|
||||||
if let Err(e) = write_res {
|
|
||||||
any_error = true;
|
|
||||||
eprintln!("Failed to write event {:?}: {}", event, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if any_error {
|
|
||||||
eprintln!(
|
|
||||||
"Failed to write at least one message for session {}",
|
|
||||||
event_wrapper.session_id
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
println!(
|
|
||||||
"Successfully wrote {} messages to kafka for session {:?}",
|
|
||||||
event_count, websocket_session_read_copy
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Message::Close(msg) => {
|
|
||||||
// No need to send a reply: tungstenite takes care of this for you.
|
|
||||||
if let Some(msg) = &msg {
|
|
||||||
println!(
|
|
||||||
"Received close message with code {} and message: {}",
|
|
||||||
msg.code, msg.reason
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
println!("Received close message");
|
|
||||||
}
|
|
||||||
|
|
||||||
let session_closed_event = SessionClosedDto {
|
|
||||||
session: websocket_session_read_copy.session.clone(),
|
|
||||||
reason: "ws closed".to_owned(),
|
|
||||||
};
|
|
||||||
let msg = json!(session_closed_event).to_string();
|
|
||||||
let session_event_write_res = event_writer.write_to_session("session", &msg);
|
|
||||||
if let Err(e) = session_event_write_res {
|
|
||||||
eprintln!("Failed to write session closed event: {0}", e)
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
_ => {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
println!("!!!! Exit websocket receiver !!!!")
|
|
||||||
});
|
|
||||||
let websocket_session_write_copy = websocket_session.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
println!(
|
|
||||||
"Ready to read messages from kafka: {:?}",
|
|
||||||
websocket_session_write_copy
|
|
||||||
);
|
|
||||||
loop {
|
|
||||||
println!("Reading messages from kafka.");
|
|
||||||
let messages = event_reader.read_from_session();
|
|
||||||
if let Err(_) = messages {
|
|
||||||
eprintln!(
|
|
||||||
"Failed to read messages from kafka for session: {:?}",
|
|
||||||
websocket_session_write_copy
|
|
||||||
);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
// println!("Read messages for websocket_session {:?} from consumer: {:?}", websocket_session_write_copy, messages);
|
|
||||||
let messages = messages.unwrap();
|
|
||||||
if messages.len() == 0 {
|
|
||||||
println!("No new messages from kafka.");
|
|
||||||
} else {
|
|
||||||
println!("{} new messages from kafka.", messages.len());
|
|
||||||
let json = serde_json::to_string(&messages).unwrap();
|
|
||||||
let message = Message::from(json);
|
|
||||||
println!("Sending kafka messages through websocket.");
|
|
||||||
let send_res = websocket_writer.send(message).await;
|
|
||||||
if let Err(e) = send_res {
|
|
||||||
eprintln!(
|
|
||||||
"Failed to send message to websocket for session {:?}: {:?}",
|
|
||||||
websocket_session_write_copy, e
|
|
||||||
);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Avoid starvation of read thread (?)
|
|
||||||
// TODO: How to avoid this? This is very bad for performance.
|
|
||||||
sleep(Duration::from_millis(1)).await;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: How to handle event writes/reads? This must be a websocket, but how to implement in hyper (if possible)?
|
async fn handle_http_request(
|
||||||
// https://github.com/de-vri-es/hyper-tungstenite-rs
|
session_manager: Arc<Mutex<SessionManager>>,
|
||||||
async fn handle_request(
|
|
||||||
session_manager: &Arc<Mutex<SessionManager>>,
|
|
||||||
req: Request<Body>,
|
req: Request<Body>,
|
||||||
addr: SocketAddr,
|
addr: SocketAddr,
|
||||||
) -> Result<Response<Body>, Infallible> {
|
) -> Result<Response<Body>, Infallible> {
|
||||||
println!("req to {} with method {}", req.uri().path(), req.method());
|
let handler = DefaultRequestHandler::new(
|
||||||
match (req.method(), req.uri().path()) {
|
session_manager
|
||||||
(&Method::GET, "/session") => handle_get_session(session_manager, req).await,
|
|
||||||
(&Method::POST, "/create_session") => {
|
|
||||||
handle_session_create(session_manager, req, addr).await
|
|
||||||
}
|
|
||||||
(&Method::POST, "/join_session") => handle_session_join(session_manager, req, addr).await,
|
|
||||||
(&Method::POST, "/write") => handle_event_write(session_manager, req).await,
|
|
||||||
(&Method::POST, "/read") => handle_event_read(session_manager, req).await,
|
|
||||||
_ => Ok(Response::new("unknown".into())),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_get_session(
|
|
||||||
session_manager: &Arc<Mutex<SessionManager>>,
|
|
||||||
req: Request<Body>,
|
|
||||||
) -> Result<Response<Body>, Infallible> {
|
|
||||||
let locked = session_manager.lock().await;
|
|
||||||
let query_params = get_query_params(&req);
|
|
||||||
let session_id = query_params.get("session_id");
|
|
||||||
if let None = session_id {
|
|
||||||
return build_error_res("Please provide a valid session id", StatusCode::BAD_REQUEST);
|
|
||||||
}
|
|
||||||
let session_id = session_id.unwrap();
|
|
||||||
let session = locked.get_session(session_id);
|
|
||||||
if let None = session {
|
|
||||||
return build_error_res("Unable to find session for given id", StatusCode::NOT_FOUND);
|
|
||||||
}
|
|
||||||
return build_success_res(&serde_json::to_string(&session.unwrap()).unwrap());
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_session_create(
|
|
||||||
session_manager: &Arc<Mutex<SessionManager>>,
|
|
||||||
req: Request<Body>,
|
|
||||||
addr: SocketAddr,
|
|
||||||
) -> Result<Response<Body>, Infallible> {
|
|
||||||
println!("Called to create new session: {:?}", req);
|
|
||||||
let mut locked = session_manager.lock().await;
|
|
||||||
let player = Player {
|
|
||||||
id: addr.to_string(),
|
|
||||||
};
|
|
||||||
let session_create_res = locked.create_session(player.clone()).await;
|
|
||||||
if let Err(e) = session_create_res {
|
|
||||||
return Ok(Response::builder()
|
|
||||||
.status(StatusCode::INTERNAL_SERVER_ERROR)
|
|
||||||
.body(Body::from(e))
|
|
||||||
.unwrap());
|
|
||||||
}
|
|
||||||
let session_created = SessionCreatedDto {
|
|
||||||
session: session_create_res.unwrap(),
|
|
||||||
player,
|
|
||||||
};
|
|
||||||
let serialized = json!(session_created);
|
|
||||||
return build_success_res(&serialized.to_string());
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_session_join(
|
|
||||||
session_manager: &Arc<Mutex<SessionManager>>,
|
|
||||||
mut req: Request<Body>,
|
|
||||||
addr: SocketAddr,
|
|
||||||
) -> Result<Response<Body>, Infallible> {
|
|
||||||
println!("Received request to join session: {:?}", req);
|
|
||||||
let mut locked = session_manager.lock().await;
|
|
||||||
let body = read_json_body::<SessionJoinDto>(&mut req).await;
|
|
||||||
let player = Player {
|
|
||||||
id: addr.to_string(),
|
|
||||||
};
|
|
||||||
let session_join_res = locked.join_session(body.session_id, player.clone()).await;
|
|
||||||
if let Err(e) = session_join_res {
|
|
||||||
eprintln!("Failed to join session: {:?}", e);
|
|
||||||
return Ok(Response::builder()
|
|
||||||
.status(StatusCode::INTERNAL_SERVER_ERROR)
|
|
||||||
.body(Body::from(e))
|
|
||||||
.unwrap());
|
|
||||||
}
|
|
||||||
let session = session_join_res.unwrap();
|
|
||||||
println!("Successfully joined session: {:?}", session);
|
|
||||||
let session_joined = SessionJoinedDto { session, player };
|
|
||||||
let serialized = json!(session_joined);
|
|
||||||
return build_success_res(&serialized.to_string());
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_event_write(
|
|
||||||
session_manager: &Arc<Mutex<SessionManager>>,
|
|
||||||
mut req: Request<Body>,
|
|
||||||
) -> Result<Response<Body>, Infallible> {
|
|
||||||
let locked = session_manager.lock().await;
|
|
||||||
let event = read_json_body::<SessionEventWriteDTO>(&mut req).await;
|
|
||||||
let writer = locked.get_session_writer(&event.session_id);
|
|
||||||
if let Err(e) = writer {
|
|
||||||
let err = format!("Failed to write event: {}", e);
|
|
||||||
println!("{}", err);
|
|
||||||
let mut res = Response::new(Body::from(err));
|
|
||||||
*res.status_mut() = StatusCode::NOT_FOUND;
|
|
||||||
return Ok(res);
|
|
||||||
}
|
|
||||||
let mut writer = writer.unwrap();
|
|
||||||
println!("Writing session event to kafka: {:?}", event);
|
|
||||||
let write_res = writer.write_to_session(&event.topic, &event.msg);
|
|
||||||
if let Err(e) = write_res {
|
|
||||||
let err = format!("Failed to write event: {}", e);
|
|
||||||
println!("{}", err);
|
|
||||||
let mut res = Response::new(Body::from(err));
|
|
||||||
*res.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
|
|
||||||
return Ok(res);
|
|
||||||
}
|
|
||||||
println!("Successfully wrote event to kafka.");
|
|
||||||
build_success_res(&serde_json::to_string(&event).unwrap())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_event_read(
|
|
||||||
session_manager: &Arc<Mutex<SessionManager>>,
|
|
||||||
mut req: Request<Body>,
|
|
||||||
) -> Result<Response<Body>, Infallible> {
|
|
||||||
let locked = session_manager.lock().await;
|
|
||||||
let read_payload = read_json_body::<SessionReadDTO>(&mut req).await;
|
|
||||||
let reader = locked.get_session_reader(
|
|
||||||
&read_payload.session_id,
|
|
||||||
&["move", "status", "input", "session"],
|
|
||||||
);
|
);
|
||||||
if let Err(e) = reader {
|
handler.handle(req, addr).await
|
||||||
let err = format!("Failed to read events: {}", e);
|
|
||||||
println!("{}", err);
|
|
||||||
let mut res = Response::new(Body::from(err));
|
|
||||||
*res.status_mut() = StatusCode::NOT_FOUND;
|
|
||||||
return Ok(res);
|
|
||||||
}
|
|
||||||
let mut reader = reader.unwrap();
|
|
||||||
println!(
|
|
||||||
"Reading session events from kafka for session: {}",
|
|
||||||
read_payload.session_id
|
|
||||||
);
|
|
||||||
let events = reader.read_from_session();
|
|
||||||
if let Err(e) = events {
|
|
||||||
let err = format!("Failed to read events: {}", e);
|
|
||||||
println!("{}", err);
|
|
||||||
let mut res = Response::new(Body::from(err));
|
|
||||||
*res.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
|
|
||||||
return Ok(res);
|
|
||||||
}
|
|
||||||
println!("Successfully read session events from kafka.");
|
|
||||||
let json = serde_json::to_string(&events.unwrap()).unwrap();
|
|
||||||
build_success_res(&json)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn build_success_res(value: &str) -> Result<Response<Body>, Infallible> {
|
|
||||||
let json = format!("{{\"data\": {}}}", value);
|
|
||||||
let mut res = Response::new(Body::from(json));
|
|
||||||
let headers = res.headers_mut();
|
|
||||||
headers.insert("Content-Type", "application/json".parse().unwrap());
|
|
||||||
headers.insert("Access-Control-Allow-Origin", "*".parse().unwrap());
|
|
||||||
Ok(res)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn build_error_res(error: &str, status: StatusCode) -> Result<Response<Body>, Infallible> {
|
|
||||||
let json = format!("{{\"error\": \"{}\"}}", error);
|
|
||||||
let mut res = Response::new(Body::from(json));
|
|
||||||
*res.status_mut() = status;
|
|
||||||
return Ok(res);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn shutdown_signal() {
|
async fn shutdown_signal() {
|
||||||
@@ -434,81 +159,3 @@ async fn shutdown_signal() {
|
|||||||
.await
|
.await
|
||||||
.expect("failed to install CTRL+C signal handler");
|
.expect("failed to install CTRL+C signal handler");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Deserialize, Serialize)]
|
|
||||||
struct SessionEventListDTO {
|
|
||||||
session_id: String,
|
|
||||||
events: Vec<SessionEventWriteDTO>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Deserialize, Serialize)]
|
|
||||||
struct SessionEventWriteDTO {
|
|
||||||
session_id: String,
|
|
||||||
topic: String,
|
|
||||||
msg: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
|
||||||
struct SessionReadDTO {
|
|
||||||
session_id: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
|
||||||
struct SessionJoinDto {
|
|
||||||
session_id: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Serialize)]
|
|
||||||
struct SessionJoinedDto {
|
|
||||||
session: Session,
|
|
||||||
player: Player,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Serialize)]
|
|
||||||
struct SessionCreatedDto {
|
|
||||||
session: Session,
|
|
||||||
player: Player,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Serialize)]
|
|
||||||
struct SessionClosedDto {
|
|
||||||
session: Session,
|
|
||||||
reason: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq)]
|
|
||||||
enum WebSocketConnectionType {
|
|
||||||
HOST,
|
|
||||||
PEER,
|
|
||||||
OBSERVER,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl FromStr for WebSocketConnectionType {
|
|
||||||
type Err = ();
|
|
||||||
|
|
||||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
|
||||||
match s.to_lowercase().as_str() {
|
|
||||||
"host" => Ok(WebSocketConnectionType::HOST),
|
|
||||||
"peer" => Ok(WebSocketConnectionType::PEER),
|
|
||||||
"observer" => Ok(WebSocketConnectionType::OBSERVER),
|
|
||||||
_ => Err(()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
struct WebSocketSession {
|
|
||||||
pub connection_type: WebSocketConnectionType,
|
|
||||||
pub session: Session,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl WebSocketConnectionType {
|
|
||||||
pub fn get_topics(&self) -> &[&str] {
|
|
||||||
match self {
|
|
||||||
WebSocketConnectionType::HOST => &["input", "session"],
|
|
||||||
WebSocketConnectionType::PEER | WebSocketConnectionType::OBSERVER => {
|
|
||||||
&["move", "input", "status", "session"]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -8,7 +8,6 @@ use kafka::producer::{Partitioner, Producer, Record, RequiredAcks, Topics};
|
|||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
|
|
||||||
use pong::event::event::{Event, EventReaderImpl, EventWriterImpl};
|
use pong::event::event::{Event, EventReaderImpl, EventWriterImpl};
|
||||||
|
|
||||||
use crate::session::Session;
|
use crate::session::Session;
|
||||||
|
|
||||||
pub struct KafkaSessionEventWriterImpl {
|
pub struct KafkaSessionEventWriterImpl {
|
||||||
|
|||||||
@@ -5,9 +5,13 @@ use crate::http::HttpServer;
|
|||||||
mod hash;
|
mod hash;
|
||||||
pub mod http;
|
pub mod http;
|
||||||
pub mod kafka;
|
pub mod kafka;
|
||||||
|
mod session_manager;
|
||||||
|
pub mod utils;
|
||||||
|
mod websocket;
|
||||||
|
mod request_handler;
|
||||||
|
mod event;
|
||||||
mod player;
|
mod player;
|
||||||
mod session;
|
mod session;
|
||||||
pub mod utils;
|
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
pub async fn main() {
|
pub async fn main() {
|
||||||
|
|||||||
115
server/src/request_handler.rs
Normal file
115
server/src/request_handler.rs
Normal file
@@ -0,0 +1,115 @@
|
|||||||
|
use std::convert::Infallible;
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use hyper::{Body, Method, Request, Response, StatusCode};
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use serde_json::json;
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
use crate::event::{SessionCreatedDto, SessionJoinDto, SessionJoinedDto};
|
||||||
|
use crate::player::Player;
|
||||||
|
use crate::session_manager::SessionManager;
|
||||||
|
use crate::utils::http_utils::{build_error_res, build_success_res, get_query_params, read_json_body};
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
pub trait RequestHandler {
|
||||||
|
async fn handle(&self, req: Request<Body>, addr: SocketAddr) -> Result<Response<Body>, Infallible>;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct DefaultRequestHandler {
|
||||||
|
session_manager: Arc<Mutex<SessionManager>>
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DefaultRequestHandler {
|
||||||
|
pub fn new(
|
||||||
|
session_manager: Arc<Mutex<SessionManager>>
|
||||||
|
) -> DefaultRequestHandler {
|
||||||
|
DefaultRequestHandler {
|
||||||
|
session_manager
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl RequestHandler for DefaultRequestHandler {
|
||||||
|
async fn handle(&self, req: Request<Body>, addr: SocketAddr) -> Result<Response<Body>, Infallible> {
|
||||||
|
println!("req to {} with method {}", req.uri().path(), req.method());
|
||||||
|
match (req.method(), req.uri().path()) {
|
||||||
|
(&Method::GET, "/session") => handle_get_session(&self.session_manager, req).await,
|
||||||
|
(&Method::POST, "/create_session") => {
|
||||||
|
handle_session_create(&self.session_manager, req, addr).await
|
||||||
|
}
|
||||||
|
(&Method::POST, "/join_session") => handle_session_join(&self.session_manager, req, addr).await,
|
||||||
|
_ => Ok(Response::new("unknown".into())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_get_session(
|
||||||
|
session_manager: &Arc<Mutex<SessionManager>>,
|
||||||
|
req: Request<Body>,
|
||||||
|
) -> Result<Response<Body>, Infallible> {
|
||||||
|
let locked = session_manager.lock().await;
|
||||||
|
let query_params = get_query_params(&req);
|
||||||
|
let session_id = query_params.get("session_id");
|
||||||
|
if let None = session_id {
|
||||||
|
return build_error_res("Please provide a valid session id", StatusCode::BAD_REQUEST);
|
||||||
|
}
|
||||||
|
let session_id = session_id.unwrap();
|
||||||
|
let session = locked.get_session(session_id);
|
||||||
|
if let None = session {
|
||||||
|
return build_error_res("Unable to find session for given id", StatusCode::NOT_FOUND);
|
||||||
|
}
|
||||||
|
return build_success_res(&serde_json::to_string(&session.unwrap()).unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_session_create(
|
||||||
|
session_manager: &Arc<Mutex<SessionManager>>,
|
||||||
|
req: Request<Body>,
|
||||||
|
addr: SocketAddr,
|
||||||
|
) -> Result<Response<Body>, Infallible> {
|
||||||
|
println!("Called to create new session: {:?}", req);
|
||||||
|
let mut locked = session_manager.lock().await;
|
||||||
|
let player = Player {
|
||||||
|
id: addr.to_string(),
|
||||||
|
};
|
||||||
|
let session_create_res = locked.create_session(player.clone()).await;
|
||||||
|
if let Err(e) = session_create_res {
|
||||||
|
return Ok(Response::builder()
|
||||||
|
.status(StatusCode::INTERNAL_SERVER_ERROR)
|
||||||
|
.body(Body::from(e))
|
||||||
|
.unwrap());
|
||||||
|
}
|
||||||
|
let session_created = SessionCreatedDto {
|
||||||
|
session: session_create_res.unwrap(),
|
||||||
|
player,
|
||||||
|
};
|
||||||
|
let serialized = json!(session_created);
|
||||||
|
return build_success_res(&serialized.to_string());
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_session_join(
|
||||||
|
session_manager: &Arc<Mutex<SessionManager>>,
|
||||||
|
mut req: Request<Body>,
|
||||||
|
addr: SocketAddr,
|
||||||
|
) -> Result<Response<Body>, Infallible> {
|
||||||
|
println!("Received request to join session: {:?}", req);
|
||||||
|
let mut locked = session_manager.lock().await;
|
||||||
|
let body = read_json_body::<SessionJoinDto>(&mut req).await;
|
||||||
|
let player = Player {
|
||||||
|
id: addr.to_string(),
|
||||||
|
};
|
||||||
|
let session_join_res = locked.join_session(body.session_id, player.clone()).await;
|
||||||
|
if let Err(e) = session_join_res {
|
||||||
|
eprintln!("Failed to join session: {:?}", e);
|
||||||
|
return Ok(Response::builder()
|
||||||
|
.status(StatusCode::INTERNAL_SERVER_ERROR)
|
||||||
|
.body(Body::from(e))
|
||||||
|
.unwrap());
|
||||||
|
}
|
||||||
|
let session = session_join_res.unwrap();
|
||||||
|
println!("Successfully joined session: {:?}", session);
|
||||||
|
let session_joined = SessionJoinedDto { session, player };
|
||||||
|
let serialized = json!(session_joined);
|
||||||
|
return build_success_res(&serialized.to_string());
|
||||||
|
}
|
||||||
@@ -1,194 +1,12 @@
|
|||||||
use serde::{Deserialize, Serialize};
|
use serde::{Serialize, Deserialize};
|
||||||
|
|
||||||
use pong::event::event::{Event, EventReader, EventWriter};
|
|
||||||
|
|
||||||
use crate::hash::Hasher;
|
|
||||||
use crate::kafka::{
|
|
||||||
KafkaDefaultEventWriterImpl, KafkaSessionEventReaderImpl, KafkaSessionEventWriterImpl,
|
|
||||||
KafkaTopicManager,
|
|
||||||
};
|
|
||||||
use crate::player::Player;
|
use crate::player::Player;
|
||||||
|
|
||||||
pub struct SessionManager {
|
|
||||||
kafka_host: String,
|
|
||||||
sessions: Vec<Session>,
|
|
||||||
session_producer: EventWriter,
|
|
||||||
topic_manager: KafkaTopicManager,
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: On startup read the session events from kafka to restore the session id <-> hash mappings.
|
|
||||||
impl SessionManager {
|
|
||||||
pub fn new(kafka_host: &str, kafka_topic_manager_host: &str) -> SessionManager {
|
|
||||||
SessionManager {
|
|
||||||
kafka_host: kafka_host.to_owned(),
|
|
||||||
sessions: vec![],
|
|
||||||
topic_manager: KafkaTopicManager::from(kafka_topic_manager_host),
|
|
||||||
session_producer: EventWriter::new(Box::new(KafkaDefaultEventWriterImpl::new(
|
|
||||||
kafka_host,
|
|
||||||
))),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_session(&self, session_id: &str) -> Option<Session> {
|
|
||||||
self.sessions
|
|
||||||
.iter()
|
|
||||||
.find(|s| s.hash == session_id)
|
|
||||||
.map_or_else(|| None, |s| Some(s.clone()))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn create_session(&mut self, player: Player) -> Result<Session, String> {
|
|
||||||
let add_partition_res = self.topic_manager.add_partition().await;
|
|
||||||
if let Err(e) = add_partition_res {
|
|
||||||
println!("Failed to create partition: {}", e);
|
|
||||||
return Err(e);
|
|
||||||
}
|
|
||||||
let session_id = add_partition_res.unwrap();
|
|
||||||
let session_hash = Hasher::hash(session_id);
|
|
||||||
let session = Session::new(session_id, session_hash, player.clone());
|
|
||||||
println!("Successfully created session: {:?}", session);
|
|
||||||
let write_res = self.write_to_producer(session_created(session.clone(), player.clone()));
|
|
||||||
if let Err(e) = write_res {
|
|
||||||
eprintln!(
|
|
||||||
"Failed to write session created event for {:?} to producer: {}",
|
|
||||||
session, e
|
|
||||||
);
|
|
||||||
}
|
|
||||||
self.sessions.push(session.clone());
|
|
||||||
Ok(session)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn join_session(
|
|
||||||
&mut self,
|
|
||||||
session_id: String,
|
|
||||||
player: Player,
|
|
||||||
) -> Result<Session, String> {
|
|
||||||
let updated_session = {
|
|
||||||
let session = self.sessions.iter_mut().find(|s| s.hash == session_id);
|
|
||||||
if let None = session {
|
|
||||||
let error = format!("Can't join session that does not exist: {}", session_id);
|
|
||||||
return Err(error);
|
|
||||||
}
|
|
||||||
let mut session = session.unwrap();
|
|
||||||
if session.state != SessionState::PENDING {
|
|
||||||
let error = format!("Can't join session that is not PENDING: {}", session_id);
|
|
||||||
return Err(error);
|
|
||||||
}
|
|
||||||
if session.players.len() > 1 {
|
|
||||||
let error = format!("Can't join session with more than 1 player: {}", session_id);
|
|
||||||
return Err(error);
|
|
||||||
}
|
|
||||||
if session.players[0] == player {
|
|
||||||
let error = format!(
|
|
||||||
"Can't join session, because player {:?} is already in session: {}",
|
|
||||||
player, session_id
|
|
||||||
);
|
|
||||||
return Err(error);
|
|
||||||
}
|
|
||||||
session.players.push(player.clone());
|
|
||||||
session.state = SessionState::RUNNING;
|
|
||||||
session.clone()
|
|
||||||
};
|
|
||||||
{
|
|
||||||
let write_res =
|
|
||||||
self.write_to_producer(session_joined(updated_session.clone(), player.clone()));
|
|
||||||
if let Err(e) = write_res {
|
|
||||||
eprintln!(
|
|
||||||
"Failed to write session joined event for {:?} to producer: {}",
|
|
||||||
updated_session, e
|
|
||||||
);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
println!("sessions = {:?}", self.sessions);
|
|
||||||
Ok(updated_session.clone())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn write_to_producer<T>(&mut self, session_event: T) -> Result<(), String>
|
|
||||||
where
|
|
||||||
T: Serialize,
|
|
||||||
{
|
|
||||||
let json_event = serde_json::to_string(&session_event).unwrap();
|
|
||||||
let session_event_write = self.session_producer.write(Event {
|
|
||||||
topic: "session".to_owned(),
|
|
||||||
key: None,
|
|
||||||
msg: json_event,
|
|
||||||
});
|
|
||||||
if let Err(e) = session_event_write {
|
|
||||||
let message = format!("Failed to write session create event to kafka: {:?}", e);
|
|
||||||
println!("{}", e);
|
|
||||||
return Err(message.to_owned());
|
|
||||||
}
|
|
||||||
println!("Successfully produced session event.");
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn split(
|
|
||||||
&self,
|
|
||||||
session_id: &str,
|
|
||||||
read_topics: &[&str],
|
|
||||||
) -> Result<(SessionReader, SessionWriter), String> {
|
|
||||||
let reader = self.get_session_reader(session_id, read_topics);
|
|
||||||
if let Err(e) = reader {
|
|
||||||
println!("Failed to create session reader: {:?}", e);
|
|
||||||
return Err("Failed to create session reader".to_string());
|
|
||||||
}
|
|
||||||
let writer = self.get_session_writer(session_id);
|
|
||||||
if let Err(e) = writer {
|
|
||||||
println!("Failed to create session writer: {:?}", e);
|
|
||||||
return Err("Failed to create session writer".to_string());
|
|
||||||
}
|
|
||||||
return Ok((reader.unwrap(), writer.unwrap()));
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_session_reader(
|
|
||||||
&self,
|
|
||||||
session_id: &str,
|
|
||||||
topics: &[&str],
|
|
||||||
) -> Result<SessionReader, String> {
|
|
||||||
let session = self.find_session(&session_id);
|
|
||||||
if let None = session {
|
|
||||||
return Err(format!("Unable to find session with hash {}", session_id));
|
|
||||||
}
|
|
||||||
let session = session.unwrap();
|
|
||||||
let kafka_reader = KafkaSessionEventReaderImpl::new(&self.kafka_host, &session, topics);
|
|
||||||
if let Err(_) = kafka_reader {
|
|
||||||
return Err("Unable to create kafka reader.".to_string());
|
|
||||||
}
|
|
||||||
let kafka_reader = kafka_reader.unwrap();
|
|
||||||
let event_reader = EventReader::new(Box::new(kafka_reader));
|
|
||||||
Ok(SessionReader {
|
|
||||||
reader: event_reader,
|
|
||||||
session,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_session_writer(&self, session_id: &str) -> Result<SessionWriter, String> {
|
|
||||||
let session = self.find_session(&session_id);
|
|
||||||
if let None = session {
|
|
||||||
return Err(format!("Unable to find session with hash {}", session_id));
|
|
||||||
}
|
|
||||||
let session = session.unwrap();
|
|
||||||
let event_writer =
|
|
||||||
EventWriter::new(Box::new(KafkaSessionEventWriterImpl::new(&self.kafka_host)));
|
|
||||||
Ok(SessionWriter {
|
|
||||||
writer: event_writer,
|
|
||||||
session,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn find_session(&self, session_id: &str) -> Option<Session> {
|
|
||||||
self.sessions
|
|
||||||
.iter()
|
|
||||||
.find(|s| session_id == s.hash)
|
|
||||||
.map(|s| s.clone())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub struct Session {
|
pub struct Session {
|
||||||
pub id: u16,
|
pub id: u16,
|
||||||
pub hash: String,
|
pub hash: String,
|
||||||
pub state: SessionState,
|
pub state: SessionState,
|
||||||
players: Vec<Player>,
|
pub players: Vec<Player>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Session {
|
impl Session {
|
||||||
@@ -221,78 +39,3 @@ pub enum SessionState {
|
|||||||
CLOSED, // game is over
|
CLOSED, // game is over
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct SessionWriter {
|
|
||||||
session: Session,
|
|
||||||
writer: EventWriter,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SessionWriter {
|
|
||||||
pub fn write_to_session(&mut self, topic: &str, msg: &str) -> Result<(), String> {
|
|
||||||
let event = Event {
|
|
||||||
msg: msg.to_owned(),
|
|
||||||
key: Some(self.session.id.to_string()),
|
|
||||||
topic: topic.to_owned(),
|
|
||||||
};
|
|
||||||
self.writer.write(event)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct SessionReader {
|
|
||||||
#[allow(dead_code)]
|
|
||||||
session: Session,
|
|
||||||
reader: EventReader,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SessionReader {
|
|
||||||
pub fn read_from_session(&mut self) -> Result<Vec<Event>, String> {
|
|
||||||
self.reader.read()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Deserialize, Serialize)]
|
|
||||||
struct SessionCreatedEvent {
|
|
||||||
event_type: SessionEventType,
|
|
||||||
session: Session,
|
|
||||||
player: Player,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SessionCreatedEvent {
|
|
||||||
pub fn new(session: Session, player: Player) -> SessionCreatedEvent {
|
|
||||||
SessionCreatedEvent {
|
|
||||||
event_type: SessionEventType::CREATED,
|
|
||||||
session,
|
|
||||||
player,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Deserialize, Serialize)]
|
|
||||||
struct SessionJoinedEvent {
|
|
||||||
event_type: SessionEventType,
|
|
||||||
session: Session,
|
|
||||||
player: Player,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SessionJoinedEvent {
|
|
||||||
pub fn new(session: Session, player: Player) -> SessionJoinedEvent {
|
|
||||||
SessionJoinedEvent {
|
|
||||||
event_type: SessionEventType::JOINED,
|
|
||||||
session,
|
|
||||||
player,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn session_created(session: Session, player: Player) -> SessionCreatedEvent {
|
|
||||||
SessionCreatedEvent::new(session, player)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn session_joined(session: Session, player: Player) -> SessionJoinedEvent {
|
|
||||||
SessionJoinedEvent::new(session, player)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Deserialize, Serialize)]
|
|
||||||
enum SessionEventType {
|
|
||||||
CREATED,
|
|
||||||
JOINED,
|
|
||||||
}
|
|
||||||
|
|||||||
261
server/src/session_manager.rs
Normal file
261
server/src/session_manager.rs
Normal file
@@ -0,0 +1,261 @@
|
|||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
use pong::event::event::{Event, EventReader, EventWriter};
|
||||||
|
|
||||||
|
use crate::hash::Hasher;
|
||||||
|
use crate::kafka::{
|
||||||
|
KafkaDefaultEventWriterImpl, KafkaSessionEventReaderImpl, KafkaSessionEventWriterImpl,
|
||||||
|
KafkaTopicManager,
|
||||||
|
};
|
||||||
|
use crate::player::Player;
|
||||||
|
use crate::session::{Session, SessionState};
|
||||||
|
|
||||||
|
pub struct SessionManager {
|
||||||
|
kafka_host: String,
|
||||||
|
sessions: Vec<Session>,
|
||||||
|
session_producer: EventWriter,
|
||||||
|
topic_manager: KafkaTopicManager,
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: On startup read the session events from kafka to restore the session id <-> hash mappings.
|
||||||
|
impl SessionManager {
|
||||||
|
pub fn new(kafka_host: &str, kafka_topic_manager_host: &str) -> SessionManager {
|
||||||
|
SessionManager {
|
||||||
|
kafka_host: kafka_host.to_owned(),
|
||||||
|
sessions: vec![],
|
||||||
|
topic_manager: KafkaTopicManager::from(kafka_topic_manager_host),
|
||||||
|
session_producer: EventWriter::new(Box::new(KafkaDefaultEventWriterImpl::new(
|
||||||
|
kafka_host,
|
||||||
|
))),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_session(&self, session_id: &str) -> Option<Session> {
|
||||||
|
self.sessions
|
||||||
|
.iter()
|
||||||
|
.find(|s| s.hash == session_id)
|
||||||
|
.map_or_else(|| None, |s| Some(s.clone()))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn create_session(&mut self, player: Player) -> Result<Session, String> {
|
||||||
|
let add_partition_res = self.topic_manager.add_partition().await;
|
||||||
|
if let Err(e) = add_partition_res {
|
||||||
|
println!("Failed to create partition: {}", e);
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
let session_id = add_partition_res.unwrap();
|
||||||
|
let session_hash = Hasher::hash(session_id);
|
||||||
|
let session = Session::new(session_id, session_hash, player.clone());
|
||||||
|
println!("Successfully created session: {:?}", session);
|
||||||
|
let write_res = self.write_to_producer(session_created(session.clone(), player.clone()));
|
||||||
|
if let Err(e) = write_res {
|
||||||
|
eprintln!(
|
||||||
|
"Failed to write session created event for {:?} to producer: {}",
|
||||||
|
session, e
|
||||||
|
);
|
||||||
|
}
|
||||||
|
self.sessions.push(session.clone());
|
||||||
|
Ok(session)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn join_session(
|
||||||
|
&mut self,
|
||||||
|
session_id: String,
|
||||||
|
player: Player,
|
||||||
|
) -> Result<Session, String> {
|
||||||
|
let updated_session = {
|
||||||
|
let session = self.sessions.iter_mut().find(|s| s.hash == session_id);
|
||||||
|
if let None = session {
|
||||||
|
let error = format!("Can't join session that does not exist: {}", session_id);
|
||||||
|
return Err(error);
|
||||||
|
}
|
||||||
|
let mut session = session.unwrap();
|
||||||
|
if session.state != SessionState::PENDING {
|
||||||
|
let error = format!("Can't join session that is not PENDING: {}", session_id);
|
||||||
|
return Err(error);
|
||||||
|
}
|
||||||
|
if session.players.len() > 1 {
|
||||||
|
let error = format!("Can't join session with more than 1 player: {}", session_id);
|
||||||
|
return Err(error);
|
||||||
|
}
|
||||||
|
if session.players[0] == player {
|
||||||
|
let error = format!(
|
||||||
|
"Can't join session, because player {:?} is already in session: {}",
|
||||||
|
player, session_id
|
||||||
|
);
|
||||||
|
return Err(error);
|
||||||
|
}
|
||||||
|
session.players.push(player.clone());
|
||||||
|
session.state = SessionState::RUNNING;
|
||||||
|
session.clone()
|
||||||
|
};
|
||||||
|
{
|
||||||
|
let write_res =
|
||||||
|
self.write_to_producer(session_joined(updated_session.clone(), player.clone()));
|
||||||
|
if let Err(e) = write_res {
|
||||||
|
eprintln!(
|
||||||
|
"Failed to write session joined event for {:?} to producer: {}",
|
||||||
|
updated_session, e
|
||||||
|
);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
println!("sessions = {:?}", self.sessions);
|
||||||
|
Ok(updated_session.clone())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn write_to_producer<T>(&mut self, session_event: T) -> Result<(), String>
|
||||||
|
where
|
||||||
|
T: Serialize,
|
||||||
|
{
|
||||||
|
let json_event = serde_json::to_string(&session_event).unwrap();
|
||||||
|
let session_event_write = self.session_producer.write(Event {
|
||||||
|
topic: "session".to_owned(),
|
||||||
|
key: None,
|
||||||
|
msg: json_event,
|
||||||
|
});
|
||||||
|
if let Err(e) = session_event_write {
|
||||||
|
let message = format!("Failed to write session create event to kafka: {:?}", e);
|
||||||
|
println!("{}", e);
|
||||||
|
return Err(message.to_owned());
|
||||||
|
}
|
||||||
|
println!("Successfully produced session event.");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn split(
|
||||||
|
&self,
|
||||||
|
session_id: &str,
|
||||||
|
read_topics: &[&str],
|
||||||
|
) -> Result<(SessionReader, SessionWriter), String> {
|
||||||
|
let reader = self.get_session_reader(session_id, read_topics);
|
||||||
|
if let Err(e) = reader {
|
||||||
|
println!("Failed to create session reader: {:?}", e);
|
||||||
|
return Err("Failed to create session reader".to_string());
|
||||||
|
}
|
||||||
|
let writer = self.get_session_writer(session_id);
|
||||||
|
if let Err(e) = writer {
|
||||||
|
println!("Failed to create session writer: {:?}", e);
|
||||||
|
return Err("Failed to create session writer".to_string());
|
||||||
|
}
|
||||||
|
return Ok((reader.unwrap(), writer.unwrap()));
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_session_reader(
|
||||||
|
&self,
|
||||||
|
session_id: &str,
|
||||||
|
topics: &[&str],
|
||||||
|
) -> Result<SessionReader, String> {
|
||||||
|
let session = self.find_session(&session_id);
|
||||||
|
if let None = session {
|
||||||
|
return Err(format!("Unable to find session with hash {}", session_id));
|
||||||
|
}
|
||||||
|
let session = session.unwrap();
|
||||||
|
let kafka_reader = KafkaSessionEventReaderImpl::new(&self.kafka_host, &session, topics);
|
||||||
|
if let Err(_) = kafka_reader {
|
||||||
|
return Err("Unable to create kafka reader.".to_string());
|
||||||
|
}
|
||||||
|
let kafka_reader = kafka_reader.unwrap();
|
||||||
|
let event_reader = EventReader::new(Box::new(kafka_reader));
|
||||||
|
Ok(SessionReader {
|
||||||
|
reader: event_reader,
|
||||||
|
session,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_session_writer(&self, session_id: &str) -> Result<SessionWriter, String> {
|
||||||
|
let session = self.find_session(&session_id);
|
||||||
|
if let None = session {
|
||||||
|
return Err(format!("Unable to find session with hash {}", session_id));
|
||||||
|
}
|
||||||
|
let session = session.unwrap();
|
||||||
|
let event_writer =
|
||||||
|
EventWriter::new(Box::new(KafkaSessionEventWriterImpl::new(&self.kafka_host)));
|
||||||
|
Ok(SessionWriter {
|
||||||
|
writer: event_writer,
|
||||||
|
session,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn find_session(&self, session_id: &str) -> Option<Session> {
|
||||||
|
self.sessions
|
||||||
|
.iter()
|
||||||
|
.find(|s| session_id == s.hash)
|
||||||
|
.map(|s| s.clone())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct SessionWriter {
|
||||||
|
session: Session,
|
||||||
|
writer: EventWriter,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SessionWriter {
|
||||||
|
pub fn write_to_session(&mut self, topic: &str, msg: &str) -> Result<(), String> {
|
||||||
|
let event = Event {
|
||||||
|
msg: msg.to_owned(),
|
||||||
|
key: Some(self.session.id.to_string()),
|
||||||
|
topic: topic.to_owned(),
|
||||||
|
};
|
||||||
|
self.writer.write(event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct SessionReader {
|
||||||
|
#[allow(dead_code)]
|
||||||
|
session: Session,
|
||||||
|
reader: EventReader,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SessionReader {
|
||||||
|
pub fn read_from_session(&mut self) -> Result<Vec<Event>, String> {
|
||||||
|
self.reader.read()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize, Serialize)]
|
||||||
|
struct SessionCreatedEvent {
|
||||||
|
event_type: SessionEventType,
|
||||||
|
session: Session,
|
||||||
|
player: Player,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SessionCreatedEvent {
|
||||||
|
pub fn new(session: Session, player: Player) -> SessionCreatedEvent {
|
||||||
|
SessionCreatedEvent {
|
||||||
|
event_type: SessionEventType::CREATED,
|
||||||
|
session,
|
||||||
|
player,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize, Serialize)]
|
||||||
|
struct SessionJoinedEvent {
|
||||||
|
event_type: SessionEventType,
|
||||||
|
session: Session,
|
||||||
|
player: Player,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SessionJoinedEvent {
|
||||||
|
pub fn new(session: Session, player: Player) -> SessionJoinedEvent {
|
||||||
|
SessionJoinedEvent {
|
||||||
|
event_type: SessionEventType::JOINED,
|
||||||
|
session,
|
||||||
|
player,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn session_created(session: Session, player: Player) -> SessionCreatedEvent {
|
||||||
|
SessionCreatedEvent::new(session, player)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn session_joined(session: Session, player: Player) -> SessionJoinedEvent {
|
||||||
|
SessionJoinedEvent::new(session, player)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize, Serialize)]
|
||||||
|
enum SessionEventType {
|
||||||
|
CREATED,
|
||||||
|
JOINED,
|
||||||
|
}
|
||||||
@@ -1,7 +1,8 @@
|
|||||||
pub mod http_utils {
|
pub mod http_utils {
|
||||||
use hyper::{body, Body, Request};
|
use hyper::{body, Body, Request, Response, StatusCode};
|
||||||
use serde::de::DeserializeOwned;
|
use serde::de::DeserializeOwned;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
use std::convert::Infallible;
|
||||||
|
|
||||||
pub fn get_query_params(req: &Request<Body>) -> HashMap<&str, &str> {
|
pub fn get_query_params(req: &Request<Body>) -> HashMap<&str, &str> {
|
||||||
let uri = req.uri();
|
let uri = req.uri();
|
||||||
@@ -26,6 +27,22 @@ pub mod http_utils {
|
|||||||
let body_str = std::str::from_utf8(&*bytes).unwrap();
|
let body_str = std::str::from_utf8(&*bytes).unwrap();
|
||||||
serde_json::from_str::<T>(body_str).unwrap()
|
serde_json::from_str::<T>(body_str).unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn build_success_res(value: &str) -> Result<Response<Body>, Infallible> {
|
||||||
|
let json = format!("{{\"data\": {}}}", value);
|
||||||
|
let mut res = Response::new(Body::from(json));
|
||||||
|
let headers = res.headers_mut();
|
||||||
|
headers.insert("Content-Type", "application/json".parse().unwrap());
|
||||||
|
headers.insert("Access-Control-Allow-Origin", "*".parse().unwrap());
|
||||||
|
Ok(res)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn build_error_res(error: &str, status: StatusCode) -> Result<Response<Body>, Infallible> {
|
||||||
|
let json = format!("{{\"error\": \"{}\"}}", error);
|
||||||
|
let mut res = Response::new(Body::from(json));
|
||||||
|
*res.status_mut() = status;
|
||||||
|
return Ok(res);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
205
server/src/websocket.rs
Normal file
205
server/src/websocket.rs
Normal file
@@ -0,0 +1,205 @@
|
|||||||
|
use std::str::FromStr;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
|
use hyper_tungstenite::HyperWebsocket;
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use futures::{SinkExt, StreamExt};
|
||||||
|
use hyper_tungstenite::tungstenite::{Error, Message};
|
||||||
|
use serde_json::json;
|
||||||
|
use tokio::time::sleep;
|
||||||
|
use serde::{Serialize, Deserialize};
|
||||||
|
use crate::event::{SessionClosedDto, SessionEventListDTO};
|
||||||
|
use crate::session::Session;
|
||||||
|
use crate::session_manager::{SessionManager};
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
pub trait WebsocketHandler {
|
||||||
|
async fn serve(self) -> Result<(), Error> ;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct DefaultWebsocketHandler {
|
||||||
|
websocket_session: WebSocketSession,
|
||||||
|
websocket: HyperWebsocket,
|
||||||
|
session_manager: Arc<Mutex<SessionManager>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DefaultWebsocketHandler {
|
||||||
|
pub fn new(
|
||||||
|
websocket_session: WebSocketSession,
|
||||||
|
websocket: HyperWebsocket,
|
||||||
|
session_manager: Arc<Mutex<SessionManager>>,
|
||||||
|
) -> DefaultWebsocketHandler {
|
||||||
|
DefaultWebsocketHandler {
|
||||||
|
websocket_session, websocket, session_manager
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl WebsocketHandler for DefaultWebsocketHandler {
|
||||||
|
async fn serve(self) -> Result<(), Error> {
|
||||||
|
let websocket = self.websocket.await?;
|
||||||
|
let (mut websocket_writer, mut websocket_reader) = websocket.split();
|
||||||
|
|
||||||
|
let session_manager = self.session_manager.lock().await;
|
||||||
|
let event_handler_pair = session_manager.split(
|
||||||
|
&self.websocket_session.session.hash,
|
||||||
|
self.websocket_session.connection_type.get_topics(),
|
||||||
|
);
|
||||||
|
if let Err(_) = event_handler_pair {
|
||||||
|
eprintln!(
|
||||||
|
"Failed to create event reader/writer pair session: {:?}",
|
||||||
|
self.websocket_session
|
||||||
|
);
|
||||||
|
return Err(Error::ConnectionClosed); // TODO: Use proper error for this case to close the connection
|
||||||
|
}
|
||||||
|
|
||||||
|
let (mut event_reader, mut event_writer) = event_handler_pair.unwrap();
|
||||||
|
let websocket_session_read_copy = self.websocket_session.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
println!(
|
||||||
|
"Ready to read messages from ws connection: {:?}",
|
||||||
|
websocket_session_read_copy
|
||||||
|
);
|
||||||
|
while let Some(message) = websocket_reader.next().await {
|
||||||
|
match message.unwrap() {
|
||||||
|
Message::Text(msg) => {
|
||||||
|
let events = serde_json::from_str::<SessionEventListDTO>(&msg);
|
||||||
|
println!("Received ws message to persist events to kafka");
|
||||||
|
if let Err(e) = events {
|
||||||
|
eprintln!("Failed to deserialize ws message to event {}: {}", msg, e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let event_wrapper = events.unwrap();
|
||||||
|
if event_wrapper.session_id != websocket_session_read_copy.session.hash {
|
||||||
|
eprintln!("Websocket has session {:?} but was asked to write to session {} - skip.", websocket_session_read_copy, event_wrapper.session_id);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let mut any_error = false;
|
||||||
|
let event_count = event_wrapper.events.len();
|
||||||
|
for event in event_wrapper.events {
|
||||||
|
let write_res = event_writer.write_to_session(&event.topic, &event.msg);
|
||||||
|
if let Err(e) = write_res {
|
||||||
|
any_error = true;
|
||||||
|
eprintln!("Failed to write event {:?}: {}", event, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if any_error {
|
||||||
|
eprintln!(
|
||||||
|
"Failed to write at least one message for session {}",
|
||||||
|
event_wrapper.session_id
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
println!(
|
||||||
|
"Successfully wrote {} messages to kafka for session {:?}",
|
||||||
|
event_count, websocket_session_read_copy
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Message::Close(msg) => {
|
||||||
|
// No need to send a reply: tungstenite takes care of this for you.
|
||||||
|
if let Some(msg) = &msg {
|
||||||
|
println!(
|
||||||
|
"Received close message with code {} and message: {}",
|
||||||
|
msg.code, msg.reason
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
println!("Received close message");
|
||||||
|
}
|
||||||
|
|
||||||
|
let session_closed_event = SessionClosedDto {
|
||||||
|
session: websocket_session_read_copy.session.clone(),
|
||||||
|
reason: "ws closed".to_owned(),
|
||||||
|
};
|
||||||
|
let msg = json!(session_closed_event).to_string();
|
||||||
|
let session_event_write_res = event_writer.write_to_session("session", &msg);
|
||||||
|
if let Err(e) = session_event_write_res {
|
||||||
|
eprintln!("Failed to write session closed event: {0}", e)
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
println!("!!!! Exit websocket receiver !!!!")
|
||||||
|
});
|
||||||
|
let websocket_session_write_copy = self.websocket_session.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
println!(
|
||||||
|
"Ready to read messages from kafka: {:?}",
|
||||||
|
websocket_session_write_copy
|
||||||
|
);
|
||||||
|
loop {
|
||||||
|
println!("Reading messages from kafka.");
|
||||||
|
let messages = event_reader.read_from_session();
|
||||||
|
if let Err(_) = messages {
|
||||||
|
eprintln!(
|
||||||
|
"Failed to read messages from kafka for session: {:?}",
|
||||||
|
websocket_session_write_copy
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// println!("Read messages for websocket_session {:?} from consumer: {:?}", websocket_session_write_copy, messages);
|
||||||
|
let messages = messages.unwrap();
|
||||||
|
if messages.len() == 0 {
|
||||||
|
println!("No new messages from kafka.");
|
||||||
|
} else {
|
||||||
|
println!("{} new messages from kafka.", messages.len());
|
||||||
|
let json = serde_json::to_string(&messages).unwrap();
|
||||||
|
let message = Message::from(json);
|
||||||
|
println!("Sending kafka messages through websocket.");
|
||||||
|
let send_res = websocket_writer.send(message).await;
|
||||||
|
if let Err(e) = send_res {
|
||||||
|
eprintln!(
|
||||||
|
"Failed to send message to websocket for session {:?}: {:?}",
|
||||||
|
websocket_session_write_copy, e
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Avoid starvation of read thread (?)
|
||||||
|
// TODO: How to avoid this? This is very bad for performance.
|
||||||
|
sleep(Duration::from_millis(1)).await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct WebSocketSession {
|
||||||
|
pub connection_type: WebSocketConnectionType,
|
||||||
|
pub session: Session,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq)]
|
||||||
|
pub enum WebSocketConnectionType {
|
||||||
|
HOST,
|
||||||
|
PEER,
|
||||||
|
OBSERVER,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FromStr for WebSocketConnectionType {
|
||||||
|
type Err = ();
|
||||||
|
|
||||||
|
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||||
|
match s.to_lowercase().as_str() {
|
||||||
|
"host" => Ok(WebSocketConnectionType::HOST),
|
||||||
|
"peer" => Ok(WebSocketConnectionType::PEER),
|
||||||
|
"observer" => Ok(WebSocketConnectionType::OBSERVER),
|
||||||
|
_ => Err(()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl WebSocketConnectionType {
|
||||||
|
pub fn get_topics(&self) -> &[&str] {
|
||||||
|
match self {
|
||||||
|
WebSocketConnectionType::HOST => &["input", "session"],
|
||||||
|
WebSocketConnectionType::PEER | WebSocketConnectionType::OBSERVER => {
|
||||||
|
&["move", "input", "status", "session"]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,14 +0,0 @@
|
|||||||
pub mod tests {
|
|
||||||
use rstest::rstest;
|
|
||||||
use std::collections::HashMap;
|
|
||||||
|
|
||||||
#[rstest]
|
|
||||||
#[case(
|
|
||||||
"test=abc",
|
|
||||||
HashMap::from([("test", "abc")])
|
|
||||||
)]
|
|
||||||
fn get_query_params_tests(#[case] query_str: &str, #[case] expected: HashMap<&str, &str>) {
|
|
||||||
let res = get_query_params(query_str);
|
|
||||||
assert_eq!(res, expected)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1 +1 @@
|
|||||||
|
fn main() {}
|
||||||
|
|||||||
Reference in New Issue
Block a user