Merge pull request #1277 from weaveworks/ebpf-plugin-improvements

eBPF plugin improvements
This commit is contained in:
Alfonso Acosta
2016-04-13 15:34:38 +01:00
2 changed files with 155 additions and 52 deletions

View File

@@ -1,25 +1,26 @@
#include <uapi/linux/ptrace.h>
#include <linux/skbuff.h>
#include <net/sock.h>
struct received_http_requests_key_t {
u32 pid;
};
BPF_HASH(received_http_requests, struct received_http_requests_key_t, u64);
/* Table from (Task group id|Task id) to (Number of received http requests).
We need to gather requests per task and not only per task group (i.e. userspace pid)
so that entries can be cleared up independently when a task exists.
This implies that userspace needs to do the per-process aggregation.
*/
BPF_HASH(received_http_requests, u64, u64);
/*
skb_copy_datagram_iter() (Kernels >= 3.19) is in charge of copying socket
buffers from kernel to userspace.
/* skb_copy_datagram_iter() (Kernels >= 3.19) is in charge of copying socket
buffers from kernel to userspace.
skb_copy_datagram_iter() has an associated tracepoint
(trace_skb_copy_datagram_iovec), which would be more stable than a kprobe but
it lacks the offset argument.
skb_copy_datagram_iter() has an associated tracepoint
(trace_skb_copy_datagram_iovec), which would be more stable than a kprobe but
it lacks the offset argument.
*/
int kprobe__skb_copy_datagram_iter(struct pt_regs *ctx, const struct sk_buff *skb, int offset, void *unused_iovec, int len)
{
/* Inspect the beginning of socket buffers copied to user-space to determine if they
correspond to http requests.
/* Inspect the beginning of socket buffers copied to user-space to determine
if they correspond to http requests.
Caveats:
@@ -30,32 +31,132 @@ int kprobe__skb_copy_datagram_iter(struct pt_regs *ctx, const struct sk_buff *sk
We could inspect the full packet but:
* It's very inefficient.
* Examining the non-linear (paginated) area of a socket buffer would be
really tricky from ebpf.
*/
/* TODO: exit early if it's not TCP */
/* Inline implementation of skb_headlen() */
unsigned int head_len = skb->len - skb->data_len;
/* http://stackoverflow.com/questions/25047905/http-request-minimum-size-in-bytes
minimum length of http request is always geater than 7 bytes
really tricky from ebpf.
*/
if (head_len - offset < 7) {
/* Verify it's a TCP socket
TODO: is it worth caching it in a socket table?
*/
struct sock *sk = skb->sk;
unsigned short skc_family = sk->__sk_common.skc_family;
switch (skc_family) {
case PF_INET:
case PF_INET6:
case PF_UNIX:
break;
default:
return 0;
}
/* The socket type and procotol are not directly addressable since they are
bitfields. We access them by assuming sk_write_queue is immediately before
them (admittedly pretty hacky).
*/
unsigned int flags = 0;
size_t flags_offset = offsetof(typeof(struct sock), sk_write_queue) + sizeof(sk->sk_write_queue);
bpf_probe_read(&flags, sizeof(flags), ((u8*)sk) + flags_offset);
u16 sk_type = flags >> 16;
if (sk_type != SOCK_STREAM) {
return 0;
}
u8 sk_protocol = flags >> 8 & 0xFF;
/* The protocol is unset (IPPROTO_IP) in Unix sockets */
if ( (sk_protocol != IPPROTO_TCP) && ((skc_family == PF_UNIX) && (sk_protocol != IPPROTO_IP)) ) {
return 0;
}
u8 data[4] = {};
bpf_probe_read(&data, sizeof(data), skb->data + offset);
/* TODO: support other methods and optimize lookups */
if ((data[0] == 'G') && (data[1] == 'E') && (data[2] == 'T') && (data[3] == ' ')) {
/* Record request */
struct received_http_requests_key_t key = {};
key.pid = bpf_get_current_pid_tgid() >> 32;
received_http_requests.increment(key);
/* Inline implementation of skb_headlen() */
unsigned int head_len = skb->len - skb->data_len;
/* http://stackoverflow.com/questions/25047905/http-request-minimum-size-in-bytes
minimum length of http request is always greater than 7 bytes
*/
unsigned int available_data = head_len - offset;
if (available_data < 7) {
return 0;
}
/* Check if buffer begins with a method name followed by a space.
To avoid false positives it would be good to do a deeper inspection
(i.e. fully ensure a 'Method SP Request-URI SP HTTP-Version CRLF'
structure) but loops are not allowed in ebpf, making variable-size-data
parsers infeasible.
*/
u8 data[8] = {};
if (available_data >= 8) {
/* We have confirmed having access to 7 bytes, but need 8 bytes to check the
space after OPTIONS. bpf_probe_read() requires its second argument to be
an immediate, so we obtain the data in this unsexy way.
*/
bpf_probe_read(&data, 8, skb->data + offset);
} else {
bpf_probe_read(&data, 7, skb->data + offset);
}
switch (data[0]) {
/* DELETE */
case 'D':
if ((data[1] != 'E') || (data[2] != 'L') || (data[3] != 'E') || (data[4] != 'T') || (data[5] != 'E') || (data[6] != ' ')) {
return 0;
}
break;
/* GET */
case 'G':
if ((data[1] != 'E') || (data[2] != 'T') || (data[3] != ' ')) {
return 0;
}
break;
/* HEAD */
case 'H':
if ((data[1] != 'E') || (data[2] != 'A') || (data[3] != 'D') || (data[4] != ' ')) {
return 0;
}
break;
/* OPTIONS */
case 'O':
if (available_data < 8 || (data[1] != 'P') || (data[2] != 'T') || (data[3] != 'I') || (data[4] != 'O') || (data[5] != 'N') || (data[6] != 'S') || (data[7] != ' ')) {
return 0;
}
break;
/* PATCH/POST/PUT */
case 'P':
switch (data[1]) {
case 'A':
if ((data[2] != 'T') || (data[3] != 'C') || (data[4] != 'H') || (data[5] != ' ')) {
return 0;
}
break;
case 'O':
if ((data[2] != 'S') || (data[3] != 'T') || (data[4] != ' ')) {
return 0;
}
break;
case 'U':
if ((data[2] != 'T') || (data[3] != ' ')) {
return 0;
}
break;
}
break;
default:
return 0;
}
/* Finally, bump the request counter for current task */
u64 pid_tgid = bpf_get_current_pid_tgid();
received_http_requests.increment(pid_tgid);
return 0;
}
/* Clear out request count entries of tasks on exit */
int kprobe__do_exit(struct pt_regs *ctx) {
u64 pid_tgid = bpf_get_current_pid_tgid();
received_http_requests.delete(&pid_tgid);
return 0;
}

View File

@@ -2,6 +2,7 @@
import bcc
import time
import collections
import datetime
import os
import signal
@@ -26,17 +27,24 @@ class KernelInspector(threading.Thread):
self.lock = threading.Lock()
def update_http_rate_per_pid(self, last_req_count_snapshot):
new_req_count_snapshot = dict()
new_http_rate_per_pid = dict()
# Aggregate the kernel's per-task http request counts into userland's
# per-process counts
req_count_table = self.bpf.get_table(EBPF_TABLE_NAME)
for key, value in req_count_table.iteritems():
request_delta = value.value
if key.pid in last_req_count_snapshot:
request_delta -= last_req_count_snapshot[key.pid]
if request_delta > 0:
new_http_rate_per_pid[key.pid] = request_delta
new_req_count_snapshot = collections.defaultdict(int)
for pid_tgid, req_count in req_count_table.iteritems():
# Note that the kernel's tgid maps into userland's pid
# (not to be confused by the kernel's pid, which is
# the unique identifier of a kernel task)
pid = pid_tgid.value >> 32
new_req_count_snapshot[pid] += req_count.value
new_req_count_snapshot[key.pid] = value.value
# Compute request rate
new_http_rate_per_pid = dict()
for pid, req_count in new_req_count_snapshot.iteritems():
request_delta = req_count
if pid in last_req_count_snapshot:
request_delta -= last_req_count_snapshot[pid]
new_http_rate_per_pid[pid] = request_delta
self.lock.acquire()
self.http_rate_per_pid = new_http_rate_per_pid
@@ -52,17 +60,11 @@ class KernelInspector(threading.Thread):
def run(self):
# Compute request rates based on the requests counts from the last
# second. It would be simpler to clear the table, wait one second
# second. It would be simpler to clear the table, wait one second but
# clear() is expensive (each entry is individually cleared with a system
# call) and less robust (clearing contends with the increments done by
# the kernel probe).
# FIXME: we need a mechanism to garbage-collect old processes, either
# here or on the probe. Some options are clearing the table once
# in a while (not ideal for the reasons above) or adding another
# probe to remove processes from the table when they die (this
# will probably require keeping keeping track of tasks and not
# just processes)
req_count_snapshot = dict()
# call) and less robust (it contends with the increments done by the
# kernel probe).
req_count_snapshot = collections.defaultdict(int)
while True:
time.sleep(1)
req_count_snapshot = self.update_http_rate_per_pid(req_count_snapshot)