feature/watch-session

This commit is contained in:
Thilo Behnke
2022-07-03 15:03:34 +02:00
parent b9810b2bfe
commit f1f97ef243
11 changed files with 166 additions and 43 deletions

View File

@@ -1,6 +1,6 @@
import type {LocalSession, NetworkSession, Session} from "../store/model/session";
import type {LocalSession, NetworkSession} from "../store/model/session";
import {SessionState, SessionType} from "../store/model/session";
import type {NetworkSessionEventPayload, SessionEventPayload} from "../store/model/event";
import type {NetworkSessionEventPayload} from "../store/model/event";
async function createLocalSession(): Promise<LocalSession> {
await new Promise((res) => {
@@ -26,7 +26,11 @@ async function createNetworkSession(): Promise<NetworkSession> {
}
function createJoinLink(sessionId: string): string {
return `${window.location.origin}${window.location.pathname}?session_id=${sessionId}`;
return `${window.location.origin}${window.location.pathname}?join=${sessionId}`;
}
function createWatchLink(sessionId: string): string {
return `${window.location.origin}${window.location.pathname}?watch=${sessionId}`;
}
async function joinNetworkSession(sessionId): Promise<NetworkSession> {
@@ -71,7 +75,7 @@ async function sessionResponseHandler(response: Response): Promise<NetworkSessio
async function createEventWebsocket(session: NetworkSession): Promise<WebSocket> {
console.debug("creating ws for session: ", session)
const url = `/pong/ws?session_id=${session.session_id}&player_id=${session.you.id}&connection_type=${session.type.toLowerCase()}`;
const url = `/pong/ws?session_id=${session.session_id}&actor_id=${session.you.id}&connection_type=${session.type.toLowerCase()}`;
return createWebsocket(url);
}
@@ -108,5 +112,6 @@ export default {
joinNetworkSession,
watchNetworkSession,
createEventWebsocket,
createJoinLink
createJoinLink,
createWatchLink
}

View File

@@ -17,11 +17,12 @@
return;
}
const params = window.location.search.slice(1).split("&").map(p => p.split('=')).reduce((acc, [key, val]) => ({...acc, [key]: val}), {}) as any;
console.log(params)
if (!params.session_id) {
return;
if (params.join) {
joinSessionId = params.join;
}
if (params.watch) {
watchSessionId = params.watch;
}
joinSessionId = params.session_id;
})
const localSession = () => {

View File

@@ -15,6 +15,7 @@
export let session: NetworkSession;
let joinLink;
let watchLink;
let cachedSessionId;
let relevantKeyboardEvents: Readable<Input[]>;
@@ -24,6 +25,7 @@
cachedSessionId = session.session_id;
console.log("NetworkSessionWrapper ready, now setting up sessionEvents")
joinLink = api.createJoinLink(session.session_id);
watchLink = api.createWatchLink(session.session_id);
relevantKeyboardEvents = getPlayerKeyboardInputs(session.you.nr);
}
@@ -50,6 +52,7 @@
{:else if session.state === SessionState.CLOSED}
<h3>game over!</h3>
{:else if session.state === SessionState.RUNNING}
<CopyToClipboard text={watchLink}></CopyToClipboard>
{#if session.type === SessionType.HOST}
<TickWrapper inputs={$sessionInputs} let:tick={tick} let:inputs={inputs} let:handleError={handleError}>
<slot inputs={inputs} tick={tick} events={$networkSessionStateEvents}></slot>

View File

@@ -1,6 +1,6 @@
<script lang="ts">
import type {Session} from "../store/model/session";
import {isLocalSession, isNetworkSession} from "../store/model/session";
import {isLocalSession, isNetworkSession, isPlayer, SessionType} from "../store/model/session";
export let session: Session;
</script>
@@ -11,7 +11,11 @@
<span><b>State:</b> {session.state}</span>
{#if isNetworkSession(session)}
<span><b>Type:</b> {session.type}</span>
<span><b>You:</b> Player {session.you.nr} ({session.you.id})</span>
{#if isPlayer(session.you)}
<span><b>You:</b> Player {session.you.nr} ({session.you.id})</span>
{:else}
<span><b>You:</b> Observer ({session.you.id})</span>
{/if}
{/if}
</div>
{/if}

View File

@@ -8,8 +8,11 @@ export enum SessionType {
LOCAL = 'LOCAL', HOST = 'HOST', PEER = 'PEER', OBSERVER = 'OBSERVER'
}
export type Player = {
export type Actor = {
id: string,
}
export type Player = Actor & {
nr: number
}
@@ -25,8 +28,14 @@ export type GameObject = {
y: number,
}
export type Observer = {
id: string
export type Observer = Actor
export const isPlayer = (actor: Actor): actor is Player => {
return !!(actor as Player).nr
}
export const isObserver = (actor: Actor): actor is Observer => {
return !isPlayer(actor);
}
export type LocalSession = {
@@ -40,8 +49,9 @@ export type NetworkSession = {
type: SessionType.HOST | SessionType.PEER | SessionType.OBSERVER,
state: SessionState,
players: Player[],
you: Player
you: Actor
}
export type Session = LocalSession | NetworkSession;
export function isNetworkSession(session: Session): session is NetworkSession {

View File

@@ -37,4 +37,14 @@ impl Player {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct Observer {
pub id: String,
pub ip: String,
}
impl Observer {
pub fn new(ip: String) -> Observer {
Observer {
ip,
id: Uuid::new_v4().to_string()
}
}
}

View File

@@ -1,7 +1,7 @@
use std::str::FromStr;
use serde::{Deserialize, Serialize};
use pong::game_field::Input;
use crate::actor::Player;
use crate::actor::{Actor, Player};
use crate::session::Session;
#[derive(Debug, Deserialize, Serialize)]
@@ -72,13 +72,14 @@ pub struct HeartBeatEventPayload {
pub enum SessionEvent {
Created(SessionEventPayload),
Joined(SessionEventPayload),
ObserverAdded(SessionEventPayload),
Closed(SessionEventPayload),
}
impl SessionEvent {
pub fn session_id(&self) -> &str {
return match self {
SessionEvent::Created(e) | SessionEvent::Joined(e) | SessionEvent::Closed(e) => e.session_id()
SessionEvent::Created(e) | SessionEvent::Joined(e) | SessionEvent::ObserverAdded(e) | SessionEvent::Closed(e) => e.session_id()
}
}
}
@@ -86,7 +87,7 @@ impl SessionEvent {
#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub struct SessionEventPayload {
pub session: Session,
pub actor: Player,
pub actor: Actor,
pub reason: String,
}

View File

@@ -12,6 +12,7 @@ use hyper_tungstenite::HyperWebsocket;
use hyper_tungstenite::tungstenite::{Error};
use log::{debug, error, info};
use tokio::sync::Mutex;
use crate::actor::Actor;
use crate::request_handler::{DefaultRequestHandler, RequestHandler};
use crate::session_manager::{SessionManager};
@@ -84,7 +85,7 @@ async fn handle_potential_ws_upgrade(session_manager: Arc<Mutex<SessionManager>>
StatusCode::BAD_REQUEST,
);
}
if !params.contains_key("player_id") {
if !params.contains_key("actor_id") {
error!("Missing player id request param for websocket connection, don't upgrade connection to ws.");
return build_error_res(
"Missing request param: player_id",
@@ -100,7 +101,7 @@ async fn handle_potential_ws_upgrade(session_manager: Arc<Mutex<SessionManager>>
return res;
}
let request_session_id = *params.get("session_id").unwrap();
let request_player_id = *params.get("player_id").unwrap();
let request_actor_id = *params.get("actor_id").unwrap();
let request_player_ip = addr.ip().to_string();
let session = session_manager.lock().await.get_session(request_session_id);
if let None = session {
@@ -109,18 +110,6 @@ async fn handle_potential_ws_upgrade(session_manager: Arc<Mutex<SessionManager>>
return build_error_res(error.as_str(), StatusCode::NOT_FOUND);
}
let session = session.unwrap();
let matching_player = session.players.iter().find(|p| p.id == request_player_id);
if let None = matching_player {
let error = format!("Player is not registered in session: {}", request_player_id);
error!("{}", error);
return build_error_res(error.as_str(), StatusCode::FORBIDDEN);
}
let matching_player = matching_player.unwrap();
if matching_player.ip != request_player_ip {
let error = format!("Player with wrong ip tried to join session: {} (expected) vs {} (actual)", matching_player.ip, request_player_ip);
error!("{}", error);
return build_error_res(error.as_str(), StatusCode::FORBIDDEN);
}
let connection_type_raw = params.get("connection_type").unwrap();
let connection_type =
WebSocketConnectionType::from_str(connection_type_raw);
@@ -130,10 +119,37 @@ async fn handle_potential_ws_upgrade(session_manager: Arc<Mutex<SessionManager>>
error!("{}", error);
return build_error_res(error.as_str(), StatusCode::BAD_REQUEST);
}
let connection_type = connection_type.unwrap();
let actor = match connection_type {
WebSocketConnectionType::OBSERVER => {
let matching_observer = session.observers.iter().find(|o| o.id == request_actor_id);
if let None = matching_observer {
let error = format!("Observer is not registered in session: {}", request_actor_id);
error!("{}", error);
return build_error_res(error.as_str(), StatusCode::FORBIDDEN);
}
Actor::Observer(matching_observer.unwrap().clone())
},
_ => {
let matching_player = session.players.iter().find(|p| p.id == request_actor_id);
if let None = matching_player {
let error = format!("Player is not registered in session: {}", request_actor_id);
error!("{}", error);
return build_error_res(error.as_str(), StatusCode::FORBIDDEN);
}
let matching_player = matching_player.unwrap();
if matching_player.ip != request_player_ip {
let error = format!("Player with wrong ip tried to join session: {} (expected) vs {} (actual)", matching_player.ip, request_player_ip);
error!("{}", error);
return build_error_res(error.as_str(), StatusCode::FORBIDDEN);
}
Actor::Player(matching_player.clone())
}
};
let websocket_session = WebSocketSession {
session: session.clone(),
connection_type: connection_type.unwrap(),
player: matching_player.clone()
connection_type,
actor
};
debug!("websocket upgrade request is valid, will now upgrade to websocket: {:?}", req);

View File

@@ -8,7 +8,7 @@ use serde_json::json;
use tokio::sync::Mutex;
use serde::{Deserialize};
use crate::event::{SessionEvent, SessionEventPayload, SessionEventType};
use crate::actor::Player;
use crate::actor::{Actor, Observer, Player};
use crate::session_manager::SessionManager;
use crate::utils::http_utils::{build_error_res, build_success_res, get_query_params, read_json_body};
@@ -41,6 +41,7 @@ impl RequestHandler for DefaultRequestHandler {
handle_session_create(&self.session_manager, req, addr).await
}
(&Method::POST, "/join_session") => handle_session_join(&self.session_manager, req, addr).await,
(&Method::POST, "/watch_session") => handle_session_watch(&self.session_manager, req, addr).await,
_ => Ok(Response::new("unknown".into())),
}
}
@@ -117,6 +118,31 @@ async fn handle_session_join(
return build_success_res(&serialized.to_string());
}
async fn handle_session_watch(
session_manager: &Arc<Mutex<SessionManager>>,
mut req: Request<Body>,
addr: SocketAddr,
) -> Result<Response<Body>, Infallible> {
info!("called watch_session");
debug!("req: {:?}", req);
let mut locked = session_manager.lock().await;
let body = read_json_body::<SessionJoinDto>(&mut req).await;
let observer = Observer::new(addr.ip().to_string());
let sesssion_add_observer_res = locked.watch_session(body.session_id, observer.clone()).await;
if let Err(e) = sesssion_add_observer_res {
error!("Failed to join session: {:?}", e);
return Ok(Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(e))
.unwrap());
}
let session_event = sesssion_add_observer_res.unwrap();
info!("observer {:?} successfully joined session: {:?}", observer, session_event);
let reason = format!("observer {:?} joined session", observer);
let serialized = json!(session_event);
return build_success_res(&serialized.to_string());
}
#[derive(Deserialize)]
struct SessionJoinDto {
pub session_id: String

View File

@@ -1,4 +1,5 @@
use std::collections::HashMap;
use futures::future::err;
use log::{debug, error, info};
use serde::{Deserialize, Serialize};
@@ -9,7 +10,7 @@ use crate::kafka::{
KafkaSessionEventReaderImpl, KafkaSessionEventWriterImpl,
KafkaTopicManager,
};
use crate::actor::Player;
use crate::actor::{Actor, Observer, Player};
use crate::event::{SessionEvent, SessionEventPayload};
use crate::session::{Session, SessionState};
@@ -52,7 +53,7 @@ impl SessionManager {
self.sessions.push(session.clone());
let session_created = SessionEvent::Created(SessionEventPayload {
session: session.clone(),
actor: player,
actor: Actor::Player(player),
reason: format!("session created")
});
let write_res = self.write_to_producer(&session_created);
@@ -105,7 +106,7 @@ impl SessionManager {
let session_joined_event = SessionEvent::Joined(SessionEventPayload {
session: updated_session.clone(),
reason: "session joined".to_owned(),
actor: player
actor: Actor::Player(player)
});
{
let write_res =
@@ -121,6 +122,52 @@ impl SessionManager {
Ok(session_joined_event)
}
pub async fn watch_session(
&mut self,
session_id: String,
observer: Observer
) -> Result<SessionEvent, String> {
let updated_session = {
let session = self.sessions.iter_mut().find(|s| s.session_id == session_id);
if let None = session {
let error = format!("Can't watch session that does not exist: {}", session_id);
return Err(error);
}
let session = session.unwrap();
if session.state != SessionState::RUNNING {
let error = format!("Can't watch session that is not RUNNING: {}", session_id);
return Err(error);
}
if session.observers.contains(&observer) {
let error = format!("Can't add observer to session {} that is already registered as an observer: {:?}", session_id, observer);
return Err(error);
}
if session.observers.len() > 5 {
let error = format!("Can't have more than 5 observers in session: {}", session_id);
return Err(error);
}
session.observers.push(observer.clone());
session.clone()
};
let session_joined_event = SessionEvent::ObserverAdded(SessionEventPayload {
session: updated_session.clone(),
reason: "observer added".to_owned(),
actor: Actor::Observer(observer)
});
{
let write_res =
self.write_to_producer(&session_joined_event);
if let Err(e) = write_res {
eprintln!(
"Failed to write watch session event for {:?} to producer: {}",
updated_session, e
);
}
};
println!("sessions = {:?}", self.sessions);
Ok(session_joined_event)
}
fn write_to_producer(&mut self, session_event: &SessionEvent) -> Result<(), String>
{
let session_id = session_event.session_id();

View File

@@ -16,7 +16,7 @@ use serde::{Serialize, Deserialize};
use pong::event::event::{EventWrapper, EventWriter};
use pong::game_field::Input;
use crate::event::{HeartBeatEventPayload, InputEventPayload, MoveEventPayload, SessionEvent, SessionEventListDTO, SessionEventPayload, SessionEventType, TickEvent};
use crate::actor::Player;
use crate::actor::{Actor, Player};
use crate::session::{Session, SessionState};
use crate::session_manager::{SessionManager, SessionWriter};
use crate::utils::json_utils::unescape;
@@ -118,7 +118,7 @@ impl WebsocketHandler for DefaultWebsocketHandler {
}
let input_event = InputEventPayload {
inputs: payload.inputs,
player_id: websocket_session_read_copy.player.id.to_owned(),
player_id: websocket_session_read_copy.actor.id().to_owned(),
ts: payload.ts,
session_id: session_id.to_owned()
};
@@ -133,7 +133,7 @@ impl WebsocketHandler for DefaultWebsocketHandler {
trace(&websocket_session_read_copy, "received message is PEER snapshot");
let input_event = InputEventPayload {
inputs: payload.inputs,
player_id: websocket_session_read_copy.player.id.to_owned(),
player_id: websocket_session_read_copy.actor.id().to_owned(),
ts: payload.ts,
session_id: session_id.to_owned()
};
@@ -296,7 +296,7 @@ fn write_session_close_event(event_writer: &mut SessionWriter, websocket_session
let mut updated_session = websocket_session.session.clone();
updated_session.state = SessionState::CLOSED;
let session_closed_event = SessionEvent::Closed(SessionEventPayload {
actor: websocket_session.player.clone(),
actor: websocket_session.actor.clone(),
session: updated_session,
reason: format!("ws closed: {}", close_reason),
});
@@ -311,7 +311,7 @@ fn write_session_close_event(event_writer: &mut SessionWriter, websocket_session
pub struct WebSocketSession {
pub connection_type: WebSocketConnectionType,
pub session: Session,
pub player: Player
pub actor: Actor
}
#[derive(Debug, Clone, PartialEq)]