Update demo for tracer

This commit is contained in:
Tom Wilkie
2016-03-10 11:58:04 +00:00
parent b2c0c93d5e
commit bd760f6e76
14 changed files with 207 additions and 48 deletions

View File

@@ -2,7 +2,7 @@ CC=gcc
CFLAGS=-g -lpthread
BUILD_IN_CONTAINER=true
all: qotd.marker app.marker client.marker searchapp.marker shout.marker frontend.marker echo.marker
all: .qotd.marker .app.marker .client.marker .searchapp.marker .shout.marker .frontend.marker .echo.marker .trace_app.marker
searchapp/searchapp: searchapp/app.go
shout/shout: shout/shout.go
@@ -23,14 +23,15 @@ shout/shout searchapp/searchapp:
go build -ldflags "-extldflags \"-static\"" -tags netgo -o $@ ./$(@D)
endif
qotd.marker: qotd/* qotd/qotd
app.marker: app/*
client.marker: client/*
searchapp.marker: searchapp/* searchapp/searchapp
shout.marker: shout/* shout/shout
frontend.marker: frontend/*
echo.marker: echo/*
%.marker:
.qotd.marker: qotd/* qotd/qotd
.app.marker: app/*
.client.marker: client/*
.searchapp.marker: searchapp/* searchapp/searchapp
.shout.marker: shout/* shout/shout
.frontend.marker: frontend/*
.echo.marker: echo/*
.trace_app.marker: trace_app/*
.%.marker:
docker build -t tomwilkie/$(<D) $(<D)/
touch $@

View File

@@ -1,3 +1,4 @@
import argparse
import requests
import random
import threading
@@ -6,21 +7,25 @@ import logging
import socket
import sys
frontend = 'frontend'
concurrency = 2
def do_request(s):
addrs = socket.getaddrinfo(frontend, 80)
def do_request(s, args):
addrs = socket.getaddrinfo(args.target, 80)
addrs = [a
for a in addrs
if a[0] == socket.AF_INET]
logging.info("got %s", addrs)
if len(addrs) <= 0:
return
addr = random.choice(addrs)
s.get("http://%s:%d" % addr[4], timeout=1.0)
def do_requests():
def do_requests(args):
s = requests.Session()
while True:
try:
do_request(s)
if args.persist:
do_request(s, args)
else:
do_request(requests.Session(), args)
except:
logging.error("Error doing request", exc_info=sys.exc_info())
@@ -28,8 +33,15 @@ def do_requests():
logging.info("Did request")
def main():
logging.info("Starting %d threads", concurrency)
threads = [threading.Thread(target=do_requests) for i in range(concurrency)]
parser = argparse.ArgumentParser()
parser.add_argument('-target', default="frontend")
parser.add_argument('-concurrency', default=2, type=int)
parser.add_argument('-persist', default=True, type=bool)
args = parser.parse_args()
logging.info("Starting %d threads, targeting %s", args.concurrency, args.target)
threads = [threading.Thread(target=do_requests, args=(args,))
for i in range(args.concurrency)]
for thread in threads:
thread.start()
for thread in threads:

View File

@@ -0,0 +1,45 @@
#!/bin/bash
set -ex
readonly ARG="$1"
eval $(weave env)
start_container() {
local replicas=$1
local image=$2
local basename=$3
shift 3
local hostname=${basename}.weave.local
local docker_args=
while [ "$#" -gt 0 ]; do
case "$1" in
--)
shift
break
;;
*)
docker_args="${docker_args} $1"
shift
;;
esac
done
local container_args="$@"
for i in $(seq ${replicas}); do
if docker inspect ${basename}${i} >/dev/null 2>&1; then
docker rm -f ${basename}${i}
fi
docker run -d -e CHECKPOINT_DISABLE --name=${basename}${i} --hostname=${hostname} \
${docker_args} ${image} ${container_args}
done
}
start_container 1 tomwilkie/qotd qotd
start_container 1 tomwilkie/echo echo
start_container 1 tomwilkie/trace_app app
start_container 1 tomwilkie/client client -- -target app.weave.local \
-concurrency 1 -persist False

View File

@@ -0,0 +1,8 @@
FROM python:2.7
MAINTAINER Weaveworks Inc <help@weave.works>
WORKDIR /home/weave
ADD requirements.txt /home/weave/
RUN pip install -r /home/weave/requirements.txt
ADD app.py /home/weave/
EXPOSE 5000
ENTRYPOINT ["python", "/home/weave/app.py"]

View File

@@ -0,0 +1,69 @@
import os
import socket
import sys
import requests
import random
import threading
import logging
from concurrent.futures import ThreadPoolExecutor
from flask import Flask
from redis import Redis
from werkzeug.serving import WSGIRequestHandler
app = Flask(__name__)
redis = Redis(host='redis', port=6379)
pool = ThreadPoolExecutor(max_workers=10)
sessions = threading.local()
searchapps = ['http://searchapp:8080/']
def do_redis():
redis.incr('hits')
return redis.get('hits')
def do_qotd():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.connect(("qotd", 4446))
s.send("Hello")
return s.recv(1024)
finally:
s.close()
def do_search():
if getattr(sessions, 'session', None) == None:
sessions.session = requests.Session()
r = sessions.session.get(random.choice(searchapps))
return r.text
def do_echo(text):
r = requests.get("http://echo/", data=text)
return r.text
def ignore_error(f):
try:
return str(f())
except:
logging.error("Error executing function", exc_info=sys.exc_info())
return "Error"
# this root is for the tracing demo
@app.route('/hello')
def hello():
qotd_msg = do_qotd()
qotd_msg = do_echo(qotd_msg)
return qotd_msg
# this is for normal demos
@app.route('/')
def root():
#counter_future = pool.submit(do_redis)
#search_future = pool.submit(do_search)
result = do_echo(do_qotd())
return result
if __name__ == "__main__":
logging.basicConfig(format='%(asctime)s %(levelname)s %(filename)s:%(lineno)d - %(message)s', level=logging.INFO)
WSGIRequestHandler.protocol_version = "HTTP/1.1"
app.run(host="0.0.0.0", port=80, debug=True)

View File

@@ -0,0 +1,4 @@
flask
redis
requests
futures

View File

@@ -1,8 +1,8 @@
BUILD_IN_CONTAINER=true
tracer.tar: main/main main/Dockerfile
docker build -t weaveworks/tracer main/
docker save weaveworks/tracer:latest >$@
docker build -t tomwilkie/tracer main/
docker save tomwilkie/tracer:latest >$@
main/main: main/*.go main/static.go ptrace/*.go

View File

@@ -32,7 +32,7 @@ func (t *tracer) Stop() {
}
func main() {
dockerRegistry, err := docker.NewRegistry(pollInterval, nil)
dockerRegistry, err := docker.NewRegistry(pollInterval, nil, false)
if err != nil {
log.Fatalf("Could start docker watcher: %v", err)
}

View File

@@ -168,5 +168,13 @@ func (s *store) Traces() []*trace {
}
sort.Sort(byKey(traces))
// only return last 15 traces
start := 0
if len(traces) > 15 {
start = len(traces) - 15
}
traces = traces[start:]
return traces
}

View File

@@ -8,6 +8,7 @@ usage() {
PORT=6060
CONTAINER_NAME=weavetracer
IMAGE_NAME=tomwilkie/tracer
[ $# -gt 0 ] || usage
COMMAND=$1
@@ -16,8 +17,9 @@ shift 1
case "$COMMAND" in
launch)
docker rm -f $CONTAINER_NAME || true
docker run --privileged --net=host --pid=host -ti -v /var/run/docker.sock:/var/run/docker.sock \
--name $CONTAINER_NAME weaveworks/tracer
--name $CONTAINER_NAME $IMAGE_NAME
;;
stop)

View File

@@ -16,7 +16,7 @@ import (
func TestControls(t *testing.T) {
mdc := newMockClient()
setupStubs(mdc, func() {
registry, _ := docker.NewRegistry(10*time.Second, nil)
registry, _ := docker.NewRegistry(10*time.Second, nil, false)
defer registry.Stop()
for _, tc := range []struct{ command, result string }{
@@ -56,7 +56,7 @@ func TestPipes(t *testing.T) {
mdc := newMockClient()
setupStubs(mdc, func() {
registry, _ := docker.NewRegistry(10*time.Second, nil)
registry, _ := docker.NewRegistry(10*time.Second, nil, false)
defer registry.Stop()
test.Poll(t, 100*time.Millisecond, true, func() interface{} {

View File

@@ -42,10 +42,11 @@ type ContainerUpdateWatcher func(c Container)
type registry struct {
sync.RWMutex
quit chan chan struct{}
interval time.Duration
client Client
pipes controls.PipeClient
quit chan chan struct{}
interval time.Duration
collectStats bool
client Client
pipes controls.PipeClient
watchers []ContainerUpdateWatcher
containers map[string]Container
@@ -76,7 +77,7 @@ func newDockerClient(endpoint string) (Client, error) {
}
// NewRegistry returns a usable Registry. Don't forget to Stop it.
func NewRegistry(interval time.Duration, pipes controls.PipeClient) (Registry, error) {
func NewRegistry(interval time.Duration, pipes controls.PipeClient, collectStats bool) (Registry, error) {
client, err := NewDockerClientStub(endpoint)
if err != nil {
return nil, err
@@ -87,10 +88,11 @@ func NewRegistry(interval time.Duration, pipes controls.PipeClient) (Registry, e
containersByPID: map[int]Container{},
images: map[string]*docker_client.APIImages{},
client: client,
pipes: pipes,
interval: interval,
quit: make(chan chan struct{}),
client: client,
pipes: pipes,
interval: interval,
collectStats: collectStats,
quit: make(chan chan struct{}),
}
r.registerControls()
@@ -178,8 +180,10 @@ func (r *registry) listenForEvents() bool {
r.Lock()
defer r.Unlock()
for _, c := range r.containers {
c.StopGatheringStats()
if r.collectStats {
for _, c := range r.containers {
c.StopGatheringStats()
}
}
close(ch)
return false
@@ -191,8 +195,10 @@ func (r *registry) reset() {
r.Lock()
defer r.Unlock()
for _, c := range r.containers {
c.StopGatheringStats()
if r.collectStats {
for _, c := range r.containers {
c.StopGatheringStats()
}
}
r.containers = map[string]Container{}
@@ -257,7 +263,9 @@ func (r *registry) updateContainerState(containerID string) {
delete(r.containers, containerID)
delete(r.containersByPID, container.PID())
container.StopGatheringStats()
if r.collectStats {
container.StopGatheringStats()
}
return
}
@@ -283,13 +291,15 @@ func (r *registry) updateContainerState(containerID string) {
}
// And finally, ensure we gather stats for it
if dockerContainer.State.Running {
if err := c.StartGatheringStats(); err != nil {
log.Errorf("Error gather stats for container: %s", containerID)
return
if r.collectStats {
if dockerContainer.State.Running {
if err := c.StartGatheringStats(); err != nil {
log.Errorf("Error gather stats for container: %s", containerID)
return
}
} else {
c.StopGatheringStats()
}
} else {
c.StopGatheringStats()
}
}

View File

@@ -253,7 +253,7 @@ func allImages(r docker.Registry) []*client.APIImages {
func TestRegistry(t *testing.T) {
mdc := newMockClient()
setupStubs(mdc, func() {
registry, _ := docker.NewRegistry(10*time.Second, nil)
registry, _ := docker.NewRegistry(10*time.Second, nil, true)
defer registry.Stop()
runtime.Gosched()
@@ -276,7 +276,7 @@ func TestRegistry(t *testing.T) {
func TestLookupByPID(t *testing.T) {
mdc := newMockClient()
setupStubs(mdc, func() {
registry, _ := docker.NewRegistry(10*time.Second, nil)
registry, _ := docker.NewRegistry(10*time.Second, nil, true)
defer registry.Stop()
want := docker.Container(&mockContainer{container1})
@@ -293,7 +293,7 @@ func TestLookupByPID(t *testing.T) {
func TestRegistryEvents(t *testing.T) {
mdc := newMockClient()
setupStubs(mdc, func() {
registry, _ := docker.NewRegistry(10*time.Second, nil)
registry, _ := docker.NewRegistry(10*time.Second, nil, true)
defer registry.Stop()
runtime.Gosched()

View File

@@ -150,7 +150,7 @@ func probeMain() {
if err := report.AddLocalBridge(*dockerBridge); err != nil {
log.Errorf("Docker: problem with bridge %s: %v", *dockerBridge, err)
}
if registry, err := docker.NewRegistry(*dockerInterval, clients); err == nil {
if registry, err := docker.NewRegistry(*dockerInterval, clients, true); err == nil {
defer registry.Stop()
p.AddTagger(docker.NewTagger(registry, processCache))
p.AddReporter(docker.NewReporter(registry, hostID, probeID, p))