From 2da13da6f7cc6f1a4ee2668596a7838d4b27b2ee Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Mon, 11 Apr 2016 01:56:29 +0000 Subject: [PATCH] ebpf plugin to gather http-request rate per PID --- examples/plugins/ebpf/Dockerfile | 12 ++ examples/plugins/ebpf/Makefile | 22 +++ examples/plugins/ebpf/http-requests.c | 61 ++++++++ examples/plugins/ebpf/http-requests.py | 197 +++++++++++++++++++++++++ 4 files changed, 292 insertions(+) create mode 100644 examples/plugins/ebpf/Dockerfile create mode 100644 examples/plugins/ebpf/Makefile create mode 100644 examples/plugins/ebpf/http-requests.c create mode 100755 examples/plugins/ebpf/http-requests.py diff --git a/examples/plugins/ebpf/Dockerfile b/examples/plugins/ebpf/Dockerfile new file mode 100644 index 000000000..29e184dd1 --- /dev/null +++ b/examples/plugins/ebpf/Dockerfile @@ -0,0 +1,12 @@ +FROM ubuntu:wily +MAINTAINER Weaveworks Inc +LABEL works.weave.role=system + +# Install BCC +RUN apt-key adv --keyserver keyserver.ubuntu.com --recv-keys D4284CDD +RUN echo "deb http://52.8.15.63/apt trusty main" | tee /etc/apt/sources.list.d/iovisor.list +RUN apt-get update && apt-get install -y libbcc libbcc-examples python-bcc + +# Add our plugin +ADD ./http-requests.c ./http-requests.py /usr/bin/ +ENTRYPOINT ["/usr/bin/http-requests.py"] diff --git a/examples/plugins/ebpf/Makefile b/examples/plugins/ebpf/Makefile new file mode 100644 index 000000000..6f9bff932 --- /dev/null +++ b/examples/plugins/ebpf/Makefile @@ -0,0 +1,22 @@ +.PHONY: run clean + +IMAGE=weavescope-http-requests-plugin +UPTODATE=.$(EXE).uptodate + +run: $(UPTODATE) + docker run --rm -it \ + --privileged --net=host \ + -v /lib/modules:/lib/modules \ + -v /usr/src:/usr/src \ + -v /sys/kernel/debug/:/sys/kernel/debug/ \ + -v /var/run/scope/plugins:/var/run/scope/plugins \ + --name $(IMAGE) \ + $(IMAGE) + +$(UPTODATE): Dockerfile + docker build -t $(IMAGE) . + touch $@ + +clean: + - rm -rf $(UPTODATE) + - docker rmi $(IMAGE) diff --git a/examples/plugins/ebpf/http-requests.c b/examples/plugins/ebpf/http-requests.c new file mode 100644 index 000000000..00bc73b56 --- /dev/null +++ b/examples/plugins/ebpf/http-requests.c @@ -0,0 +1,61 @@ +#include +#include + +struct received_http_requests_key_t { + u32 pid; +}; +BPF_HASH(received_http_requests, struct received_http_requests_key_t, u64); + + +/* + skb_copy_datagram_iter() (Kernels >= 3.19) is in charge of copying socket + buffers from kernel to userspace. + + skb_copy_datagram_iter() has an associated tracepoint + (trace_skb_copy_datagram_iovec), which would be more stable than a kprobe but + it lacks the offset argument. + */ +int kprobe__skb_copy_datagram_iter(struct pt_regs *ctx, const struct sk_buff *skb, int offset, void *unused_iovec, int len) +{ + + /* Inspect the beginning of docket buffers copied to user-space to determine if they + correspond to http requests. + + Caveats: + + Requests may not appear at the beginning of a packet due to: + * Persistent connections. + * Packet fragmentation. + + We could inspect the full packet but: + * It's very inefficient. + * Examining the non-linear (paginated) area of a socket buffer would be + really tricky from ebpf. + */ + + /* TODO: exit early if it's not TCP */ + + /* Inline implementation of skb_headlen() */ + unsigned int head_len = skb->len - skb->data_len; + + /* http://stackoverflow.com/questions/25047905/http-request-minimum-size-in-bytes + minimum length of http request is always geater than 7 bytes + */ + if (head_len - offset < 7) { + return 0; + } + + u8 data[4] = {}; + bpf_probe_read(&data, sizeof(data), skb->data + offset); + + /* TODO: support other methods and optimize lookups */ + if ((data[0] == 'G') && (data[1] == 'E') && (data[2] == 'T')) { + /* Record request */ + struct received_http_requests_key_t key = {}; + key.pid = bpf_get_current_pid_tgid() >> 32; + received_http_requests.increment(key); + } + + + return 0; +} diff --git a/examples/plugins/ebpf/http-requests.py b/examples/plugins/ebpf/http-requests.py new file mode 100755 index 000000000..d68558724 --- /dev/null +++ b/examples/plugins/ebpf/http-requests.py @@ -0,0 +1,197 @@ +#!/usr/bin/env python +import bcc + +import time +import datetime +import os +import signal +import errno +import json +import urlparse +import threading +import socket +import BaseHTTPServer +import SocketServer +import sys + +EBPF_FILE = "http-requests.c" +EBPF_TABLE_NAME = "received_http_requests" +PLUGIN_UNIX_SOCK = "/var/run/scope/plugins/http_requests.sock" + +class KernelInspector(threading.Thread): + def __init__(self): + super(KernelInspector, self).__init__() + self.bpf = bcc.BPF(EBPF_FILE) + self.http_rate_per_pid = dict() + self.lock = threading.Lock() + + def update_http_rate_per_pid(self, last_req_count_snapshot): + new_req_count_snapshot = dict() + new_http_rate_per_pid = dict() + req_count_table = self.bpf.get_table(EBPF_TABLE_NAME) + for key, value in req_count_table.iteritems(): + request_delta = value.value + if key.pid in last_req_count_snapshot: + request_delta -= last_req_count_snapshot[key.pid] + if request_delta > 0: + new_http_rate_per_pid[key.pid] = request_delta + + new_req_count_snapshot[key.pid] = value.value + + self.lock.acquire() + self.http_rate_per_pid = new_http_rate_per_pid + self.lock.release() + + return new_req_count_snapshot + + def on_http_rate_per_pid(self, f): + self.lock.acquire() + r = f(self.http_rate_per_pid) + self.lock.release() + return r + + def run(self): + # Compute request rates based on the requests counts from the last + # second. It would be simpler to clear the table, wait one second + # clear() is expensive (each entry is individually cleared with a system + # call) and less robust (clearing contends with the increments done by + # the kernel probe). + # FIXME: we need a mechanism to garbage-collect old processes, either + # here or on the probe. Some options are clearing the table once + # in a while (not ideal for the reasons above) or adding another + # probe to remove processes from the table when they die (this + # will probably require keeping keeping track of tasks and not + # just processes) + req_count_snapshot = dict() + while True: + time.sleep(1) + req_count_snapshot = self.update_http_rate_per_pid(req_count_snapshot) + + +class PluginRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler): + protocol_version = 'HTTP/1.1' + + def __init__(self, *args, **kwargs): + self.request_log = '' + BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args, **kwargs) + + def do_GET(self): + self.log_extra = '' + path = urlparse.urlparse(self.path)[2].lower() + if path == '/': + self.do_handshake() + elif path == '/report': + self.do_report() + else: + self.send_response(404) + self.send_header('Content-length', 0) + self.end_headers() + + def get_process_nodes(self, http_rate_per_pid): + # Get current timestamp in RFC3339 + date = datetime.datetime.utcnow() + date = date.isoformat('T') + 'Z' + process_nodes = dict() + for pid, http_rate in http_rate_per_pid.iteritems(): + # print "\t%-10s %s" % (pid , http_rate) + node_key = "%s;%d" % (self.server.hostname, pid) + process_nodes[node_key] = { + 'metrics': { + 'http_requests_per_second': { + 'samples': [{ + 'date': date, + 'value': float(http_rate), + }] + } + } + } + return process_nodes + + def do_report(self): + kernel_inspector = self.server.kernel_inspector + process_nodes = kernel_inspector.on_http_rate_per_pid(self.get_process_nodes) + report = { + 'Process': { + 'nodes': process_nodes, + 'metric_templates': { + 'http_requests_per_second': { + 'id': 'http_requests_per_second', + 'label': 'HTTP Req/Second', + 'priority': 0.1, + } + } + } + } + body = json.dumps(report) + self.request_log = "resp_size=%d, resp_entry_count=%d" % (len(body), len(process_nodes)) + self.respond(body) + + def do_handshake(self): + spec = { + 'name': 'http-requests', + 'description': 'Adds http request metrics to processes', + 'interfaces': ['reporter'], + 'api_version': '1', + } + self.respond(json.dumps(spec)) + + def respond(self, body): + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.send_header('Content-length', len(body)) + self.end_headers() + self.wfile.write(body) + + def log_request(self, code='-', size='-'): + request_log = '' + if self.request_log: + request_log = ' (%s)' % self.request_log + self.log_message('"%s" %s %s%s', + self.requestline, str(code), str(size), request_log) + + +class PluginServer(SocketServer.ThreadingUnixStreamServer): + daemon_threads = True + + def __init__(self, socket_file, kernel_inspector): + mkdir_p(os.path.dirname(socket_file)) + self.socket_file = socket_file + self.delete_socket_file() + self.kernel_inspector = kernel_inspector + self.hostname = socket.gethostname() + SocketServer.UnixStreamServer.__init__(self, socket_file, PluginRequestHandler) + + def finish_request(self, request, _): + # Make the logger happy by providing a phony client_address + self.RequestHandlerClass(request, '-', self) + + def delete_socket_file(self): + if os.path.exists(self.socket_file): + os.remove(self.socket_file) + + +def mkdir_p(path): + try: + os.makedirs(path) + except OSError as exc: + if exc.errno == errno.EEXIST and os.path.isdir(path): + pass + else: + raise + + +if __name__ == '__main__': + kernel_inspector = KernelInspector() + kernel_inspector.setDaemon(True) + kernel_inspector.start() + plugin_server = PluginServer(PLUGIN_UNIX_SOCK, kernel_inspector) + def sig_handler(b, a): + plugin_server.delete_socket_file() + exit(0) + signal.signal(signal.SIGTERM, sig_handler) + signal.signal(signal.SIGINT, sig_handler) + try: + plugin_server.serve_forever() + except: + plugin_server.delete_socket_file() + raise