more logging

This commit is contained in:
Thilo Behnke
2022-06-23 21:52:18 +02:00
parent bc144785ea
commit 726e18ba46
2 changed files with 9 additions and 4 deletions

View File

@@ -180,7 +180,7 @@ async fn handle_http_request(
async fn shutdown_signal() {
// Wait for the CTRL+C signal
let shutdown_received = tokio::signal::ctrl_c()
tokio::signal::ctrl_c()
.await
.expect("failed to install CTRL+C signal handler");
info!("received shutdown signal, shutting down now...");

View File

@@ -5,6 +5,7 @@ use hyper::{Body, Client, Method, Request, Uri};
use kafka::client::ProduceMessage;
use kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage, MessageSet};
use kafka::producer::{Partitioner, Producer, Record, RequiredAcks, Topics};
use log::{error, info};
use serde::Deserialize;
use pong::event::event::{Event, EventReaderImpl, EventWriterImpl};
@@ -33,12 +34,16 @@ pub struct KafkaDefaultEventWriterImpl {
impl KafkaDefaultEventWriterImpl {
pub fn new(host: &str) -> KafkaDefaultEventWriterImpl {
println!("Connecting default producer to kafka host: {}", host);
info!("connecting default producer to kafka host: {}", host);
let producer = Producer::from_hosts(vec![host.to_owned()])
.with_ack_timeout(Duration::from_secs(1))
.with_required_acks(RequiredAcks::One)
.create()
.unwrap();
.create();
if let Err(e) = producer {
error!("failed to connect producer to kafka host {}: {:?}", host, e);
panic!("kafka connection failed, no recovery possible.")
}
let producer = producer.unwrap();
KafkaDefaultEventWriterImpl { producer }
}
}