mirror of
https://github.com/weaveworks/scope.git
synced 2026-03-04 02:30:45 +00:00
Add scheduler from weave
This commit is contained in:
1
scheduler/.gitignore
vendored
Normal file
1
scheduler/.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
lib
|
||||
6
scheduler/README.md
Normal file
6
scheduler/README.md
Normal file
@@ -0,0 +1,6 @@
|
||||
To upload newer version:
|
||||
|
||||
```
|
||||
pip install -r requirements.txt -t lib
|
||||
appcfg.py update .
|
||||
```
|
||||
13
scheduler/app.yaml
Normal file
13
scheduler/app.yaml
Normal file
@@ -0,0 +1,13 @@
|
||||
application: positive-cocoa-90213
|
||||
version: 1
|
||||
runtime: python27
|
||||
api_version: 1
|
||||
threadsafe: true
|
||||
|
||||
handlers:
|
||||
- url: .*
|
||||
script: main.app
|
||||
|
||||
libraries:
|
||||
- name: webapp2
|
||||
version: latest
|
||||
3
scheduler/appengine_config.py
Normal file
3
scheduler/appengine_config.py
Normal file
@@ -0,0 +1,3 @@
|
||||
from google.appengine.ext import vendor
|
||||
|
||||
vendor.add('lib')
|
||||
4
scheduler/cron.yaml
Normal file
4
scheduler/cron.yaml
Normal file
@@ -0,0 +1,4 @@
|
||||
cron:
|
||||
- description: periodic gc
|
||||
url: /tasks/gc
|
||||
schedule: every 5 minutes
|
||||
112
scheduler/main.py
Normal file
112
scheduler/main.py
Normal file
@@ -0,0 +1,112 @@
|
||||
import collections
|
||||
import json
|
||||
import logging
|
||||
import operator
|
||||
import re
|
||||
|
||||
import flask
|
||||
from oauth2client.client import GoogleCredentials
|
||||
from googleapiclient import discovery
|
||||
|
||||
from google.appengine.api import urlfetch
|
||||
from google.appengine.ext import ndb
|
||||
|
||||
app = flask.Flask('scheduler')
|
||||
app.debug = True
|
||||
|
||||
# We use exponential moving average to record
|
||||
# test run times. Higher alpha discounts historic
|
||||
# observations faster.
|
||||
alpha = 0.3
|
||||
|
||||
PROJECT = 'positive-cocoa-90213'
|
||||
ZONE = 'us-central1-a'
|
||||
|
||||
class Test(ndb.Model):
|
||||
total_run_time = ndb.FloatProperty(default=0.) # Not total, but a EWMA
|
||||
total_runs = ndb.IntegerProperty(default=0)
|
||||
|
||||
class Schedule(ndb.Model):
|
||||
shards = ndb.JsonProperty()
|
||||
|
||||
@app.route('/record/<path:test_name>/<runtime>', methods=['POST'])
|
||||
@ndb.transactional
|
||||
def record(test_name, runtime):
|
||||
test = Test.get_by_id(test_name)
|
||||
if test is None:
|
||||
test = Test(id=test_name)
|
||||
test.total_run_time = (test.total_run_time * (1-alpha)) + (float(runtime) * alpha)
|
||||
test.total_runs += 1
|
||||
test.put()
|
||||
return ('', 204)
|
||||
|
||||
@app.route('/schedule/<test_run>/<int:shard_count>/<int:shard>', methods=['POST'])
|
||||
def schedule(test_run, shard_count, shard):
|
||||
# read tests from body
|
||||
test_names = flask.request.get_json(force=True)['tests']
|
||||
|
||||
# first see if we have a scedule already
|
||||
schedule_id = "%s-%d" % (test_run, shard_count)
|
||||
schedule = Schedule.get_by_id(schedule_id)
|
||||
if schedule is not None:
|
||||
return flask.json.jsonify(tests=schedule.shards[str(shard)])
|
||||
|
||||
# if not, do simple greedy algorithm
|
||||
test_times = ndb.get_multi(ndb.Key(Test, test_name) for test_name in test_names)
|
||||
def avg(test):
|
||||
if test is not None:
|
||||
return test.total_run_time
|
||||
return 1
|
||||
test_times = [(test_name, avg(test)) for test_name, test in zip(test_names, test_times)]
|
||||
test_times_dict = dict(test_times)
|
||||
test_times.sort(key=operator.itemgetter(1))
|
||||
|
||||
shards = {i: [] for i in xrange(shard_count)}
|
||||
while test_times:
|
||||
test_name, time = test_times.pop()
|
||||
|
||||
# find shortest shard and put it in that
|
||||
s, _ = min(((i, sum(test_times_dict[t] for t in shards[i]))
|
||||
for i in xrange(shard_count)), key=operator.itemgetter(1))
|
||||
|
||||
shards[s].append(test_name)
|
||||
|
||||
# atomically insert or retrieve existing schedule
|
||||
schedule = Schedule.get_or_insert(schedule_id, shards=shards)
|
||||
return flask.json.jsonify(tests=schedule.shards[str(shard)])
|
||||
|
||||
NAME_RE = re.compile(r'^host(?P<index>\d+)-(?P<build>\d+)-(?P<shard>\d+)$')
|
||||
|
||||
@app.route('/tasks/gc')
|
||||
def gc():
|
||||
# Get list of running VMs, pick build id out of VM name
|
||||
credentials = GoogleCredentials.get_application_default()
|
||||
compute = discovery.build('compute', 'v1', credentials=credentials)
|
||||
instances = compute.instances().list(project=PROJECT, zone=ZONE).execute()
|
||||
host_by_build = collections.defaultdict(list)
|
||||
for instance in instances['items']:
|
||||
matches = NAME_RE.match(instance['name'])
|
||||
if matches is None:
|
||||
continue
|
||||
host_by_build[int(matches.group('build'))].append(instance['name'])
|
||||
logging.info("Running VMs by build: %r", host_by_build)
|
||||
|
||||
# Get list of builds, filter down to runnning builds
|
||||
result = urlfetch.fetch('https://circleci.com/api/v1/project/weaveworks/weave',
|
||||
headers={'Accept': 'application/json'})
|
||||
assert result.status_code == 200
|
||||
builds = json.loads(result.content)
|
||||
running = {build['build_num'] for build in builds if build['status'] == 'running'}
|
||||
logging.info("Runnings builds: %r", running)
|
||||
|
||||
# Stop VMs for builds that aren't running
|
||||
stopped = []
|
||||
for build, names in host_by_build.iteritems():
|
||||
if build in running:
|
||||
continue
|
||||
for name in names:
|
||||
stopped.append(name)
|
||||
logging.info("Stopping VM %s", name)
|
||||
compute.instances().delete(project=PROJECT, zone=ZONE, instance=name).execute()
|
||||
|
||||
return (flask.json.jsonify(running=list(running), stopped=stopped), 200)
|
||||
2
scheduler/requirements.txt
Normal file
2
scheduler/requirements.txt
Normal file
@@ -0,0 +1,2 @@
|
||||
flask
|
||||
google-api-python-client
|
||||
Reference in New Issue
Block a user