Files
karma/demo/prometheus.py
2021-05-11 15:36:23 +01:00

98 lines
2.4 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""
Mock Prometheus server for alery history queries
"""
import json
import sys
import random
import time
from http.server import BaseHTTPRequestHandler, HTTPServer
def generateSeries():
series = []
now = time.time()
for i in range(24):
value = 0
if random.randint(0, 100) > 75:
value = random.randint(0, 10)
series.append([now, str(value)])
now = now - 3600
return series
def maybe_fail():
if random.randint(0, 100) > 98:
return True
return False
class server(BaseHTTPRequestHandler):
def _set_headers(self, code=200):
self.send_response(code)
self.send_header("Content-type", "application/json")
self.end_headers()
def _query_range(self):
self.wfile.write(
json.dumps(
{
"status": "success",
"data": {
"resultType": "matrix",
"result": [
{
"metric": {},
"values": generateSeries(),
}
],
},
}
).encode("utf-8")
)
def _label_names(self):
self.wfile.write(
json.dumps(
{
"status": "success",
"data": [
"alertname",
"instance",
"cluster",
"region",
"job",
"severity",
"device",
],
}
).encode("utf-8")
)
def do_POST(self):
self.do_GET()
def do_GET(self):
if maybe_fail():
self._set_headers(code=500)
self.wfile.write(json.dumps({"status": "error"}).encode("utf-8"))
return
self._set_headers()
if self.path.startswith("/api/v1/labels"):
self._label_names()
else:
self._query_range()
if __name__ == "__main__":
server_address = ("", int(sys.argv[1]))
httpd = HTTPServer(server_address, server)
try:
httpd.serve_forever()
except KeyboardInterrupt:
pass
httpd.server_close()