refactoring/message-deserialization (#3)

This commit is contained in:
thilo-behnke
2022-06-30 00:02:51 +02:00
committed by Thilo Behnke
parent e0924a96e2
commit c1dacccbe7
56 changed files with 15811 additions and 490 deletions

4
client/svelte-client/.gitignore vendored Normal file
View File

@@ -0,0 +1,4 @@
.DS_Store
node_modules
dist
pong

View File

@@ -0,0 +1 @@
{"recommendations": ["svelte.svelte-vscode"]}

View File

@@ -0,0 +1,64 @@
# svelte app
This is a project template for [Svelte](https://svelte.dev) apps. It lives at https://github.com/sveltejs/template-webpack.
To create a new project based on this template using [degit](https://github.com/Rich-Harris/degit):
```bash
npx degit sveltejs/template-webpack svelte-app
cd svelte-app
```
*Note that you will need to have [Node.js](https://nodejs.org) installed.*
## Get started
Install the dependencies...
```bash
cd svelte-app
npm install
```
...then start webpack:
```bash
npm run dev
```
Navigate to [localhost:8080](http://localhost:8080). You should see your app running. Edit a component file in `src`, save it, and the page should reload with your changes.
## Deploying to the web
### With [now](https://zeit.co/now)
Install `now` if you haven't already:
```bash
npm install -g now
```
Then, from within your project folder:
```bash
now
```
As an alternative, use the [Now desktop client](https://zeit.co/download) and simply drag the unzipped project folder to the taskbar icon.
### With [surge](https://surge.sh/)
Install `surge` if you haven't already:
```bash
npm install -g surge
```
Then, from within your project folder:
```bash
npm run build
surge public
```

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.1 KiB

View File

@@ -0,0 +1,17 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset='utf-8'>
<meta name='viewport' content='width=device-width,initial-scale=1'>
<title>Svelte app</title>
<link rel='icon' type='image/png' href='favicon.png'>
<link rel='stylesheet' href='main.css'>
<script defer src='main.js'></script>
</head>
<body>
</body>
</html>

13062
client/svelte-client/package-lock.json generated Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,34 @@
{
"name": "svelte-app",
"version": "1.0.0",
"devDependencies": {
"@tsconfig/svelte": "^1.0.10",
"@types/node": "^14.11.1",
"copy-webpack-plugin": "^5.0.0",
"cross-env": "^7.0.3",
"css-loader": "^5.0.1",
"mini-css-extract-plugin": "^1.3.4",
"svelte": "^3.31.2",
"svelte-check": "^1.0.46",
"svelte-loader": "^3.0.0",
"svelte-preprocess": "^4.3.0",
"ts-loader": "^8.0.4",
"tslib": "^2.0.1",
"typescript": "^4.0.3",
"webpack": "^5.16.0",
"webpack-cli": "^4.4.0",
"webpack-dev-server": "^3.11.2"
},
"dependencies": {
"svelte-loading-spinners": "^0.1.7",
"wasm-app": "file:../wasm/pkg"
},
"bin": {
"create-wasm-app": ".bin/create-wasm-app.js"
},
"scripts": {
"build": "cross-env NODE_ENV=production webpack",
"dev": "webpack serve --content-base public",
"validate": "svelte-check"
}
}

View File

@@ -0,0 +1,144 @@
<script lang="ts">
import Canvas from "./components/Canvas.svelte";
import Fps from "./components/Fps.svelte";
import Input from "./components/Input.svelte";
import {localSession, networkSession, SessionStore} from "./store/session";
import ModeSelect from "./components/ModeSelect.svelte";
import GameSettings from "./components/GameSettings.svelte";
import SessionWrapper from "./components/SessionWrapper.svelte";
import Error from "./components/Error.svelte";
import SessionInfo from "./components/SessionInfo.svelte";
import type {Readable} from "svelte/store";
import {SessionType} from "./store/model/session";
import EvenTicker from "./components/EvenTicker.svelte";
let sessionStore: Readable<SessionStore>;
let debug = false;
$: loading = $sessionStore?.loading;
$: error = $sessionStore?.error;
$: session = $sessionStore?.session;
function createLocalSession() {
sessionStore = localSession();
}
function createSession() {
sessionStore = networkSession(SessionType.HOST)
}
function joinSession(sessionId) {
sessionStore = networkSession(SessionType.PEER, sessionId);
}
function watchSession(sessionId) {
sessionStore = networkSession(SessionType.OBSERVER, sessionId);
}
function toggleDebug() {
debug = !debug;
}
</script>
<main>
<h1>Welcome to WASM-Pong!</h1>
{#key error?.at}
<Error error={error?.value} duration={5_000}></Error>
{/key}
{#if !session}
<div class="mode-select">
<ModeSelect
isLoading={loading}
on:local-create={() => createLocalSession()}
on:session-create={() => createSession()}
on:session-join={({detail: sessionId}) => joinSession(sessionId)}
on:session-watch={({detail: sessionId}) => watchSession(sessionId)}
on:debug-toggle={() => toggleDebug()}
></ModeSelect>
</div>
{:else}
<SessionWrapper session={session} let:inputs={inputs} let:objects={objects} let:tick={tick} let:events={events}>
<div class="game-area">
<div class="game-area__session">
<SessionInfo session={session}></SessionInfo>
</div>
<div class="game-area__canvas">
<Canvas debug={debug} session={session} inputs={inputs} objects={objects} on:tick={event => tick(...event.detail)}>
<Fps></Fps>
</Canvas>
</div>
<div class="game-area__hud">
<GameSettings on:debug-toggle={() => toggleDebug()}></GameSettings>
<Input inputs={inputs}></Input>
</div>
<div class="game-area__events">
<EvenTicker events={events}></EvenTicker>
</div>
</div>
</SessionWrapper>
{/if}
</main>
<style>
main {
display: flex;
flex-flow: column;
justify-content: start;
align-items: center;
width: 600px;
margin: auto;
}
.mode-select {
display: flex;
flex-flow: column nowrap;
align-items: center;
width: 100%;
}
.game-area {
display: grid;
grid-template-areas:
"session session"
"game hud"
"events events";
grid-template-rows: min-content 1fr;
grid-template-columns: 1fr min-content;
grid-row-gap: 1rem;
grid-column-gap: 1rem;
}
.game-area__session {
grid-area: session;
display: flex;
justify-content: center;
border: 1px solid #ff3e00;
padding: 0.6rem;
}
.game-area__canvas {
grid-area: game;
}
.game-area__hud {
grid-area: hud;
display: grid;
grid-template-rows: max-content 1fr;
border: 1px solid #ff3e00;
padding: 0.4rem;
}
.game-area__events {
grid-area: events;
display: grid;
border: 1px solid #ff3e00;
padding: 0.4rem;
height: 300px;
}
@media (min-width: 640px) {
main {
max-width: none;
}
}
</style>

View File

@@ -0,0 +1,112 @@
import type {LocalSession, NetworkSession, Session} from "../store/model/session";
import {SessionState, SessionType} from "../store/model/session";
import type {NetworkSessionEventPayload, SessionEventPayload} from "../store/model/event";
async function createLocalSession(): Promise<LocalSession> {
await new Promise((res) => {
setTimeout(() => {
res(null)
}, 2_000)
});
return {
session_id: "local_session",
type: SessionType.LOCAL,
state: SessionState.RUNNING
}
}
async function createNetworkSession(): Promise<NetworkSession> {
return fetch("/pong/api/create_session", {method: 'POST', headers: [['Content-Type', 'application/json']]})
.then(sessionResponseHandler)
.then(session => ({...session, type: SessionType.HOST}) as NetworkSession)
.catch(err => {
console.error(`Failed to create session: ${err}`);
throw(err);
});
}
function createJoinLink(sessionId: string): string {
return `${window.location.origin}${window.location.pathname}?session_id=${sessionId}`;
}
async function joinNetworkSession(sessionId): Promise<NetworkSession> {
return fetch("/pong/api/join_session", {
method: 'POST',
body: JSON.stringify({session_id: sessionId}),
headers: [['Content-Type', 'application/json']]
})
.then(sessionResponseHandler)
.then(session => ({...session, type: SessionType.PEER}) as NetworkSession)
.catch(err => {
console.error(`Failed to create session: ${err}`);
throw(err);
});
}
async function watchNetworkSession(sessionId): Promise<NetworkSession> {
return fetch("/pong/api/watch_session", {
method: 'POST',
body: JSON.stringify({session_id: sessionId}),
headers: [['Content-Type', 'application/json']]
})
.then(sessionResponseHandler)
.then(session => ({...session, type: SessionType.OBSERVER} as NetworkSession))
.catch(err => {
console.error(`Failed to create session: ${err}`);
throw(err);
});
}
async function sessionResponseHandler(response: Response): Promise<NetworkSession> {
if (!response.ok) {
return response.text().then(text => {
return Promise.reject(`${response.status}: ${text}`)
});
}
return response.json().then(({data}) => {
console.debug(`session action result: ${JSON.stringify(data)}`)
return data;
}).then((event: NetworkSessionEventPayload) => ({you: event.actor, ...event.session}));
}
async function createEventWebsocket(session: NetworkSession): Promise<WebSocket> {
console.debug("creating ws for session: ", session)
const url = `/pong/ws?session_id=${session.session_id}&player_id=${session.you.id}&connection_type=${session.type.toLowerCase()}`;
return createWebsocket(url);
}
async function createWebsocket(path: string): Promise<WebSocket> {
return new Promise((res, rej) => {
const baseUrl = location.host.split(':')[0];
const websocket = new WebSocket(`ws://${baseUrl}/${path}`);
console.debug("ws initialized, not yet ready: ", websocket)
waitForWebsocket(websocket, 10, () => {
return res(websocket)
}, () => rej())
})
}
const waitForWebsocket = (websocket, retries, success, fail) => {
if (retries <= 0) {
console.error("ws not established successfully")
return
}
if (websocket.readyState !== 1) {
console.debug("ws not yet ready, sleep and check again in 100ms")
setTimeout(() => {
waitForWebsocket(websocket, retries - 1, success, fail)
}, 100)
} else {
console.debug("ws ready!")
success()
}
}
export default {
createLocalSession,
createNetworkSession,
joinNetworkSession,
watchNetworkSession,
createEventWebsocket,
createJoinLink
}

View File

@@ -0,0 +1,121 @@
<script lang="ts">
import {FieldWrapper} from "wasm-app";
import {createEventDispatcher, onMount, setContext} from "svelte";
import {drawObjects} from "../store/render";
import {engineCanvas, engineCtx, height, pixelRatio, props, renderContext, width} from "../store/engine";
import type {Input} from "../store/model/input";
import type {GameObject, Session} from "../store/model/session";
import {SessionType} from "../store/model/session";
export let inputs: Input[] = []
export let objects: GameObject[] = []
export let session: Session;
export let killLoopOnError = true;
export let debug = false;
const dispatch = createEventDispatcher();
let canvas: any;
let ctx: any;
let frame: number;
let listeners = [];
let renderOnly = false;
$: if(session) {
renderOnly = session.type === SessionType.PEER || session.type === SessionType.OBSERVER;
}
onMount(() => {
ctx = canvas.getContext('2d');
engineCtx.set(ctx)
engineCanvas.set(engineCanvas);
width.set(canvas.width);
height.set(canvas.height);
// field.set_dimensions(canvas.width, canvas.height);
// setup entities
listeners.forEach(async entity => {
if (entity.setup) {
let p = entity.setup($props);
if (p && p.then) await p;
}
entity.ready = true;
});
return createLoop((elapsed, dt) => {
tick(dt);
render(objects, dt);
});
})
setContext(renderContext, {
add (fn) {
this.remove(fn);
listeners.push(fn);
},
remove (fn) {
const idx = listeners.indexOf(fn);
if (idx >= 0) listeners.splice(idx, 1);
}
});
function createLoop (fn) {
let elapsed = 0;
let lastTime = performance.now();
(function loop() {
frame = requestAnimationFrame(loop);
const beginTime = performance.now();
const dt = (beginTime - lastTime) / 1000;
lastTime = beginTime;
elapsed += dt;
fn(elapsed, dt);
})();
return () => {
cancelAnimationFrame(frame);
};
}
function tick(dt) {
dispatch('tick', [dt]);
}
function render(objects, dt) {
const [canvas_width, canvas_height] = [canvas.width, canvas.height];
ctx.clearRect(0, 0, canvas_width, canvas_height);
drawObjects(ctx, objects, [canvas_width, canvas_height], debug);
listeners.forEach(entity => {
try {
if (entity.mounted && entity.ready && entity.render) {
entity.render($props, dt);
}
} catch (err) {
console.error(err);
if (killLoopOnError) {
cancelAnimationFrame(frame);
console.warn('Animation loop stopped due to an error');
}
}
});
}
function handleResize () {
// TODO: Resolution scaling needs to be implemented in wasm module.
// width.set(window.innerWidth);
// height.set(window.innerHeight);
pixelRatio.set(window.devicePixelRatio);
}
</script>
<canvas
bind:this={canvas}
width={$width * $pixelRatio}
height={$height * $pixelRatio}
style="width: {$width}px; height: {$height}px;"
></canvas>
<svelte:window on:resize|passive={handleResize}/>
<slot></slot>
<style>
</style>

View File

@@ -0,0 +1,43 @@
<script lang="ts">
export let text: string;
let copied = false;
const copyToClipboard = () => {
navigator.clipboard.writeText(text);
copied = true;
}
</script>
{#if text}
<div class="copy-to-clipboard">
<input type="text" bind:value={text} disabled/>
<button on:click={() => copyToClipboard()}>Copy{#if copied} <span class="copied-icon">&#10003;</span>{/if}</button>
</div>
{/if}
<style>
.copy-to-clipboard {
display: flex;
flex-flow: row nowrap;
justify-content: center;
width: 100%;
}
.copy-to-clipboard > input {
flex-grow: 1;
}
.copy-to-clipboard > button {
margin-left: 0.4rem;
display: flex;
align-items: center;
}
.copied-icon {
font-size: 1.5rem;
color: #ff3e00;
}
</style>

View File

@@ -0,0 +1,24 @@
<script lang="ts">
import {timer} from "../store/utils";
export let error: string;
export let duration: number;
let durationTimer;
$: if (error) {
durationTimer = timer(duration || 2_000)
}
</script>
{#if error && $durationTimer > 0}
<div class="error">
{error}
</div>
{/if}
<style>
.error {
border: 0.1rem solid #ff3e00;
padding: 1rem;
}
</style>

View File

@@ -0,0 +1,25 @@
<script lang="ts">
import type {GameEventWrapper} from "../store/model/event";
export let events: GameEventWrapper[];
</script>
<div class="event-ticker">
{#if events.length }
{#each events as event }
<div>{JSON.stringify(event)}</div>
{/each}
{:else}
No events available.
{/if}
</div>
<style>
.event-ticker {
width: 100%;
overflow-y: scroll;
font-size: 0.8rem;
}
</style>

View File

@@ -0,0 +1,32 @@
<script>
import Text from "./Text.svelte";
import {renderable} from "../store/engine";
let text = '';
let elapsed = 0;
let frames = 0;
let prevTime = performance.now();
renderable((state, dt) => {
let time = performance.now();
frames++;
if ( time >= prevTime + 1000 ) {
const fps = ((frames * 1000) / (time - prevTime));
text = `${fps.toFixed(1)} FPS`;
prevTime = time;
frames = 0;
}
});
</script>
<Text
{text}
fontSize=12
fontFamily='Courier New'
align='left'
baseline='top'
x={20}
y={20} />
<!-- The following allows this component to nest children -->
<slot></slot>

View File

@@ -0,0 +1,20 @@
<script lang="ts">
import {createEventDispatcher} from "svelte";
const dispatch = createEventDispatcher();
const toggleDebug = () => {
dispatch("debug-toggle")
}
</script>
<div class="game-settings">
<button on:click={() => toggleDebug()}>Toggle Debug</button>
</div>
<style>
.game-settings {
display: flex;
flex-flow: column nowrap;
align-items: center;
}
</style>

View File

@@ -0,0 +1,54 @@
<script lang="ts">
import type {Input} from "../store/session";
const availableInputs: string[] = ["UP", "DOWN"]
export let inputs: Input[] = [];
$: player1Inputs = inputs.filter(({player}) => player === 1).map(({input}) => input)
$: player2Inputs = inputs.filter(({player}) => player === 2).map(({input}) => input)
</script>
<div class="game_inputs">
<h4>Player 1</h4>
{#each availableInputs as input}
<div class="game_input game_input--{player1Inputs.includes(input) ? 'active' : 'inactive'}">
{input}
</div>
{/each}
<h4>Player 2</h4>
{#each availableInputs as input}
<div class="game_input game_input--{player2Inputs.includes(input) ? 'active' : 'inactive'}">
{input}
</div>
{/each}
</div>
<style>
.game_inputs {
display: flex;
flex-flow: column;
justify-content: center;
}
.game_input {
background-color: #eee;
border-radius: 4px;
font-size: 1em;
padding: 0.2em 0.5em;
border-top: 5px solid rgba(255, 255, 255, 0.5);
border-left: 5px solid rgba(255, 255, 255, 0.5);
border-right: 5px solid rgba(0, 0, 0, 0.2);
border-bottom: 5px solid rgba(0, 0, 0, 0.2);
color: #555;
}
.game_input--inactive {
color: black;
}
.game_input--active {
color: white;
background-color: #ff3e00;
}
</style>

View File

@@ -0,0 +1,15 @@
<script lang="ts">
import {sessionInputs} from "../store/session";
import type {LocalSession} from "../store/model/session";
import {gameField} from "../store/engine";
export let session: LocalSession;
const tick = (dt: number) => {
gameField.tick($sessionInputs, dt)
}
</script>
{#if session && sessionInputs}
<slot inputs={$sessionInputs} objects={$gameField.objects} tick={tick} events={[]}></slot>
{/if}

View File

@@ -0,0 +1,98 @@
<script lang="ts">
import {createEventDispatcher, getContext, onMount} from "svelte";
import {Shadow} from 'svelte-loading-spinners'
import session from "../api/session";
export let isLoading = false;
const dispatch = createEventDispatcher();
let joinSessionId = '';
let watchSessionId = '';
$: disableControls = isLoading;
onMount(() => {
if (!window.location.search.startsWith("?")) {
return;
}
const params = window.location.search.slice(1).split("&").map(p => p.split('=')).reduce((acc, [key, val]) => ({...acc, [key]: val}), {}) as any;
console.log(params)
if (!params.session_id) {
return;
}
joinSessionId = params.session_id;
})
const localSession = () => {
dispatch("local-create")
}
const createSession = () => {
dispatch("session-create")
}
const joinSession = () => {
if (!joinSessionId) {
return
}
dispatch("session-join", joinSessionId)
}
const watchSession = () => {
if (!watchSessionId) {
return
}
dispatch("session-watch", watchSessionId)
}
</script>
<div class="game-mode-select">
{#if isLoading}
<h3 style="text-align: center">Loading...</h3>
<div class="game-mode-select__loading">
<Shadow size="20" unit="px" color="#FF3E00" duration="1s"></Shadow>
</div>
{:else}
<h3 style="text-align: center">Please select a game mode</h3>
{/if}
<button disabled={disableControls} on:click={() => localSession()}>Create Local Game</button>
<hr/>
<button disabled={disableControls} on:click={() => createSession()}>Create Online Game</button>
<div class="game-mode-select__group">
<input bind:value={joinSessionId} placeholder="session id"/>
<button disabled={!joinSessionId || disableControls} on:click={() => joinSession()}>Join Online Game</button>
</div>
<div class="game-mode-select__group ">
<input bind:value={watchSessionId} placeholder="session id"/>
<button disabled={!watchSessionId || disableControls} on:click={() => watchSession()}>Watch Online Game</button>
</div>
</div>
<style>
.game-mode-select {
display: grid;
min-width: 20%;
}
.game-mode-select > hr {
width: 100%;
margin-bottom: 20px;
}
.game-mode-select__loading {
display: flex;
justify-content: center;
padding: 20px;
}
.game-mode-select__group {
display: grid;
grid-template-columns: 1fr 200px;
grid-column-gap: 10px;
}
.game-mode-select__group > input {
}
</style>

View File

@@ -0,0 +1,66 @@
<script lang="ts">
import {networkEvents, networkMoveEvents, networkSessionStateEvents, sessionInputs} from "../store/session";
import type {NetworkSession} from "../store/model/session";
import {SessionState, SessionType} from "../store/model/session";
import CopyToClipboard from "./CopyToClipboard.svelte";
import api from "../api/session";
import type {Readable} from "svelte/store";
import type {Input} from "../store/model/input";
import {getPlayerKeyboardInputs} from "../store/input";
import {gameField} from "../store/engine";
export let session: NetworkSession;
let joinLink;
let cachedSessionId;
let relevantKeyboardEvents: Readable<Input[]>;
// TODO: objects must come from events for peer and observer
$: if(!cachedSessionId && session) {
cachedSessionId = session.session_id;
console.log("NetworkSessionWrapper ready, now setting up sessionEvents")
joinLink = api.createJoinLink(session.session_id);
relevantKeyboardEvents = getPlayerKeyboardInputs(session.you.nr);
}
$: if(session && session.type === SessionType.HOST && session.state === SessionState.RUNNING) {
console.debug("sending host snapshot")
networkEvents.produce({inputs: $relevantKeyboardEvents, session_id: session.session_id, objects: $gameField.objects, player_id: session.you.id, ts: $gameField.lastTick})
}
$: if(session && session.type === SessionType.PEER && session.state === SessionState.RUNNING) {
console.debug("sending host snapshot")
networkEvents.produce({inputs: $relevantKeyboardEvents, session_id: session.session_id, player_id: session.you.id, ts: $gameField.lastTick})
}
$: console.debug($networkSessionStateEvents)
const tick = (dt: number) => {
if (session.type === SessionType.HOST) {
gameField.tick($sessionInputs, dt)
return;
}
// peer and observer directly override game field state
gameField.update($networkMoveEvents)
}
</script>
{#if !session}
<h3>no session</h3>
{:else}
{#if session.state === SessionState.PENDING}
<h3>waiting for other player...</h3>
<CopyToClipboard text={joinLink}></CopyToClipboard>
{:else if session.state === SessionState.CLOSED}
<h3>game over!</h3>
{:else if session.state === SessionState.RUNNING}
<slot inputs={$sessionInputs} objects={$gameField.objects} tick={tick} events={$networkSessionStateEvents}></slot>
{:else }
<h3>unknown game state</h3>
{/if}
{/if}

View File

@@ -0,0 +1,29 @@
<script lang="ts">
import type {Session} from "../store/model/session";
import {isLocalSession, isNetworkSession} from "../store/model/session";
export let session: Session;
</script>
{#if session}
<div class="session-info">
<span><b>Id:</b> {session.session_id}</span>
<span><b>State:</b> {session.state}</span>
{#if isNetworkSession(session)}
<span><b>Type:</b> {session.type}</span>
<span><b>You:</b> Player {session.you.nr} ({session.you.id})</span>
{/if}
</div>
{/if}
<style>
.session-info {
display: flex;
flex-flow: row wrap;
font-size: 0.9rem;
}
.session-info > span + span {
margin-left: 1rem;
}
</style>

View File

@@ -0,0 +1,34 @@
<script lang="ts">
import LocalSessionWrapper from "./LocalSessionWrapper.svelte";
import NetworkSessionWrapper from "./NetworkSessionWrapper.svelte";
import type {Session} from "../store/model/session";
import {SessionState, SessionType} from "../store/model/session";
import CopyToClipboard from "./CopyToClipboard.svelte";
import api from "../api/session";
export let session: Session;
</script>
<div class="session-wrapper">
{#if !session}
<h1>no session</h1>
{:else if session.type === SessionType.LOCAL}
<LocalSessionWrapper session={session} let:inputs={inputs} let:objects={objects} let:tick={tick} let:events={events}>
<slot inputs={inputs} objects={objects} tick={tick} events={events}></slot>
</LocalSessionWrapper>
{:else}
<NetworkSessionWrapper session={session} let:inputs={inputs} let:objects={objects} let:tick={tick} let:events={events}>
<slot inputs={inputs} objects={objects} tick={tick} events={events}></slot>
</NetworkSessionWrapper>
{/if}
</div>
<style>
.session-wrapper {
min-width: 20%;
display: flex;
flex-flow: column nowrap;
align-items: center;
}
</style>

View File

@@ -0,0 +1,28 @@
<script>
import {renderable} from "../store/engine";
export let color = 'hsl(0, 0%, 100%)';
export let align = 'center';
export let baseline = 'middle';
export let text = '';
export let x = 0;
export let y = 0;
export let fontSize = 16;
export let fontFamily = '-apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Helvetica';
renderable(props => {
const { engineCtx: ctx } = props;
if (text) {
ctx.fillStyle = 'black';
ctx.font = `${fontSize}px ${fontFamily}`;
ctx.textAlign = align;
ctx.textBaseline = baseline;
ctx.fillText(text, x, y);
}
});
</script>
<!-- The following allows this component to nest children -->
<slot></slot>

View File

@@ -0,0 +1,84 @@
html, body {
position: relative;
width: 100%;
height: 100%;
}
body {
color: #333;
margin: 0;
padding: 8px;
box-sizing: border-box;
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Oxygen-Sans, Ubuntu, Cantarell, "Helvetica Neue", sans-serif;
}
a {
color: rgb(0,100,200);
text-decoration: none;
}
a:hover {
text-decoration: underline;
}
a:visited {
color: rgb(0,80,160);
}
label {
display: block;
}
input, button, select, textarea {
font-family: inherit;
font-size: inherit;
-webkit-padding: 0.4em 0;
padding: 0.4em;
margin: 0 0 0.5em 0;
box-sizing: border-box;
border: 1px solid #ccc;
border-radius: 2px;
}
input:disabled {
color: #ccc;
}
button {
color: #333;
background-color: #f4f4f4;
outline: none;
}
button:disabled {
color: #999;
}
button:not(:disabled):active {
background-color: #ddd;
}
button:focus {
border-color: #666;
}
h1 {
color: #ff3e00;
text-transform: uppercase;
font-size: 2em;
font-weight: 100;
}
h3 {
color: #ff3e00;
text-transform: uppercase;
font-size: 1.3em;
font-weight: 100;
}
h4 {
color: #ff3e00;
text-transform: uppercase;
font-size: 1.0em;
font-weight: 300;
}

View File

@@ -0,0 +1,9 @@
import './global.css';
import App from './App.svelte';
const app = new App({
target: document.body
});
export default app;

View File

@@ -0,0 +1,91 @@
import {FieldWrapper} from "wasm-app";
import {derived, Readable, Writable, writable} from "svelte/store";
import {getContext, onMount} from "svelte";
import type {GameObject} from "./model/session";
import type {Input} from "./model/input";
import type {Subscriber} from "svelte/types/runtime/store";
export const engineCanvas = writable();
export const engineCtx = writable();
export const width = writable(800);
export const height = writable(600);
export const pixelRatio = writable(window.devicePixelRatio);
// A more convenient store for grabbing all store props
export const props = deriveObject({
width,
height,
pixelRatio,
engineCanvas,
engineCtx
});
export const renderContext = Symbol();
// https://svelte.dev/repl/79f4f3e0296a403ea988f74d332a7a4a?version=3.12.1
export const renderable = (render) => {
const api: any = getContext(renderContext);
const element = {
ready: false,
mounted: false,
render: null,
setup: null
};
if (typeof render === 'function') element.render = render;
else if (render) {
if (render.render) element.render = render.render;
if (render.setup) element.setup = render.setup;
}
api.add(element);
onMount(() => {
element.mounted = true;
return () => {
api.remove(element);
element.mounted = false;
};
});
}
function deriveObject (obj) {
const keys = Object.keys(obj);
const list = keys.map(key => {
return obj[key];
});
return derived(list, (array) => {
return array.reduce((dict, value, i) => {
dict[keys[i]] = value;
return dict;
}, {});
});
}
export type GameFieldState = {
lastTick: number,
objects: GameObject[]
}
function createGameFieldStore(): Readable<GameFieldState> & {tick: (inputs: Input[], dt: number) => void, update: (objects: GameObject[]) => void} {
const {subscribe, set} = writable<GameFieldState>({lastTick: 0, objects: []});
const field = FieldWrapper.new();
function tick(inputs: Input[], dt: number) {
field.tick(inputs, dt);
const objects = JSON.parse(field.objects());
const lastTick = Date.now();
set({objects, lastTick});
}
function update(objects: GameObject[]) {
set({objects, lastTick: Date.now()})
}
return {
subscribe,
tick,
update
}
}
export const gameField = createGameFieldStore();

View File

@@ -0,0 +1,26 @@
import {derived, readable} from "svelte/store";
import {keysPressed} from "./io";
import type {Input} from "./model/input";
export const playerKeyboardInputs = derived(keysPressed, $keysPressed => {
return $keysPressed.map((key): Input => {
switch (key.toLowerCase()) {
case 'w':
return {input: 'UP', obj_id: 0, player: 1};
case 's':
return {input: 'DOWN', obj_id: 0, player: 1}
case 'arrowup':
return {input: 'UP', obj_id: 1, player: 2}
case 'arrowdown':
return {input: 'DOWN', obj_id: 1, player: 2}
default:
return null
}
}).filter(it => !!it);
})
export const getPlayerKeyboardInputs = (player_nr: number) => readable<Input[]>([], set => {
return playerKeyboardInputs.subscribe(inputs => {
set(inputs.filter(({player}) => player === player_nr));
})
})

View File

@@ -0,0 +1,29 @@
import type {Readable} from "svelte/store";
import {readable} from "svelte/store";
export const keysPressed: Readable<string[]> = readable([], function(set) {
let keys = [];
const onKeydown = ({key}) => {
if (keys.includes(key)) {
return;
}
keys = [...keys, key];
set(keys);
}
const onKeyup = ({key}) => {
if (!keys.includes(key)) {
return;
}
keys = keys.filter(k => k !== key);
set(keys);
}
document.addEventListener('keydown', onKeydown);
document.addEventListener('keyup', onKeyup);
return () => {
document.removeEventListener('keydown', onKeydown);
document.removeEventListener('keyup', onKeyup);
}
});

View File

@@ -0,0 +1,54 @@
import type {GameObject, NetworkSession, Session} from "./session";
import type {Input} from "./input";
export type SessionEventPayload = {
actor: { id: string },
event_type: string,
reason: string,
session: Session
}
export type NetworkSessionEventPayload = {
actor: { id: string },
event_type: string,
reason: string,
session: NetworkSession
}
export type InputEventPayload = {
session_id: string,
inputs: Input[],
player_id: string,
ts: number,
}
export type GameEvent = SessionEventPayload | NetworkSessionEventPayload | InputEventPayload;
export type SessionEvenWrapper = {
topic: 'session',
event: SessionEventPayload
}
export type InputEventWrapper = {
topic: 'input',
event: InputEventPayload
}
export type MoveEventWrapper = {
topic: 'move',
event: GameObject
}
export type GameEventWrapper = SessionEvenWrapper | InputEventWrapper | MoveEventWrapper;
export const isSessionEvent = (event: GameEventWrapper): event is SessionEvenWrapper => {
return event.topic === 'session';
}
export const isInputEvent = (event: GameEventWrapper): event is InputEventWrapper => {
return event.topic === 'input';
}
export const isMoveEvent = (event: GameEventWrapper): event is MoveEventWrapper => {
return event.topic === 'move';
}

View File

@@ -0,0 +1,6 @@
export type Input = {
input: 'UP' | 'DOWN',
obj_id: number,
player: number
}

View File

@@ -0,0 +1,88 @@
import type {Input} from "./input";
export enum SessionState {
PENDING = 'PENDING', RUNNING = 'RUNNING', CLOSED = 'CLOSED'
}
export enum SessionType {
LOCAL = 'LOCAL', HOST = 'HOST', PEER = 'PEER', OBSERVER = 'OBSERVER'
}
export type Player = {
id: string,
nr: number
}
export type GameObject = {
id: number,
orientation_x: number,
orientation_y: number,
shape_param_1: number,
shape_param_2: number,
vel_x: number,
vel_y: number,
x: number,
y: number,
}
export type Observer = {
id: string
}
export type LocalSession = {
session_id: string,
state: SessionState,
type: SessionType.LOCAL
}
export type NetworkSession = {
session_id: string,
type: SessionType.HOST | SessionType.PEER | SessionType.OBSERVER,
state: SessionState,
players: Player[],
you: Player
}
export type Session = LocalSession | NetworkSession;
export function isNetworkSession(session: Session): session is NetworkSession {
return !isLocalSession(session)
}
export function isLocalSession(session: Session): session is LocalSession {
return !!session.type && session.type === SessionType.LOCAL
}
export type HostSessionSnapshot = {
session_id: string,
inputs: Input[],
objects: GameObject[],
player_id: string,
ts: number
}
export type PeerSessionSnapshot = {
session_id: string,
inputs: Input[],
player_id: string,
ts: number
}
export type SessionSnapshot = HostSessionSnapshot | PeerSessionSnapshot;
export type Heartbeat = {
session_id: string,
player_id: string,
ts: number
}
export enum MessageType {
Snapshot = "SessionSnapshot", Heartbeat = "HeartBeat"
}
export type Message = {
msg_type: MessageType.Snapshot,
payload: SessionSnapshot
} | {
msg_type: MessageType.Heartbeat,
payload: Heartbeat
}

View File

@@ -0,0 +1,15 @@
import {writable} from "svelte/store";
export type NetworkStore = {
loading: boolean,
error?: {
value: string,
at: number
}
}
const initialValue = () => ({
loading: false,
});
export const network = writable<NetworkStore>(initialValue())

View File

@@ -0,0 +1,42 @@
import main from "../main";
const mainColor = '#ff3e00';
export const drawObjects = (ctx: CanvasRenderingContext2D, objects, [width, height], debug = false) => {
objects.forEach(obj => {
ctx.beginPath();
ctx.strokeStyle = mainColor;
ctx.lineWidth = 2;
const obj_y = height - obj.y;
const orientation_y = obj.orientation_y * -1;
const vel_y = obj.vel_y * -1;
// rect
if (obj.shape_param_2) {
ctx.moveTo(obj.x, obj_y);
ctx.rect(obj.x - obj.shape_param_1 / 2, obj_y - obj.shape_param_2 / 2, obj.shape_param_1, obj.shape_param_2);
}
// circle
else {
ctx.arc(obj.x, obj_y, obj.shape_param_1, 0, 2 * Math.PI);
}
ctx.stroke();
if (debug) {
// velocity
drawLine(ctx, obj.x, obj_y, obj.x + obj.vel_x * 20, obj_y + vel_y * 20, 'red')
// orientation
drawLine(ctx, obj.x, obj_y, obj.x + obj.orientation_x * 20, obj_y + orientation_y * 20, 'blue')
ctx.fillText(`[x: ${obj.x}, y: ${obj_y}]`, obj.x + 10, obj_y)
}
})
}
const drawLine = (ctx, from_x, from_y, to_x, to_y, color) => {
ctx.beginPath();
ctx.moveTo(from_x, from_y);
ctx.strokeStyle = color;
ctx.lineTo(to_x, to_y);
ctx.stroke();
}

View File

@@ -0,0 +1,246 @@
import {derived, get, Readable, readable, Unsubscriber, writable} from "svelte/store";
import api from "../api/session";
import type {LocalSession, Message, NetworkSession, Session, SessionSnapshot} from "./model/session";
import {isLocalSession, isNetworkSession, MessageType, SessionState, SessionType} from "./model/session";
import type {NetworkStore} from "./network";
import type {GameEventWrapper, InputEventPayload, SessionEventPayload} from "./model/event";
import {isInputEvent, isMoveEvent, isSessionEvent} from "./model/event";
import {getPlayerKeyboardInputs, playerKeyboardInputs} from "./input";
import type {Subscriber} from "svelte/types/runtime/store";
import {combined} from "./utils";
import type {Input} from "./model/input";
import {init} from "svelte/internal";
const sessionStore = writable<Session>(null);
function createNetworkEvents() {
const {subscribe, set, update} = writable<GameEventWrapper[]>([]);
const websocket = writable<WebSocket>(null);
const sessionId = writable<string>(null);
const playerId = writable<string>(null);
const lastSnapshot = writable<SessionSnapshot>(null);
const unsubscribeSession = sessionStore.subscribe(session => {
if (!session || isLocalSession(session)) {
return;
}
if (get(sessionId) === session.session_id) {
return;
}
sessionId.set(session.session_id);
playerId.set(session.you.id);
console.log("creating ws to receive/send websocket events for session: ", JSON.stringify(session))
api.createEventWebsocket(session).then(ws => {
console.log("ws successfully established: ", ws)
ws.onopen = () => {
console.debug("ws successfully opened")
}
ws.onmessage = event => {
console.debug("Received event: ", event)
let events = JSON.parse(event.data);
// TODO: Hotfix, would be better to have clean serialization in the backend...
events = events.map(({event, ...rest}) => ({...rest, event: JSON.parse(event)}))
console.debug("Parsed events: ", events)
set(events);
}
ws.onerror = err => {
console.error("ws error: ", err)
}
ws.onclose = event => {
console.error("ws closed: ", event)
}
websocket.set(ws);
});
})
const interval = setInterval(() => {
const cachedSessionId = get(sessionId);
if (!cachedSessionId) {
return;
}
const last = get(lastSnapshot);
const now = Date.now();
if (last && now - last.ts < 1_000) {
return
}
console.debug("sending heartbeat")
const heartbeat: Message = {msg_type: MessageType.Heartbeat, payload: {session_id: cachedSessionId, player_id: get(playerId), ts: now}};
sendMessage(heartbeat);
}, 1_000)
function sendMessage(message: Message) {
const ws = get(websocket);
if (!ws) {
return;
}
console.debug("producing message to ws: ", message);
// TODO: Hotfix, double serialize to ease deserialization on server.
ws.send(JSON.stringify({msg_type: message.msg_type, payload: JSON.stringify(message.payload)}));
}
function produce(snapshot: SessionSnapshot) {
lastSnapshot.set(snapshot);
sendMessage({msg_type: MessageType.Snapshot, payload: snapshot});
}
const customSubscribe = (run: Subscriber<GameEventWrapper[]>, invalidate): Unsubscriber => {
const unsubscribe = subscribe(run, invalidate);
return () => {
unsubscribeSession();
clearInterval(interval);
unsubscribe();
}
}
return {
subscribe: customSubscribe,
produce
}
}
export type NetworkEventStore = Readable<GameEventWrapper[]> & {
produce: (snapshot: SessionSnapshot) => void
}
export const networkEvents: NetworkEventStore = createNetworkEvents();
export const networkSessionStateEvents = readable<SessionEventPayload[]>([], set => {
const cache = writable<SessionEventPayload[]>([]);
const unsub = networkEvents.subscribe(events => {
const sessionEvents = events.filter(isSessionEvent).map(({event}) => event);
if (!sessionEvents.length) {
return [];
}
cache.set([...get(cache), ...sessionEvents]);
set(get(cache))
const latestSessionEvent = sessionEvents[sessionEvents.length - 1] as SessionEventPayload;
const currentSession = get(sessionStore) as NetworkSession;
const session: Session = {
...(latestSessionEvent.session as NetworkSession),
you: currentSession.you,
type: currentSession.type
}
console.debug("updating current session: ", session)
sessionStore.set(session);
})
return () => {
unsub();
}
});
export const networkMoveEvents = derived(networkEvents, $sessionEvents => {
const moveEvents = $sessionEvents.filter(isMoveEvent).map(({event}) => event);
if (!moveEvents.length) {
return [];
}
// TODO: How to know number of objects?
return moveEvents.slice(moveEvents.length - 7)
})
const networkInputEvents = derived([networkEvents, sessionStore], ([$sessionEvents, $sessionStore]) => $sessionEvents.filter(wrapper => {
if (!isInputEvent(wrapper)) {
return false;
}
return wrapper.event.player_id !== ($sessionStore as NetworkSession).you.id
}).map(({event}) => event as InputEventPayload));
const getPlayerNetworkInputEvents = (player_nr: number): Readable<Input[]> => derived(networkInputEvents, $networkInputEvents => {
const session = get(sessionStore);
if (!isNetworkSession(session)) {
return [] as Input[];
}
const player = session.players.find(({nr}) => player_nr === nr);
if (!player) {
return [] as Input[];
}
const inputEvents = $networkInputEvents.filter(({player_id}) => player.id === player_id);
if (!inputEvents.length) {
return [] as Input[];
}
return inputEvents[inputEvents.length - 1].inputs
});
export const sessionInputs = readable([], function (setInputs) {
setInputs([]);
const unsubscribe = sessionStore.subscribe(session => {
return getInputStore(session).subscribe(input => {
setInputs(input);
});
});
return () => {
unsubscribe();
}
})
const getInputStore = (session: Session): Readable<Input[]> => {
if (isLocalSession(session)) {
return playerKeyboardInputs;
}
const sessionType = session.type;
if (sessionType === SessionType.HOST) {
return combined(getPlayerKeyboardInputs(1), getPlayerNetworkInputEvents(2));
}
if (sessionType === SessionType.PEER) {
return combined(getPlayerNetworkInputEvents(1), getPlayerKeyboardInputs(2));
}
if (sessionType === SessionType.OBSERVER) {
return combined(getPlayerNetworkInputEvents(1), getPlayerNetworkInputEvents(2));
}
throw new Error(`unknown session type ${session.type}`)
}
export const localSession = () => readable<SessionStore>(null, function (set) {
const session: LocalSession = {session_id: "local", type: SessionType.LOCAL, state: SessionState.RUNNING};
set({loading: true});
setTimeout(() => {
set({loading: false, session});
sessionStore.set(session);
}, 2_000);
})
export type SessionStore = NetworkStore & {
session?: Session
}
export const networkSession = (type: SessionType.HOST | SessionType.PEER | SessionType.OBSERVER, sessionId?: string) => readable<SessionStore>(null, function (set) {
function sessionCreator(fn) {
set({loading: true});
fn().then(session => {
set({loading: false, session});
sessionStore.set(session);
}).catch(e => {
set({loading: false, error: {value: e, at: performance.now()}});
sessionStore.set(null);
})
}
const unsubscribe = sessionStore.subscribe(session => {
set({loading: false, session})
})
switch (type) {
case SessionType.HOST:
sessionCreator(() => api.createNetworkSession());
break;
case SessionType.PEER:
sessionCreator(() => api.joinNetworkSession(sessionId));
break;
case SessionType.OBSERVER:
sessionCreator(() => api.watchNetworkSession(sessionId));
break;
default:
throw new Error("Unable to handle session type: " + type)
}
return () => {
unsubscribe();
}
})

View File

@@ -0,0 +1,36 @@
import {get, Readable, readable, Unsubscriber, writable} from "svelte/store";
import type {Subscriber} from "svelte/types/runtime/store";
export const timer = ms => readable(ms, function(set) {
setTimeout(() => {
set(0)
}, ms)
})
export function combined<T>(store1: Readable<T[]>, store2: Readable<T[]>): Readable<T[]> {
const val1 = writable<T[]>([]);
const val2 = writable<T[]>([]);
const {set, subscribe} = writable<T[]>();
const unsub1 = store1.subscribe(val => {
val1.set(val);
set([...val, ...get(val2)]);
})
const unsub2 = store2.subscribe(val => {
val2.set(val);
set([...get(val1), ...val]);
})
const customSubscribe = (run: Subscriber<T[]>, invalidate): Unsubscriber => {
const unsub = subscribe(run, invalidate);
return () => {
unsub();
unsub1();
unsub2();
}
}
return {
subscribe: customSubscribe
}
}

View File

@@ -0,0 +1,5 @@
{
"extends": "@tsconfig/svelte/tsconfig.json",
"include": ["src/**/*", "src/node_modules/**/*"],
"exclude": ["node_modules/*", "__sapper__/*", "static/*"],
}

View File

@@ -0,0 +1,83 @@
const MiniCssExtractPlugin = require('mini-css-extract-plugin');
const CopyWebpackPlugin = require("copy-webpack-plugin");
const path = require('path');
const sveltePreprocess = require('svelte-preprocess');
const mode = process.env.NODE_ENV || 'development';
const prod = mode === 'production';
module.exports = {
entry: './src/main.ts',
resolve: {
alias: {
svelte: path.dirname(require.resolve('svelte/package.json'))
},
extensions: ['.mjs', '.js', '.ts', '.svelte'],
mainFields: ['svelte', 'browser', 'module', 'main']
},
output: {
path: path.join(__dirname, '/dist'),
filename: '[name].js',
chunkFilename: '[name].[id].js'
},
module: {
rules: [
{
test: /\.ts$/,
loader: 'ts-loader',
exclude: /node_modules/
},
{
test: /\.svelte$/,
use: {
loader: 'svelte-loader',
options: {
compilerOptions: {
dev: !prod
},
emitCss: prod,
hotReload: !prod,
preprocess: sveltePreprocess({ sourceMap: !prod })
}
}
},
{
test: /\.css$/,
use: [
MiniCssExtractPlugin.loader,
'css-loader'
]
},
{
// required to prevent errors from Svelte on Webpack 5+
test: /node_modules\/svelte\/.*\.mjs$/,
resolve: {
fullySpecified: false
}
}
]
},
mode,
plugins: [
new MiniCssExtractPlugin({
filename: '[name].css'
}),
new CopyWebpackPlugin(['index.html'])
],
devtool: prod ? false : 'source-map',
devServer: {
hot: true,
publicPath: '/pong/web/',
openPage: 'pong/web/',
open: true,
proxy: {
'/pong/api': {
target: 'http://localhost:4000',
pathRewrite: { '^/pong/api': '' }
}
}
},
experiments: {
asyncWebAssembly: true
}
};

View File

@@ -38,7 +38,7 @@ pub struct GameObjectDTO {
pub vel_x: f64,
pub vel_y: f64,
pub shape_param_1: u16,
pub shape_param_2: u16,
pub shape_param_2: u16
}
impl GameObjectDTO {
@@ -147,6 +147,12 @@ impl FieldWrapper {
let json = json!(objs);
serde_json::to_string(&json).unwrap()
}
pub fn set_dimensions(&mut self, width_js: JsValue, height_js: JsValue) {
let width = width_js.as_f64().unwrap();
let height = height_js.as_f64().unwrap();
self.field.set_dimensions(width as u16, height as u16);
}
}
#[derive(Clone)]

View File

@@ -7,4 +7,5 @@ source .env
docker exec pong_server_kafka /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic session --bootstrap-server "$KAFKA_HOST:$KAFKA_PORT"
docker exec pong_server_kafka /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic move --bootstrap-server "$KAFKA_HOST:$KAFKA_PORT"
docker exec pong_server_kafka /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic status --bootstrap-server "$KAFKA_HOST:$KAFKA_PORT"
docker exec pong_server_kafka /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic heart_beat --bootstrap-server "$KAFKA_HOST:$KAFKA_PORT"
docker exec pong_server_kafka /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic input --bootstrap-server "$KAFKA_HOST:$KAFKA_PORT"

View File

@@ -8,7 +8,7 @@ use tokio::fs::OpenOptions;
use tokio::io::AsyncWriteExt;
use tokio::process::Command;
const TOPICS: [&str; 4] = ["move", "status", "input", "session"];
const TOPICS: [&str; 5] = ["move", "status", "input", "heart_beat", "session"];
#[tokio::main]
pub async fn main() {

View File

@@ -8,6 +8,9 @@ edition = "2021"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0.79" }
rand = "0.8.5"
getrandom = { version = "0.2", features = ["js"] }
[dev-dependencies]
rstest = "0.12.0"

View File

@@ -5,19 +5,23 @@ pub mod event {
use std::io::Write;
#[derive(Debug, Deserialize, Serialize)]
pub struct Event {
pub struct EventWrapper {
pub topic: String,
pub key: Option<String>,
pub msg: String,
pub event: String,
}
pub trait EventWriterImpl: Send + Sync {
fn write(&mut self, event: Event) -> Result<(), String>;
fn write(&mut self, events: Vec<EventWrapper>) -> Result<(), String>;
}
pub struct FileEventWriterImpl {}
impl EventWriterImpl for FileEventWriterImpl {
fn write(&mut self, event: Event) -> Result<(), String> {
fn write(&mut self, events: Vec<EventWrapper>) -> Result<(), String> {
let event_buffer = events.iter().fold(vec![], |mut acc, e| {
acc.push(e.event.as_bytes());
acc
}).concat();
let options = OpenOptions::new()
.read(true)
.create(true)
@@ -27,7 +31,7 @@ pub mod event {
return Err(format!("{}", e));
}
let mut file = options.unwrap();
match file.write(event.msg.as_bytes()) {
match file.write(&*event_buffer) {
Ok(_) => Ok(()),
Err(e) => Err(format!("{}", e)),
}
@@ -36,7 +40,7 @@ pub mod event {
pub struct NoopEventWriterImpl {}
impl EventWriterImpl for NoopEventWriterImpl {
fn write(&mut self, _event: Event) -> Result<(), String> {
fn write(&mut self, _events: Vec<EventWrapper>) -> Result<(), String> {
Ok(())
}
}
@@ -62,13 +66,17 @@ pub mod event {
}
}
pub fn write(&mut self, event: Event) -> Result<(), String> {
self.writer_impl.write(event)
pub fn write(&mut self, event: EventWrapper) -> Result<(), String> {
self.write_all(vec![event])
}
pub fn write_all(&mut self, events: Vec<EventWrapper>) -> Result<(), String> {
self.writer_impl.write(events)
}
}
pub trait EventReaderImpl: Send + Sync {
fn read(&mut self) -> Result<Vec<Event>, String>;
fn read(&mut self) -> Result<Vec<EventWrapper>, String>;
}
pub struct EventReader {
@@ -80,7 +88,7 @@ pub mod event {
EventReader { reader_impl }
}
pub fn read(&mut self) -> Result<Vec<Event>, String> {
pub fn read(&mut self) -> Result<Vec<EventWrapper>, String> {
self.reader_impl.read()
}
}

View File

@@ -1,5 +1,6 @@
use std::cell::RefCell;
use std::rc::Rc;
use serde::{Deserialize, Serialize};
use crate::collision::collision::{
CollisionRegistry, Collisions,
@@ -18,13 +19,13 @@ use crate::pong::pong_events::{
};
use crate::utils::utils::{DefaultLoggerFactory, Logger, LoggerFactory, NoopLogger};
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Deserialize, Serialize)]
pub enum InputType {
UP,
DOWN,
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
pub struct Input {
pub input: InputType,
pub obj_id: u16,
@@ -130,7 +131,12 @@ impl Field {
continue;
}
if *obj_mut.vel() == Vector::zero() {
obj_mut.vel_mut().add(&Vector::new(-300., 0.))
let go_right = rand::random::<bool>();
let start_vel_x = match go_right {
true => 500.,
false => -500.
};
obj_mut.vel_mut().add(&Vector::new(start_vel_x, 0.))
}
}
@@ -218,6 +224,11 @@ impl Field {
pub fn objs(&self) -> Vec<&Rc<RefCell<Box<dyn GameObject>>>> {
self.objs.iter().collect()
}
pub fn set_dimensions(&mut self, width: u16, height: u16) {
self.width = width;
self.height = height;
}
}
impl DefaultGameObject {
@@ -232,7 +243,7 @@ impl DefaultGameObject {
},
Vector::new(0., 1.),
(field.width as f64) / 25.,
(field.height as f64) / 5.,
(field.height as f64) / 4.,
))),
Box::new(DefaultPhysicsComp::new(Vector::zero(), true)),
))

View File

@@ -158,7 +158,7 @@ pub mod pong_collisions {
}
pub mod pong_events {
use crate::event::event::{Event, EventWriter};
use crate::event::event::{EventWrapper, EventWriter};
use crate::geom::vector::Vector;
use serde::Serialize;
@@ -185,14 +185,16 @@ pub mod pong_events {
impl PongEventWriter for DefaultPongEventWriter {
fn write(&mut self, event: PongEventType) -> Result<(), String> {
let out_event = match event {
PongEventType::GameObjUpdate(ref update) => Event {
topic: String::from("obj_update"),
key: Some(update.obj_id.clone().to_string()),
msg: serde_json::to_string(&event).unwrap(),
},
};
self.writer.write(out_event)
// TODO: Fix
// let out_event = match event {
// PongEventType::GameObjUpdate(ref update) => Event {
// topic: String::from("obj_update"),
// key: Some(update.obj_id.clone().to_string()),
// msg: serde_json::to_string(&event).unwrap(),
// },
// };
// self.writer.write(out_event)
Ok(())
}
}

View File

@@ -17,6 +17,9 @@ pong = { path = "../pong", version = "0.1.0" }
hyper-tungstenite = "0.8.0"
futures = { version = "0.3.12" }
async-trait = "0.1.56"
uuid = { version = "1.1.2", features = ["v4"] }
log = "0.4"
env_logger = "0.9.0"
[dev-dependencies]
rstest = "0.12.0"

40
server/src/actor.rs Normal file
View File

@@ -0,0 +1,40 @@
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "actor_type")]
pub enum Actor {
Player(Player),
Observer(Observer)
}
impl Actor {
pub fn id(&self) -> &str {
match self {
Actor::Player(p) => &p.id,
Actor::Observer(o) => &o.id,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct Player {
pub id: String,
pub ip: String,
pub nr: u8
}
impl Player {
pub fn new(nr: u8, ip: String) -> Player {
Player {
ip,
id: Uuid::new_v4().to_string(),
nr
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct Observer {
pub id: String,
}

View File

@@ -1,45 +1,164 @@
use std::str::FromStr;
use serde::{Deserialize, Serialize};
use crate::player::Player;
use pong::game_field::Input;
use crate::actor::Player;
use crate::session::Session;
#[derive(Debug, Deserialize, Serialize)]
pub struct SessionEventListDTO {
pub session_id: String,
pub events: Vec<SessionEventWriteDTO>,
pub events: Vec<PongEventWrapper>,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct SessionEventWriteDTO {
#[derive(Debug, Serialize, Deserialize)]
pub struct PongEventWrapper {
pub session_id: String,
pub topic: String,
pub msg: String,
pub event: String,
}
#[derive(Debug, Serialize)]
pub struct SessionClosedDto {
#[derive(Debug, Serialize, Deserialize)]
pub enum PongEvent {
Move(String, MoveEventPayload),
Input(String, InputEventPayload),
Status(String, StatusEventPayload),
HeartBeat(String, HeartBeatEventPayload),
Session(String, SessionEvent),
}
#[derive(Debug, Serialize, Deserialize)]
pub struct MoveEventPayload {
pub session_id: String,
pub id: i32,
pub orientation_x: f64,
pub orientation_y: f64,
pub shape_param_1: f64,
pub shape_param_2: f64,
pub vel_x: f64,
pub vel_y: f64,
pub x: f64,
pub y: f64,
pub ts: u128,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct InputEventPayload {
pub session_id: String,
pub inputs: Vec<Input>,
pub player_id: String,
pub ts: u128,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct StatusEventPayload {
// TODO
}
#[derive(Debug, Serialize, Deserialize)]
pub struct HeartBeatEventPayload {
pub actor_id: String,
pub session_id: String,
pub ts: u128
}
#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[serde(tag = "event_type")]
pub enum SessionEvent {
Created(SessionEventPayload),
Joined(SessionEventPayload),
Closed(SessionEventPayload),
}
impl SessionEvent {
pub fn session_id(&self) -> &str {
return match self {
SessionEvent::Created(e) | SessionEvent::Joined(e) | SessionEvent::Closed(e) => e.session_id()
}
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub struct SessionEventPayload {
pub session: Session,
pub actor: Player,
pub reason: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct SessionReadDTO {
pub session_id: String,
impl SessionEventPayload {
pub fn session_id(&self) -> &str {
return &self.session.session_id;
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct SessionJoinDto {
pub session_id: String,
pub enum SessionEventType {
Created,
Joined,
Closed,
}
#[derive(Debug, Serialize)]
pub struct SessionJoinedDto {
pub session: Session,
pub player: Player,
impl FromStr for SessionEventType {
type Err = ();
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"created" => Ok(SessionEventType::Created),
"joined" => Ok(SessionEventType::Joined),
"closed" => Ok(SessionEventType::Closed),
_ => Err(())
}
}
}
#[derive(Debug, Serialize)]
pub struct SessionCreatedDto {
pub session: Session,
pub player: Player,
pub fn deserialize(event: &str) -> Option<PongEvent> {
let wrapper = serde_json::from_str::<PongEventWrapper>(event);
wrapper.ok().and_then(|w| {
match w.topic.as_str() {
"move" => serde_json::from_str::<MoveEventPayload>(&w.event).ok().map(|e| PongEvent::Move(w.session_id, e)),
"input" => serde_json::from_str::<InputEventPayload>(&w.event).ok().map(|e| PongEvent::Input(w.session_id, e)),
"status" => serde_json::from_str::<StatusEventPayload>(&w.event).ok().map(|e| PongEvent::Status(w.session_id, e)),
"heart_beat" => serde_json::from_str::<HeartBeatEventPayload>(&w.event).ok().map(|e| PongEvent::HeartBeat(w.session_id, e)),
"session" => serde_json::from_str::<SessionEvent>(&w.event).ok().map(|e| PongEvent::Session(w.session_id, e)),
_ => None
}
})
}
#[cfg(test)]
mod tests {
use crate::event::{SessionEvent, SessionEventPayload};
use crate::actor::{Observer, Player};
use crate::session::{Session, SessionState};
const SESSION_EVENT_JSON: &str = "{\"event_type\":\"Created\",\"session\":{\"id\":1,\"hash\":\"abc\",\"state\":\"PENDING\",\"players\":[{\"id\":\"player_1\"}],\"observers\":[{\"id\":\"observer_1\"}]},\"actor\":{\"id\":\"player_1\"},\"reason\":\"some reason\"}";
#[test]
pub fn should_serialize_correctly() {
let res = serde_json::to_string(&get_session_event());
assert_eq!(res.is_ok(), true);
let res = res.unwrap();
assert_eq!(res, SESSION_EVENT_JSON);
}
#[test]
pub fn should_deserialize_correctly() {
let res = serde_json::from_str::<SessionEvent>(&SESSION_EVENT_JSON);
assert_eq!(res.is_ok(), true);
let res = res.unwrap();
assert_eq!(res, get_session_event());
}
fn get_session_event() -> SessionEvent {
SessionEvent::Created(SessionEventPayload {
session: Session {
id: 1,
session_id: "abc".to_owned(),
state: SessionState::PENDING,
players: vec![Player { id: "player_1".to_owned(), nr: 1, ip: "127.0.0.1".to_owned() }],
observers: vec![Observer {id: "observer_1".to_owned()}]
},
actor: Player { id: "player_1".to_owned(), nr: 1, ip: "127.0.0.1".to_owned() },
reason: "some reason".to_owned(),
})
}
}

View File

@@ -4,17 +4,19 @@ use std::str::FromStr;
use std::sync::Arc;
use futures::{stream::StreamExt};
use futures::future::err;
use hyper::{Body, Request, Response, Server, StatusCode};
use hyper::server::conn::AddrStream;
use hyper::service::{make_service_fn, service_fn};
use hyper_tungstenite::HyperWebsocket;
use hyper_tungstenite::tungstenite::{Error};
use log::{debug, error, info};
use tokio::sync::Mutex;
use crate::request_handler::{DefaultRequestHandler, RequestHandler};
use crate::session_manager::{SessionManager};
use crate::utils::http_utils::{build_error_res, get_query_params, read_json_body};
use crate::websocket::{DefaultWebsocketHandler, WebSocketConnectionType, WebsocketHandler, WebSocketSession};
use crate::websocket_handler::{DefaultWebsocketHandler, WebSocketConnectionType, WebsocketHandler, WebSocketSession};
pub struct HttpServer {
addr: [u8; 4],
@@ -49,7 +51,7 @@ impl HttpServer {
let session_manager = Arc::clone(&session_manager);
async move {
if hyper_tungstenite::is_upgrade_request(&req) {
return handle_potential_ws_upgrade(session_manager, req).await;
return handle_potential_ws_upgrade(session_manager, req, addr).await;
}
return handle_http_request(session_manager, req, addr).await;
@@ -60,7 +62,7 @@ impl HttpServer {
let host = (self.addr, self.port).into();
let server = Server::bind(&host).serve(make_svc);
println!("Listening on http://{}", host);
info!("running server on http://{}", host);
let graceful = server.with_graceful_shutdown(shutdown_signal());
graceful.await?;
Ok(())
@@ -68,50 +70,72 @@ impl HttpServer {
}
async fn handle_potential_ws_upgrade(session_manager: Arc<Mutex<SessionManager>>, req: Request<Body>) -> Result<Response<Body>, Infallible> {
println!(
"Received request to upgrade to websocket connection: {:?}",
req
async fn handle_potential_ws_upgrade(session_manager: Arc<Mutex<SessionManager>>, req: Request<Body>, addr: SocketAddr) -> Result<Response<Body>, Infallible> {
debug!(
"received request from {:?} to upgrade to websocket connection: {:?}",
addr, req
);
let params = get_query_params(&req);
println!("Ws request params: {:?}", params);
debug!("ws request params: {:?}", params);
if !params.contains_key("session_id") {
eprintln!("Missing session id request param for websocket connection, don't upgrade connection to ws.");
error!("Missing session id request param for websocket connection, don't upgrade connection to ws.");
return build_error_res(
"Missing request param: session_id",
StatusCode::BAD_REQUEST,
);
}
if !params.contains_key("player_id") {
error!("Missing player id request param for websocket connection, don't upgrade connection to ws.");
return build_error_res(
"Missing request param: player_id",
StatusCode::BAD_REQUEST,
);
}
if !params.contains_key("connection_type") {
eprintln!("Missing connection type request param for websocket connection, don't upgrade connection to ws.");
error!("Missing connection type request param for websocket connection, don't upgrade connection to ws.");
let res = build_error_res(
"Missing request param: connection_type",
StatusCode::BAD_REQUEST,
);
return res;
}
let session_id = params.get("session_id").unwrap();
let request_session_id = *params.get("session_id").unwrap();
let request_player_id = *params.get("player_id").unwrap();
let request_player_ip = addr.ip().to_string();
let session = session_manager.lock().await.get_session(request_session_id);
if let None = session {
let error = format!("Session does not exist: {}", request_session_id);
error!("{}", error);
return build_error_res(error.as_str(), StatusCode::NOT_FOUND);
}
let session = session.unwrap();
let matching_player = session.players.iter().find(|p| p.id == request_player_id);
if let None = matching_player {
let error = format!("Player is not registered in session: {}", request_player_id);
error!("{}", error);
return build_error_res(error.as_str(), StatusCode::FORBIDDEN);
}
let matching_player = matching_player.unwrap();
if matching_player.ip != request_player_ip {
let error = format!("Player with wrong ip tried to join session: {} (expected) vs {} (actual)", matching_player.ip, request_player_ip);
error!("{}", error);
return build_error_res(error.as_str(), StatusCode::FORBIDDEN);
}
let connection_type_raw = params.get("connection_type").unwrap();
let connection_type =
WebSocketConnectionType::from_str(connection_type_raw);
if let Err(_) = connection_type {
let error =
format!("Invalid connection type: {}", connection_type_raw);
eprintln!("{}", error);
error!("{}", error);
return build_error_res(error.as_str(), StatusCode::BAD_REQUEST);
}
let session = session_manager.lock().await.get_session(session_id);
if let None = session {
let error = format!("Session does not exist: {}", session_id);
eprintln!("{}", error);
return build_error_res(error.as_str(), StatusCode::NOT_FOUND);
}
let session = session.unwrap();
let websocket_session = WebSocketSession {
session: session.clone(),
connection_type: connection_type.unwrap(),
player: matching_player.clone()
};
println!("Websocket upgrade request is valid, will now upgrade to websocket: {:?}", req);
debug!("websocket upgrade request is valid, will now upgrade to websocket: {:?}", req);
let (response, websocket) =
hyper_tungstenite::upgrade(req, None).unwrap();
@@ -122,10 +146,11 @@ async fn handle_potential_ws_upgrade(session_manager: Arc<Mutex<SessionManager>>
serve_websocket(websocket_session, websocket, session_manager)
.await
{
eprintln!("Error in websocket connection: {:?}", e);
error!("Error in websocket connection: {:?}", e);
}
});
debug!("websocket upgrade done.");
// Return the response so the spawned future can continue.
return Ok(response);
}
@@ -158,4 +183,5 @@ async fn shutdown_signal() {
tokio::signal::ctrl_c()
.await
.expect("failed to install CTRL+C signal handler");
info!("received shutdown signal, shutting down now...");
}

View File

@@ -1,27 +1,33 @@
use std::str::FromStr;
use std::time::Duration;
use futures::future::err;
use hyper::{Body, Client, Method, Request, Uri};
use kafka::client::ProduceMessage;
use kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage, MessageSet};
use kafka::producer::{Partitioner, Producer, Record, RequiredAcks, Topics};
use log::{debug, error, info, trace};
use serde::Deserialize;
use pong::event::event::{Event, EventReaderImpl, EventWriterImpl};
use pong::event::event::{EventWrapper, EventReaderImpl, EventWriterImpl};
use crate::session::Session;
pub struct KafkaSessionEventWriterImpl {
producer: Producer<SessionPartitioner>,
}
impl KafkaSessionEventWriterImpl {
pub fn new(host: &str) -> KafkaSessionEventWriterImpl {
println!("Connecting session_writer producer to kafka host: {}", host);
info!("Connecting session_writer producer to kafka host: {}", host);
let producer = Producer::from_hosts(vec![host.to_owned()])
.with_ack_timeout(Duration::from_secs(1))
.with_required_acks(RequiredAcks::One)
.with_partitioner(SessionPartitioner {})
.create()
.unwrap();
.create();
if let Err(ref e) = producer {
error!("Failed to connect kafka producer: {:?}", e)
}
let producer = producer.unwrap();
KafkaSessionEventWriterImpl { producer }
}
}
@@ -29,95 +35,80 @@ impl KafkaSessionEventWriterImpl {
pub struct KafkaDefaultEventWriterImpl {
producer: Producer,
}
impl KafkaDefaultEventWriterImpl {
pub fn new(host: &str) -> KafkaDefaultEventWriterImpl {
println!("Connecting default producer to kafka host: {}", host);
info!("connecting default producer to kafka host: {}", host);
let producer = Producer::from_hosts(vec![host.to_owned()])
.with_ack_timeout(Duration::from_secs(1))
.with_required_acks(RequiredAcks::One)
.create()
.unwrap();
.create();
if let Err(e) = producer {
error!("failed to connect producer to kafka host {}: {:?}", host, e);
panic!("kafka connection failed, no recovery possible.")
}
let producer = producer.unwrap();
KafkaDefaultEventWriterImpl { producer }
}
}
impl EventWriterImpl for KafkaSessionEventWriterImpl {
fn write(&mut self, event: Event) -> Result<(), String> {
match event.key {
Some(key) => {
let record = Record::from_key_value(event.topic.as_str(), key, event.msg.as_str());
match self.producer.send(&record) {
Ok(()) => Ok(()),
Err(e) => Err(format!("{}", e)),
}
}
None => {
let record = Record::from_value(event.topic.as_str(), event.msg.as_str());
match self.producer.send(&record) {
Ok(()) => Ok(()),
Err(e) => Err(format!("{}", e)),
}
}
}
fn write(&mut self, events: Vec<EventWrapper>) -> Result<(), String> {
write_events(events, &mut self.producer)
}
}
impl EventWriterImpl for KafkaDefaultEventWriterImpl {
fn write(&mut self, event: Event) -> Result<(), String> {
match event.key {
fn write(&mut self, events: Vec<EventWrapper>) -> Result<(), String> {
write_events(events, &mut self.producer)
}
}
fn write_events<T>(events: Vec<EventWrapper>, producer: &mut Producer<T>) -> Result<(), String> where T : Partitioner {
let mut records_without_key = vec![];
let mut records_with_key = vec![];
for event in events.iter() {
match &event.key {
Some(key) => {
let record = Record::from_key_value(event.topic.as_str(), key, event.msg.as_str());
match self.producer.send(&record) {
Ok(()) => Ok(()),
Err(e) => Err(format!("{}", e)),
}
let record = Record::from_key_value(&event.topic, key.clone(), event.event.clone());
records_with_key.push(record);
}
None => {
let record = Record::from_value(event.topic.as_str(), event.msg.as_str());
match self.producer.send(&record) {
Ok(()) => Ok(()),
Err(e) => Err(format!("{}", e)),
}
let record = Record::from_value(&event.topic, event.event.clone());
records_without_key.push(record);
}
}
}
let res_with_key = match producer.send_all::<String, String>(&*records_with_key) {
Ok(_) => Ok(()),
Err(e) => Err(format!("{}", e)),
};
let res_without_key = match producer.send_all::<(), String>(&*records_without_key) {
Ok(_) => Ok(()),
Err(e) => Err(format!("{}", e)),
};
res_with_key.and(res_without_key)
}
pub struct KafkaEventReaderImpl {
consumer: Consumer,
topics: Vec<String>,
partitions: Vec<i32>
}
impl KafkaEventReaderImpl {
pub fn default() -> KafkaEventReaderImpl {
KafkaEventReaderImpl::new("localhost:9093")
}
pub fn from(host: &str) -> KafkaEventReaderImpl {
KafkaEventReaderImpl::new(host)
}
pub fn new(host: &str) -> KafkaEventReaderImpl {
println!("Connecting consumer to kafka host: {}", host);
let consumer = Consumer::from_hosts(vec![host.to_owned()])
.with_topic("move".to_owned())
.with_topic("status".to_owned())
.with_topic("input".to_owned())
.with_fallback_offset(FetchOffset::Earliest)
.with_group("group".to_owned())
.with_offset_storage(GroupOffsetStorage::Kafka)
.create()
.unwrap();
KafkaEventReaderImpl { consumer }
}
pub fn for_partitions(
host: &str,
partitions: &[i32],
topics: &[&str],
) -> Result<KafkaEventReaderImpl, String> {
println!("Connecting partition specific consumer to kafka host {} with topics {:?} / partitions {:?}", host, topics, partitions);
debug!("connecting partition specific consumer to kafka host {} with topics {:?} / partitions {:?}", host, topics, partitions);
let mut builder = Consumer::from_hosts(vec![host.to_owned()]);
let topics = topics.iter().map(|s| s.to_owned().to_owned()).collect::<Vec<String>>();
let partitions = partitions.iter().map(|i| *i).collect::<Vec<i32>>();
for topic in topics.iter() {
builder = builder.with_topic_partitions(topic.parse().unwrap(), partitions);
builder = builder.with_topic_partitions(topic.parse().unwrap(), &*partitions);
}
builder = builder
.with_fallback_offset(FetchOffset::Earliest)
@@ -126,24 +117,25 @@ impl KafkaEventReaderImpl {
let consumer = builder.create();
if let Err(e) = consumer {
eprintln!("Failed to connect consumer: {:?}", e);
return Err("Failed to connect consumer".to_string());
let error = format!("Failed to connect consumer: {:?}", e);
error!("{}", error);
return Err(error);
}
let consumer = consumer.unwrap();
Ok(KafkaEventReaderImpl { consumer })
debug!("successfully connected partition specific consumer to kafka host {} with topics {:?} / partitions {:?}", host, topics, partitions);
Ok(KafkaEventReaderImpl { consumer, topics, partitions })
}
}
impl EventReaderImpl for KafkaEventReaderImpl {
fn read(&mut self) -> Result<Vec<Event>, String> {
fn read(&mut self) -> Result<Vec<EventWrapper>, String> {
self.consume()
}
}
impl KafkaEventReaderImpl {
fn consume(&mut self) -> Result<Vec<Event>, String> {
// TODO: How to best filter messages by key (= game session id?)
// E.g. https://docs.rs/kafka/latest/kafka/producer/struct.DefaultPartitioner.html - is it possible to read from partition by retrieving the hash of the key?
// Does it even make sense to hash the key if it already is a hash? Custom partitioner?
fn consume(&mut self) -> Result<Vec<EventWrapper>, String> {
debug!("kafka consumer called to consume messages for {:?} / {:?}", self.topics, self.partitions);
let polled = self.consumer.poll().unwrap();
let message_sets: Vec<MessageSet<'_>> = polled.iter().collect();
let mut events = vec![];
@@ -151,23 +143,24 @@ impl KafkaEventReaderImpl {
let mut topic_event_count = 0;
let topic = ms.topic();
let partition = ms.partition();
println!("querying topic={} partition={}", topic, partition);
trace!("querying kafka topic={} partition={}", topic, partition);
for m in ms.messages() {
let event = Event {
let event = EventWrapper {
topic: String::from(topic),
key: Some(std::str::from_utf8(m.key).unwrap().parse().unwrap()),
msg: std::str::from_utf8(m.value).unwrap().parse().unwrap(),
event: std::str::from_utf8(m.value).unwrap().parse().unwrap(),
};
topic_event_count += 1;
events.push(event);
}
println!(
trace!(
"returned {:?} events for topic={} partition={}",
topic_event_count, topic, partition
);
self.consumer.consume_messageset(ms).unwrap();
}
self.consumer.commit_consumed().unwrap();
trace!("kafka consumed {} messages for {:?} / {:?}", events.len(), self.topics, self.partitions);
Ok(events)
}
}
@@ -193,7 +186,7 @@ impl KafkaSessionEventReaderImpl {
}
impl EventReaderImpl for KafkaSessionEventReaderImpl {
fn read(&mut self) -> Result<Vec<Event>, String> {
fn read(&mut self) -> Result<Vec<EventWrapper>, String> {
self.inner.read()
}
}
@@ -202,13 +195,8 @@ impl EventReaderImpl for KafkaSessionEventReaderImpl {
pub struct KafkaTopicManager {
partition_management_endpoint: String,
}
impl KafkaTopicManager {
pub fn default() -> KafkaTopicManager {
KafkaTopicManager {
partition_management_endpoint: "http://localhost:7243/add_partition".to_owned(),
}
}
impl KafkaTopicManager {
pub fn from(topic_manager_host: &str) -> KafkaTopicManager {
KafkaTopicManager {
partition_management_endpoint: format!("http://{}/add_partition", topic_manager_host)
@@ -217,6 +205,7 @@ impl KafkaTopicManager {
}
pub async fn add_partition(&self) -> Result<u16, String> {
debug!("called to create new partition");
let client = Client::new();
let request = Request::builder()
.method(Method::POST)
@@ -225,33 +214,33 @@ impl KafkaTopicManager {
.unwrap();
let res = client.request(request).await;
if let Err(e) = res {
let error = format!("Failed to add partition: {:?}", e);
println!("{}", error);
let error = format!("failed to add partition: {:?}", e);
error!("{}", error);
return Err(error);
}
let status = res.as_ref().unwrap().status();
let bytes = hyper::body::to_bytes(res.unwrap()).await;
if let Err(e) = bytes {
let error = format!("Failed to read bytes from response: {:?}", e);
let error = format!("failed to read bytes from response: {:?}", e);
println!("{}", error);
return Err(error);
}
let bytes = bytes.unwrap().to_vec();
let res_str = std::str::from_utf8(&*bytes);
if let Err(e) = res_str {
let error = format!("Failed to deserialize bytes to string: {:?}", e);
let error = format!("failed to deserialize bytes to string: {:?}", e);
println!("{}", error);
return Err(error);
}
if status != 200 {
let error = format!("Failed to add partition: {}", res_str.unwrap());
let error = format!("failed to add partition: {}", res_str.unwrap());
println!("{}", error);
return Err(error);
}
let json = serde_json::from_str::<PartitionApiDTO>(res_str.unwrap());
if let Err(e) = json {
let error = format!(
"Failed to convert string {} to json: {:?}",
"failed to convert string {} to json: {:?}",
res_str.unwrap(),
e
);
@@ -259,8 +248,8 @@ impl KafkaTopicManager {
return Err(error);
}
let updated_partition_count = json.unwrap().data;
println!(
"Successfully created partition: {}",
debug!(
"successfully created partition: {}",
updated_partition_count
);
Ok(updated_partition_count)

View File

@@ -1,5 +1,6 @@
extern crate core;
use log::{debug, error, info, Level};
use crate::http::HttpServer;
mod hash;
@@ -7,25 +8,30 @@ pub mod http;
pub mod kafka;
mod session_manager;
pub mod utils;
mod websocket;
mod websocket_handler;
mod request_handler;
mod event;
mod player;
mod actor;
mod session;
#[tokio::main]
pub async fn main() {
env_logger::init();
info!("preparing environment");
let kafka_host_env = std::env::var("KAFKA_HOST");
let kafka_host = match kafka_host_env {
Ok(val) => val,
Err(_) => "localhost:9093".to_owned(),
};
info!("KAFKA_HOST={}", kafka_host);
let kafka_partition_manager_host_env = std::env::var("KAFKA_TOPIC_MANAGER_HOST");
let kafka_topic_manager_host = match kafka_partition_manager_host_env {
Ok(val) => val,
Err(_) => "localhost:7243".to_owned(),
};
info!("KAFKA_TOPIC_MANAGER_HOST={}", kafka_topic_manager_host);
info!("booting up server");
HttpServer::new([0, 0, 0, 0], 4000, &kafka_host, &kafka_topic_manager_host)
.run()
.await

View File

@@ -1,6 +0,0 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct Player {
pub id: String,
}

View File

@@ -3,10 +3,12 @@ use std::net::SocketAddr;
use std::sync::Arc;
use hyper::{Body, Method, Request, Response, StatusCode};
use async_trait::async_trait;
use log::{debug, error, info};
use serde_json::json;
use tokio::sync::Mutex;
use crate::event::{SessionCreatedDto, SessionJoinDto, SessionJoinedDto};
use crate::player::Player;
use serde::{Deserialize};
use crate::event::{SessionEvent, SessionEventPayload, SessionEventType};
use crate::actor::Player;
use crate::session_manager::SessionManager;
use crate::utils::http_utils::{build_error_res, build_success_res, get_query_params, read_json_body};
@@ -32,7 +34,7 @@ impl DefaultRequestHandler {
#[async_trait]
impl RequestHandler for DefaultRequestHandler {
async fn handle(&self, req: Request<Body>, addr: SocketAddr) -> Result<Response<Body>, Infallible> {
println!("req to {} with method {}", req.uri().path(), req.method());
info!("called route {} {}", req.method(), req.uri());
match (req.method(), req.uri().path()) {
(&Method::GET, "/session") => handle_get_session(&self.session_manager, req).await,
(&Method::POST, "/create_session") => {
@@ -49,17 +51,21 @@ async fn handle_get_session(
session_manager: &Arc<Mutex<SessionManager>>,
req: Request<Body>,
) -> Result<Response<Body>, Infallible> {
info!("called get_session");
let locked = session_manager.lock().await;
let query_params = get_query_params(&req);
let session_id = query_params.get("session_id");
if let None = session_id {
error!("session id was not provided");
return build_error_res("Please provide a valid session id", StatusCode::BAD_REQUEST);
}
let session_id = session_id.unwrap();
let session = locked.get_session(session_id);
if let None = session {
error!("session for session id {} does not exist", session_id);
return build_error_res("Unable to find session for given id", StatusCode::NOT_FOUND);
}
info!("successfully retrieved session for session id {}", session_id);
return build_success_res(&serde_json::to_string(&session.unwrap()).unwrap());
}
@@ -68,23 +74,21 @@ async fn handle_session_create(
req: Request<Body>,
addr: SocketAddr,
) -> Result<Response<Body>, Infallible> {
println!("Called to create new session: {:?}", req);
info!("called create_session");
debug!("req: {:?}", req);
let mut locked = session_manager.lock().await;
let player = Player {
id: addr.to_string(),
};
let player = Player::new(1, addr.ip().to_string());
let session_create_res = locked.create_session(player.clone()).await;
if let Err(e) = session_create_res {
error!("failed to create session: {:?}", e);
return Ok(Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(e))
.unwrap());
}
let session_created = SessionCreatedDto {
session: session_create_res.unwrap(),
player,
};
let serialized = json!(session_created);
let session_event = session_create_res.unwrap();
error!("session created: {:?}", session_event);
let serialized = json!(session_event);
return build_success_res(&serialized.to_string());
}
@@ -93,23 +97,27 @@ async fn handle_session_join(
mut req: Request<Body>,
addr: SocketAddr,
) -> Result<Response<Body>, Infallible> {
println!("Received request to join session: {:?}", req);
info!("called join_session");
debug!("req: {:?}", req);
let mut locked = session_manager.lock().await;
let body = read_json_body::<SessionJoinDto>(&mut req).await;
let player = Player {
id: addr.to_string(),
};
let player = Player::new(2, addr.ip().to_string());
let session_join_res = locked.join_session(body.session_id, player.clone()).await;
if let Err(e) = session_join_res {
eprintln!("Failed to join session: {:?}", e);
error!("Failed to join session: {:?}", e);
return Ok(Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.status(StatusCode::CONFLICT)
.body(Body::from(e))
.unwrap());
}
let session = session_join_res.unwrap();
println!("Successfully joined session: {:?}", session);
let session_joined = SessionJoinedDto { session, player };
let serialized = json!(session_joined);
let session_event = session_join_res.unwrap();
info!("player {:?} successfully joined session: {:?}", player, session_event);
let reason = format!("player {:?} joined session", player);
let serialized = json!(session_event);
return build_success_res(&serialized.to_string());
}
#[derive(Deserialize)]
struct SessionJoinDto {
pub session_id: String
}

View File

@@ -1,12 +1,13 @@
use serde::{Serialize, Deserialize};
use crate::player::Player;
use crate::actor::{Actor, Observer, Player};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct Session {
pub id: u16,
pub hash: String,
pub id: u16, // internal id
pub session_id: String,
pub state: SessionState,
pub players: Vec<Player>,
pub observers: Vec<Observer>
}
impl Session {
@@ -14,8 +15,9 @@ impl Session {
Session {
players: vec![player],
id,
hash,
session_id: hash,
state: SessionState::PENDING,
observers: vec![]
}
}
@@ -35,7 +37,7 @@ impl Session {
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub enum SessionState {
PENDING, // 1 player is missing
RUNNING, // game is playing
CLOSED, // game is over
RUNNING, // store is playing
CLOSED, // store is over
}

View File

@@ -1,19 +1,22 @@
use std::collections::HashMap;
use log::{debug, error, info};
use serde::{Deserialize, Serialize};
use pong::event::event::{Event, EventReader, EventWriter};
use pong::event::event::{EventWrapper, EventReader, EventWriter};
use crate::hash::Hasher;
use crate::kafka::{
KafkaDefaultEventWriterImpl, KafkaSessionEventReaderImpl, KafkaSessionEventWriterImpl,
KafkaSessionEventReaderImpl, KafkaSessionEventWriterImpl,
KafkaTopicManager,
};
use crate::player::Player;
use crate::actor::Player;
use crate::event::{SessionEvent, SessionEventPayload};
use crate::session::{Session, SessionState};
pub struct SessionManager {
kafka_host: String,
sessions: Vec<Session>,
session_producer: EventWriter,
session_producers: HashMap<String, SessionWriter>,
topic_manager: KafkaTopicManager,
}
@@ -24,47 +27,57 @@ impl SessionManager {
kafka_host: kafka_host.to_owned(),
sessions: vec![],
topic_manager: KafkaTopicManager::from(kafka_topic_manager_host),
session_producer: EventWriter::new(Box::new(KafkaDefaultEventWriterImpl::new(
kafka_host,
))),
session_producers: HashMap::new(),
}
}
pub fn get_session(&self, session_id: &str) -> Option<Session> {
self.sessions
.iter()
.find(|s| s.hash == session_id)
.find(|s| s.session_id == session_id)
.map_or_else(|| None, |s| Some(s.clone()))
}
pub async fn create_session(&mut self, player: Player) -> Result<Session, String> {
pub async fn create_session(&mut self, player: Player) -> Result<SessionEvent, String> {
info!("called to create new session by player {:?}", player);
let add_partition_res = self.topic_manager.add_partition().await;
if let Err(e) = add_partition_res {
println!("Failed to create partition: {}", e);
error!("failed to create partition: {}", e);
return Err(e);
}
let session_id = add_partition_res.unwrap();
let session_hash = Hasher::hash(session_id);
let session = Session::new(session_id, session_hash, player.clone());
println!("Successfully created session: {:?}", session);
let write_res = self.write_to_producer(session_created(session.clone(), player.clone()));
let session_partition_id = add_partition_res.unwrap();
let session_id = Hasher::hash(session_partition_id);
let session = Session::new(session_partition_id, session_id.clone(), player.clone());
info!("successfully created session: {:?}", session);
self.sessions.push(session.clone());
let session_created = SessionEvent::Created(SessionEventPayload {
session: session.clone(),
actor: player,
reason: format!("session created")
});
let write_res = self.write_to_producer(&session_created);
if let Err(e) = write_res {
eprintln!(
"Failed to write session created event for {:?} to producer: {}",
let index = self.sessions.iter().position(|s| s.session_id == session_id);
if let Some(i) = index {
debug!("session create event could not be persisted - remove session from cache.");
self.sessions.remove(i);
}
error!(
"failed to write session created event for {:?} to producer: {}",
session, e
);
}
self.sessions.push(session.clone());
Ok(session)
info!("successfully persisted create session event.");
Ok(session_created)
}
pub async fn join_session(
&mut self,
session_id: String,
player: Player,
) -> Result<Session, String> {
) -> Result<SessionEvent, String> {
let updated_session = {
let session = self.sessions.iter_mut().find(|s| s.hash == session_id);
let session = self.sessions.iter_mut().find(|s| s.session_id == session_id);
if let None = session {
let error = format!("Can't join session that does not exist: {}", session_id);
return Err(error);
@@ -89,9 +102,14 @@ impl SessionManager {
session.state = SessionState::RUNNING;
session.clone()
};
let session_joined_event = SessionEvent::Joined(SessionEventPayload {
session: updated_session.clone(),
reason: "session joined".to_owned(),
actor: player
});
{
let write_res =
self.write_to_producer(session_joined(updated_session.clone(), player.clone()));
self.write_to_producer(&session_joined_event);
if let Err(e) = write_res {
eprintln!(
"Failed to write session joined event for {:?} to producer: {}",
@@ -100,25 +118,35 @@ impl SessionManager {
}
};
println!("sessions = {:?}", self.sessions);
Ok(updated_session.clone())
Ok(session_joined_event)
}
fn write_to_producer<T>(&mut self, session_event: T) -> Result<(), String>
where
T: Serialize,
fn write_to_producer(&mut self, session_event: &SessionEvent) -> Result<(), String>
{
let json_event = serde_json::to_string(&session_event).unwrap();
let session_event_write = self.session_producer.write(Event {
topic: "session".to_owned(),
key: None,
msg: json_event,
});
let session_id = session_event.session_id();
let session_producer = match self.session_producers.get_mut(session_id) {
Some(p) => p,
None => {
let session_writer = self.get_session_writer(session_id).expect("failed to create session writer to persist create event");
self.session_producers.insert(session_id.to_owned(), session_writer);
self.session_producers.get_mut(session_id).expect("failed to retrieve newly created session writer")
}
};
let json_event = serde_json::to_string(&session_event);
if let Err(e) = json_event {
let error = format!("failed to serialize session event: {}", e);
error!("{}", error);
return Err(error);
}
let json_event = json_event.unwrap();
info!("preparing to write session event to kafka: {}", json_event);
let session_event_write = session_producer.write_to_session("session", vec![&json_event]);
if let Err(e) = session_event_write {
let message = format!("Failed to write session create event to kafka: {:?}", e);
let message = format!("Failed to write session event to kafka: {:?}", e);
println!("{}", e);
return Err(message.to_owned());
}
println!("Successfully produced session event.");
info!("successfully produced session event: {:?}", json_event);
return Ok(());
}
@@ -129,12 +157,12 @@ impl SessionManager {
) -> Result<(SessionReader, SessionWriter), String> {
let reader = self.get_session_reader(session_id, read_topics);
if let Err(e) = reader {
println!("Failed to create session reader: {:?}", e);
error!("Failed to create session reader for session {}: {:?}", session_id, e);
return Err("Failed to create session reader".to_string());
}
let writer = self.get_session_writer(session_id);
if let Err(e) = writer {
println!("Failed to create session writer: {:?}", e);
error!("Failed to create session writer for session {}: {:?}", session_id, e);
return Err("Failed to create session writer".to_string());
}
return Ok((reader.unwrap(), writer.unwrap()));
@@ -179,7 +207,7 @@ impl SessionManager {
fn find_session(&self, session_id: &str) -> Option<Session> {
self.sessions
.iter()
.find(|s| session_id == s.hash)
.find(|s| session_id == s.session_id)
.map(|s| s.clone())
}
}
@@ -190,13 +218,15 @@ pub struct SessionWriter {
}
impl SessionWriter {
pub fn write_to_session(&mut self, topic: &str, msg: &str) -> Result<(), String> {
let event = Event {
msg: msg.to_owned(),
key: Some(self.session.id.to_string()),
topic: topic.to_owned(),
};
self.writer.write(event)
pub fn write_to_session(&mut self, topic: &str, messages: Vec<&str>) -> Result<(), String> {
let events = messages.into_iter().map(|e| {
EventWrapper {
event: e.to_owned(),
key: Some(self.session.id.to_string()),
topic: topic.to_owned(),
}
}).collect();
self.writer.write_all(events)
}
}
@@ -207,55 +237,7 @@ pub struct SessionReader {
}
impl SessionReader {
pub fn read_from_session(&mut self) -> Result<Vec<Event>, String> {
pub fn read_from_session(&mut self) -> Result<Vec<EventWrapper>, String> {
self.reader.read()
}
}
#[derive(Deserialize, Serialize)]
struct SessionCreatedEvent {
event_type: SessionEventType,
session: Session,
player: Player,
}
impl SessionCreatedEvent {
pub fn new(session: Session, player: Player) -> SessionCreatedEvent {
SessionCreatedEvent {
event_type: SessionEventType::CREATED,
session,
player,
}
}
}
#[derive(Deserialize, Serialize)]
struct SessionJoinedEvent {
event_type: SessionEventType,
session: Session,
player: Player,
}
impl SessionJoinedEvent {
pub fn new(session: Session, player: Player) -> SessionJoinedEvent {
SessionJoinedEvent {
event_type: SessionEventType::JOINED,
session,
player,
}
}
}
fn session_created(session: Session, player: Player) -> SessionCreatedEvent {
SessionCreatedEvent::new(session, player)
}
fn session_joined(session: Session, player: Player) -> SessionJoinedEvent {
SessionJoinedEvent::new(session, player)
}
#[derive(Deserialize, Serialize)]
enum SessionEventType {
CREATED,
JOINED,
}

View File

@@ -21,11 +21,16 @@ pub mod http_utils {
pub async fn read_json_body<T>(req: &mut Request<Body>) -> T
where
T: DeserializeOwned,
{
let body_str = read_json_body_raw(req).await;
serde_json::from_str::<T>(&body_str).unwrap()
}
pub async fn read_json_body_raw(req: &mut Request<Body>) -> String
{
let body = req.body_mut();
let bytes = body::to_bytes(body).await.unwrap();
let body_str = std::str::from_utf8(&*bytes).unwrap();
serde_json::from_str::<T>(body_str).unwrap()
std::str::from_utf8(&*bytes).unwrap().to_owned()
}
pub fn build_success_res(value: &str) -> Result<Response<Body>, Infallible> {
@@ -91,3 +96,9 @@ pub mod time_utils {
return since_the_epoch.as_millis();
}
}
pub mod json_utils {
pub fn unescape(json: &str) -> String {
return json.replace("\\\"", "\"")
}
}

View File

@@ -1,205 +0,0 @@
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use hyper_tungstenite::HyperWebsocket;
use tokio::sync::Mutex;
use async_trait::async_trait;
use futures::{SinkExt, StreamExt};
use hyper_tungstenite::tungstenite::{Error, Message};
use serde_json::json;
use tokio::time::sleep;
use serde::{Serialize, Deserialize};
use crate::event::{SessionClosedDto, SessionEventListDTO};
use crate::session::Session;
use crate::session_manager::{SessionManager};
#[async_trait]
pub trait WebsocketHandler {
async fn serve(self) -> Result<(), Error> ;
}
pub struct DefaultWebsocketHandler {
websocket_session: WebSocketSession,
websocket: HyperWebsocket,
session_manager: Arc<Mutex<SessionManager>>,
}
impl DefaultWebsocketHandler {
pub fn new(
websocket_session: WebSocketSession,
websocket: HyperWebsocket,
session_manager: Arc<Mutex<SessionManager>>,
) -> DefaultWebsocketHandler {
DefaultWebsocketHandler {
websocket_session, websocket, session_manager
}
}
}
#[async_trait]
impl WebsocketHandler for DefaultWebsocketHandler {
async fn serve(self) -> Result<(), Error> {
let websocket = self.websocket.await?;
let (mut websocket_writer, mut websocket_reader) = websocket.split();
let session_manager = self.session_manager.lock().await;
let event_handler_pair = session_manager.split(
&self.websocket_session.session.hash,
self.websocket_session.connection_type.get_topics(),
);
if let Err(_) = event_handler_pair {
eprintln!(
"Failed to create event reader/writer pair session: {:?}",
self.websocket_session
);
return Err(Error::ConnectionClosed); // TODO: Use proper error for this case to close the connection
}
let (mut event_reader, mut event_writer) = event_handler_pair.unwrap();
let websocket_session_read_copy = self.websocket_session.clone();
tokio::spawn(async move {
println!(
"Ready to read messages from ws connection: {:?}",
websocket_session_read_copy
);
while let Some(message) = websocket_reader.next().await {
match message.unwrap() {
Message::Text(msg) => {
let events = serde_json::from_str::<SessionEventListDTO>(&msg);
println!("Received ws message to persist events to kafka");
if let Err(e) = events {
eprintln!("Failed to deserialize ws message to event {}: {}", msg, e);
continue;
}
let event_wrapper = events.unwrap();
if event_wrapper.session_id != websocket_session_read_copy.session.hash {
eprintln!("Websocket has session {:?} but was asked to write to session {} - skip.", websocket_session_read_copy, event_wrapper.session_id);
continue;
}
let mut any_error = false;
let event_count = event_wrapper.events.len();
for event in event_wrapper.events {
let write_res = event_writer.write_to_session(&event.topic, &event.msg);
if let Err(e) = write_res {
any_error = true;
eprintln!("Failed to write event {:?}: {}", event, e);
}
}
if any_error {
eprintln!(
"Failed to write at least one message for session {}",
event_wrapper.session_id
);
} else {
println!(
"Successfully wrote {} messages to kafka for session {:?}",
event_count, websocket_session_read_copy
)
}
}
Message::Close(msg) => {
// No need to send a reply: tungstenite takes care of this for you.
if let Some(msg) = &msg {
println!(
"Received close message with code {} and message: {}",
msg.code, msg.reason
);
} else {
println!("Received close message");
}
let session_closed_event = SessionClosedDto {
session: websocket_session_read_copy.session.clone(),
reason: "ws closed".to_owned(),
};
let msg = json!(session_closed_event).to_string();
let session_event_write_res = event_writer.write_to_session("session", &msg);
if let Err(e) = session_event_write_res {
eprintln!("Failed to write session closed event: {0}", e)
}
break;
}
_ => {}
}
}
println!("!!!! Exit websocket receiver !!!!")
});
let websocket_session_write_copy = self.websocket_session.clone();
tokio::spawn(async move {
println!(
"Ready to read messages from kafka: {:?}",
websocket_session_write_copy
);
loop {
println!("Reading messages from kafka.");
let messages = event_reader.read_from_session();
if let Err(_) = messages {
eprintln!(
"Failed to read messages from kafka for session: {:?}",
websocket_session_write_copy
);
continue;
}
// println!("Read messages for websocket_session {:?} from consumer: {:?}", websocket_session_write_copy, messages);
let messages = messages.unwrap();
if messages.len() == 0 {
println!("No new messages from kafka.");
} else {
println!("{} new messages from kafka.", messages.len());
let json = serde_json::to_string(&messages).unwrap();
let message = Message::from(json);
println!("Sending kafka messages through websocket.");
let send_res = websocket_writer.send(message).await;
if let Err(e) = send_res {
eprintln!(
"Failed to send message to websocket for session {:?}: {:?}",
websocket_session_write_copy, e
);
break;
}
}
// Avoid starvation of read thread (?)
// TODO: How to avoid this? This is very bad for performance.
sleep(Duration::from_millis(1)).await;
}
});
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct WebSocketSession {
pub connection_type: WebSocketConnectionType,
pub session: Session,
}
#[derive(Debug, Clone, PartialEq)]
pub enum WebSocketConnectionType {
HOST,
PEER,
OBSERVER,
}
impl FromStr for WebSocketConnectionType {
type Err = ();
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"host" => Ok(WebSocketConnectionType::HOST),
"peer" => Ok(WebSocketConnectionType::PEER),
"observer" => Ok(WebSocketConnectionType::OBSERVER),
_ => Err(()),
}
}
}
impl WebSocketConnectionType {
pub fn get_topics(&self) -> &[&str] {
match self {
WebSocketConnectionType::HOST => &["input", "session"],
WebSocketConnectionType::PEER | WebSocketConnectionType::OBSERVER => {
&["move", "input", "status", "session"]
}
}
}
}

View File

@@ -0,0 +1,474 @@
use std::fmt::{Debug, format};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use hyper_tungstenite::HyperWebsocket;
use tokio::sync::Mutex;
use async_trait::async_trait;
use futures::{SinkExt, StreamExt};
use hyper_tungstenite::tungstenite::{Error, Message};
use log::{debug, error, info, trace};
use serde_json::json;
use tokio::time::sleep;
use serde::{Serialize, Deserialize};
use pong::event::event::EventWriter;
use pong::game_field::Input;
use crate::event::{HeartBeatEventPayload, InputEventPayload, MoveEventPayload, SessionEvent, SessionEventListDTO, SessionEventPayload, SessionEventType};
use crate::actor::Player;
use crate::session::{Session, SessionState};
use crate::session_manager::{SessionManager, SessionWriter};
use crate::utils::json_utils::unescape;
#[async_trait]
pub trait WebsocketHandler {
async fn serve(self) -> Result<(), Error> ;
}
pub struct DefaultWebsocketHandler {
websocket_session: WebSocketSession,
websocket: HyperWebsocket,
session_manager: Arc<Mutex<SessionManager>>,
}
impl DefaultWebsocketHandler {
pub fn new(
websocket_session: WebSocketSession,
websocket: HyperWebsocket,
session_manager: Arc<Mutex<SessionManager>>,
) -> DefaultWebsocketHandler {
DefaultWebsocketHandler {
websocket_session, websocket, session_manager
}
}
}
#[async_trait]
impl WebsocketHandler for DefaultWebsocketHandler {
async fn serve(self) -> Result<(), Error> {
info(&self.websocket_session, "serving new websocket connection");
let websocket = self.websocket.await?;
let (mut websocket_writer, mut websocket_reader) = websocket.split();
let session_manager = self.session_manager.lock().await;
let event_handler_pair = session_manager.split(
&self.websocket_session.session.session_id,
self.websocket_session.connection_type.get_topics(),
);
if let Err(_) = event_handler_pair {
error(
&self.websocket_session,
&format!(
"failed to create event reader/writer pair session: {:?}",
self.websocket_session
)
);
return Err(Error::ConnectionClosed); // TODO: Use proper error for this case to close the connection
}
let (mut event_reader, mut event_writer) = event_handler_pair.unwrap();
let websocket_session_read_copy = self.websocket_session.clone();
tokio::spawn(async move {
info(
&websocket_session_read_copy,
"ready to read messages from ws connection",
);
while let Some(message) = websocket_reader.next().await {
if let Err(e) = message {
error(&websocket_session_read_copy, &format!("ws message read failed for session: {:?}", e));
let reason = format!("ws closed: {:?}", e);
write_session_close_event(&mut event_writer, &websocket_session_read_copy, reason.as_str());
break;
}
let message = message.unwrap();
trace(&websocket_session_read_copy, &format!("read new message from websocket: {:?}", message));
match message {
Message::Text(msg) => {
let ws_message = deserialize_ws_event(&msg, &websocket_session_read_copy.connection_type);
trace(&websocket_session_read_copy, "received ws event to persist to kafka");
if let Err(e) = ws_message {
error(&websocket_session_read_copy, &format!("Failed to deserialize ws message to event: {:?}", e));
continue;
}
let ws_message = ws_message.unwrap();
{
let session_id = ws_message.session_id();
if session_id != websocket_session_read_copy.session.session_id {
error(&websocket_session_read_copy, &format!("websocket was asked to write to other session {} - skip.", session_id));
continue;
}
}
match ws_message {
WebsocketEvent::Snapshot(_, session_snapshot) => {
trace!("received message is snapshot");
let mut any_error = false;
match session_snapshot {
SessionSnapshot::Host(session_id, payload) => {
trace(&websocket_session_read_copy, "received message is HOST snapshot");
let move_events = payload.objects.iter().map(|o| {
o.to_move_event(&session_id, payload.ts)
}).collect();
debug(&websocket_session_read_copy, &format!("host: created move events from snapshot: {:?}", move_events));
any_error = !write_events(move_events, "move", &mut event_writer) || any_error;
if any_error {
debug(&websocket_session_read_copy, "host: move events write failed");
}
let input_event = InputEventPayload {
inputs: payload.inputs,
player_id: websocket_session_read_copy.player.id.to_owned(),
ts: payload.ts,
session_id: session_id.to_owned()
};
debug(&websocket_session_read_copy, &format!("host: created input event from snapshot: {:?}", input_event));
any_error = !write_events(vec![input_event], "input", &mut event_writer) || any_error;
if any_error {
debug(&websocket_session_read_copy, "peer: input event write failed");
}
// TODO: Status events.
},
SessionSnapshot::Peer(session_id, payload) => {
trace(&websocket_session_read_copy, "received message is PEER snapshot");
let input_event = InputEventPayload {
inputs: payload.inputs,
player_id: websocket_session_read_copy.player.id.to_owned(),
ts: payload.ts,
session_id: session_id.to_owned()
};
debug(&websocket_session_read_copy, &format!("peer: created input event from snapshot: {:?}", input_event));
any_error = !write_events(vec![input_event], "input", &mut event_writer) || any_error;
if any_error {
debug(&websocket_session_read_copy, "peer: input event write failed");
}
},
SessionSnapshot::Observer(_, _) => {
// noop
}
}
if any_error {
error(&websocket_session_read_copy, "at least one event write operation failed");
} else {
debug(&websocket_session_read_copy, "successfully persisted session snapshot");
}
},
WebsocketEvent::HeartBeat(session_id, heartbeat) => {
trace(&websocket_session_read_copy, "received message is heartbeat");
let event = HeartBeatEventPayload {
session_id: session_id.clone(),
actor_id: heartbeat.player_id,
ts: heartbeat.ts
};
let res = write_events(vec![event], "heart_beat", &mut event_writer);
if !res {
error!("failed to persist heart beat.");
} else {
debug(&websocket_session_read_copy, "successfully persisted heartbeat");
}
}
}
}
Message::Close(msg) => {
info(&websocket_session_read_copy, "ws session closed");
// No need to send a reply: tungstenite takes care of this for you.
let reason = if let Some(msg) = &msg {
debug!(
"Received close message with code {} and message: {}",
msg.code, msg.reason
);
format!("{}: {}", msg.code, msg.reason)
} else {
"unknown".to_owned()
};
let reason = format!("ws closed: {}", reason);
write_session_close_event(&mut event_writer, &websocket_session_read_copy, reason.as_str());
break;
}
_ => {}
}
}
info!("ws receiver terminated")
});
let websocket_session_write_copy = self.websocket_session.clone();
tokio::spawn(async move {
debug(
&websocket_session_write_copy,
"ready to read messages from kafka"
);
loop {
trace(&websocket_session_write_copy, "reading messages from kafka");
// TODO: Should perform more filtering, e.g. inputs of player are not relevant.
let events = event_reader.read_from_session();
if let Err(e) = events {
error(&websocket_session_write_copy, &format!("Failed to read messages from kafka: {:?}", e));
continue;
}
trace(&websocket_session_write_copy, &format!("read messages for websocket_session from consumer: {:?}", events));
let event_dtos = events.unwrap().into_iter().map(|e| {
info!("#### {}", e.event);
WebsocketEventDTO {
topic: e.topic,
event: e.event
}
}).collect::<Vec<WebsocketEventDTO>>();
if event_dtos.len() == 0 {
trace(&websocket_session_write_copy, "no new messages from kafka.");
} else {
trace(&websocket_session_write_copy, &format!("{} new messages from kafka.", event_dtos.len()));
let json = serde_json::to_string(&event_dtos).unwrap();
trace(&websocket_session_write_copy, &format!("sending msg batch to client: {}", json));
let message = Message::from(json);
trace(&websocket_session_write_copy, "sending kafka messages through websocket.");
let send_res = websocket_writer.send(message).await;
if let Err(e) = send_res {
error(
&websocket_session_write_copy,
&format!(
"Failed to send message to websocket: {:?}", e
)
);
break;
}
}
// Avoid starvation of read thread (?)
// TODO: How to avoid this? This is very bad for performance.
sleep(Duration::from_millis(1)).await;
trace(&websocket_session_write_copy, "kafka read done, back to sleep")
}
});
Ok(())
}
}
// TODO: doable in macro?
fn trace(websocket_session: &WebSocketSession, msg: &str) {
trace!("{} {}", websocket_session.session.session_id, msg)
}
fn debug(websocket_session: &WebSocketSession, msg: &str) {
debug!("[{}] {}", websocket_session.session.session_id, msg)
}
fn info(websocket_session: &WebSocketSession, msg: &str) {
info!("[{}] {}", websocket_session.session.session_id, msg)
}
fn error(websocket_session: &WebSocketSession, msg: &str) {
error!("[{}] {}", websocket_session.session.session_id, msg)
}
fn write_session_close_event(event_writer: &mut SessionWriter, websocket_session: &WebSocketSession, close_reason: &str) {
let mut updated_session = websocket_session.session.clone();
updated_session.state = SessionState::CLOSED;
let session_closed_event = SessionEvent::Closed(SessionEventPayload {
actor: websocket_session.player.clone(),
session: updated_session,
reason: format!("ws closed: {}", close_reason),
});
let msg = json!(session_closed_event).to_string();
let session_event_write_res = event_writer.write_to_session("session", vec![&msg]);
if let Err(e) = session_event_write_res {
eprintln!("Failed to write session closed event: {0}", e)
}
}
#[derive(Debug, Clone)]
pub struct WebSocketSession {
pub connection_type: WebSocketConnectionType,
pub session: Session,
pub player: Player
}
#[derive(Debug, Clone, PartialEq)]
pub enum WebSocketConnectionType {
HOST,
PEER,
OBSERVER,
}
impl FromStr for WebSocketConnectionType {
type Err = ();
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"host" => Ok(WebSocketConnectionType::HOST),
"peer" => Ok(WebSocketConnectionType::PEER),
"observer" => Ok(WebSocketConnectionType::OBSERVER),
_ => Err(()),
}
}
}
impl WebSocketConnectionType {
pub fn get_topics(&self) -> &[&str] {
match self {
WebSocketConnectionType::HOST => &["input", "session"],
WebSocketConnectionType::PEER | WebSocketConnectionType::OBSERVER => {
&["move", "input", "status", "session"]
}
}
}
}
fn deserialize_ws_event(message: &str, connection_type: &WebSocketConnectionType) -> Result<WebsocketEvent, String> {
let deserialized = serde_json::from_str::<WebsocketMessageWrapper>(message);
if let Err(e) = deserialized {
let err = format!("Failed to deserialize ws message {}: {:?}", message, e);
eprintln!("{}", err);
return Err(err);
}
let deserialized = deserialized.unwrap();
match deserialized.msg_type {
WebsocketEventType::SessionSnapshot => {
deserialize_event_snapshot(&deserialized.payload, connection_type).map(|s| WebsocketEvent::Snapshot(s.session_id().to_owned(), s))
},
WebsocketEventType::HeartBeat => {
serde_json::from_str::<HeartBeatMessage>(&deserialized.payload).map_err(|e| e.to_string()).map(|h| WebsocketEvent::HeartBeat(h.session_id.clone(), h))
}
}
}
fn deserialize_event_snapshot(serialized_snapshot: &str, connection_type: &WebSocketConnectionType) -> Result<SessionSnapshot, String> {
match connection_type {
WebSocketConnectionType::HOST => serde_json::from_str::<HostSessionSnapshotDTO>(serialized_snapshot).map_err(|e| e.to_string()).map(|s| SessionSnapshot::Host(s.session_id.to_owned(), s)),
WebSocketConnectionType::PEER => serde_json::from_str::<PeerSessionSnapshotDTO>(serialized_snapshot).map_err(|e| e.to_string()).map(|s| SessionSnapshot::Peer(s.session_id.to_owned(), s)),
WebSocketConnectionType::OBSERVER => serde_json::from_str::<ObserverSessionSnapshotDTO>(serialized_snapshot).map_err(|e| e.to_string()).map(|s| SessionSnapshot::Observer(s.session_id.to_owned(), s)),
}
}
enum WebsocketEvent {
Snapshot(String, SessionSnapshot),
HeartBeat(String, HeartBeatMessage)
}
impl WebsocketEvent {
pub fn session_id(&self) -> &str {
match self {
WebsocketEvent::HeartBeat(s, _) => &s,
WebsocketEvent::Snapshot(s, _) => &s,
}
}
}
#[derive(Deserialize)]
struct WebsocketMessageWrapper {
pub msg_type: WebsocketEventType,
pub payload: String
}
#[derive(Deserialize)]
enum WebsocketEventType {
HeartBeat, SessionSnapshot
}
#[derive(Deserialize)]
struct HeartBeatMessage {
pub player_id: String,
pub session_id: String,
pub ts: u128
}
enum SessionSnapshot {
Host(String, HostSessionSnapshotDTO),
Peer(String, PeerSessionSnapshotDTO),
Observer(String, ObserverSessionSnapshotDTO)
}
impl SessionSnapshot {
pub fn session_id(&self) -> &str {
match self {
SessionSnapshot::Host(id, _) => id,
SessionSnapshot::Peer(id, _) => id,
SessionSnapshot::Observer(id, _) => id
}
}
}
#[derive(Deserialize)]
struct HostSessionSnapshotDTO {
pub session_id: String,
pub inputs: Vec<Input>,
pub objects: Vec<GameObjectStateDTO>,
pub player_id: String,
pub ts: u128
}
#[derive(Deserialize)]
struct PeerSessionSnapshotDTO {
pub session_id: String,
pub inputs: Vec<Input>,
pub player_id: String,
pub ts: u128
}
#[derive(Deserialize)]
struct ObserverSessionSnapshotDTO {
pub session_id: String,
pub player: String,
pub ts: u128
}
#[derive(Deserialize)]
struct GameObjectStateDTO {
pub id: i32,
pub orientation_x: f64,
pub orientation_y: f64,
pub shape_param_1: f64,
pub shape_param_2: f64,
pub vel_x: f64,
pub vel_y: f64,
pub x: f64,
pub y: f64,
}
impl GameObjectStateDTO {
pub fn to_move_event(&self, session_id: &str, ts: u128) -> MoveEventPayload {
MoveEventPayload {
session_id: session_id.to_owned(),
ts,
id: self.id,
x: self.x,
y: self.y,
orientation_x: self.orientation_x,
orientation_y: self.orientation_y,
vel_x: self.vel_x,
vel_y: self.vel_y,
shape_param_1: self.shape_param_1,
shape_param_2: self.shape_param_2,
}
}
}
fn write_events<T>(events: Vec<T>, topic: &str, event_writer: &mut SessionWriter) -> bool where T : Serialize + Debug {
if events.len() == 0 {
debug!("called to write 0 events - noop");
return true;
}
let mut any_error = false;
let mut to_send = vec![];
for event in events {
let serialized = serde_json::to_string(&event);
if let Err(e) = serialized {
error!("Failed to serialize event {:?} in topic {}: {:?}", event, topic, e);
any_error = true;
continue;
}
let serialized = serialized.unwrap();
to_send.push(serialized);
}
let to_send = to_send.iter().map(|e| e.as_str()).collect();
let write_res = event_writer.write_to_session(topic, to_send);
if let Err(e) = write_res {
error!("Failed to write at least one event to topic {}: {:?}", topic, e);
any_error = true;
}
return !any_error;
}
#[derive(Debug, Serialize, Deserialize)]
struct WebsocketEventDTO {
pub topic: String,
pub event: String
}