From c4eb0960f93931ed2486617a7a5fa322d0b470e5 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Mon, 23 Jan 2017 12:48:50 -0800 Subject: [PATCH] awsecs client: simplify list/describe services by removing ability to stream results between them, since this is such a minor optimization and greatly complicates the code. --- probe/awsecs/client.go | 133 +++++++++++++++++------------------------ 1 file changed, 55 insertions(+), 78 deletions(-) diff --git a/probe/awsecs/client.go b/probe/awsecs/client.go index 114bcbc26..1ee8a9d55 100644 --- a/probe/awsecs/client.go +++ b/probe/awsecs/client.go @@ -129,85 +129,67 @@ func (c ecsClientImpl) getCachedService(serviceName string) (EcsService, bool) { return EcsService{}, false } -// Returns a channel from which service ARNs can be read. +// Returns a list of service names. // Cannot fail as it will attempt to deliver partial results, though that may end up being no results. -func (c ecsClientImpl) listServices() <-chan string { +func (c ecsClientImpl) listServices() []string { log.Debugf("Listing ECS services") - results := make(chan string) - go func() { - count := 0 - err := c.client.ListServicesPages( - &ecs.ListServicesInput{Cluster: &c.cluster}, - func(page *ecs.ListServicesOutput, lastPage bool) bool { - for _, arn := range page.ServiceArns { - count++ - results <- *arn - } - return true - }, - ) - if err != nil { - log.Warnf("Error listing ECS services, ECS service report may be incomplete: %v", err) - } - log.Debugf("Listed %d services", count) - close(results) - }() + results := []string{} + err := c.client.ListServicesPages( + &ecs.ListServicesInput{Cluster: &c.cluster}, + func(page *ecs.ListServicesOutput, lastPage bool) bool { + for _, name := range page.ServiceArns { + results = append(results, *name) + } + return true + }, + ) + if err != nil { + log.Warnf("Error listing ECS services, ECS service report may be incomplete: %v", err) + } + log.Debugf("Listed %d services", len(results)) return results } -// Returns (input, done) channels. Service ARNs given to input are batched and details are fetched, -// with full EcsService objects being put into the cache. Closes done when finished. -func (c ecsClientImpl) describeServices() (chan<- string, <-chan struct{}) { - input := make(chan string) - done := make(chan struct{}) +// Service names given are batched and details are fetched, +// with full EcsService objects being put into the cache. +// Cannot fail as it will attempt to deliver partial results. +func (c ecsClientImpl) describeServices(services []string) { + const maxServices = 10 // How many services we can put in one Describe command + group := sync.WaitGroup{} log.Debugf("Describing ECS services") - go func() { - const maxServices = 10 // How many services we can put in one Describe command - group := sync.WaitGroup{} + // split into batches + batches := make([][]string, 0, len(services)/maxServices+1) + for len(services) > maxServices { + batch := services[:maxServices] + services = services[maxServices:] + batches = append(batches, batch) + } + if len(services) > 0 { + batches = append(batches, services) + } - // count and calls is just for logging - count := 0 - calls := 0 + for _, batch := range batches { + group.Add(1) + go func(names []string) { + defer group.Done() + c.describeServicesBatch(names) + }(batch) + } - batch := make([]string, 0, maxServices) - for arn := range input { - batch = append(batch, arn) - if len(batch) == maxServices { - group.Add(1) - go func(arns []string) { - defer group.Done() - c.describeServicesBatch(arns) - }(batch) - count += len(batch) - calls++ - batch = make([]string, 0, maxServices) - } - } - if len(batch) > 0 { - c.describeServicesBatch(batch) - count += len(batch) - calls++ - } - - log.Debugf("Described %d services in %d calls", count, calls) - group.Wait() - close(done) - }() - - return input, done + group.Wait() } -func (c ecsClientImpl) describeServicesBatch(arns []string) { - arnPtrs := make([]*string, 0, len(arns)) - for i := range arns { - arnPtrs = append(arnPtrs, &arns[i]) +func (c ecsClientImpl) describeServicesBatch(names []string) { + namePtrs := make([]*string, 0, len(names)) + for i := range names { + namePtrs = append(namePtrs, &names[i]) } resp, err := c.client.DescribeServices(&ecs.DescribeServicesInput{ Cluster: &c.cluster, - Services: arnPtrs, + Services: namePtrs, }) if err != nil { log.Warnf("Error describing some ECS services, ECS service report may be incomplete: %v", err) @@ -306,33 +288,28 @@ func (c ecsClientImpl) ensureTasksAreCached(taskARNs []string) { } func (c ecsClientImpl) refreshServices(taskServiceMap map[string]string) map[string]bool { - toDescribe, done := c.describeServices() servicesRefreshed := map[string]bool{} + toDescribe := []string{} for _, serviceName := range taskServiceMap { if servicesRefreshed[serviceName] { continue } - toDescribe <- serviceName + toDescribe = append(toDescribe, serviceName) servicesRefreshed[serviceName] = true } - close(toDescribe) - <-done + c.describeServices(toDescribe) return servicesRefreshed } func (c ecsClientImpl) describeAllServices(servicesRefreshed map[string]bool) { - serviceNamesChan := c.listServices() - toDescribe, done := c.describeServices() - go func() { - for serviceName := range serviceNamesChan { - if !servicesRefreshed[serviceName] { - toDescribe <- serviceName - servicesRefreshed[serviceName] = true - } + toDescribe := []string{} + for _, serviceName := range c.listServices() { + if !servicesRefreshed[serviceName] { + toDescribe = append(toDescribe, serviceName) + servicesRefreshed[serviceName] = true } - close(toDescribe) - }() - <-done + } + c.describeServices(toDescribe) } func (c ecsClientImpl) makeECSInfo(taskARNs []string, taskServiceMap map[string]string) EcsInfo {