Merge pull request #1906 from kinvolk/alessandro/plugins-doc

Move plugins to the new organization
This commit is contained in:
Alfonso Acosta
2016-11-14 16:37:46 +01:00
committed by GitHub
21 changed files with 87 additions and 1944 deletions

View File

@@ -51,8 +51,6 @@ test:
- test -z "$SECRET_PASSWORD" || (cd $SRCDIR/integration; eval $(./gce.sh hosts); ./run_all.sh):
parallel: true
timeout: 300
- cd $SRCDIR/examples/plugins/traffic-control && make .traffic-control.uptodate && docker tag weaveworks/scope-traffic-control-plugin weaveworks/scope-traffic-control-plugin:$(../../../tools/image-tag):
parallel: true
post:
- test -z "$SECRET_PASSWORD" || (cd $SRCDIR/integration; ./gce.sh destroy):
parallel: true
@@ -72,14 +70,10 @@ deployment:
docker login -e $DOCKER_EMAIL -u $DOCKER_USER -p $DOCKER_PASS &&
(test "${DOCKER_ORGANIZATION:-$DOCKER_USER}" == "weaveworks" || (
docker tag weaveworks/scope:latest ${DOCKER_ORGANIZATION:-$DOCKER_USER}/scope:latest &&
docker tag weaveworks/scope:$(./tools/image-tag) ${DOCKER_ORGANIZATION:-$DOCKER_USER}/scope:$(./tools/image-tag) &&
docker tag weaveworks/scope-traffic-control-plugin:latest ${DOCKER_ORGANIZATION:-$DOCKER_USER}/scope-traffic-control-plugin:latest &&
docker tag weaveworks/scope-traffic-control-plugin:$(./tools/image-tag) ${DOCKER_ORGANIZATION:-$DOCKER_USER}/scope-traffic-control-plugin:$(./tools/image-tag)
docker tag weaveworks/scope:$(./tools/image-tag) ${DOCKER_ORGANIZATION:-$DOCKER_USER}/scope:$(./tools/image-tag)
)) &&
docker push ${DOCKER_ORGANIZATION:-$DOCKER_USER}/scope &&
docker push ${DOCKER_ORGANIZATION:-$DOCKER_USER}/scope:$(./tools/image-tag) &&
docker push ${DOCKER_ORGANIZATION:-$DOCKER_USER}/scope-traffic-control-plugin &&
docker push ${DOCKER_ORGANIZATION:-$DOCKER_USER}/scope-traffic-control-plugin:$(./tools/image-tag) &&
(test -z "${UI_BUCKET_KEY_ID}" || (
make ui-upload
)) &&

View File

@@ -1,75 +1,66 @@
# Scope Probe Plugins
Scope probe plugins let you insert your own custom metrics into Scope
and get them displayed in the UI.
Scope probe plugins let you insert your own custom data and controls into Scope and display them in the UI.
The list of the current running plugins is displayed next to the label `PLUGINS` in the bottom right of the UI.
<img src="../../imgs/plugin.png" width="800" alt="Scope Probe plugin screenshot" align="center">
You can find some examples at the [the example
plugins](https://github.com/weaveworks/scope/tree/master/examples/plugins)
directory. We currently provide two examples:
* A [Python
plugin](https://github.com/weaveworks/scope/tree/master/examples/plugins/http-requests)
using [bcc](http://iovisor.github.io/bcc/) to extract incoming HTTP
request rates per process, without any application-level
instrumentation requirements and negligible performance toll
(metrics are obtained in-kernel without any packet copying to
userspace). **Note:** This plugin needs a [recent kernel version
with ebpf
support](https://github.com/iovisor/bcc/blob/master/INSTALL.md#kernel-configuration). It
will not compile on current [dlite](https://github.com/nlf/dlite)
and boot2docker hosts.
* A [Go
plugin](https://github.com/weaveworks/scope/tree/master/examples/plugins/iowait),
using [iostat](https://en.wikipedia.org/wiki/Iostat) to provide
host-level CPU IO wait or idle metrics.
## Official Plugins
The example plugins can be run by calling `make` in their directory.
This will build the plugin, and immediately run it in the foreground.
To run the plugin in the background, see the `Makefile` for examples
of the `docker run ...` command.
Official Weave Scope plugins can be found at [Weaveworks Plugins](https://github.com/weaveworks-plugins).
If the running plugin was picked up by Scope, you will see it in the
list of `PLUGINS` in the bottom right of the UI.
* [IOWait](https://github.com/weaveworks-plugins/scope-iowait): is a Go plugin that uses [iostat](https://en.wikipedia.org/wiki/Iostat) to provide host-level CPU IO wait or idle metrics.
## Plugin ID
* [HTTP Statistics](https://github.com/weaveworks-plugins/scope-http-statistics): is a Python plugin that uses [bcc](http://iovisor.github.io/bcc/) to track multiple metrics about HTTP per process. It does this without any application-level instrumentation requirements and with a negligible performance toll. This plugin is a work in progress, and implements the following (for more information read the [plugin documentation](https://github.com/weaveworks-plugins/scope-http-statistics)):
* Number of HTTP requests per seconds.
* Number of HTTP responses code per second (per code).
Each plugin should have an unique ID. It is forbidden to change it
during the plugin's lifetime. The scope probe will get the plugin's ID
from the plugin's socket filename. For example, the socket named
`my-plugin.sock`, the scope probe will deduce the ID as
`my-plugin`. IDs can only contain alphanumeric sequences, optionally
separated with a dash.
* [Traffic Control](https://github.com/weaveworks-plugins/scope-traffic-control): This plugin allows you to modify latency and packet loss for a specific container via controls from the container's detailed view in the Scope user interface.
## Plugin registration
* [Volume Count](https://github.com/weaveworks-plugins/scope-volume-count): This plugin (written in Python) requests the number of mounted volumes for each container, and provides a container-level count.
All plugins should listen for HTTP connections on a unix socket in the
`/var/run/scope/plugins` directory. The scope probe will recursively
scan that directory every 5 seconds, to look for sockets being added
(or removed). It is also valid to put the plugin unix socket in a
sub-directory, in case you want to apply some permissions, or store
other information with the socket.
## How Plugins Communicate with Scope
This section explains the fundamental parts of the plugins structure necessary to understand how a plugin communicates with Scope.
You can find more practical examples in [Weaveworks Plugins](https://github.com/weaveworks-plugins) repositories.
## Protocol
### Plugin IDs
Each plugin must have a unique ID and this ID must not change
during the plugin's lifetime. Scope probes retrieve the plugin's ID
from the plugin's socket filename. For example, if a socket is named
`my-plugin.sock`, the scope probe deduces the ID as
`my-plugin`. IDs may contain only alphanumeric sequences that are optionally
separated by a dash.
### Registering Plugins
All plugins listen for HTTP connections on a UNIX socket in the `/var/run/scope/plugins` directory. The Scope probe recursively scans that directory every 5 seconds and looks for any added or removed sockets.
If you want to run permissions or store any other information with the socket, you can also put the plugin UNIX socket into a sub-directory.
When a new plugin is detected, the Scope probe begins requesting reports from it via GET /report. It is therefore important that **every plugin implements the report interface**. Implementing the report interface also means handling specific requests.
All plugin endpoints are expected to respond within 500ms, and must respond using the JSON format.
### Protocol
There are several interfaces a plugin may (or must) implement. Usually
implementing an interface means handling specific requests. These
requests are described below.
### Reporter interface
#### Reporter interface
Plugins _must_ implement the reporter interface. Implementing this
interface means listening for HTTP requests at `/report`.
Plugins _must_ implement the reporter interface because Scope uses it to discover which other interfaces the plugin implements.
Implementing this interface means listening for HTTP requests at `/report`.
Add the "reporter" string to the `interfaces` field in the plugin
specification.
**Note**: Plugins must add the "reporter" string to the `interfaces` field in the plugin specification even though this interface is implicitly implemented.
#### Report
When the scope probe discovers a new plugin unix socket it will begin
When a scope probe discovers a new plugin UNIX socket it will begin
periodically making a `GET` request to the `/report` endpoint. The
report data structure returned from this will be merged into the
report data structure returned from this is merged into the
probe's report and sent to the app. An example of the report structure
can be viewed at the `/api/report` endpoint of any scope app.
@@ -80,12 +71,12 @@ For example:
```json
{
"Processes": {},
...,
"Plugins": [
{
"id": "iowait",
"label": "IOWait",
"description": "Adds a graph of CPU IO Wait to hosts",
"id": "plugin-id",
"label": "Human Friendly Name",
"description": "Plugin's brief description",
"interfaces": ["reporter"],
"api_version": "1",
}
@@ -96,29 +87,22 @@ For example:
Note that the `Plugins` section includes exactly one plugin
description. The plugin description fields are:
* `id` is used to check for duplicate plugins. It is
* `id` - checks for duplicate plugins. It is
required. Described in [the Plugin ID section](#plugin-id).
* `label` is a human readable plugin label displayed in the UI. It is
* `label` - a human readable plugin label displayed in the UI. It is
required.
* `description` is displayed in the UI.
* `interfaces` is a list of interfaces which this plugin supports. It
* `description` - displayed in the UI.
* `interfaces` - a list of interfaces which this plugin supports. It
is required, and must contain at least `["reporter"]`.
* `api_version` is used to ensure both the plugin and the scope probe
* `api_version` - ensure both the plugin and the scope probe
can speak to each other. It is required, and must match the probe.
You may notice a small chicken and egg problem - the plugin reports to
the scope probe what interfaces it supports, but the scope probe can
learn that only by doing a `GET /report` request which will be handled
by the plugin if it implements the "reporter" interface. This is
solved (or worked around) by requiring the plugin to always implements
the "reporter" interface.
#### Controller interface
### Controller interface
Plugins _may_ implement the controller interface. Implementing the
Plugins _may_ also implement the controller interface. Implementing the
controller interface means that the plugin can react to HTTP `POST`
control requests sent by the app. The plugin will receive them only
for controls it exposed in its reports. The requests will come to the
control requests sent by the app. The plugin receives them only
for the controls it exposed in its reports. All such requests come to the
`/control` endpoint.
Add the "controller" string to the `interfaces` field in the plugin
@@ -126,7 +110,7 @@ specification.
#### Control
The `POST` requests will have a JSON-encoded body with the following contents:
The `POST` requests contain a JSON-encoded body with the following contents:
```json
{
@@ -136,10 +120,10 @@ The `POST` requests will have a JSON-encoded body with the following contents:
}
```
The body of the response should also be a JSON-encoded data. Usually
the body would be an empty JSON object (so, "{}" after
serialization). If some error happens during handling the control,
then the plugin can send a response with an `error` field set, for
The body of the response should also be a JSON-encoded data. In most cases,
the body is an empty JSON object (so, "{}" after
serialization). If an error happens when handling the control,
then the plugin sends a response with an `error` field set, for
example:
```json
@@ -148,11 +132,11 @@ example:
}
```
Sometimes the control activation can make the control obsolete, so the
Sometimes the control activation can make the control obsolete, and so the
plugin may want to hide it (for example, control for stopping the
container should be hidden after the container is stopped). For this
to work, the plugin can send a shortcut report by filling the
`ShortcutReport` field in the response, like for example:
to work, the plugin sends a shortcut report by filling the
`ShortcutReport` field in the response, like so:
```json
{
@@ -162,11 +146,10 @@ to work, the plugin can send a shortcut report by filling the
##### How to expose controls
Each topology in the report (be it host, pod, endpoint and so on) has
a set of available controls a node in the topology may want to
Each topology in the report (be it host, pod, endpoint and so on) contains
a set of available controls that a node in the topology may want to
show. The following (rather artificial) example shows a topology with
two controls (`ctrl-one` and `ctrl-two`) and two nodes, each having a
different control from the two:
two controls (`ctrl-one` and `ctrl-two`) and two nodes, each with a different control defined:
```json
{
@@ -211,8 +194,8 @@ different control from the two:
}
```
When control "ctrl-one" is activated, the plugin will receive a
request like:
When control "ctrl-one" is activated, the plugin receives a
request as follows:
```json
{
@@ -226,30 +209,31 @@ A short note about the "icon" field of the topology control - the
value for it can be taken from [Font Awesome
Cheatsheet](http://fontawesome.io/cheatsheet/)
##### Node naming
##### Naming Nodes
Very often the controller plugin wants to add some controls to already
existing nodes (like controls for network traffic management to nodes
Often the controller plugin may want to add some controls to already
existing nodes (for example add controls for network traffic management to nodes
representing the running Docker container). To achieve that, it is
important to make sure that the node ID in the plugin's report matches
the ID of the node created by the probe. The ID is a
semicolon-separated list of strings.
For containers, images, hosts and others the ID is usually formatted
For containers, images, hosts and others, the ID is usually formatted
as `${name};<${tag}>`. The `${name}` variable is usually a name of a
thing the node represents, like an ID of the Docker container or the
hostname. The `${tag}` denotes the type of the node. There is a fixed
set of tags used by the probe:
hostname. The `${tag}` denotes the type of the node.
- host
- container
- container_image
- pod
- service
- deployment
- replica_set
There is a fixed set of tags used by the probe:
The examples of "tagged" node names:
- `host`
- `container`
- `container_image`
- `pod`
- `service`
- `deployment`
- `replica_set`
These are examples of "tagged" node names:
- The Docker container with full ID
2299a2ca59dfd821f367e689d5869c4e568272c2305701761888e1d79d7a6f51:
@@ -260,17 +244,19 @@ The examples of "tagged" node names:
The fixed set of tags listed above is not a complete set of names a
node can have though. For example, nodes representing processes are
have ID formatted as `${host};${pid}`. Probably the easiest ways to
have IDs formatted as `${host};${pid}`. The easiest way to
discover how the nodes are named are:
- Read the code in
1. Read the code in
[report/id.go](https://github.com/weaveworks/scope/blob/master/report/id.go).
- Browse the Weave Scope GUI, select some node and search for an `id`
2. Browse the Weave Scope GUI, select some node and search for an `id`
key in the `nodeDetails` array in the address bar.
- For example in the
`http://localhost:4040/#!/state/{"controlPipe":null,"nodeDetails":[{"id":"example.com;<host>","label":"example.com","topologyId":"hosts"}],…`
URL, you can find the `example.com;<host>` which is an ID of the node
representing the host.
- Mentally substitute the `<SLASH>` with `/`. This can appear in
3. Mentally substitute the `<SLASH>` with `/`. This can appear in
Docker image names, so `docker.io/alpine` in the address bar will
be `docker.io<SLASH>alpine`.
For more detailed information visit [https://www.weave.works/documentation/scope-latest-plugins/](https://www.weave.works/documentation/scope-latest-plugins/)

View File

@@ -1,12 +0,0 @@
FROM ubuntu:wily
MAINTAINER Weaveworks Inc <help@weave.works>
LABEL works.weave.role=system
# Install BCC
RUN apt-key adv --keyserver keyserver.ubuntu.com --recv-keys D4284CDD
RUN echo "deb http://52.8.15.63/apt trusty main" | tee /etc/apt/sources.list.d/iovisor.list
RUN apt-get update && apt-get install -y libbcc libbcc-examples python-bcc
# Add our plugin
ADD ./http-requests.c ./http-requests.py /usr/bin/
ENTRYPOINT ["/usr/bin/http-requests.py"]

View File

@@ -1,22 +0,0 @@
.PHONY: run clean
IMAGE=weavescope-http-requests-plugin
UPTODATE=.uptodate
run: $(UPTODATE)
docker run --rm -it \
--privileged --net=host \
-v /lib/modules:/lib/modules \
-v /usr/src:/usr/src \
-v /sys/kernel/debug/:/sys/kernel/debug/ \
-v /var/run/scope/plugins:/var/run/scope/plugins \
--name $(IMAGE) \
$(IMAGE)
$(UPTODATE): Dockerfile http-requests.py http-requests.c
docker build -t $(IMAGE) .
touch $@
clean:
- rm -rf $(UPTODATE)
- docker rmi $(IMAGE)

View File

@@ -1,162 +0,0 @@
#include <linux/skbuff.h>
#include <net/sock.h>
/* Table from (Task group id|Task id) to (Number of received http requests).
We need to gather requests per task and not only per task group (i.e. userspace pid)
so that entries can be cleared up independently when a task exists.
This implies that userspace needs to do the per-process aggregation.
*/
BPF_HASH(received_http_requests, u64, u64);
/* skb_copy_datagram_iter() (Kernels >= 3.19) is in charge of copying socket
buffers from kernel to userspace.
skb_copy_datagram_iter() has an associated tracepoint
(trace_skb_copy_datagram_iovec), which would be more stable than a kprobe but
it lacks the offset argument.
*/
int kprobe__skb_copy_datagram_iter(struct pt_regs *ctx, const struct sk_buff *skb, int offset, void *unused_iovec, int len)
{
/* Inspect the beginning of socket buffers copied to user-space to determine
if they correspond to http requests.
Caveats:
Requests may not appear at the beginning of a packet due to:
* Persistent connections.
* Packet fragmentation.
We could inspect the full packet but:
* It's very inefficient.
* Examining the non-linear (paginated) area of a socket buffer would be
really tricky from ebpf.
*/
/* Verify it's a TCP socket
TODO: is it worth caching it in a socket table?
*/
struct sock *sk = skb->sk;
unsigned short skc_family = sk->__sk_common.skc_family;
switch (skc_family) {
case PF_INET:
case PF_INET6:
case PF_UNIX:
break;
default:
return 0;
}
/* The socket type and protocol are not directly addressable since they are
bitfields. We access them by assuming sk_write_queue is immediately before
them (admittedly pretty hacky).
*/
unsigned int flags = 0;
size_t flags_offset = offsetof(typeof(struct sock), sk_write_queue) + sizeof(sk->sk_write_queue);
bpf_probe_read(&flags, sizeof(flags), ((u8*)sk) + flags_offset);
u16 sk_type = flags >> 16;
if (sk_type != SOCK_STREAM) {
return 0;
}
u8 sk_protocol = flags >> 8 & 0xFF;
/* The protocol is unset (IPPROTO_IP) in Unix sockets */
if ( (sk_protocol != IPPROTO_TCP) && ((skc_family == PF_UNIX) && (sk_protocol != IPPROTO_IP)) ) {
return 0;
}
/* Inline implementation of skb_headlen() */
unsigned int head_len = skb->len - skb->data_len;
/* http://stackoverflow.com/questions/25047905/http-request-minimum-size-in-bytes
minimum length of http request is always greater than 7 bytes
*/
unsigned int available_data = head_len - offset;
if (available_data < 7) {
return 0;
}
/* Check if buffer begins with a method name followed by a space.
To avoid false positives it would be good to do a deeper inspection
(i.e. fully ensure a 'Method SP Request-URI SP HTTP-Version CRLF'
structure) but loops are not allowed in ebpf, making variable-size-data
parsers infeasible.
*/
u8 data[8] = {};
if (available_data >= 8) {
/* We have confirmed having access to 7 bytes, but need 8 bytes to check the
space after OPTIONS. bpf_probe_read() requires its second argument to be
an immediate, so we obtain the data in this unsexy way.
*/
bpf_probe_read(&data, 8, skb->data + offset);
} else {
bpf_probe_read(&data, 7, skb->data + offset);
}
switch (data[0]) {
/* DELETE */
case 'D':
if ((data[1] != 'E') || (data[2] != 'L') || (data[3] != 'E') || (data[4] != 'T') || (data[5] != 'E') || (data[6] != ' ')) {
return 0;
}
break;
/* GET */
case 'G':
if ((data[1] != 'E') || (data[2] != 'T') || (data[3] != ' ')) {
return 0;
}
break;
/* HEAD */
case 'H':
if ((data[1] != 'E') || (data[2] != 'A') || (data[3] != 'D') || (data[4] != ' ')) {
return 0;
}
break;
/* OPTIONS */
case 'O':
if (available_data < 8 || (data[1] != 'P') || (data[2] != 'T') || (data[3] != 'I') || (data[4] != 'O') || (data[5] != 'N') || (data[6] != 'S') || (data[7] != ' ')) {
return 0;
}
break;
/* PATCH/POST/PUT */
case 'P':
switch (data[1]) {
case 'A':
if ((data[2] != 'T') || (data[3] != 'C') || (data[4] != 'H') || (data[5] != ' ')) {
return 0;
}
break;
case 'O':
if ((data[2] != 'S') || (data[3] != 'T') || (data[4] != ' ')) {
return 0;
}
break;
case 'U':
if ((data[2] != 'T') || (data[3] != ' ')) {
return 0;
}
break;
}
break;
default:
return 0;
}
/* Finally, bump the request counter for current task */
u64 pid_tgid = bpf_get_current_pid_tgid();
received_http_requests.increment(pid_tgid);
return 0;
}
/* Clear out request count entries of tasks on exit */
int kprobe__do_exit(struct pt_regs *ctx) {
u64 pid_tgid = bpf_get_current_pid_tgid();
received_http_requests.delete(&pid_tgid);
return 0;
}

View File

@@ -1,195 +0,0 @@
#!/usr/bin/env python
import bcc
import time
import collections
import datetime
import os
import signal
import errno
import json
import urlparse
import threading
import socket
import BaseHTTPServer
import SocketServer
import sys
EBPF_FILE = "http-requests.c"
EBPF_TABLE_NAME = "received_http_requests"
PLUGIN_ID="http-requests"
PLUGIN_UNIX_SOCK = "/var/run/scope/plugins/" + PLUGIN_ID + ".sock"
class KernelInspector(threading.Thread):
def __init__(self):
super(KernelInspector, self).__init__()
self.bpf = bcc.BPF(EBPF_FILE)
self.http_rate_per_pid = dict()
self.lock = threading.Lock()
def update_http_rate_per_pid(self, last_req_count_snapshot):
# Aggregate the kernel's per-task http request counts into userland's
# per-process counts
req_count_table = self.bpf.get_table(EBPF_TABLE_NAME)
new_req_count_snapshot = collections.defaultdict(int)
for pid_tgid, req_count in req_count_table.iteritems():
# Note that the kernel's tgid maps into userland's pid
# (not to be confused by the kernel's pid, which is
# the unique identifier of a kernel task)
pid = pid_tgid.value >> 32
new_req_count_snapshot[pid] += req_count.value
# Compute request rate
new_http_rate_per_pid = dict()
for pid, req_count in new_req_count_snapshot.iteritems():
request_delta = req_count
if pid in last_req_count_snapshot:
request_delta -= last_req_count_snapshot[pid]
new_http_rate_per_pid[pid] = request_delta
self.lock.acquire()
self.http_rate_per_pid = new_http_rate_per_pid
self.lock.release()
return new_req_count_snapshot
def on_http_rate_per_pid(self, f):
self.lock.acquire()
r = f(self.http_rate_per_pid)
self.lock.release()
return r
def run(self):
# Compute request rates based on the requests counts from the last
# second. It would be simpler to clear the table, wait one second but
# clear() is expensive (each entry is individually cleared with a system
# call) and less robust (it contends with the increments done by the
# kernel probe).
req_count_snapshot = collections.defaultdict(int)
while True:
time.sleep(1)
req_count_snapshot = self.update_http_rate_per_pid(req_count_snapshot)
class PluginRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
protocol_version = 'HTTP/1.1'
def __init__(self, *args, **kwargs):
self.request_log = ''
BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
def do_GET(self):
self.log_extra = ''
path = urlparse.urlparse(self.path)[2].lower()
if path == '/report':
self.do_report()
else:
self.send_response(404)
self.send_header('Content-length', 0)
self.end_headers()
def get_process_nodes(self, http_rate_per_pid):
# Get current timestamp in RFC3339
date = datetime.datetime.utcnow()
date = date.isoformat('T') + 'Z'
process_nodes = dict()
for pid, http_rate in http_rate_per_pid.iteritems():
# print "\t%-10s %s" % (pid , http_rate)
node_key = "%s;%d" % (self.server.hostname, pid)
process_nodes[node_key] = {
'metrics': {
'http_requests_per_second': {
'samples': [{
'date': date,
'value': float(http_rate),
}]
}
}
}
return process_nodes
def do_report(self):
kernel_inspector = self.server.kernel_inspector
process_nodes = kernel_inspector.on_http_rate_per_pid(self.get_process_nodes)
report = {
'Process': {
'nodes': process_nodes,
'metric_templates': {
'http_requests_per_second': {
'id': 'http_requests_per_second',
'label': 'HTTP Req/Second',
'priority': 0.1,
}
}
},
'Plugins': [
{
'id': PLUGIN_ID,
'label': 'HTTP Requests',
'description': 'Adds http request metrics to processes',
'interfaces': ['reporter'],
'api_version': '1',
}
]
}
body = json.dumps(report)
self.request_log = "resp_size=%d, resp_entry_count=%d" % (len(body), len(process_nodes))
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.send_header('Content-length', len(body))
self.end_headers()
self.wfile.write(body)
def log_request(self, code='-', size='-'):
request_log = ''
if self.request_log:
request_log = ' (%s)' % self.request_log
self.log_message('"%s" %s %s%s',
self.requestline, str(code), str(size), request_log)
class PluginServer(SocketServer.ThreadingUnixStreamServer):
daemon_threads = True
def __init__(self, socket_file, kernel_inspector):
mkdir_p(os.path.dirname(socket_file))
self.socket_file = socket_file
self.delete_socket_file()
self.kernel_inspector = kernel_inspector
self.hostname = socket.gethostname()
SocketServer.UnixStreamServer.__init__(self, socket_file, PluginRequestHandler)
def finish_request(self, request, _):
# Make the logger happy by providing a phony client_address
self.RequestHandlerClass(request, '-', self)
def delete_socket_file(self):
if os.path.exists(self.socket_file):
os.remove(self.socket_file)
def mkdir_p(path):
try:
os.makedirs(path)
except OSError as exc:
if exc.errno == errno.EEXIST and os.path.isdir(path):
pass
else:
raise
if __name__ == '__main__':
kernel_inspector = KernelInspector()
kernel_inspector.setDaemon(True)
kernel_inspector.start()
plugin_server = PluginServer(PLUGIN_UNIX_SOCK, kernel_inspector)
def sig_handler(b, a):
plugin_server.delete_socket_file()
exit(0)
signal.signal(signal.SIGTERM, sig_handler)
signal.signal(signal.SIGINT, sig_handler)
try:
plugin_server.serve_forever()
except:
plugin_server.delete_socket_file()
raise

View File

@@ -1 +0,0 @@
iowait

View File

@@ -1,6 +0,0 @@
FROM alpine:3.3
MAINTAINER Weaveworks Inc <help@weave.works>
LABEL works.weave.role=system
COPY ./iowait /usr/bin/iowait
RUN mkdir /lib64 && ln -s /lib/libc.musl-x86_64.so.1 /lib64/ld-linux-x86-64.so.2
ENTRYPOINT ["/usr/bin/iowait"]

View File

@@ -1,25 +0,0 @@
.PHONY: run clean
SUDO=$(shell docker info >/dev/null 2>&1 || echo "sudo -E")
EXE=iowait
IMAGE=weavescope-iowait-plugin
UPTODATE=.$(EXE).uptodate
run: $(UPTODATE)
# --net=host gives us the remote hostname, in case we're being launched against a non-local docker host.
# We could also pass in the `-hostname=foo` flag, but that doesn't work against a remote docker host.
$(SUDO) docker run --rm -it \
--net=host \
-v /var/run/scope/plugins:/var/run/scope/plugins \
--name $(IMAGE) $(IMAGE)
$(UPTODATE): $(EXE) Dockerfile
$(SUDO) docker build -t $(IMAGE) .
touch $@
$(EXE): main.go
$(SUDO) docker run --rm -v "$$PWD":/usr/src/$(EXE) -w /usr/src/$(EXE) golang:1.6 go build -v
clean:
- rm -rf $(UPTODATE) $(EXE)
- $(SUDO) docker rmi $(IMAGE)

View File

@@ -1,384 +0,0 @@
package main
import (
"encoding/json"
"flag"
"fmt"
"log"
"net"
"net/http"
"os"
"os/exec"
"os/signal"
"strconv"
"strings"
"sync"
"time"
)
func main() {
hostname, _ := os.Hostname()
var (
addr = flag.String("addr", "/var/run/scope/plugins/iowait.sock", "unix socket to listen for connections on")
hostID = flag.String("hostname", hostname, "hostname of the host running this plugin")
)
flag.Parse()
log.Printf("Starting on %s...\n", *hostID)
// Check we can get the iowait for the system
_, err := iowait()
if err != nil {
log.Fatal(err)
}
os.Remove(*addr)
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
go func() {
<-interrupt
os.Remove(*addr)
os.Exit(0)
}()
listener, err := net.Listen("unix", *addr)
if err != nil {
log.Fatal(err)
}
defer func() {
listener.Close()
os.Remove(*addr)
}()
log.Printf("Listening on: unix://%s", *addr)
plugin := &Plugin{HostID: *hostID}
http.HandleFunc("/report", plugin.Report)
http.HandleFunc("/control", plugin.Control)
if err := http.Serve(listener, nil); err != nil {
log.Printf("error: %v", err)
}
}
// Plugin groups the methods a plugin needs
type Plugin struct {
HostID string
lock sync.Mutex
iowaitMode bool
}
type request struct {
NodeID string
Control string
}
type response struct {
ShortcutReport *report `json:"shortcutReport,omitempty"`
}
type report struct {
Host topology
Plugins []pluginSpec
}
type topology struct {
Nodes map[string]node `json:"nodes"`
MetricTemplates map[string]metricTemplate `json:"metric_templates"`
Controls map[string]control `json:"controls"`
}
type node struct {
Metrics map[string]metric `json:"metrics"`
LatestControls map[string]controlEntry `json:"latestControls,omitempty"`
}
type metric struct {
Samples []sample `json:"samples,omitempty"`
Min float64 `json:"min"`
Max float64 `json:"max"`
}
type sample struct {
Date time.Time `json:"date"`
Value float64 `json:"value"`
}
type controlEntry struct {
Timestamp time.Time `json:"timestamp"`
Value controlData `json:"value"`
}
type controlData struct {
Dead bool `json:"dead"`
}
type metricTemplate struct {
ID string `json:"id"`
Label string `json:"label,omitempty"`
Format string `json:"format,omitempty"`
Priority float64 `json:"priority,omitempty"`
}
type control struct {
ID string `json:"id"`
Human string `json:"human"`
Icon string `json:"icon"`
Rank int `json:"rank"`
}
type pluginSpec struct {
ID string `json:"id"`
Label string `json:"label"`
Description string `json:"description,omitempty"`
Interfaces []string `json:"interfaces"`
APIVersion string `json:"api_version,omitempty"`
}
func (p *Plugin) makeReport() (*report, error) {
metrics, err := p.metrics()
if err != nil {
return nil, err
}
rpt := &report{
Host: topology{
Nodes: map[string]node{
p.getTopologyHost(): {
Metrics: metrics,
LatestControls: p.latestControls(),
},
},
MetricTemplates: p.metricTemplates(),
Controls: p.controls(),
},
Plugins: []pluginSpec{
{
ID: "iowait",
Label: "iowait",
Description: "Adds a graph of CPU IO Wait to hosts",
Interfaces: []string{"reporter", "controller"},
APIVersion: "1",
},
},
}
return rpt, nil
}
func (p *Plugin) metrics() (map[string]metric, error) {
value, err := p.metricValue()
if err != nil {
return nil, err
}
id, _ := p.metricIDAndName()
metrics := map[string]metric{
id: {
Samples: []sample{
{
Date: time.Now(),
Value: value,
},
},
Min: 0,
Max: 100,
},
}
return metrics, nil
}
func (p *Plugin) latestControls() map[string]controlEntry {
ts := time.Now()
ctrls := map[string]controlEntry{}
for _, details := range p.allControlDetails() {
ctrls[details.id] = controlEntry{
Timestamp: ts,
Value: controlData{
Dead: details.dead,
},
}
}
return ctrls
}
func (p *Plugin) metricTemplates() map[string]metricTemplate {
id, name := p.metricIDAndName()
return map[string]metricTemplate{
id: {
ID: id,
Label: name,
Format: "percent",
Priority: 0.1,
},
}
}
func (p *Plugin) controls() map[string]control {
ctrls := map[string]control{}
for _, details := range p.allControlDetails() {
ctrls[details.id] = control{
ID: details.id,
Human: details.human,
Icon: details.icon,
Rank: 1,
}
}
return ctrls
}
// Report is called by scope when a new report is needed. It is part of the
// "reporter" interface, which all plugins must implement.
func (p *Plugin) Report(w http.ResponseWriter, r *http.Request) {
p.lock.Lock()
defer p.lock.Unlock()
log.Println(r.URL.String())
rpt, err := p.makeReport()
if err != nil {
log.Printf("error: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
raw, err := json.Marshal(*rpt)
if err != nil {
log.Printf("error: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
w.Write(raw)
}
// Control is called by scope when a control is activated. It is part
// of the "controller" interface.
func (p *Plugin) Control(w http.ResponseWriter, r *http.Request) {
p.lock.Lock()
defer p.lock.Unlock()
log.Println(r.URL.String())
xreq := request{}
err := json.NewDecoder(r.Body).Decode(&xreq)
if err != nil {
log.Printf("Bad request: %v", err)
w.WriteHeader(http.StatusBadRequest)
return
}
thisNodeID := p.getTopologyHost()
if xreq.NodeID != thisNodeID {
log.Printf("Bad nodeID, expected %q, got %q", thisNodeID, xreq.NodeID)
w.WriteHeader(http.StatusBadRequest)
return
}
expectedControlID, _, _ := p.controlDetails()
if expectedControlID != xreq.Control {
log.Printf("Bad control, expected %q, got %q", expectedControlID, xreq.Control)
w.WriteHeader(http.StatusBadRequest)
return
}
p.iowaitMode = !p.iowaitMode
rpt, err := p.makeReport()
if err != nil {
log.Printf("error: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
res := response{ShortcutReport: rpt}
raw, err := json.Marshal(res)
if err != nil {
log.Printf("error: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
w.Write(raw)
}
func (p *Plugin) getTopologyHost() string {
return fmt.Sprintf("%s;<host>", p.HostID)
}
func (p *Plugin) metricIDAndName() (string, string) {
if p.iowaitMode {
return "iowait", "IO Wait"
}
return "idle", "Idle"
}
func (p *Plugin) metricValue() (float64, error) {
if p.iowaitMode {
return iowait()
}
return idle()
}
type controlDetails struct {
id string
human string
icon string
dead bool
}
func (p *Plugin) allControlDetails() []controlDetails {
return []controlDetails{
{
id: "switchToIdle",
human: "Switch to idle",
icon: "fa-beer",
dead: !p.iowaitMode,
},
{
id: "switchToIOWait",
human: "Switch to IO wait",
icon: "fa-hourglass",
dead: p.iowaitMode,
},
}
}
func (p *Plugin) controlDetails() (string, string, string) {
for _, details := range p.allControlDetails() {
if !details.dead {
return details.id, details.human, details.icon
}
}
return "", "", ""
}
func iowait() (float64, error) {
return iostatValue(3)
}
func idle() (float64, error) {
return iostatValue(5)
}
func iostatValue(idx int) (float64, error) {
values, err := iostat()
if err != nil {
return 0, err
}
if idx >= len(values) {
return 0, fmt.Errorf("invalid iostat field index %d", idx)
}
return strconv.ParseFloat(values[idx], 64)
}
// Get the latest iostat values
func iostat() ([]string, error) {
out, err := exec.Command("iostat", "-c").Output()
if err != nil {
return nil, fmt.Errorf("iowait: %v", err)
}
// Linux 4.2.0-25-generic (a109563eab38) 04/01/16 _x86_64_(4 CPU)
//
// avg-cpu: %user %nice %system %iowait %steal %idle
// 2.37 0.00 1.58 0.01 0.00 96.04
lines := strings.Split(string(out), "\n")
if len(lines) < 4 {
return nil, fmt.Errorf("iowait: unexpected output: %q", out)
}
values := strings.Fields(lines[3])
if len(values) != 6 {
return nil, fmt.Errorf("iowait: unexpected output: %q", out)
}
return values, nil
}

View File

@@ -1 +0,0 @@
/traffic-control

View File

@@ -1,7 +0,0 @@
FROM alpine:3.3
MAINTAINER Weaveworks Inc <help@weave.works>
LABEL works.weave.role=system
COPY ./traffic-control /usr/bin/traffic-control
RUN mkdir /lib64 && ln -s /lib/libc.musl-x86_64.so.1 /lib64/ld-linux-x86-64.so.2
RUN apk add --update iproute2 && rm -rf /var/cache/apk/*
ENTRYPOINT ["/usr/bin/traffic-control"]

View File

@@ -1,33 +0,0 @@
.PHONY: run clean
SUDO=$(shell docker info >/dev/null 2>&1 || echo "sudo -E")
EXE=traffic-control
IMAGE=weaveworks/scope-$(EXE)-plugin
NAME=weaveworks-scope-${EXE}-plugin
UPTODATE=.$(EXE).uptodate
run: $(UPTODATE)
# --net=host gives us the remote hostname, in case we're being launched against a non-local docker host.
# We could also pass in the `-hostname=foo` flag, but that doesn't work against a remote docker host.
$(SUDO) docker run --rm -it \
--net=host \
--pid=host \
--privileged \
-v /var/run:/var/run \
--name $(NAME) $(IMAGE)
$(UPTODATE): $(EXE) Dockerfile
$(SUDO) docker build -t $(IMAGE) .
touch $@
$(EXE): $(shell find . -name *.go)
$(SUDO) docker run --rm \
-v "$$PWD":/go/src/hosting/org/$(EXE) \
-v $(shell pwd)/../../../vendor:/go/src/hosting/org/$(EXE)/vendor \
-w /go/src/hosting/org/$(EXE) \
golang:1.6 go build -v
clean:
- rm -rf $(UPTODATE) $(EXE)
- $(SUDO) docker rmi $(IMAGE)

View File

@@ -1,17 +0,0 @@
# Traffic Control Plugin
The Traffic Control plugin allows to modify the performance parameters of container's network interfaces. The following images show a simple example of how **status** and **controls** are displayed in scope UI.
<img src="imgs/container_view.png" width="200" alt="Scope Probe plugin screenshot" align="center">
## Visualization
The parameters are shown in a table named **Traffic Control**. The plugin shows the values of latency and packet loss that are enforced on the network interface. The "-" mean that no value is set for that parameter, latency is displayed in *milliseconds* (ms) and packet loss in *percentage*.
## Controls
The Traffic Controls plugin provides a simple interface to change the value of latency (hourglass buttons) and packet loss (scissor button) or remove value that was set (circled cross button). Such buttons are displayed on the top of the container detailed view, just above the STATUS section (See picture below, control are shown inside the red rectangle).
<img src="imgs/controls.png" width="400" alt="Scope Probe plugin screenshot" align="center">
The *hourglass* buttons control the latency value, from left to right they set: 2000ms, 1000ms, and 500ms. The *scissor* button controls the packet loss value, it set a 10% packet loss. The *circled cross* button clear any previous settings.

View File

@@ -1,144 +0,0 @@
package main
import (
"fmt"
"time"
log "github.com/Sirupsen/logrus"
docker "github.com/fsouza/go-dockerclient"
)
// DockerClient internal data structure
type DockerClient struct {
store *Store
client *docker.Client
}
// NewDockerClient instantiates a new DockerClient
func NewDockerClient(store *Store) (*DockerClient, error) {
dc, err := docker.NewClient("unix:///var/run/docker.sock")
if err != nil {
return nil, fmt.Errorf("failed to connect to docker daemon: %v", err)
}
return &DockerClient{
store: store,
client: dc,
}, nil
}
// Start docker client
func (c *DockerClient) Start() {
for {
func() {
events := make(chan *docker.APIEvents)
if err := c.client.AddEventListener(events); err != nil {
log.Error(err)
return
}
defer func() {
if err := c.client.RemoveEventListener(events); err != nil {
log.Error(err)
}
}()
if err := c.getContainers(); err != nil {
log.Error(err)
return
}
for {
event, ok := <-events
if !ok {
log.Error("event listener unexpectedly disconnected")
return
}
c.handleEvent(event)
}
}()
time.Sleep(time.Second)
}
}
func (c *DockerClient) getContainers() error {
apiContainers, err := c.client.ListContainers(docker.ListContainersOptions{All: true})
if err != nil {
return err
}
for _, apiContainer := range apiContainers {
containerState, err := c.getContainerState(apiContainer.ID)
if err != nil {
log.Error(err)
continue
}
state := Destroyed
switch {
case containerState.Dead || containerState.Paused || containerState.Restarting || containerState.OOMKilled:
state = Stopped
case containerState.Running:
state = Running
}
c.updateContainer(apiContainer.ID, state, containerState.Pid)
}
return nil
}
func (c *DockerClient) handleEvent(event *docker.APIEvents) {
var state State
switch event.Status {
case "create":
state = Created
case "destroy":
state = Destroyed
case "start", "unpause":
state = Running
case "die", "pause":
state = Stopped
default:
return
}
pid, err := c.getContainerPID(event.ID)
if err != nil {
log.Error(err)
return
}
c.updateContainer(event.ID, state, pid)
}
func (c *DockerClient) getContainerPID(containerID string) (int, error) {
containerState, err := c.getContainerState(containerID)
if containerState == nil {
return 0, err
}
return containerState.Pid, nil
}
func (c *DockerClient) getContainerState(containerID string) (*docker.State, error) {
dockerContainer, err := c.getContainer(containerID)
if dockerContainer == nil {
return nil, err
}
return &dockerContainer.State, nil
}
func (c *DockerClient) getContainer(containerID string) (*docker.Container, error) {
dockerContainer, err := c.client.InspectContainer(containerID)
if err != nil {
if _, ok := err.(*docker.NoSuchContainer); !ok {
return nil, err
}
return nil, nil
}
return dockerContainer, nil
}
func (c *DockerClient) updateContainer(containerID string, state State, pid int) {
if state == Destroyed {
c.store.DeleteContainer(containerID)
return
}
cont := Container{
State: state,
PID: pid,
}
c.store.SetContainer(containerID, cont)
}

Binary file not shown.

Before

Width:  |  Height:  |  Size: 145 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 49 KiB

View File

@@ -1,227 +0,0 @@
package main
import (
"encoding/json"
"fmt"
"net"
"net/http"
"os"
"os/signal"
"path/filepath"
log "github.com/Sirupsen/logrus"
)
// TODO:
//
// do not try to install the qdics on the network interface every
// time, skip this step if it is already installed (currently we do
// "replace" instead of "add", but this check may be a way of avoiding
// more one-time installation steps in future).
//
// somehow inform the user about the current traffic control state
// (either add some metadata about latency or maybe add background
// color to buttons to denote whether the button is active); this may
// involve sending shortcut reports as a part of a response to the
// control request
//
// detect if ip and tc binaries are in $PATH
//
// detect if required sch_netem kernel module is loaded; note that in
// some (rare) cases this might be compiled in the kernel instead of
// being a separate module; probably check if tc works, if it does not
// return something like "not implemented".
//
// add traffic control on ingress traffic too (ifb kernel module will
// be required)
//
// currently we can control latency, add controls for packet loss and
// bandwidth
//
// port to eBPF?
type containerClient interface {
Start()
}
// Plugin is the internal data structure
type Plugin struct {
reporter *Reporter
clients []containerClient
}
// TrafficControlStatus keeps track of parameters status
type TrafficControlStatus struct {
latency string
packetLoss string
}
// String is useful to easily create a string of the traffic control plugin internal status.
// Useful for debugging
func (tcs *TrafficControlStatus) String() string {
return fmt.Sprintf("%s %s", tcs.latency, tcs.packetLoss)
}
// SetLatency sets the latency value
// the convention is that empty latency is represented by '-'
func (tcs *TrafficControlStatus) SetLatency(latency string) {
if latency == "" {
tcs.latency = "-"
}
tcs.latency = latency
}
// SetPacketLoss sets the packet loss value
// the convention is that empty packet loss is represented by '-'
func (tcs *TrafficControlStatus) SetPacketLoss(packetLoss string) {
if packetLoss == "" {
tcs.packetLoss = "-"
}
tcs.packetLoss = packetLoss
}
// TrafficControlStatusInit initializes with the convention that empty values are '-'
func TrafficControlStatusInit() *TrafficControlStatus {
return &TrafficControlStatus{
latency: "-",
packetLoss: "-",
}
}
// TrafficControlStatusCache implements status caching
var trafficControlStatusCache map[string]*TrafficControlStatus
func main() {
const socket = "/var/run/scope/plugins/traffic-control.sock"
// Handle the exit signal
setupSignals(socket)
listener, err := setupSocket(socket)
if err != nil {
log.Fatalf("Failed to setup socket: %v", err)
}
plugin, err := NewPlugin()
if err != nil {
log.Fatalf("Failed to create a plugin: %v", err)
}
// Cache
trafficControlStatusCache = make(map[string]*TrafficControlStatus)
trafficControlServeMux := http.NewServeMux()
// Report request handler
reportHandler := http.HandlerFunc(plugin.report)
trafficControlServeMux.Handle("/report", reportHandler)
// Control request handler
controlHandler := http.HandlerFunc(plugin.control)
trafficControlServeMux.Handle("/control", controlHandler)
log.Println("Listening...")
if err = http.Serve(listener, trafficControlServeMux); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
func setupSignals(socket string) {
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
go func() {
<-interrupt
os.Remove(socket)
os.Exit(0)
}()
}
func setupSocket(socket string) (net.Listener, error) {
os.Remove(socket)
if err := os.MkdirAll(filepath.Dir(socket), 0755); err != nil {
return nil, fmt.Errorf("failed to create directory %q: %v", filepath.Dir(socket), err)
}
listener, err := net.Listen("unix", socket)
if err != nil {
return nil, fmt.Errorf("failed to listen on %q: %v", socket, err)
}
log.Printf("Listening on: unix://%s", socket)
return listener, nil
}
// NewPlugin instantiates a new plugin
func NewPlugin() (*Plugin, error) {
store := NewStore()
dockerClient, err := NewDockerClient(store)
if err != nil {
return nil, fmt.Errorf("failed to create a docker client: %v", err)
}
reporter := NewReporter(store)
plugin := &Plugin{
reporter: reporter,
clients: []containerClient{
dockerClient,
},
}
for _, client := range plugin.clients {
go client.Start()
}
return plugin, nil
}
func (p *Plugin) report(w http.ResponseWriter, r *http.Request) {
raw, err := p.reporter.RawReport()
if err != nil {
msg := fmt.Sprintf("error: failed to get raw report: %v", err)
log.Print(msg)
http.Error(w, msg, http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
w.Write(raw)
}
type request struct {
NodeID string
Control string
}
type response struct {
Error string `json:"error,omitempty"`
}
func (p *Plugin) control(w http.ResponseWriter, r *http.Request) {
xreq := request{}
if err := json.NewDecoder(r.Body).Decode(&xreq); err != nil {
log.Printf("Bad request: %v", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
handler, err := p.reporter.GetHandler(xreq.NodeID, xreq.Control)
if err != nil {
sendResponse(w, fmt.Errorf("failed to get handler: %v", err))
return
}
if err := handler(); err != nil {
sendResponse(w, fmt.Errorf("handler failed: %v", err))
return
}
sendResponse(w, nil)
}
func sendResponse(w http.ResponseWriter, err error) {
res := response{}
if err != nil {
res.Error = err.Error()
}
raw, err := json.Marshal(res)
if err != nil {
log.Printf("Internal server error: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
w.Write(raw)
}

View File

@@ -1,326 +0,0 @@
package main
import (
"encoding/json"
"fmt"
"strings"
"time"
)
const (
trafficControlTablePrefix = "traffic-control-table-"
)
type report struct {
Container topology
Plugins []pluginSpec
}
type topology struct {
Nodes map[string]node `json:"nodes"`
Controls map[string]control `json:"controls"`
MetadataTemplates map[string]metadataTemplate `json:"metadata_templates,omitempty"`
TableTemplates map[string]tableTemplate `json:"table_templates,omitempty"`
}
type tableTemplate struct {
ID string `json:"id"`
Label string `json:"label"`
Prefix string `json:"prefix"`
}
type metadataTemplate struct {
ID string `json:"id"`
Label string `json:"label,omitempty"` // Human-readable descriptor for this row
Truncate int `json:"truncate,omitempty"` // If > 0, truncate the value to this length.
Datatype string `json:"dataType,omitempty"`
Priority float64 `json:"priority,omitempty"`
From string `json:"from,omitempty"` // Defines how to get the value from a report node
}
type node struct {
LatestControls map[string]controlEntry `json:"latestControls,omitempty"`
Latest map[string]stringEntry `json:"latest,omitempty"`
}
type controlEntry struct {
Timestamp time.Time `json:"timestamp"`
Value controlData `json:"value"`
}
type controlData struct {
Dead bool `json:"dead"`
}
type control struct {
ID string `json:"id"`
Human string `json:"human"`
Icon string `json:"icon"`
Rank int `json:"rank"`
}
type stringEntry struct {
Timestamp time.Time `json:"timestamp"`
Value string `json:"value"`
}
type pluginSpec struct {
ID string `json:"id"`
Label string `json:"label"`
Description string `json:"description,omitempty"`
Interfaces []string `json:"interfaces"`
APIVersion string `json:"api_version,omitempty"`
}
// Reporter internal data structure
type Reporter struct {
store *Store
}
// NewReporter instantiates a new Reporter
func NewReporter(store *Store) *Reporter {
return &Reporter{
store: store,
}
}
// RawReport returns a report
func (r *Reporter) RawReport() ([]byte, error) {
rpt := &report{
Container: topology{
Nodes: r.getContainerNodes(),
Controls: getTrafficControls(),
MetadataTemplates: getMetadataTemplate(),
TableTemplates: getTableTemplate(),
},
Plugins: []pluginSpec{
{
ID: "traffic-control",
Label: "Traffic control",
Description: "Adds traffic controls to the running Docker containers",
Interfaces: []string{"reporter", "controller"},
APIVersion: "1",
},
},
}
raw, err := json.Marshal(rpt)
if err != nil {
return nil, fmt.Errorf("failed to marshal the report: %v", err)
}
return raw, nil
}
// GetHandler returns the function performing the action specified by controlID
func (r *Reporter) GetHandler(nodeID, controlID string) (func() error, error) {
containerID, err := nodeIDToContainerID(nodeID)
if err != nil {
return nil, fmt.Errorf("failed to get container ID from node ID %q: %v", nodeID, err)
}
container, found := r.store.Container(containerID)
if !found {
return nil, fmt.Errorf("container %s not found", containerID)
}
var handler func(pid int) error
for _, c := range getControls() {
if c.control.ID == controlID {
handler = c.handler
break
}
}
if handler == nil {
return nil, fmt.Errorf("unknown control ID %q for node ID %q", controlID, nodeID)
}
return func() error {
return handler(container.PID)
}, nil
}
// states:
// created, destroyed - don't create any node
// running, not running - create node with controls
func (r *Reporter) getContainerNodes() map[string]node {
nodes := map[string]node{}
timestamp := time.Now()
r.store.ForEach(func(containerID string, container Container) {
dead := false
switch container.State {
case Created, Destroyed:
// do nothing, to prevent adding a stale node
// to a report
case Stopped:
dead = true
fallthrough
case Running:
nodeID := containerIDToNodeID(containerID)
latency, _ := getLatency(container.PID)
packetLoss, _ := getPacketLoss(container.PID)
nodes[nodeID] = node{
LatestControls: getTrafficNodeControls(timestamp, dead),
Latest: map[string]stringEntry{
fmt.Sprintf("%s%s", trafficControlTablePrefix, "latency"): {
Timestamp: timestamp,
Value: latency,
},
fmt.Sprintf("%s%s", trafficControlTablePrefix, "pktloss"): {
Timestamp: timestamp,
Value: packetLoss,
},
},
}
}
})
return nodes
}
func getMetadataTemplate() map[string]metadataTemplate {
return map[string]metadataTemplate{
"traffic-control-latency": {
ID: "traffic-control-latency",
Label: "Latency",
Truncate: 0,
Datatype: "",
Priority: 13.5,
From: "latest",
},
"traffic-control-pktloss": {
ID: "traffic-control-pktloss",
Label: "Packet Loss",
Truncate: 0,
Datatype: "",
Priority: 13.6,
From: "latest",
},
}
}
func getTableTemplate() map[string]tableTemplate {
return map[string]tableTemplate{
"traffic-control-table": {
ID: "traffic-control-table",
Label: "Traffic Control",
Prefix: trafficControlTablePrefix,
},
}
}
func getTrafficNodeControls(timestamp time.Time, dead bool) map[string]controlEntry {
controls := map[string]controlEntry{}
entry := controlEntry{
Timestamp: timestamp,
Value: controlData{
Dead: dead,
},
}
for _, c := range getControls() {
controls[c.control.ID] = entry
}
return controls
}
func getTrafficControls() map[string]control {
controls := map[string]control{}
for _, c := range getControls() {
controls[c.control.ID] = c.control
}
return controls
}
type extControl struct {
control control
handler func(pid int) error
}
func getLatencyControls() []extControl {
return []extControl{
{
control: control{
ID: fmt.Sprintf("%s%s", trafficControlTablePrefix, "slow"),
Human: "Traffic speed: slow",
Icon: "fa-hourglass-1",
Rank: 20,
},
handler: func(pid int) error {
return ApplyLatency(pid, "2000ms")
},
},
{
control: control{
ID: fmt.Sprintf("%s%s", trafficControlTablePrefix, "medium"),
Human: "Traffic speed: medium",
Icon: "fa-hourglass-2",
Rank: 21,
},
handler: func(pid int) error {
return ApplyLatency(pid, "1000ms")
},
},
{
control: control{
ID: fmt.Sprintf("%s%s", trafficControlTablePrefix, "fast"),
Human: "Traffic speed: fast",
Icon: "fa-hourglass-3",
Rank: 22,
},
handler: func(pid int) error {
return ApplyLatency(pid, "500ms")
},
},
}
}
func getPacketLossControls() []extControl {
return []extControl{
{
control: control{
ID: fmt.Sprintf("%s%s", trafficControlTablePrefix, "pkt-drop-low"),
Human: "Packet drop: low",
Icon: "fa-cut",
Rank: 23,
},
handler: func(pid int) error {
return ApplyPacketLoss(pid, "10%")
},
},
}
}
func getGeneralControls() []extControl {
return []extControl{
{
control: control{
ID: fmt.Sprintf("%s%s", trafficControlTablePrefix, "clear"),
Human: "Clear traffic control settings",
Icon: "fa-times-circle",
Rank: 24,
},
handler: func(pid int) error {
return ClearTrafficControlSettings(pid)
},
},
}
}
func getControls() []extControl {
controls := getLatencyControls()
// TODO alepuccetti why append(controls, getPacketLossControls()) does not work?
for _, ctrl := range getPacketLossControls() {
controls = append(controls, ctrl)
}
for _, ctrl := range getGeneralControls() {
controls = append(controls, ctrl)
}
return controls
}
const nodeSuffix = ";<container>"
func containerIDToNodeID(containerID string) string {
return fmt.Sprintf("%s%s", containerID, nodeSuffix)
}
func nodeIDToContainerID(nodeID string) (string, error) {
if !strings.HasSuffix(nodeID, nodeSuffix) {
return "", fmt.Errorf("no suffix %q in node ID %q", nodeSuffix, nodeID)
}
return strings.TrimSuffix(nodeID, nodeSuffix), nil
}

View File

@@ -1,69 +0,0 @@
package main
import (
"sync"
)
// State is the container internal state
type State int
const (
// Created state
Created State = iota
// Running state
Running
// Stopped state
Stopped
// Destroyed state
Destroyed
)
// Container data structure
type Container struct {
State State
PID int
}
// Store data structure
type Store struct {
lock sync.Mutex
containers map[string]Container
}
// NewStore instantiates a new Store
func NewStore() *Store {
return &Store{
containers: map[string]Container{},
}
}
// Container returns a container form its ID
func (s *Store) Container(containerID string) (Container, bool) {
s.lock.Lock()
defer s.lock.Unlock()
container, found := s.containers[containerID]
return container, found
}
// SetContainer sets a container into the store
func (s *Store) SetContainer(containerID string, container Container) {
s.lock.Lock()
defer s.lock.Unlock()
s.containers[containerID] = container
}
// DeleteContainer deletes a container from the store
func (s *Store) DeleteContainer(containerID string) {
s.lock.Lock()
defer s.lock.Unlock()
delete(s.containers, containerID)
}
// ForEach execute a function on each container in the store
func (s *Store) ForEach(callback func(ID string, c Container)) {
s.lock.Lock()
defer s.lock.Unlock()
for containerID, container := range s.containers {
callback(containerID, container)
}
}

View File

@@ -1,206 +0,0 @@
package main
import (
"fmt"
"os"
"os/exec"
"strings"
log "github.com/Sirupsen/logrus"
"github.com/containernetworking/cni/pkg/ns"
)
// applyTrafficControlRules set the network policies
func applyTrafficControlRules(pid int, rules []string) (netNSID string, err error) {
cmds := [][]string{
strings.Fields("tc qdisc replace dev eth0 root handle 1: netem"),
}
cmd := strings.Fields("tc qdisc change dev eth0 root handle 1: netem")
cmd = append(cmd, rules...)
cmds = append(cmds, cmd)
netNS := fmt.Sprintf("/proc/%d/ns/net", pid)
err = ns.WithNetNSPath(netNS, func(hostNS ns.NetNS) error {
for _, cmd := range cmds {
if output, err := exec.Command(cmd[0], cmd[1:]...).CombinedOutput(); err != nil {
log.Error(string(output))
return fmt.Errorf("failed to execute command: %v", err)
}
}
return nil
})
if err != nil {
return "", fmt.Errorf("failed to perform traffic control: %v", err)
}
netNSID, err = getNSID(netNS)
if err != nil {
return "", err
}
return netNSID, nil
}
// ApplyLatency sets the latency
func ApplyLatency(pid int, latency string) error {
if latency == "" {
return nil
}
rules := strings.Fields(fmt.Sprintf("delay %s", latency))
// Get cached packet loss
packetLoss, err := getPacketLoss(pid)
if err != nil {
return err
}
if packetLoss != "-" {
rules = append(rules, strings.Fields(fmt.Sprintf("loss %s", packetLoss))...)
}
netNSID, err := applyTrafficControlRules(pid, rules)
// Update cached values
if trafficControlStatusCache[netNSID] == nil {
trafficControlStatusCache[netNSID] = TrafficControlStatusInit()
}
trafficControlStatusCache[netNSID].SetLatency(latency)
trafficControlStatusCache[netNSID].SetPacketLoss(packetLoss)
return nil
}
// ApplyPacketLoss sets the packet loss
func ApplyPacketLoss(pid int, packetLoss string) error {
if packetLoss == "" {
return nil
}
rules := strings.Fields(fmt.Sprintf("loss %s", packetLoss))
// Get cached latency
latency, err := getLatency(pid)
if err != nil {
return err
}
if latency != "-" {
rules = append(rules, strings.Fields(fmt.Sprintf("delay %s", latency))...)
}
netNSID, err := applyTrafficControlRules(pid, rules)
// Update cached values
if trafficControlStatusCache[netNSID] == nil {
trafficControlStatusCache[netNSID] = TrafficControlStatusInit()
}
trafficControlStatusCache[netNSID].SetLatency(latency)
trafficControlStatusCache[netNSID].SetPacketLoss(packetLoss)
return nil
}
// ClearTrafficControlSettings clear all parameters of the qdisc with tc
func ClearTrafficControlSettings(pid int) error {
cmds := [][]string{
strings.Fields("tc qdisc replace dev eth0 root handle 1: netem"),
}
netNS := fmt.Sprintf("/proc/%d/ns/net", pid)
err := ns.WithNetNSPath(netNS, func(hostNS ns.NetNS) error {
for _, cmd := range cmds {
if output, err := exec.Command(cmd[0], cmd[1:]...).CombinedOutput(); err != nil {
log.Error(string(output))
return fmt.Errorf("failed to execute command: %v", err)
}
}
return nil
})
if err != nil {
return fmt.Errorf("failed to perform traffic control: %v", err)
}
// clear cached parameters
netNSID, err := getNSID(netNS)
if err != nil {
log.Error(netNSID)
return fmt.Errorf("failed to get network namespace ID: %v", err)
}
delete(trafficControlStatusCache, netNSID)
return nil
}
func getLatency(pid int) (string, error) {
var status *TrafficControlStatus
var err error
if status, err = getStatus(pid); err != nil {
return "-", err
} else if status == nil {
return "-", fmt.Errorf("status for PID %d does not exist", pid)
}
return status.latency, nil
}
func getPacketLoss(pid int) (string, error) {
var status *TrafficControlStatus
var err error
if status, err = getStatus(pid); err != nil {
return "-", err
} else if status == nil {
return "-", fmt.Errorf("status for PID %d does not exist", pid)
}
return status.packetLoss, nil
}
func getStatus(pid int) (*TrafficControlStatus, error) {
netNS := fmt.Sprintf("/proc/%d/ns/net", pid)
netNSID, err := getNSID(netNS)
if err != nil {
log.Error(netNSID)
return nil, fmt.Errorf("failed to get network namespace ID: %v", err)
}
if status, ok := trafficControlStatusCache[netNSID]; ok {
return status, nil
}
cmd := strings.Fields("tc qdisc show dev eth0")
var output string
err = ns.WithNetNSPath(netNS, func(hostNS ns.NetNS) error {
cmdOut, err := exec.Command(cmd[0], cmd[1:]...).CombinedOutput()
if err != nil {
log.Error(string(cmdOut))
output = ""
return fmt.Errorf("failed to execute command: tc qdisc show dev eth0: %v", err)
}
output = string(cmdOut)
return nil
})
// cache parameters
trafficControlStatusCache[netNSID] = &TrafficControlStatus{
latency: parseLatency(output),
packetLoss: parsePacketLoss(output),
}
status, _ := trafficControlStatusCache[netNSID]
return status, err
}
func parseLatency(statusString string) string {
return parseAttribute(statusString, "delay")
}
func parsePacketLoss(statusString string) string {
return parseAttribute(statusString, "loss")
}
func parseAttribute(statusString string, attribute string) string {
statusStringSplited := strings.Fields(statusString)
for i, s := range statusStringSplited {
if s == attribute {
if i < len(statusStringSplited)-1 {
return strings.Trim(statusStringSplited[i+1], "\n")
}
return "-"
}
}
return "-"
}
func getNSID(nsPath string) (string, error) {
nsID, err := os.Readlink(nsPath)
if err != nil {
return "", fmt.Errorf("failed read \"%s\": %v", nsPath, err)
}
return nsID[5 : len(nsID)-1], nil
}