refactoring/async-kafka-implementation

This commit is contained in:
Thilo Behnke
2022-07-10 19:17:45 +02:00
parent 3d8cdaba2c
commit d4767d2f6f
15 changed files with 316 additions and 353 deletions

View File

@@ -1,19 +1,27 @@
<script lang="ts">
import {gameField} from "../store/engine";
import {gameStateEvents, networkTickEvents, sessionInputs} from "../store/session";
import type {GameState} from "../store/model/session";
import {networkTickEvents, sessionInputs} from "../store/session";
import type {GameState, HostSessionSnapshot} from "../store/model/session";
import Input from "./Input.svelte";
export let killLoopOnError = true;
let frame: number;
let state: GameState;
$: state = $gameStateEvents;
let lastTick;
let inputs: Input[];
$: if (networkTickEvents && $networkTickEvents.hasNext) {
const tick = networkTickEvents.next();
const tick = networkTickEvents.next() as HostSessionSnapshot;
if (tick != null) {
gameField.update(tick.objects, state);
if (lastTick && lastTick.ts >= tick.ts) {
// TODO: How is this possible?
console.error(`!!!! Duplicated Tick ${tick.ts} (vs ${lastTick.ts}) !!!!`)
} else {
inputs = tick.inputs;
gameField.update(tick.objects, tick.state);
lastTick = tick;
}
}
}
@@ -27,4 +35,4 @@
}
</script>
<slot tick={$gameField} inputs={$sessionInputs} handleError={handleError}></slot>
<slot tick={$gameField} inputs={inputs} handleError={handleError}></slot>

View File

@@ -33,7 +33,7 @@
const state: GameState = $gameField.state;
networkEvents.produce({
state,
inputs: $relevantKeyboardEvents,
inputs: $sessionInputs,
session_id: session.session_id,
objects: $gameField.objects,
player_id: session.you.id,

View File

@@ -26,11 +26,13 @@
y={20}/>
{/if}
<Text
text={state.winner ? "player " + state.winner + " has won!" : ""}
{#if state?.winner}
<Text
text={"player " + state.winner + " has won!"}
fontSize=32
fontFamily='Courier New'
align='left'
baseline='top'
x={dimensions.width / 4}
y={dimensions.height / 2}/>
{/if}

View File

@@ -1,4 +1,12 @@
import type {GameObject, GameState, NetworkSession, Session} from "./session";
import type {
GameObject,
GameState,
HostSessionSnapshot,
NetworkSession,
PeerSessionSnapshot,
Session,
SessionSnapshot
} from "./session";
import type {Input} from "./input";
export type SessionEventPayload = {
@@ -22,12 +30,6 @@ export type InputEventPayload = {
ts: number,
}
export type TickEventPayload = {
session_id: string,
objects: GameObject[],
ts: number
}
export type StatusEventPayload = {
session_id: string,
state: GameState
@@ -45,7 +47,7 @@ export type InputEventWrapper = {
export type TickEventWrapper = {
topic: 'tick',
event: TickEventPayload
event: SessionSnapshot
}
export type MoveEventWrapper = {

View File

@@ -1,26 +1,27 @@
import {derived, get, Readable, readable, Unsubscriber, writable} from "svelte/store";
import api from "../api/session";
import type {GameState, LocalSession, Message, NetworkSession, Session, SessionSnapshot} from "./model/session";
import type {
HostSessionSnapshot,
LocalSession,
Message,
NetworkSession,
PeerSessionSnapshot,
Session,
SessionSnapshot
} from "./model/session";
import {isLocalSession, isNetworkSession, MessageType, SessionState, SessionType} from "./model/session";
import type {NetworkStore} from "./network";
import type {
GameEventWrapper,
InputEventPayload,
SessionEventPayload,
StatusEventPayload,
TickEventPayload
} from "./model/event";
import {isInputEvent, isMoveEvent, isSessionEvent, isStatusEvent, isTickEvent} from "./model/event";
import type {GameEventWrapper, InputEventPayload, SessionEventPayload} from "./model/event";
import {isSessionEvent, isTickEvent} from "./model/event";
import {getPlayerKeyboardInputs, playerKeyboardInputs} from "./input";
import type {Subscriber} from "svelte/types/runtime/store";
import {combined} from "./utils";
import type {Input} from "./model/input";
import {init, subscribe, tick} from "svelte/internal";
const sessionStore = writable<Session>(null);
function createNetworkEvents() {
const {subscribe, set, update} = writable<GameEventWrapper[]>([]);
const {subscribe, set} = writable<GameEventWrapper[]>([]);
const websocket = writable<WebSocket>(null);
const sessionId = writable<string>(null);
@@ -140,33 +141,9 @@ export const networkSessionStateEvents = readable<SessionEventPayload[]>([], set
}
});
export const gameStateEvents = (function() {
const lastEvent = writable<StatusEventPayload>(null);
const unsubNetworkEvents = networkEvents.subscribe($events => {
const events = $events.filter(isStatusEvent);
if (!events.length) {
return;
}
const latest = events[events.length - 1];
lastEvent.set(latest.event);
})
const customSubscribe = (run: Subscriber<StatusEventPayload>, invalidate): Unsubscriber => {
const unsub = lastEvent.subscribe(run, invalidate);
return () => {
unsub();
unsubNetworkEvents();
}
}
return {
subscribe: customSubscribe
}
}())
export type NetworkTickEventState = {
hasNext: boolean;
events: TickEventPayload[]
events: HostSessionSnapshot[] | PeerSessionSnapshot[]
}
const createNetworkTickEventStore = function() {
@@ -183,7 +160,7 @@ const createNetworkTickEventStore = function() {
})
})
function next(): TickEventPayload {
function next(): SessionSnapshot {
const events = get(ticks);
if (!events.hasNext) {
return null;
@@ -215,12 +192,10 @@ const createNetworkTickEventStore = function() {
export const networkTickEvents = createNetworkTickEventStore();
const networkInputEvents = derived([networkEvents, sessionStore], ([$sessionEvents, $sessionStore]) => $sessionEvents.filter(wrapper => {
if (!isInputEvent(wrapper)) {
return false;
}
return wrapper.event.player_id !== ($sessionStore as NetworkSession).you.id
}).map(({event}) => event as InputEventPayload));
const networkInputEvents = derived(networkTickEvents, ($networkTickEvents: NetworkTickEventState) => $networkTickEvents.events.map((tick: SessionSnapshot): InputEventPayload => {
const inputs = tick.inputs;
return {inputs, player_id: tick.player_id, ts: tick.ts, session_id: tick.session_id}
}));
const getPlayerNetworkInputEvents = (player_nr: number): Readable<Input[]> => derived(networkInputEvents, $networkInputEvents => {
const session = get(sessionStore);

View File

@@ -5,7 +5,6 @@ set -e
source .env
docker exec pong_server_kafka /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic session --bootstrap-server "$KAFKA_HOST:$KAFKA_PORT"
docker exec pong_server_kafka /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic move --bootstrap-server "$KAFKA_HOST:$KAFKA_PORT"
docker exec pong_server_kafka /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic status --bootstrap-server "$KAFKA_HOST:$KAFKA_PORT"
docker exec pong_server_kafka /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic host_tick --bootstrap-server "$KAFKA_HOST:$KAFKA_PORT"
docker exec pong_server_kafka /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic peer_tick --bootstrap-server "$KAFKA_HOST:$KAFKA_PORT"
docker exec pong_server_kafka /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic heart_beat --bootstrap-server "$KAFKA_HOST:$KAFKA_PORT"
docker exec pong_server_kafka /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic input --bootstrap-server "$KAFKA_HOST:$KAFKA_PORT"

View File

@@ -8,7 +8,7 @@ use tokio::fs::OpenOptions;
use tokio::io::AsyncWriteExt;
use tokio::process::Command;
const TOPICS: [&str; 5] = ["move", "status", "input", "heart_beat", "session"];
const TOPICS: [&str; 4] = ["host_tick", "peer_tick", "heart_beat", "session"];
#[tokio::main]
pub async fn main() {

View File

@@ -9,6 +9,7 @@ edition = "2021"
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0.79" }
rand = "0.8.5"
async-trait = "0.1.56"
getrandom = { version = "0.2", features = ["js"] }

View File

@@ -1,4 +1,5 @@
pub mod event {
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use std::fs::OpenOptions;
@@ -11,13 +12,16 @@ pub mod event {
pub event: String,
}
pub trait EventWriterImpl: Send + Sync {
fn write(&mut self, events: Vec<EventWrapper>) -> Result<(), String>;
#[async_trait]
pub trait EventWriterImpl : Send {
async fn write(&mut self, events: Vec<EventWrapper>) -> Result<(), String>;
}
pub struct FileEventWriterImpl {}
#[async_trait]
impl EventWriterImpl for FileEventWriterImpl {
fn write(&mut self, events: Vec<EventWrapper>) -> Result<(), String> {
async fn write(&mut self, events: Vec<EventWrapper>) -> Result<(), String> {
let event_buffer = events.iter().fold(vec![], |mut acc, e| {
acc.push(e.event.as_bytes());
acc
@@ -39,8 +43,10 @@ pub mod event {
}
pub struct NoopEventWriterImpl {}
#[async_trait]
impl EventWriterImpl for NoopEventWriterImpl {
fn write(&mut self, _events: Vec<EventWrapper>) -> Result<(), String> {
async fn write(&mut self, _events: Vec<EventWrapper>) -> Result<(), String> {
Ok(())
}
}
@@ -66,17 +72,18 @@ pub mod event {
}
}
pub fn write(&mut self, event: EventWrapper) -> Result<(), String> {
self.write_all(vec![event])
pub async fn write(&mut self, event: EventWrapper) -> Result<(), String> {
self.write_all(vec![event]).await
}
pub fn write_all(&mut self, events: Vec<EventWrapper>) -> Result<(), String> {
self.writer_impl.write(events)
pub async fn write_all(&mut self, events: Vec<EventWrapper>) -> Result<(), String> {
self.writer_impl.write(events).await
}
}
pub trait EventReaderImpl: Send + Sync {
fn read(&mut self) -> Result<Vec<EventWrapper>, String>;
#[async_trait]
pub trait EventReaderImpl : Send {
async fn read(&mut self) -> Result<Vec<EventWrapper>, String>;
}
pub struct EventReader {
@@ -88,8 +95,8 @@ pub mod event {
EventReader { reader_impl }
}
pub fn read(&mut self) -> Result<Vec<EventWrapper>, String> {
self.reader_impl.read()
pub async fn read(&mut self) -> Result<Vec<EventWrapper>, String> {
self.reader_impl.read().await
}
}
}

View File

@@ -6,7 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
kafka = { version = "0.8.0" }
rskafka = { version = "0.2.0" }
hyper = {version = "0.14.18", features = ["full"]}
tokio = { version = "1", features = ["full"] }
tokio-stream = {version = "0.1" }

View File

@@ -26,6 +26,13 @@ pub enum PongEvent {
Session(String, SessionEvent),
}
#[derive(Debug, Serialize, Deserialize)]
pub struct MoveEventBatchPayload {
pub session_id: String,
pub ts: u128,
pub objects: Vec<MoveEventPayload>
}
#[derive(Debug, Serialize, Deserialize)]
pub struct MoveEventPayload {
pub session_id: String,
@@ -119,20 +126,6 @@ impl FromStr for SessionEventType {
}
}
pub fn deserialize(event: &str) -> Option<PongEvent> {
let wrapper = serde_json::from_str::<PongEventWrapper>(event);
wrapper.ok().and_then(|w| {
match w.topic.as_str() {
"move" => serde_json::from_str::<MoveEventPayload>(&w.event).ok().map(|e| PongEvent::Move(w.session_id, e)),
"input" => serde_json::from_str::<InputEventPayload>(&w.event).ok().map(|e| PongEvent::Input(w.session_id, e)),
"status" => serde_json::from_str::<StatusEventPayload>(&w.event).ok().map(|e| PongEvent::Status(w.session_id, e)),
"heart_beat" => serde_json::from_str::<HeartBeatEventPayload>(&w.event).ok().map(|e| PongEvent::HeartBeat(w.session_id, e)),
"session" => serde_json::from_str::<SessionEvent>(&w.event).ok().map(|e| PongEvent::Session(w.session_id, e)),
_ => None
}
})
}
#[cfg(test)]
mod tests {
use crate::event::{SessionEvent, SessionEventPayload};

View File

@@ -1,193 +1,233 @@
use async_trait::async_trait;
use std::collections::{BTreeMap, HashMap};
use std::fs::read;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use futures::{StreamExt, TryFutureExt};
use futures::future::err;
use hyper::{Body, Client, Method, Request, Uri};
use kafka::client::ProduceMessage;
use kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage, MessageSet};
use kafka::producer::{Partitioner, Producer, Record, RequiredAcks, Topics};
use log::{debug, error, info, trace};
use rskafka::client::ClientBuilder;
use rskafka::client::consumer::{StartOffset, StreamConsumer, StreamConsumerBuilder};
use rskafka::client::partition::{Compression, PartitionClient};
use rskafka::record::Record;
use rskafka::time::OffsetDateTime;
use serde::Deserialize;
use pong::event::event::{EventWrapper, EventReaderImpl, EventWriterImpl};
use crate::session::Session;
pub struct KafkaSessionEventWriterImpl {
producer: Producer<SessionPartitioner>,
topics: Vec<String>,
partitions: Vec<i32>,
producers: HashMap<String, PartitionClient>
}
impl KafkaSessionEventWriterImpl {
pub fn new(host: &str) -> KafkaSessionEventWriterImpl {
pub async fn new(host: &str, topics: Vec<&str>, partition: &i32) -> KafkaSessionEventWriterImpl {
info!("Connecting session_writer producer to kafka host: {}", host);
let producer = Producer::from_hosts(vec![host.to_owned()])
.with_ack_timeout(Duration::from_secs(1))
.with_required_acks(RequiredAcks::One)
.with_partitioner(SessionPartitioner {})
.create();
if let Err(ref e) = producer {
error!("Failed to connect kafka producer: {:?}", e)
let owned_topics = topics.iter().map(|t| t.to_owned().to_owned()).collect();
let mut producers = HashMap::new();
for topic in topics {
let client = ClientBuilder::new(vec![host.to_owned()]).build().await.unwrap();
let producer = client.partition_client(topic.to_owned(), partition.clone()).await;
if let Err(ref e) = producer {
error!("Failed to connect kafka producer: {:?}", e)
}
let producer = producer.unwrap();
producers.insert(topic.to_owned(), producer);
}
let producer = producer.unwrap();
KafkaSessionEventWriterImpl { producer }
}
}
pub struct KafkaDefaultEventWriterImpl {
producer: Producer,
}
impl KafkaDefaultEventWriterImpl {
pub fn new(host: &str) -> KafkaDefaultEventWriterImpl {
info!("connecting default producer to kafka host: {}", host);
let producer = Producer::from_hosts(vec![host.to_owned()])
.with_ack_timeout(Duration::from_secs(1))
.with_required_acks(RequiredAcks::One)
.create();
if let Err(e) = producer {
error!("failed to connect producer to kafka host {}: {:?}", host, e);
panic!("kafka connection failed, no recovery possible.")
}
let producer = producer.unwrap();
KafkaDefaultEventWriterImpl { producer }
KafkaSessionEventWriterImpl { topics: owned_topics, partitions: vec![partition.clone()], producers }
}
}
#[async_trait]
impl EventWriterImpl for KafkaSessionEventWriterImpl {
fn write(&mut self, events: Vec<EventWrapper>) -> Result<(), String> {
write_events(events, &mut self.producer)
async fn write(&mut self, events: Vec<EventWrapper>) -> Result<(), String> {
let mut by_topic: HashMap<String, Vec<EventWrapper>> = HashMap::new();
for e in events {
match by_topic.get_mut(&e.topic) {
Some(events) => events.push(e),
None => {
let mut events = vec![];
let topic = e.topic.clone();
events.push(e);
by_topic.insert(topic, events);
}
}
}
for topic_events in by_topic {
let mut producer = self.producers.get_mut(&topic_events.0);
if let None = producer {
let available = self.producers.keys().collect::<Vec<&String>>();
return Err(format!("Could not find producer for topic: {}. Available topic producers: {:?}", &topic_events.0, available));
}
let producer = producer.unwrap();
let res = write_events(topic_events.1, producer).await;
if let Err(e) = res {
return Err(e);
}
}
Ok(())
}
}
impl EventWriterImpl for KafkaDefaultEventWriterImpl {
fn write(&mut self, events: Vec<EventWrapper>) -> Result<(), String> {
write_events(events, &mut self.producer)
}
}
fn write_events<T>(events: Vec<EventWrapper>, producer: &mut Producer<T>) -> Result<(), String> where T : Partitioner {
let mut records_without_key = vec![];
let mut records_with_key = vec![];
async fn write_events(events: Vec<EventWrapper>, producer: &mut PartitionClient) -> Result<(), String> {
let mut records = vec![];
for event in events.iter() {
match &event.key {
Some(key) => {
let record = Record::from_key_value(&event.topic, key.clone(), event.event.clone());
records_with_key.push(record);
let key = Some(key.clone().into_bytes());
let value = Some(event.event.clone().into_bytes());
let headers = BTreeMap::new();
let timestamp = OffsetDateTime::now_utc();
let record = Record { key, value, headers, timestamp };
records.push(record);
}
None => {
let record = Record::from_value(&event.topic, event.event.clone());
records_without_key.push(record);
let key = None;
let value = Some(event.event.clone().into_bytes());
let headers = BTreeMap::new();
let timestamp = OffsetDateTime::now_utc();
let record = Record { key, value, headers, timestamp };
records.push(record);
}
}
}
let res_with_key = match producer.send_all::<String, String>(&*records_with_key) {
let res = match producer.produce(records, Compression::NoCompression).await {
Ok(_) => Ok(()),
Err(e) => Err(format!("{}", e)),
Err(e) => Err(format!("{:?}", e)),
};
let res_without_key = match producer.send_all::<(), String>(&*records_without_key) {
Ok(_) => Ok(()),
Err(e) => Err(format!("{}", e)),
};
res_with_key.and(res_without_key)
res
}
pub struct KafkaEventReaderImpl {
consumer: Consumer,
topics: Vec<String>,
partitions: Vec<i32>
consumer: StreamConsumer,
topic: String,
partition: i32
}
impl KafkaEventReaderImpl {
pub fn for_partitions(
host: &str,
partitions: &[i32],
topics: &[&str],
) -> Result<KafkaEventReaderImpl, String> {
debug!("connecting partition specific consumer to kafka host {} with topics {:?} / partitions {:?}", host, topics, partitions);
let mut builder = Consumer::from_hosts(vec![host.to_owned()]);
let topics = topics.iter().map(|s| s.to_owned().to_owned()).collect::<Vec<String>>();
let partitions = partitions.iter().map(|i| *i).collect::<Vec<i32>>();
for topic in topics.iter() {
builder = builder.with_topic_partitions(topic.parse().unwrap(), &*partitions);
}
builder = builder
.with_fallback_offset(FetchOffset::Earliest)
.with_group("group".to_owned())
.with_offset_storage(GroupOffsetStorage::Kafka);
// TODO: Hotfix, but does this really work?
unsafe impl Send for KafkaEventReaderImpl {}
unsafe impl Sync for KafkaEventReaderImpl {}
let consumer = builder.create();
if let Err(e) = consumer {
impl KafkaEventReaderImpl {
pub async fn for_partition(
host: &str,
partition: &i32,
topic: &str,
) -> Result<KafkaEventReaderImpl, String> {
debug!("connecting partition specific consumer to kafka host {} with topic {:?} / partition {:?}", host, topic, partition);
let partition_client = ClientBuilder::new(vec![host.to_owned()]).build().await.unwrap()
.partition_client(topic.to_owned(), partition.clone()).await;
if let Err(e) = partition_client {
let error = format!("Failed to connect consumer: {:?}", e);
error!("{}", error);
return Err(error);
}
let consumer = consumer.unwrap();
debug!("successfully connected partition specific consumer to kafka host {} with topics {:?} / partitions {:?}", host, topics, partitions);
Ok(KafkaEventReaderImpl { consumer, topics, partitions })
let partition_client = Arc::new(partition_client.unwrap());
let consumer = StreamConsumerBuilder::new(
partition_client,
StartOffset::Earliest
).with_max_wait_ms(1).build();
debug!("successfully connected partition specific consumer to kafka host {} with topic {:?} / partition {:?}", host, topic, partition);
Ok(KafkaEventReaderImpl { consumer, topic: topic.to_owned(), partition: partition.clone() })
}
}
#[async_trait]
impl EventReaderImpl for KafkaEventReaderImpl {
fn read(&mut self) -> Result<Vec<EventWrapper>, String> {
self.consume()
async fn read(&mut self) -> Result<Vec<EventWrapper>, String> {
self.consume().await
}
}
impl KafkaEventReaderImpl {
fn consume(&mut self) -> Result<Vec<EventWrapper>, String> {
debug!("kafka consumer called to consume messages for {:?} / {:?}", self.topics, self.partitions);
let polled = self.consumer.poll().unwrap();
let message_sets: Vec<MessageSet<'_>> = polled.iter().collect();
let mut events = vec![];
for ms in message_sets {
let mut topic_event_count = 0;
let topic = ms.topic();
let partition = ms.partition();
trace!("querying kafka topic={} partition={}", topic, partition);
for m in ms.messages() {
let event = EventWrapper {
topic: String::from(topic),
key: Some(std::str::from_utf8(m.key).unwrap().parse().unwrap()),
event: std::str::from_utf8(m.value).unwrap().parse().unwrap(),
};
topic_event_count += 1;
events.push(event);
}
trace!(
"returned {:?} events for topic={} partition={}",
topic_event_count, topic, partition
);
self.consumer.consume_messageset(ms).unwrap();
async fn consume(&mut self) -> Result<Vec<EventWrapper>, String> {
debug!("kafka consumer called to consume messages for {:?} / {:?}", self.topic, self.partition);
// TODO: This is a problem:
// - The peer inputs are intermittent to the host, because sometimes it will timeout without receiving the peer's inputs
// - The peer will receive the host's inputs intermittent in the same way.
let next_res = tokio::time::timeout(Duration::from_millis(3), self.consumer.next()).await;
if let Err(e) = next_res {
info!("No record received in time after {}, timeout for {} / {}.", e, self.topic, self.partition);
return Ok(vec![]);
}
self.consumer.commit_consumed().unwrap();
trace!("kafka consumed {} messages for {:?} / {:?}", events.len(), self.topics, self.partitions);
let next_res = next_res.unwrap();
if let None = next_res {
debug!("No record retrieved for {} / {}", self.topic, self.partition);
return Err("No record.".to_owned());
}
let next_res = next_res.unwrap();
if let Err(e) = next_res {
let error = format!("Failed to extract record for {} / {}: {:?}", self.topic, self.partition, e);
error!("{}", error);
return Err(error);
}
let (record, _) = next_res.unwrap();
debug!("kafka consumer retrieved record: {:?}", record);
let key = match record.record.key {
Some(k) => Some(std::str::from_utf8(&*k).unwrap().to_owned()),
None => None
};
let event = match record.record.value {
Some(e) => std::str::from_utf8(&*e).unwrap().to_owned(),
None => panic!("event without payload!")
};
let event = EventWrapper {
topic: String::from(self.topic.clone()),
key,
event,
};
debug!("converted record to event: {:?}", event);
let events = vec![event];
Ok(events)
}
}
pub struct KafkaSessionEventReaderImpl {
inner: KafkaEventReaderImpl,
inner: HashMap<String, KafkaEventReaderImpl>
}
impl KafkaSessionEventReaderImpl {
pub fn new(
pub async fn new(
host: &str,
session: &Session,
topics: &[&str],
) -> Result<KafkaSessionEventReaderImpl, String> {
let partitions = [session.id as i32];
let reader = KafkaEventReaderImpl::for_partitions(host, &partitions, topics);
if let Err(_) = reader {
return Err("Failed to create kafka session event reader".to_string());
let mut reader_map = HashMap::new();
for topic in topics {
let reader = KafkaEventReaderImpl::for_partition(host, &i32::from(session.id), *topic).await;
if let Err(_) = reader {
return Err("Failed to create kafka session event reader".to_string());
}
let reader = reader.unwrap();
reader_map.insert(topic.to_owned().to_owned(), reader);
}
let reader = reader.unwrap();
Ok(KafkaSessionEventReaderImpl { inner: reader })
Ok(KafkaSessionEventReaderImpl { inner: reader_map })
}
}
#[async_trait]
impl EventReaderImpl for KafkaSessionEventReaderImpl {
fn read(&mut self) -> Result<Vec<EventWrapper>, String> {
self.inner.read()
async fn read(&mut self) -> Result<Vec<EventWrapper>, String> {
let mut events = vec![];
for topic_reader in self.inner.iter_mut() {
let topic_events = topic_reader.1.read().await;
if let Err(e) = topic_events {
let error = format!("Failed to consume events for topic {}: {}", topic_reader.0, e);
return Err(error);
}
let topic_events = topic_events.unwrap();
for event in topic_events {
events.push(event);
}
}
Ok(events)
}
}
@@ -260,18 +300,3 @@ impl KafkaTopicManager {
struct PartitionApiDTO {
data: u16,
}
pub struct SessionPartitioner {}
impl Partitioner for SessionPartitioner {
fn partition(&mut self, _topics: Topics, msg: &mut ProduceMessage) {
match msg.key {
Some(key) => {
let key = std::str::from_utf8(key).unwrap();
msg.partition = key.parse::<i32>().unwrap();
// println!("Overriding message partition with key: {}", msg.partition);
}
None => panic!("Producing message without key not allowed!"),
}
}
}

View File

@@ -88,7 +88,7 @@ async fn handle_session_create(
.unwrap());
}
let session_event = session_create_res.unwrap();
error!("session created: {:?}", session_event);
info!("session created: {:?}", session_event);
let serialized = json!(session_event);
return build_success_res(&serialized.to_string());
}

View File

@@ -17,7 +17,6 @@ use crate::session::{Session, SessionState};
pub struct SessionManager {
kafka_host: String,
sessions: Vec<Session>,
session_producers: HashMap<String, SessionWriter>,
topic_manager: KafkaTopicManager,
}
@@ -27,8 +26,7 @@ impl SessionManager {
SessionManager {
kafka_host: kafka_host.to_owned(),
sessions: vec![],
topic_manager: KafkaTopicManager::from(kafka_topic_manager_host),
session_producers: HashMap::new(),
topic_manager: KafkaTopicManager::from(kafka_topic_manager_host)
}
}
@@ -56,7 +54,7 @@ impl SessionManager {
actor: Actor::Player(player),
reason: format!("session created")
});
let write_res = self.write_to_producer(&session_created);
let write_res = self.write_to_producer(&session_created).await;
if let Err(e) = write_res {
let index = self.sessions.iter().position(|s| s.session_id == session_id);
if let Some(i) = index {
@@ -110,7 +108,7 @@ impl SessionManager {
});
{
let write_res =
self.write_to_producer(&session_joined_event);
self.write_to_producer(&session_joined_event).await;
if let Err(e) = write_res {
eprintln!(
"Failed to write session joined event for {:?} to producer: {}",
@@ -156,7 +154,7 @@ impl SessionManager {
});
{
let write_res =
self.write_to_producer(&session_joined_event);
self.write_to_producer(&session_joined_event).await;
if let Err(e) = write_res {
eprintln!(
"Failed to write watch session event for {:?} to producer: {}",
@@ -168,17 +166,10 @@ impl SessionManager {
Ok(session_joined_event)
}
fn write_to_producer(&mut self, session_event: &SessionEvent) -> Result<(), String>
async fn write_to_producer(&mut self, session_event: &SessionEvent) -> Result<(), String>
{
let session_id = session_event.session_id();
let session_producer = match self.session_producers.get_mut(session_id) {
Some(p) => p,
None => {
let session_writer = self.get_session_writer(session_id).expect("failed to create session writer to persist create event");
self.session_producers.insert(session_id.to_owned(), session_writer);
self.session_producers.get_mut(session_id).expect("failed to retrieve newly created session writer")
}
};
let session_writer = self.get_session_writer(session_id).await.expect("failed to create session writer to persist create event");
let json_event = serde_json::to_string(&session_event);
if let Err(e) = json_event {
let error = format!("failed to serialize session event: {}", e);
@@ -187,7 +178,8 @@ impl SessionManager {
}
let json_event = json_event.unwrap();
info!("preparing to write session event to kafka: {}", json_event);
let session_event_write = session_producer.write_to_session("session", vec![&json_event]);
let mut session_writer = self.get_session_writer(session_id).await.unwrap();
let session_event_write = session_writer.write_to_session("session", vec![&json_event]).await;
if let Err(e) = session_event_write {
let message = format!("Failed to write session event to kafka: {:?}", e);
println!("{}", e);
@@ -197,17 +189,17 @@ impl SessionManager {
return Ok(());
}
pub fn split(
pub async fn split(
&self,
session_id: &str,
read_topics: &[&str],
) -> Result<(SessionReader, SessionWriter), String> {
let reader = self.get_session_reader(session_id, read_topics);
let reader = self.get_session_reader(session_id, read_topics).await;
if let Err(e) = reader {
error!("Failed to create session reader for session {}: {:?}", session_id, e);
return Err("Failed to create session reader".to_string());
}
let writer = self.get_session_writer(session_id);
let writer = self.get_session_writer(session_id).await;
if let Err(e) = writer {
error!("Failed to create session writer for session {}: {:?}", session_id, e);
return Err("Failed to create session writer".to_string());
@@ -215,7 +207,7 @@ impl SessionManager {
return Ok((reader.unwrap(), writer.unwrap()));
}
pub fn get_session_reader(
pub async fn get_session_reader(
&self,
session_id: &str,
topics: &[&str],
@@ -225,7 +217,7 @@ impl SessionManager {
return Err(format!("Unable to find session with hash {}", session_id));
}
let session = session.unwrap();
let kafka_reader = KafkaSessionEventReaderImpl::new(&self.kafka_host, &session, topics);
let kafka_reader = KafkaSessionEventReaderImpl::new(&self.kafka_host, &session, topics).await;
if let Err(_) = kafka_reader {
return Err("Unable to create kafka reader.".to_string());
}
@@ -237,14 +229,15 @@ impl SessionManager {
})
}
pub fn get_session_writer(&self, session_id: &str) -> Result<SessionWriter, String> {
pub async fn get_session_writer(&self, session_id: &str) -> Result<SessionWriter, String> {
let session = self.find_session(&session_id);
if let None = session {
return Err(format!("Unable to find session with hash {}", session_id));
}
let session = session.unwrap();
let writer = KafkaSessionEventWriterImpl::new(&self.kafka_host, vec!["host_tick", "peer_tick", "session", "heart_beat"], &i32::from(session.id)).await;
let event_writer =
EventWriter::new(Box::new(KafkaSessionEventWriterImpl::new(&self.kafka_host)));
EventWriter::new(Box::new(writer));
Ok(SessionWriter {
writer: event_writer,
session,
@@ -265,7 +258,7 @@ pub struct SessionWriter {
}
impl SessionWriter {
pub fn write_to_session(&mut self, topic: &str, messages: Vec<&str>) -> Result<(), String> {
pub async fn write_to_session(&mut self, topic: &str, messages: Vec<&str>) -> Result<(), String> {
let events = messages.into_iter().map(|e| {
EventWrapper {
event: e.to_owned(),
@@ -273,7 +266,7 @@ impl SessionWriter {
topic: topic.to_owned(),
}
}).collect();
self.writer.write_all(events)
self.writer.write_all(events).await
}
}
@@ -284,7 +277,7 @@ pub struct SessionReader {
}
impl SessionReader {
pub fn read_from_session(&mut self) -> Result<Vec<EventWrapper>, String> {
self.reader.read()
pub async fn read_from_session(&mut self) -> Result<Vec<EventWrapper>, String> {
self.reader.read().await
}
}

View File

@@ -1,26 +1,24 @@
use std::collections::HashMap;
use std::fmt::{Debug, format};
use std::fmt::{Debug};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use hyper_tungstenite::HyperWebsocket;
use tokio::sync::Mutex;
use async_trait::async_trait;
use futures::{SinkExt, StreamExt};
use futures::future::err;
use hyper_tungstenite::HyperWebsocket;
use hyper_tungstenite::tungstenite::{Error, Message};
use log::{debug, error, info, trace};
use serde::{Deserialize, Serialize};
use serde_json::json;
use tokio::time::sleep;
use serde::{Serialize, Deserialize};
use tokio::sync::Mutex;
use tokio::task;
use pong::event::event::{EventWrapper, EventWriter};
use pong::event::event::{EventWriter};
use pong::game_field::{GameState, Input};
use crate::event::{HeartBeatEventPayload, InputEventPayload, MoveEventPayload, SessionEvent, SessionEventListDTO, SessionEventPayload, SessionEventType, StatusEventPayload, TickEvent};
use crate::actor::{Actor, Player};
use crate::actor::{Actor};
use crate::event::{HeartBeatEventPayload, MoveEventBatchPayload, MoveEventPayload, SessionEvent, SessionEventListDTO, SessionEventPayload, SessionEventType, StatusEventPayload, TickEvent};
use crate::session::{Session, SessionState};
use crate::session_manager::{SessionManager, SessionWriter};
use crate::utils::json_utils::unescape;
#[async_trait]
pub trait WebsocketHandler {
@@ -54,11 +52,13 @@ impl WebsocketHandler for DefaultWebsocketHandler {
let websocket = self.websocket.await?;
let (mut websocket_writer, mut websocket_reader) = websocket.split();
let session_manager = self.session_manager.lock().await;
let event_handler_pair = session_manager.split(
&self.websocket_session.session.session_id,
self.websocket_session.connection_type.get_topics(),
);
let event_handler_pair = async {
let session_manager = self.session_manager.lock().await;
session_manager.split(
&self.websocket_session.session.session_id,
self.websocket_session.connection_type.get_topics(),
).await
}.await;
if let Err(_) = event_handler_pair {
error(
&self.websocket_session,
@@ -107,51 +107,21 @@ impl WebsocketHandler for DefaultWebsocketHandler {
trace!("received message is snapshot");
let mut any_error = false;
match session_snapshot {
SessionSnapshot::Host(session_id, payload) => {
SessionSnapshot::Host(_, payload) => {
trace(&websocket_session_read_copy, "received message is HOST snapshot");
let move_events = payload.objects.iter().map(|o| {
o.to_move_event(&session_id, payload.ts)
}).collect();
debug(&websocket_session_read_copy, &format!("host: created move events from snapshot: {:?}", move_events));
let move_write_error = !write_events(move_events, "move", &mut event_writer);
if move_write_error {
debug(&websocket_session_read_copy, "host: move events write failed");
let write_res = write_events(vec![payload], "host_tick", &mut event_writer).await;
if !write_res {
error(&websocket_session_read_copy, "failed to write HOST tick");
}
let input_event = InputEventPayload {
inputs: payload.inputs,
player_id: websocket_session_read_copy.actor.id().to_owned(),
ts: payload.ts,
session_id: session_id.to_owned()
};
debug(&websocket_session_read_copy, &format!("host: created input event from snapshot: {:?}", input_event));
let input_write_error = !write_events(vec![input_event], "input", &mut event_writer);
if input_write_error {
debug(&websocket_session_read_copy, "host: input event write failed");
}
let status_event = StatusEventPayload {
score: payload.state.score,
winner: payload.state.winner,
session_id: session_id.to_owned()
};
let status_write_error = !write_events(vec![status_event], "status", &mut event_writer);
if status_write_error {
debug(&websocket_session_read_copy, "host: status event write failed");
}
any_error = move_write_error || input_write_error || input_write_error;
any_error = !write_res;
},
SessionSnapshot::Peer(session_id, payload) => {
trace(&websocket_session_read_copy, "received message is PEER snapshot");
let input_event = InputEventPayload {
inputs: payload.inputs,
player_id: websocket_session_read_copy.actor.id().to_owned(),
ts: payload.ts,
session_id: session_id.to_owned()
};
debug(&websocket_session_read_copy, &format!("peer: created input event from snapshot: {:?}", input_event));
any_error = !write_events(vec![input_event], "input", &mut event_writer);
if any_error {
debug(&websocket_session_read_copy, "peer: input event write failed");
let write_res = write_events(vec![payload], "peer_tick", &mut event_writer).await;
if !write_res {
error(&websocket_session_read_copy, &format!("failed to write PEER tick"));
}
any_error = !write_res;
},
SessionSnapshot::Observer(_, _) => {
// noop
@@ -170,7 +140,7 @@ impl WebsocketHandler for DefaultWebsocketHandler {
actor_id: heartbeat.player_id,
ts: heartbeat.ts
};
let res = write_events(vec![event], "heart_beat", &mut event_writer);
let res = write_events(vec![event], "heart_beat", &mut event_writer).await;
if !res {
error!("failed to persist heart beat.");
} else {
@@ -193,7 +163,7 @@ impl WebsocketHandler for DefaultWebsocketHandler {
};
let reason = format!("ws closed: {}", reason);
write_session_close_event(&mut event_writer, &websocket_session_read_copy, reason.as_str());
write_session_close_event(&mut event_writer, &websocket_session_read_copy, reason.as_str()).await;
break;
}
_ => {}
@@ -213,7 +183,7 @@ impl WebsocketHandler for DefaultWebsocketHandler {
loop {
trace(&websocket_session_write_copy, "reading messages from kafka");
// TODO: Should perform more filtering, e.g. inputs of player are not relevant.
let events = event_reader.read_from_session();
let events = event_reader.read_from_session().await;
if let Err(e) = events {
error(&websocket_session_write_copy, &format!("Failed to read messages from kafka: {:?}", e));
continue;
@@ -224,41 +194,29 @@ impl WebsocketHandler for DefaultWebsocketHandler {
if events.len() == 0 {
trace(&websocket_session_write_copy, "no new messages from kafka.");
} else {
let (move_events, other_events): (Vec<EventWrapper>, Vec<EventWrapper>) = events.into_iter().partition(|e| &e.topic == "move");
let mut session_events = events.iter().filter(|e| e.topic == "session")
.map(|e| WebsocketEventDTO {
topic: "session".to_owned(),
event: e.event.clone()
})
.collect();
let mut grouped_move_events = HashMap::<u128, Vec<MoveEventPayload>>::new();
for move_event in move_events {
let deserialized = serde_json::from_str::<MoveEventPayload>(&move_event.event);
if let Err(e) = deserialized {
error(&websocket_session_write_copy, &format!("Failed to deserialize move event {:?}: {:?}", move_event, e));
continue;
let mut tick_events = match websocket_session_write_copy.connection_type {
WebSocketConnectionType::HOST => {
events.iter().filter(|e| e.topic == "peer_tick")
.map(|e| WebsocketEventDTO {topic: "tick".to_owned(), event: e.event.clone()})
.collect()
},
_ => {
events.iter().filter(|e| e.topic == "host_tick")
.map(|e| WebsocketEventDTO {topic: "tick".to_owned(), event: e.event.clone()})
.collect()
}
let deserialized = deserialized.unwrap();
if !grouped_move_events.contains_key(&deserialized.ts) {
grouped_move_events.insert(deserialized.ts.clone(), vec![]);
}
let existing = grouped_move_events.get_mut(&deserialized.ts).unwrap();
existing.push(deserialized);
}
let mut tick_event_dtos = grouped_move_events.into_iter()
.map(|e| TickEvent{tick: e.0, objects: e.1})
.map(|e| serde_json::to_string(&e).unwrap())
// TODO: This could e.g. be done with ksql when the move events are sent.
.map(|e| WebsocketEventDTO {topic: "tick".to_owned(), event: e})
.collect::<Vec<WebsocketEventDTO>>();
let mut other_event_dtos = other_events.into_iter().map(|e| {
WebsocketEventDTO {
topic: e.topic,
event: e.event
}
}).collect::<Vec<WebsocketEventDTO>>();
};
let mut event_dtos = vec![];
event_dtos.append(&mut other_event_dtos);
if websocket_session_write_copy.connection_type != WebSocketConnectionType::HOST {
event_dtos.append(&mut tick_event_dtos);
}
event_dtos.append(&mut session_events);
event_dtos.append(&mut tick_events);
trace(&websocket_session_write_copy, &format!("{} new messages from kafka.", event_dtos.len()));
let json = serde_json::to_string(&event_dtos).unwrap();
@@ -303,7 +261,7 @@ fn error(websocket_session: &WebSocketSession, msg: &str) {
error!("[{}] {}", websocket_session.session.session_id, msg)
}
fn write_session_close_event(event_writer: &mut SessionWriter, websocket_session: &WebSocketSession, close_reason: &str) {
async fn write_session_close_event(event_writer: &mut SessionWriter, websocket_session: &WebSocketSession, close_reason: &str) {
let mut updated_session = websocket_session.session.clone();
updated_session.state = SessionState::CLOSED;
let session_closed_event = SessionEvent::Closed(SessionEventPayload {
@@ -312,7 +270,7 @@ fn write_session_close_event(event_writer: &mut SessionWriter, websocket_session
reason: format!("ws closed: {}", close_reason),
});
let msg = json!(session_closed_event).to_string();
let session_event_write_res = event_writer.write_to_session("session", vec![&msg]);
let session_event_write_res = event_writer.write_to_session("session", vec![&msg]).await;
if let Err(e) = session_event_write_res {
eprintln!("Failed to write session closed event: {0}", e)
}
@@ -348,9 +306,9 @@ impl FromStr for WebSocketConnectionType {
impl WebSocketConnectionType {
pub fn get_topics(&self) -> &[&str] {
match self {
WebSocketConnectionType::HOST => &["input", "session"],
WebSocketConnectionType::HOST => &["peer_tick", "session"],
WebSocketConnectionType::PEER | WebSocketConnectionType::OBSERVER => {
&["move", "input", "status", "session"]
&["host_tick", "session"]
}
}
}
@@ -430,7 +388,7 @@ impl SessionSnapshot {
}
}
#[derive(Deserialize)]
#[derive(Deserialize, Serialize, Debug)]
struct HostSessionSnapshotDTO {
pub session_id: String,
pub inputs: Vec<Input>,
@@ -440,7 +398,7 @@ struct HostSessionSnapshotDTO {
pub ts: u128
}
#[derive(Deserialize)]
#[derive(Deserialize, Serialize, Debug)]
struct PeerSessionSnapshotDTO {
pub session_id: String,
pub inputs: Vec<Input>,
@@ -455,7 +413,7 @@ struct ObserverSessionSnapshotDTO {
pub ts: u128
}
#[derive(Deserialize)]
#[derive(Deserialize, Serialize, Debug)]
struct GameObjectStateDTO {
pub id: String,
pub orientation_x: f64,
@@ -486,7 +444,7 @@ impl GameObjectStateDTO {
}
}
fn write_events<T>(events: Vec<T>, topic: &str, event_writer: &mut SessionWriter) -> bool where T : Serialize + Debug {
async fn write_events<T>(events: Vec<T>, topic: &str, event_writer: &mut SessionWriter) -> bool where T : Serialize + Debug {
if events.len() == 0 {
debug!("called to write 0 events - noop");
return true;
@@ -505,7 +463,7 @@ fn write_events<T>(events: Vec<T>, topic: &str, event_writer: &mut SessionWriter
}
let to_send = to_send.iter().map(|e| e.as_str()).collect();
let write_res = event_writer.write_to_session(topic, to_send);
let write_res = event_writer.write_to_session(topic, to_send).await;
if let Err(e) = write_res {
error!("Failed to write at least one event to topic {}: {:?}", topic, e);
any_error = true;