package main import ( "bytes" "encoding/json" "fmt" "net/http" "net/url" "os" "os/exec" "sort" "strconv" "strings" "sync" "time" "github.com/mgutz/ansi" "github.com/weaveworks/common/mflag" ) const ( defaultSchedulerHost = "positive-cocoa-90213.appspot.com" jsonContentType = "application/json" ) var ( start = ansi.ColorCode("black+ub") fail = ansi.ColorCode("red+b") succ = ansi.ColorCode("green+b") reset = ansi.ColorCode("reset") schedulerHost = defaultSchedulerHost useScheduler = false runParallel = false verbose = false timeout = 180 // In seconds. Three minutes ought to be enough for any test consoleLock = sync.Mutex{} ) type test struct { name string hosts int } type schedule struct { Tests []string `json:"tests"` } type result struct { test errored bool hosts []string } type tests []test func (ts tests) Len() int { return len(ts) } func (ts tests) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] } func (ts tests) Less(i, j int) bool { if ts[i].hosts != ts[j].hosts { return ts[i].hosts < ts[j].hosts } return ts[i].name < ts[j].name } func (ts *tests) pick(available int) (test, bool) { // pick the first test that fits in the available hosts for i, test := range *ts { if test.hosts <= available { *ts = append((*ts)[:i], (*ts)[i+1:]...) return test, true } } return test{}, false } func (t test) run(hosts []string) bool { consoleLock.Lock() fmt.Printf("%s>>> Running %s on %s%s\n", start, t.name, hosts, reset) consoleLock.Unlock() var out bytes.Buffer cmd := exec.Command(t.name) cmd.Env = os.Environ() cmd.Stdout = &out cmd.Stderr = &out // replace HOSTS in env for i, env := range cmd.Env { if strings.HasPrefix(env, "HOSTS") { cmd.Env[i] = fmt.Sprintf("HOSTS=%s", strings.Join(hosts, " ")) break } } start := time.Now() var err error c := make(chan error, 1) go func() { c <- cmd.Run() }() select { case err = <-c: case <-time.After(time.Duration(timeout) * time.Second): err = fmt.Errorf("timed out") } duration := float64(time.Now().Sub(start)) / float64(time.Second) consoleLock.Lock() if err != nil { fmt.Printf("%s>>> Test %s finished after %0.1f secs with error: %v%s\n", fail, t.name, duration, err, reset) } else { fmt.Printf("%s>>> Test %s finished with success after %0.1f secs%s\n", succ, t.name, duration, reset) } if err != nil || verbose { fmt.Print(out.String()) fmt.Println() } consoleLock.Unlock() if err != nil && useScheduler { updateScheduler(t.name, duration) } return err != nil } func updateScheduler(test string, duration float64) { req := &http.Request{ Method: "POST", Host: schedulerHost, URL: &url.URL{ Opaque: fmt.Sprintf("/record/%s/%0.2f", url.QueryEscape(test), duration), Scheme: "http", Host: schedulerHost, }, Close: true, } if resp, err := http.DefaultClient.Do(req); err != nil { fmt.Printf("Error updating scheduler: %v\n", err) } else { resp.Body.Close() } } func getSchedule(tests []string) ([]string, error) { var ( userName = os.Getenv("CIRCLE_PROJECT_USERNAME") project = os.Getenv("CIRCLE_PROJECT_REPONAME") buildNum = os.Getenv("CIRCLE_BUILD_NUM") testRun = userName + "-" + project + "-integration-" + buildNum shardCount = os.Getenv("CIRCLE_NODE_TOTAL") shardID = os.Getenv("CIRCLE_NODE_INDEX") requestBody = &bytes.Buffer{} ) if err := json.NewEncoder(requestBody).Encode(schedule{tests}); err != nil { return []string{}, err } url := fmt.Sprintf("http://%s/schedule/%s/%s/%s", schedulerHost, testRun, shardCount, shardID) resp, err := http.Post(url, jsonContentType, requestBody) if err != nil { return []string{}, err } var sched schedule if err := json.NewDecoder(resp.Body).Decode(&sched); err != nil { return []string{}, err } return sched.Tests, nil } func getTests(testNames []string) (tests, error) { var err error if useScheduler { testNames, err = getSchedule(testNames) if err != nil { return tests{}, err } } tests := tests{} for _, name := range testNames { parts := strings.Split(strings.TrimSuffix(name, "_test.sh"), "_") numHosts, err := strconv.Atoi(parts[len(parts)-1]) if err != nil { numHosts = 1 } tests = append(tests, test{name, numHosts}) fmt.Printf("Test %s needs %d hosts\n", name, numHosts) } return tests, nil } func summary(tests, failed tests) { if len(failed) > 0 { fmt.Printf("%s>>> Ran %d tests, %d failed%s\n", fail, len(tests), len(failed), reset) for _, test := range failed { fmt.Printf("%s>>> Fail %s%s\n", fail, test.name, reset) } } else { fmt.Printf("%s>>> Ran %d tests, all succeeded%s\n", succ, len(tests), reset) } } func parallel(ts tests, hosts []string) bool { testsCopy := ts sort.Sort(sort.Reverse(ts)) resultsChan := make(chan result) outstanding := 0 failed := tests{} for len(ts) > 0 || outstanding > 0 { // While we have some free hosts, try and schedule // a test on them for len(hosts) > 0 { test, ok := ts.pick(len(hosts)) if !ok { break } testHosts := hosts[:test.hosts] hosts = hosts[test.hosts:] go func() { errored := test.run(testHosts) resultsChan <- result{test, errored, testHosts} }() outstanding++ } // Otherwise, wait for the test to finish and return // the hosts to the pool result := <-resultsChan hosts = append(hosts, result.hosts...) outstanding-- if result.errored { failed = append(failed, result.test) } } summary(testsCopy, failed) return len(failed) > 0 } func sequential(ts tests, hosts []string) bool { failed := tests{} for _, test := range ts { if test.run(hosts) { failed = append(failed, test) } } summary(ts, failed) return len(failed) > 0 } func main() { mflag.BoolVar(&useScheduler, []string{"scheduler"}, false, "Use scheduler to distribute tests across shards") mflag.BoolVar(&runParallel, []string{"parallel"}, false, "Run tests in parallel on hosts where possible") mflag.BoolVar(&verbose, []string{"v"}, false, "Print output from all tests (Also enabled via DEBUG=1)") mflag.StringVar(&schedulerHost, []string{"scheduler-host"}, defaultSchedulerHost, "Hostname of scheduler.") mflag.IntVar(&timeout, []string{"timeout"}, 180, "Max time to run one test for, in seconds") mflag.Parse() if len(os.Getenv("DEBUG")) > 0 { verbose = true } testArgs := mflag.Args() tests, err := getTests(testArgs) if err != nil { fmt.Printf("Error parsing tests: %v (%v)\n", err, testArgs) os.Exit(1) } hosts := strings.Fields(os.Getenv("HOSTS")) maxHosts := len(hosts) if maxHosts == 0 { fmt.Print("No HOSTS specified.\n") os.Exit(1) } var errored bool if runParallel { errored = parallel(tests, hosts) } else { errored = sequential(tests, hosts) } if errored { os.Exit(1) } }