From c5cc9814fe33b8a2d7d7b4e792f29a50b020648d Mon Sep 17 00:00:00 2001 From: Krzesimir Nowak Date: Fri, 22 Jul 2016 16:02:37 +0200 Subject: [PATCH] Add an example traffic control plugin As it is an initial implementation, it only controls latency of the outgoing (egress) traffic. There is also a TODO to turn this plugin into something more serious. Also, at some point we may consider moving this plugin outside of "example" directory. --- examples/plugins/traffic-control/.gitignore | 1 + examples/plugins/traffic-control/Dockerfile | 7 + examples/plugins/traffic-control/Makefile | 32 +++ examples/plugins/traffic-control/docker.go | 143 ++++++++++++++ examples/plugins/traffic-control/main.go | 178 +++++++++++++++++ examples/plugins/traffic-control/report.go | 207 ++++++++++++++++++++ examples/plugins/traffic-control/store.go | 57 ++++++ examples/plugins/traffic-control/tc.go | 51 +++++ 8 files changed, 676 insertions(+) create mode 100644 examples/plugins/traffic-control/.gitignore create mode 100644 examples/plugins/traffic-control/Dockerfile create mode 100644 examples/plugins/traffic-control/Makefile create mode 100644 examples/plugins/traffic-control/docker.go create mode 100644 examples/plugins/traffic-control/main.go create mode 100644 examples/plugins/traffic-control/report.go create mode 100644 examples/plugins/traffic-control/store.go create mode 100644 examples/plugins/traffic-control/tc.go diff --git a/examples/plugins/traffic-control/.gitignore b/examples/plugins/traffic-control/.gitignore new file mode 100644 index 000000000..df5531de4 --- /dev/null +++ b/examples/plugins/traffic-control/.gitignore @@ -0,0 +1 @@ +/traffic-control diff --git a/examples/plugins/traffic-control/Dockerfile b/examples/plugins/traffic-control/Dockerfile new file mode 100644 index 000000000..3ccf9b633 --- /dev/null +++ b/examples/plugins/traffic-control/Dockerfile @@ -0,0 +1,7 @@ +FROM alpine:3.3 +MAINTAINER Weaveworks Inc +LABEL works.weave.role=system +COPY ./traffic-control /usr/bin/traffic-control +RUN mkdir /lib64 && ln -s /lib/libc.musl-x86_64.so.1 /lib64/ld-linux-x86-64.so.2 +RUN apk add --update iproute2 && rm -rf /var/cache/apk/* +ENTRYPOINT ["/usr/bin/traffic-control"] diff --git a/examples/plugins/traffic-control/Makefile b/examples/plugins/traffic-control/Makefile new file mode 100644 index 000000000..702b17a9f --- /dev/null +++ b/examples/plugins/traffic-control/Makefile @@ -0,0 +1,32 @@ +.PHONY: run clean + +SUDO=$(shell docker info >/dev/null 2>&1 || echo "sudo -E") +EXE=traffic-control +IMAGE=weavescope-$(EXE)-plugin +UPTODATE=.$(EXE).uptodate + +run: $(UPTODATE) + # --net=host gives us the remote hostname, in case we're being launched against a non-local docker host. + # We could also pass in the `-hostname=foo` flag, but that doesn't work against a remote docker host. + $(SUDO) docker run --rm -it \ + --net=host \ + --pid=host \ + --privileged \ + -v /var/run:/var/run \ + --name $(IMAGE) $(IMAGE) + +$(UPTODATE): $(EXE) Dockerfile + $(SUDO) docker build -t $(IMAGE) . + touch $@ + +$(EXE): main.go + $(SUDO) docker run --rm \ + -v "$$PWD":/go/src/hosting/org/$(EXE) \ + -v $(shell pwd)/../../../vendor:/go/src/hosting/org/$(EXE)/vendor \ + -w /go/src/hosting/org/$(EXE) \ + golang:1.6 go build -v + + +clean: + - rm -rf $(UPTODATE) $(EXE) + - $(SUDO) docker rmi $(IMAGE) diff --git a/examples/plugins/traffic-control/docker.go b/examples/plugins/traffic-control/docker.go new file mode 100644 index 000000000..07852c27b --- /dev/null +++ b/examples/plugins/traffic-control/docker.go @@ -0,0 +1,143 @@ +package main + +import ( + "fmt" + "time" + + log "github.com/Sirupsen/logrus" + docker "github.com/fsouza/go-dockerclient" +) + +type DockerClient struct { + store *Store + client *docker.Client +} + +func NewDockerClient(store *Store) (*DockerClient, error) { + dc, err := docker.NewClient("unix:///var/run/docker.sock") + if err != nil { + return nil, fmt.Errorf("failed to connect to docker daemon: %v", err) + } + return &DockerClient{ + store: store, + client: dc, + }, nil +} + +func (c *DockerClient) Start() { + for { + c.loopIteration() + time.Sleep(time.Second) + } +} + +func (c *DockerClient) loopIteration() { + events := make(chan *docker.APIEvents) + if err := c.client.AddEventListener(events); err != nil { + log.Error(err) + return + } + defer func() { + if err := c.client.RemoveEventListener(events); err != nil { + log.Error(err) + } + }() + if err := c.getContainers(); err != nil { + log.Error(err) + return + } + for { + event, ok := <-events + if !ok { + log.Error("event listener unexpectedly disconnected") + return + } + c.handleEvent(event) + } +} + +func (c *DockerClient) getContainers() error { + apiContainers, err := c.client.ListContainers(docker.ListContainersOptions{All: true}) + if err != nil { + return err + } + + for _, apiContainer := range apiContainers { + containerState, err := c.getContainerState(apiContainer.ID) + if err != nil { + log.Error(err) + continue + } + state := Destroyed + switch { + case containerState.Dead || containerState.Paused || containerState.Restarting || containerState.OOMKilled: + state = Stopped + case containerState.Running: + state = Running + } + c.updateContainer(apiContainer.ID, state, containerState.Pid) + } + + return nil +} + +func (c *DockerClient) handleEvent(event *docker.APIEvents) { + var state State + switch event.Status { + case "create": + state = Created + case "destroy": + state = Destroyed + case "start", "unpause": + state = Running + case "die", "pause": + state = Stopped + default: + return + } + pid, err := c.getContainerPID(event.ID) + if err != nil { + log.Error(err) + return + } + c.updateContainer(event.ID, state, pid) +} + +func (c *DockerClient) getContainerPID(containerID string) (int, error) { + containerState, err := c.getContainerState(containerID) + if containerState == nil { + return 0, err + } + return containerState.Pid, nil +} + +func (c *DockerClient) getContainerState(containerID string) (*docker.State, error) { + dockerContainer, err := c.getContainer(containerID) + if dockerContainer == nil { + return nil, err + } + return &dockerContainer.State, nil +} + +func (c *DockerClient) getContainer(containerID string) (*docker.Container, error) { + dockerContainer, err := c.client.InspectContainer(containerID) + if err != nil { + if _, ok := err.(*docker.NoSuchContainer); !ok { + return nil, err + } + return nil, nil + } + return dockerContainer, nil +} + +func (c *DockerClient) updateContainer(containerID string, state State, pid int) { + if state == Destroyed { + c.store.DeleteContainer(containerID) + return + } + cont := Container{ + State: state, + PID: pid, + } + c.store.SetContainer(containerID, cont) +} diff --git a/examples/plugins/traffic-control/main.go b/examples/plugins/traffic-control/main.go new file mode 100644 index 000000000..84c2a1471 --- /dev/null +++ b/examples/plugins/traffic-control/main.go @@ -0,0 +1,178 @@ +package main + +import ( + "encoding/json" + "fmt" + "net" + "net/http" + "os" + "os/signal" + "path/filepath" + + log "github.com/Sirupsen/logrus" +) + +// TODO: +// +// do not try to install the qdics on the network interface every +// time, skip this step if it is already installed (currently we do +// "replace" instead of "add", but this check may be a way of avoiding +// more one-time installation steps in future). +// +// somehow inform the user about the current traffic control state +// (either add some metadata about latency or maybe add background +// color to buttons to denote whether the button is active); this may +// involve sending shortcut reports as a part of a response to the +// control request +// +// detect if ip and tc binaries are in $PATH +// +// detect if required sch_netem kernel module is loaded; note that in +// some (rare) cases this might be compiled in the kernel instead of +// being a separate module; probably check if tc works, if it does not +// return something like "not implemented". +// +// add traffic control on ingress traffic too (ifb kernel module will +// be required) +// +// currently we can control latency, add controls for packet loss and +// bandwidth +// +// port to eBPF? + +type containerClient interface { + Start() +} + +type Plugin struct { + reporter *Reporter + + clients []containerClient +} + +func main() { + const socket = "/var/run/scope/plugins/traffic-control.sock" + + setupSignals(socket) + + listener, err := setupSocket(socket) + if err != nil { + log.Fatalf("Failed to setup socket: %v", err) + } + defer func() { + listener.Close() + os.Remove(socket) + }() + + plugin, err := NewPlugin() + if err != nil { + log.Fatalf("Failed to create a plugin: %v", err) + } + if err := plugin.Serve(listener); err != nil { + log.Fatalf("failed to serve: %v", err) + } +} + +func setupSignals(socket string) { + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, os.Interrupt) + go func() { + <-interrupt + os.Remove(socket) + os.Exit(0) + }() +} + +func setupSocket(socket string) (net.Listener, error) { + os.Remove(socket) + if err := os.MkdirAll(filepath.Dir(socket), 0755); err != nil { + return nil, fmt.Errorf("failed to create directory %q: %v", filepath.Dir(socket), err) + } + listener, err := net.Listen("unix", socket) + if err != nil { + return nil, fmt.Errorf("failed to listen on %q: %v", socket, err) + } + + log.Printf("Listening on: unix://%s", socket) + return listener, nil +} + +func NewPlugin() (*Plugin, error) { + store := NewStore() + dockerClient, err := NewDockerClient(store) + if err != nil { + return nil, fmt.Errorf("failed to create a docker client: %v", err) + } + reporter := NewReporter(store) + plugin := &Plugin{ + reporter: reporter, + clients: []containerClient{ + dockerClient, + }, + } + for _, client := range plugin.clients { + go client.Start() + } + return plugin, nil +} + +func (p *Plugin) Serve(listener net.Listener) error { + http.HandleFunc("/report", p.report) + http.HandleFunc("/control", p.control) + return http.Serve(listener, nil) +} + +func (p *Plugin) report(w http.ResponseWriter, r *http.Request) { + raw, err := p.reporter.RawReport() + if err != nil { + msg := fmt.Sprintf("error: failed to get raw report: %v", err) + log.Print(msg) + http.Error(w, msg, http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusOK) + w.Write(raw) +} + +type request struct { + NodeID string + Control string +} + +type response struct { + Error string `json:"error,omitempty"` +} + +func (p *Plugin) control(w http.ResponseWriter, r *http.Request) { + xreq := request{} + if err := json.NewDecoder(r.Body).Decode(&xreq); err != nil { + log.Printf("Bad request: %v", err) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + handler, err := p.reporter.GetHandler(xreq.NodeID, xreq.Control) + if err != nil { + sendResponse(w, fmt.Errorf("failed to get handler: %v", err)) + return + } + if err := handler(); err != nil { + sendResponse(w, fmt.Errorf("handler failed: %v", err)) + return + } + sendResponse(w, nil) +} + +func sendResponse(w http.ResponseWriter, err error) { + res := response{} + if err != nil { + res.Error = err.Error() + } + raw, err := json.Marshal(res) + if err != nil { + log.Printf("Internal server error: %v", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusOK) + w.Write(raw) +} diff --git a/examples/plugins/traffic-control/report.go b/examples/plugins/traffic-control/report.go new file mode 100644 index 000000000..ddfce5262 --- /dev/null +++ b/examples/plugins/traffic-control/report.go @@ -0,0 +1,207 @@ +package main + +import ( + "encoding/json" + "fmt" + "strings" + "time" +) + +type report struct { + Container topology + Plugins []pluginSpec +} + +type topology struct { + Nodes map[string]node `json:"nodes"` + Controls map[string]control `json:"controls"` +} + +type node struct { + LatestControls map[string]controlEntry `json:"latestControls,omitempty"` +} + +type controlEntry struct { + Timestamp time.Time `json:"timestamp"` + Value controlData `json:"value"` +} + +type controlData struct { + Dead bool `json:"dead"` +} + +type control struct { + ID string `json:"id"` + Human string `json:"human"` + Icon string `json:"icon"` + Rank int `json:"rank"` +} + +type pluginSpec struct { + ID string `json:"id"` + Label string `json:"label"` + Description string `json:"description,omitempty"` + Interfaces []string `json:"interfaces"` + APIVersion string `json:"api_version,omitempty"` +} + +type Reporter struct { + store *Store +} + +func NewReporter(store *Store) *Reporter { + return &Reporter{ + store: store, + } +} + +func (r *Reporter) RawReport() ([]byte, error) { + rpt := &report{ + Container: topology{ + Nodes: r.getContainerNodes(), + Controls: getTrafficControls(), + }, + Plugins: []pluginSpec{ + { + ID: "traffic-control", + Label: "Traffic control", + Description: "Adds traffic controls to the running Docker containers", + Interfaces: []string{"reporter", "controller"}, + APIVersion: "1", + }, + }, + } + raw, err := json.Marshal(rpt) + if err != nil { + return nil, fmt.Errorf("failed to marshal the report: %v", err) + } + return raw, nil +} + +func (r *Reporter) GetHandler(nodeID, controlID string) (func() error, error) { + containerID, err := nodeIDToContainerID(nodeID) + if err != nil { + return nil, fmt.Errorf("failed to get container ID from node ID %q: %v", nodeID) + } + container, found := r.store.Container(containerID) + if !found { + return nil, fmt.Errorf("container %s not found") + } + var handler func(pid int) error + for _, c := range getControls() { + if c.control.ID == controlID { + handler = c.handler + break + } + } + if handler == nil { + return nil, fmt.Errorf("unknown control ID %q for node ID %q", controlID, nodeID) + } + return func() error { + return handler(container.PID) + }, nil +} + +// states: +// created, destroyed - don't create any node +// running, not running - create node with controls + +func (r *Reporter) getContainerNodes() map[string]node { + nodes := map[string]node{} + timestamp := time.Now() + r.store.ForEach(func(containerID string, container Container) { + dead := false + switch container.State { + case Created, Destroyed: + // do nothing, to prevent adding a stale node + // to a report + case Stopped: + dead = true + fallthrough + case Running: + nodeID := containerIDToNodeID(containerID) + nodes[nodeID] = node{ + LatestControls: getTrafficNodeControls(timestamp, dead), + } + } + }) + return nodes +} + +func getTrafficNodeControls(timestamp time.Time, dead bool) map[string]controlEntry { + controls := map[string]controlEntry{} + entry := controlEntry{ + Timestamp: timestamp, + Value: controlData{ + Dead: dead, + }, + } + for _, c := range getControls() { + controls[c.control.ID] = entry + } + return controls +} + +func getTrafficControls() map[string]control { + controls := map[string]control{} + for _, c := range getControls() { + controls[c.control.ID] = c.control + } + return controls +} + +type extControl struct { + control control + handler func(pid int) error +} + +func getControls() []extControl { + return []extControl{ + { + control: control{ + ID: "slow", + Human: "Traffic speed: slow", + Icon: "fa-hourglass-1", + Rank: 20, + }, + handler: func(pid int) error { + return DoTrafficControl(pid, 2000) + }, + }, + { + control: control{ + ID: "medium", + Human: "Traffic speed: medium", + Icon: "fa-hourglass-2", + Rank: 21, + }, + handler: func(pid int) error { + return DoTrafficControl(pid, 300) + }, + }, + { + control: control{ + ID: "fast", + Human: "Traffic speed: fast", + Icon: "fa-hourglass-3", + Rank: 22, + }, + handler: func(pid int) error { + return DoTrafficControl(pid, 1) + }, + }, + } +} + +const nodeSuffix = ";" + +func containerIDToNodeID(containerID string) string { + return fmt.Sprintf("%s%s", containerID, nodeSuffix) +} + +func nodeIDToContainerID(nodeID string) (string, error) { + if !strings.HasSuffix(nodeID, nodeSuffix) { + return "", fmt.Errorf("no suffix %q in node ID %q", nodeSuffix, nodeID) + } + return strings.TrimSuffix(nodeID, nodeSuffix), nil +} diff --git a/examples/plugins/traffic-control/store.go b/examples/plugins/traffic-control/store.go new file mode 100644 index 000000000..c8b60db4c --- /dev/null +++ b/examples/plugins/traffic-control/store.go @@ -0,0 +1,57 @@ +package main + +import ( + "sync" +) + +type State int + +const ( + Created State = iota + Running + Stopped + Destroyed +) + +type Container struct { + State State + PID int +} + +type Store struct { + lock sync.Mutex + containers map[string]Container +} + +func NewStore() *Store { + return &Store{ + containers: map[string]Container{}, + } +} + +func (s *Store) Container(containerID string) (Container, bool) { + s.lock.Lock() + defer s.lock.Unlock() + container, found := s.containers[containerID] + return container, found +} + +func (s *Store) SetContainer(containerID string, container Container) { + s.lock.Lock() + defer s.lock.Unlock() + s.containers[containerID] = container +} + +func (s *Store) DeleteContainer(containerID string) { + s.lock.Lock() + defer s.lock.Unlock() + delete(s.containers, containerID) +} + +func (s *Store) ForEach(callback func(ID string, c Container)) { + s.lock.Lock() + defer s.lock.Unlock() + for containerID, container := range s.containers { + callback(containerID, container) + } +} diff --git a/examples/plugins/traffic-control/tc.go b/examples/plugins/traffic-control/tc.go new file mode 100644 index 000000000..d9b9819db --- /dev/null +++ b/examples/plugins/traffic-control/tc.go @@ -0,0 +1,51 @@ +package main + +import ( + "fmt" + "os/exec" + "strings" + + log "github.com/Sirupsen/logrus" + "github.com/containernetworking/cni/pkg/ns" +) + +func DoTrafficControl(pid int, delay uint32) error { + cmds := [][]string{ + split("tc qdisc replace dev eth0 root handle 1: netem"), + + // These steps are not required, since we don't do + // ingress traffic control, only egress, see the TODO + // at the beginning of the file. + + //split("ip link add ifb0 type ifb"), + //split("ip link set ifb0 up"), + //split("tc qdisc add dev eth0 handle ffff: ingress"), + //split("tc filter add dev eth0 parent ffff: protocol ip u32 match u32 0 0 action mirred egress redirect dev ifb0"), + //split("tc qdisc replace dev ifb0 handle 1:0 root netem"), + + // Add "loss %d%% rate %dkbit" when we add the + // possibility to control the packet loss and + // bandwidth. See the TODO at the beginning of the + // file. + + split(fmt.Sprintf("tc qdisc change dev eth0 root handle 1: netem delay %dms", delay)), + } + netNS := fmt.Sprintf("/proc/%d/ns/net", pid) + err := ns.WithNetNSPath(netNS, func(hostNS ns.NetNS) error { + for _, cmd := range cmds { + if output, err := exec.Command(cmd[0], cmd[1:]...).CombinedOutput(); err != nil { + log.Error(string(output)) + return fmt.Errorf("failed to execute command: %v", err) + } + } + return nil + }) + if err != nil { + return fmt.Errorf("failed to perform traffic control: %v", err) + } + return nil +} + +func split(cmd string) []string { + return strings.Split(cmd, " ") +}