diff --git a/scheduler/.gitignore b/scheduler/.gitignore new file mode 100644 index 000000000..a65b41774 --- /dev/null +++ b/scheduler/.gitignore @@ -0,0 +1 @@ +lib diff --git a/scheduler/README.md b/scheduler/README.md new file mode 100644 index 000000000..8489d7870 --- /dev/null +++ b/scheduler/README.md @@ -0,0 +1,6 @@ +To upload newer version: + +``` +pip install -r requirements.txt -t lib +appcfg.py update . +``` diff --git a/scheduler/app.yaml b/scheduler/app.yaml new file mode 100644 index 000000000..8bc59f004 --- /dev/null +++ b/scheduler/app.yaml @@ -0,0 +1,13 @@ +application: positive-cocoa-90213 +version: 1 +runtime: python27 +api_version: 1 +threadsafe: true + +handlers: +- url: .* + script: main.app + +libraries: +- name: webapp2 + version: latest diff --git a/scheduler/appengine_config.py b/scheduler/appengine_config.py new file mode 100644 index 000000000..f4489ff96 --- /dev/null +++ b/scheduler/appengine_config.py @@ -0,0 +1,3 @@ +from google.appengine.ext import vendor + +vendor.add('lib') diff --git a/scheduler/cron.yaml b/scheduler/cron.yaml new file mode 100644 index 000000000..652aed802 --- /dev/null +++ b/scheduler/cron.yaml @@ -0,0 +1,4 @@ +cron: +- description: periodic gc + url: /tasks/gc + schedule: every 5 minutes diff --git a/scheduler/main.py b/scheduler/main.py new file mode 100644 index 000000000..ed0c78e31 --- /dev/null +++ b/scheduler/main.py @@ -0,0 +1,112 @@ +import collections +import json +import logging +import operator +import re + +import flask +from oauth2client.client import GoogleCredentials +from googleapiclient import discovery + +from google.appengine.api import urlfetch +from google.appengine.ext import ndb + +app = flask.Flask('scheduler') +app.debug = True + +# We use exponential moving average to record +# test run times. Higher alpha discounts historic +# observations faster. +alpha = 0.3 + +PROJECT = 'positive-cocoa-90213' +ZONE = 'us-central1-a' + +class Test(ndb.Model): + total_run_time = ndb.FloatProperty(default=0.) # Not total, but a EWMA + total_runs = ndb.IntegerProperty(default=0) + +class Schedule(ndb.Model): + shards = ndb.JsonProperty() + +@app.route('/record//', methods=['POST']) +@ndb.transactional +def record(test_name, runtime): + test = Test.get_by_id(test_name) + if test is None: + test = Test(id=test_name) + test.total_run_time = (test.total_run_time * (1-alpha)) + (float(runtime) * alpha) + test.total_runs += 1 + test.put() + return ('', 204) + +@app.route('/schedule///', methods=['POST']) +def schedule(test_run, shard_count, shard): + # read tests from body + test_names = flask.request.get_json(force=True)['tests'] + + # first see if we have a scedule already + schedule_id = "%s-%d" % (test_run, shard_count) + schedule = Schedule.get_by_id(schedule_id) + if schedule is not None: + return flask.json.jsonify(tests=schedule.shards[str(shard)]) + + # if not, do simple greedy algorithm + test_times = ndb.get_multi(ndb.Key(Test, test_name) for test_name in test_names) + def avg(test): + if test is not None: + return test.total_run_time + return 1 + test_times = [(test_name, avg(test)) for test_name, test in zip(test_names, test_times)] + test_times_dict = dict(test_times) + test_times.sort(key=operator.itemgetter(1)) + + shards = {i: [] for i in xrange(shard_count)} + while test_times: + test_name, time = test_times.pop() + + # find shortest shard and put it in that + s, _ = min(((i, sum(test_times_dict[t] for t in shards[i])) + for i in xrange(shard_count)), key=operator.itemgetter(1)) + + shards[s].append(test_name) + + # atomically insert or retrieve existing schedule + schedule = Schedule.get_or_insert(schedule_id, shards=shards) + return flask.json.jsonify(tests=schedule.shards[str(shard)]) + +NAME_RE = re.compile(r'^host(?P\d+)-(?P\d+)-(?P\d+)$') + +@app.route('/tasks/gc') +def gc(): + # Get list of running VMs, pick build id out of VM name + credentials = GoogleCredentials.get_application_default() + compute = discovery.build('compute', 'v1', credentials=credentials) + instances = compute.instances().list(project=PROJECT, zone=ZONE).execute() + host_by_build = collections.defaultdict(list) + for instance in instances['items']: + matches = NAME_RE.match(instance['name']) + if matches is None: + continue + host_by_build[int(matches.group('build'))].append(instance['name']) + logging.info("Running VMs by build: %r", host_by_build) + + # Get list of builds, filter down to runnning builds + result = urlfetch.fetch('https://circleci.com/api/v1/project/weaveworks/weave', + headers={'Accept': 'application/json'}) + assert result.status_code == 200 + builds = json.loads(result.content) + running = {build['build_num'] for build in builds if build['status'] == 'running'} + logging.info("Runnings builds: %r", running) + + # Stop VMs for builds that aren't running + stopped = [] + for build, names in host_by_build.iteritems(): + if build in running: + continue + for name in names: + stopped.append(name) + logging.info("Stopping VM %s", name) + compute.instances().delete(project=PROJECT, zone=ZONE, instance=name).execute() + + return (flask.json.jsonify(running=list(running), stopped=stopped), 200) diff --git a/scheduler/requirements.txt b/scheduler/requirements.txt new file mode 100644 index 000000000..d4d47e6eb --- /dev/null +++ b/scheduler/requirements.txt @@ -0,0 +1,2 @@ +flask +google-api-python-client