feature/aggregate-tick-event

This commit is contained in:
Thilo Behnke
2022-07-03 14:34:54 +02:00
parent 720bc4c153
commit b9810b2bfe
13 changed files with 229 additions and 89 deletions

View File

@@ -1,3 +1,4 @@
.idea
.DS_Store
node_modules
dist

View File

@@ -57,13 +57,13 @@
></ModeSelect>
</div>
{:else}
<SessionWrapper session={session} let:inputs={inputs} let:objects={objects} let:tick={tick} let:events={events}>
<SessionWrapper session={session} let:inputs={inputs} let:tick={tick} let:events={events}>
<div class="game-area">
<div class="game-area__session">
<SessionInfo session={session}></SessionInfo>
</div>
<div class="game-area__canvas">
<Canvas debug={debug} session={session} inputs={inputs} objects={objects} on:tick={event => tick(...event.detail)}>
<Canvas debug={debug} session={session} inputs={inputs} tick={tick} on:tick={event => tick(...event.detail)}>
<Fps></Fps>
</Canvas>
</div>

View File

@@ -1,24 +1,29 @@
<script lang="ts">
import {FieldWrapper} from "wasm-app";
import {createEventDispatcher, onMount, setContext} from "svelte";
import {drawObjects} from "../store/render";
import {engineCanvas, engineCtx, height, pixelRatio, props, renderContext, width} from "../store/engine";
import {
engineCanvas,
engineCtx,
GameFieldState,
height,
pixelRatio,
props,
renderContext,
width
} from "../store/engine";
import type {Input} from "../store/model/input";
import type {GameObject, Session} from "../store/model/session";
import {SessionType} from "../store/model/session";
export let inputs: Input[] = []
export let objects: GameObject[] = []
export let tick: GameFieldState = null;
export let session: Session;
export let handleError: (err: string) => void;
export let killLoopOnError = true;
export let debug = false;
const dispatch = createEventDispatcher();
let canvas: any;
let ctx: any;
let frame: number;
let listeners = [];
let renderOnly = false;
@@ -27,6 +32,10 @@
renderOnly = session.type === SessionType.PEER || session.type === SessionType.OBSERVER;
}
$: if(canvas && session && tick) {
render(tick)
}
onMount(() => {
ctx = canvas.getContext('2d');
engineCtx.set(ctx)
@@ -43,11 +52,6 @@
}
entity.ready = true;
});
return createLoop((elapsed, dt) => {
tick(dt);
render(objects, dt);
});
})
setContext(renderContext, {
@@ -61,27 +65,7 @@
}
});
function createLoop (fn) {
let elapsed = 0;
let lastTime = performance.now();
(function loop() {
frame = requestAnimationFrame(loop);
const beginTime = performance.now();
const dt = (beginTime - lastTime) / 1000;
lastTime = beginTime;
elapsed += dt;
fn(elapsed, dt);
})();
return () => {
cancelAnimationFrame(frame);
};
}
function tick(dt) {
dispatch('tick', [dt]);
}
function render(objects, dt) {
function render({objects, ts}: {objects, ts}) {
const [canvas_width, canvas_height] = [canvas.width, canvas.height];
ctx.clearRect(0, 0, canvas_width, canvas_height);
drawObjects(ctx, objects, [canvas_width, canvas_height], debug);
@@ -89,14 +73,10 @@
listeners.forEach(entity => {
try {
if (entity.mounted && entity.ready && entity.render) {
entity.render($props, dt);
entity.render($props, ts);
}
} catch (err) {
console.error(err);
if (killLoopOnError) {
cancelAnimationFrame(frame);
console.warn('Animation loop stopped due to an error');
}
handleError(err);
}
});
}

View File

@@ -0,0 +1,25 @@
<script lang="ts">
import {gameField} from "../store/engine";
import {networkTickEvents, sessionInputs} from "../store/session";
export let killLoopOnError = true;
let frame: number;
$: if (networkTickEvents && $networkTickEvents.hasNext) {
const tick = networkTickEvents.next();
if (tick != null) {
gameField.update(tick.objects);
}
}
function handleError(err) {
console.error(err);
if (killLoopOnError) {
cancelAnimationFrame(frame);
console.warn('Animation loop stopped due to an error');
}
}
</script>
<slot tick={$gameField} inputs={$sessionInputs} handleError={handleError}></slot>

View File

@@ -1,15 +1,13 @@
<script lang="ts">
import {sessionInputs} from "../store/session";
import type {LocalSession} from "../store/model/session";
import {gameField} from "../store/engine";
import TickWrapper from "./TickWrapper.svelte";
export let session: LocalSession;
const tick = (dt: number) => {
gameField.tick($sessionInputs, dt)
}
</script>
{#if session && sessionInputs}
<slot inputs={$sessionInputs} objects={$gameField.objects} tick={tick} events={[]}></slot>
<TickWrapper inputs={$sessionInputs} let:tick={tick} let:inputs={inputs} let:handleError={handleError}>
<slot inputs={inputs} tick={tick} handleError={handleError} events={[]}></slot>
</TickWrapper>
{/if}

View File

@@ -1,6 +1,6 @@
<script lang="ts">
import {networkEvents, networkMoveEvents, networkSessionStateEvents, sessionInputs} from "../store/session";
import {networkEvents, networkTickEvents, networkSessionStateEvents, sessionInputs} from "../store/session";
import type {NetworkSession} from "../store/model/session";
import {SessionState, SessionType} from "../store/model/session";
import CopyToClipboard from "./CopyToClipboard.svelte";
@@ -9,6 +9,8 @@
import type {Input} from "../store/model/input";
import {getPlayerKeyboardInputs} from "../store/input";
import {gameField} from "../store/engine";
import InstrumentedTickWrapper from "./InstrumentedTickWrapper.svelte";
import TickWrapper from "./TickWrapper.svelte";
export let session: NetworkSession;
@@ -28,25 +30,15 @@
$: if(session && session.type === SessionType.HOST && session.state === SessionState.RUNNING) {
console.debug("sending host snapshot")
networkEvents.produce({inputs: $relevantKeyboardEvents, session_id: session.session_id, objects: $gameField.objects, player_id: session.you.id, ts: $gameField.lastTick})
networkEvents.produce({inputs: $relevantKeyboardEvents, session_id: session.session_id, objects: $gameField.objects, player_id: session.you.id, ts: $gameField.ts})
}
$: if(session && session.type === SessionType.PEER && session.state === SessionState.RUNNING) {
console.debug("sending host snapshot")
networkEvents.produce({inputs: $relevantKeyboardEvents, session_id: session.session_id, player_id: session.you.id, ts: $gameField.lastTick})
networkEvents.produce({inputs: $relevantKeyboardEvents, session_id: session.session_id, player_id: session.you.id, ts: $gameField.ts})
}
$: console.debug($networkSessionStateEvents)
const tick = (dt: number) => {
if (session.type === SessionType.HOST) {
gameField.tick($sessionInputs, dt)
return;
}
// peer and observer directly override game field state
gameField.update($networkMoveEvents)
}
</script>
{#if !session}
@@ -58,7 +50,15 @@
{:else if session.state === SessionState.CLOSED}
<h3>game over!</h3>
{:else if session.state === SessionState.RUNNING}
<slot inputs={$sessionInputs} objects={$gameField.objects} tick={tick} events={$networkSessionStateEvents}></slot>
{#if session.type === SessionType.HOST}
<TickWrapper inputs={$sessionInputs} let:tick={tick} let:inputs={inputs} let:handleError={handleError}>
<slot inputs={inputs} tick={tick} events={$networkSessionStateEvents}></slot>
</TickWrapper>
{:else}
<InstrumentedTickWrapper inputs={$sessionInputs} let:tick={tick} let:inputs={inputs}>
<slot inputs={inputs} tick={tick} events={$networkSessionStateEvents}></slot>
</InstrumentedTickWrapper>
{/if}
{:else }
<h3>unknown game state</h3>
{/if}

View File

@@ -7,7 +7,6 @@
import api from "../api/session";
export let session: Session;
</script>
<div class="session-wrapper">

View File

@@ -0,0 +1,41 @@
<script lang="ts">
import {onMount} from "svelte";
import {gameField} from "../store/engine";
export let inputs;
export let killLoopOnError = true;
let frame: number;
onMount(() => {
return createLoop((elapsed, dt) => {
gameField.tick(inputs, dt);
});
})
function createLoop (fn) {
let elapsed = 0;
let lastTime = performance.now();
(function loop() {
frame = requestAnimationFrame(loop);
const beginTime = performance.now();
const dt = (beginTime - lastTime) / 1000;
lastTime = beginTime;
elapsed += dt;
fn(elapsed, dt);
})();
return () => {
cancelAnimationFrame(frame);
};
}
function handleError(err) {
console.error(err);
if (killLoopOnError) {
cancelAnimationFrame(frame);
console.warn('Animation loop stopped due to an error');
}
}
</script>
<slot tick={$gameField} inputs={inputs} handleError={handleError}></slot>

View File

@@ -60,12 +60,12 @@ function deriveObject (obj) {
}
export type GameFieldState = {
lastTick: number,
ts: number,
objects: GameObject[]
}
function createGameFieldStore(): Readable<GameFieldState> & {tick: (inputs: Input[], dt: number) => void, update: (objects: GameObject[]) => void} {
const {subscribe, set} = writable<GameFieldState>({lastTick: 0, objects: []});
const {subscribe, set} = writable<GameFieldState>({ts: 0, objects: []});
const field = FieldWrapper.new();
@@ -73,12 +73,12 @@ function createGameFieldStore(): Readable<GameFieldState> & {tick: (inputs: Inpu
field.tick(inputs, dt);
const objects = JSON.parse(field.objects());
const lastTick = Date.now();
set({objects, lastTick});
const ts = Date.now();
set({objects, ts});
}
function update(objects: GameObject[]) {
set({objects, lastTick: Date.now()})
set({objects, ts: Date.now()})
}
return {

View File

@@ -22,7 +22,11 @@ export type InputEventPayload = {
ts: number,
}
export type GameEvent = SessionEventPayload | NetworkSessionEventPayload | InputEventPayload;
export type TickEventPayload = {
session_id: string,
objects: GameObject[],
ts: number
}
export type SessionEvenWrapper = {
topic: 'session',
@@ -34,12 +38,17 @@ export type InputEventWrapper = {
event: InputEventPayload
}
export type TickEventWrapper = {
topic: 'tick',
event: TickEventPayload
}
export type MoveEventWrapper = {
topic: 'move',
event: GameObject
}
export type GameEventWrapper = SessionEvenWrapper | InputEventWrapper | MoveEventWrapper;
export type GameEventWrapper = SessionEvenWrapper | InputEventWrapper | MoveEventWrapper | TickEventWrapper;
export const isSessionEvent = (event: GameEventWrapper): event is SessionEvenWrapper => {
return event.topic === 'session';
@@ -52,3 +61,8 @@ export const isInputEvent = (event: GameEventWrapper): event is InputEventWrappe
export const isMoveEvent = (event: GameEventWrapper): event is MoveEventWrapper => {
return event.topic === 'move';
}
export const isTickEvent = (event: GameEventWrapper): event is TickEventWrapper => {
return event.topic === 'tick';
}

View File

@@ -3,13 +3,13 @@ import api from "../api/session";
import type {LocalSession, Message, NetworkSession, Session, SessionSnapshot} from "./model/session";
import {isLocalSession, isNetworkSession, MessageType, SessionState, SessionType} from "./model/session";
import type {NetworkStore} from "./network";
import type {GameEventWrapper, InputEventPayload, SessionEventPayload} from "./model/event";
import {isInputEvent, isMoveEvent, isSessionEvent} from "./model/event";
import type {GameEventWrapper, InputEventPayload, SessionEventPayload, TickEventPayload} from "./model/event";
import {isInputEvent, isMoveEvent, isSessionEvent, isTickEvent} from "./model/event";
import {getPlayerKeyboardInputs, playerKeyboardInputs} from "./input";
import type {Subscriber} from "svelte/types/runtime/store";
import {combined} from "./utils";
import type {Input} from "./model/input";
import {init} from "svelte/internal";
import {init, tick} from "svelte/internal";
const sessionStore = writable<Session>(null);
@@ -134,14 +134,56 @@ export const networkSessionStateEvents = readable<SessionEventPayload[]>([], set
}
});
export const networkMoveEvents = derived(networkEvents, $sessionEvents => {
const moveEvents = $sessionEvents.filter(isMoveEvent).map(({event}) => event);
if (!moveEvents.length) {
return [];
export type NetworkTickEventState = {
hasNext: boolean;
events: TickEventPayload[]
}
const createNetworkTickEventStore = function() {
const ticks = writable<NetworkTickEventState>({hasNext: false, events: []});
const unsubSessionEvents = networkEvents.subscribe($sessionEvents => {
const tickEvents = $sessionEvents.filter(isTickEvent).map(({event}) => event);
ticks.update(({events}) => {
const updatedEvents = [...events, ...tickEvents];
return {
hasNext: !!updatedEvents.length,
events: updatedEvents
}
})
})
function next(): TickEventPayload {
const events = get(ticks);
if (!events.hasNext) {
return null;
}
const nextEvent = events.events[events.events.length - 1]
ticks.update(({events}) => {
const updatedEvents = events.slice(0, -1);
return {
hasNext: !!updatedEvents.length,
events: updatedEvents
}
})
return nextEvent;
}
// TODO: How to know number of objects?
return moveEvents.slice(moveEvents.length - 7)
})
const customSubscribe = (run: Subscriber<NetworkTickEventState>, invalidate): Unsubscriber => {
const unsubscribe = ticks.subscribe(run, invalidate);
return () => {
unsubscribe();
unsubSessionEvents();
}
}
return {
next,
subscribe: customSubscribe
}
}
export const networkTickEvents = createNetworkTickEventStore();
const networkInputEvents = derived([networkEvents, sessionStore], ([$sessionEvents, $sessionStore]) => $sessionEvents.filter(wrapper => {
if (!isInputEvent(wrapper)) {

View File

@@ -41,6 +41,12 @@ pub struct MoveEventPayload {
pub ts: u128,
}
#[derive(Debug, Serialize)]
pub struct TickEvent {
pub tick: u128,
pub objects: Vec<MoveEventPayload>
}
#[derive(Debug, Serialize, Deserialize)]
pub struct InputEventPayload {
pub session_id: String,

View File

@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::fmt::{Debug, format};
use std::str::FromStr;
use std::sync::Arc;
@@ -6,14 +7,15 @@ use hyper_tungstenite::HyperWebsocket;
use tokio::sync::Mutex;
use async_trait::async_trait;
use futures::{SinkExt, StreamExt};
use futures::future::err;
use hyper_tungstenite::tungstenite::{Error, Message};
use log::{debug, error, info, trace};
use serde_json::json;
use tokio::time::sleep;
use serde::{Serialize, Deserialize};
use pong::event::event::EventWriter;
use pong::event::event::{EventWrapper, EventWriter};
use pong::game_field::Input;
use crate::event::{HeartBeatEventPayload, InputEventPayload, MoveEventPayload, SessionEvent, SessionEventListDTO, SessionEventPayload, SessionEventType};
use crate::event::{HeartBeatEventPayload, InputEventPayload, MoveEventPayload, SessionEvent, SessionEventListDTO, SessionEventPayload, SessionEventType, TickEvent};
use crate::actor::Player;
use crate::session::{Session, SessionState};
use crate::session_manager::{SessionManager, SessionWriter};
@@ -203,17 +205,48 @@ impl WebsocketHandler for DefaultWebsocketHandler {
error(&websocket_session_write_copy, &format!("Failed to read messages from kafka: {:?}", e));
continue;
}
let events = events.unwrap();
trace(&websocket_session_write_copy, &format!("read messages for websocket_session from consumer: {:?}", events));
let event_dtos = events.unwrap().into_iter().map(|e| {
info!("#### {}", e.event);
WebsocketEventDTO {
topic: e.topic,
event: e.event
}
}).collect::<Vec<WebsocketEventDTO>>();
if event_dtos.len() == 0 {
if events.len() == 0 {
trace(&websocket_session_write_copy, "no new messages from kafka.");
} else {
let (move_events, other_events): (Vec<EventWrapper>, Vec<EventWrapper>) = events.into_iter().partition(|e| &e.topic == "move");
let mut grouped_move_events = HashMap::<u128, Vec<MoveEventPayload>>::new();
for move_event in move_events {
let deserialized = serde_json::from_str::<MoveEventPayload>(&move_event.event);
if let Err(e) = deserialized {
error(&websocket_session_write_copy, &format!("Failed to deserialize move event {:?}: {:?}", move_event, e));
continue;
}
let deserialized = deserialized.unwrap();
if !grouped_move_events.contains_key(&deserialized.ts) {
grouped_move_events.insert(deserialized.ts.clone(), vec![]);
}
let existing = grouped_move_events.get_mut(&deserialized.ts).unwrap();
existing.push(deserialized);
}
let mut tick_event_dtos = grouped_move_events.into_iter()
.map(|e| TickEvent{tick: e.0, objects: e.1})
.map(|e| serde_json::to_string(&e).unwrap())
// TODO: This could e.g. be done with ksql when the move events are sent.
.map(|e| WebsocketEventDTO {topic: "tick".to_owned(), event: e})
.collect::<Vec<WebsocketEventDTO>>();
let mut other_event_dtos = other_events.into_iter().map(|e| {
WebsocketEventDTO {
topic: e.topic,
event: e.event
}
}).collect::<Vec<WebsocketEventDTO>>();
let mut event_dtos = vec![];
event_dtos.append(&mut other_event_dtos);
if websocket_session_write_copy.connection_type != WebSocketConnectionType::HOST {
event_dtos.append(&mut tick_event_dtos);
}
trace(&websocket_session_write_copy, &format!("{} new messages from kafka.", event_dtos.len()));
let json = serde_json::to_string(&event_dtos).unwrap();
trace(&websocket_session_write_copy, &format!("sending msg batch to client: {}", json));
@@ -230,6 +263,7 @@ impl WebsocketHandler for DefaultWebsocketHandler {
break;
}
}
// Avoid starvation of read thread (?)
// TODO: How to avoid this? This is very bad for performance.
sleep(Duration::from_millis(1)).await;