awsecs client: simplify list/describe services

by removing ability to stream results between them, since this is such a minor optimization
and greatly complicates the code.
This commit is contained in:
Mike Lang
2017-01-23 12:48:50 -08:00
parent baffe94538
commit c4eb0960f9

View File

@@ -129,85 +129,67 @@ func (c ecsClientImpl) getCachedService(serviceName string) (EcsService, bool) {
return EcsService{}, false
}
// Returns a channel from which service ARNs can be read.
// Returns a list of service names.
// Cannot fail as it will attempt to deliver partial results, though that may end up being no results.
func (c ecsClientImpl) listServices() <-chan string {
func (c ecsClientImpl) listServices() []string {
log.Debugf("Listing ECS services")
results := make(chan string)
go func() {
count := 0
err := c.client.ListServicesPages(
&ecs.ListServicesInput{Cluster: &c.cluster},
func(page *ecs.ListServicesOutput, lastPage bool) bool {
for _, arn := range page.ServiceArns {
count++
results <- *arn
}
return true
},
)
if err != nil {
log.Warnf("Error listing ECS services, ECS service report may be incomplete: %v", err)
}
log.Debugf("Listed %d services", count)
close(results)
}()
results := []string{}
err := c.client.ListServicesPages(
&ecs.ListServicesInput{Cluster: &c.cluster},
func(page *ecs.ListServicesOutput, lastPage bool) bool {
for _, name := range page.ServiceArns {
results = append(results, *name)
}
return true
},
)
if err != nil {
log.Warnf("Error listing ECS services, ECS service report may be incomplete: %v", err)
}
log.Debugf("Listed %d services", len(results))
return results
}
// Returns (input, done) channels. Service ARNs given to input are batched and details are fetched,
// with full EcsService objects being put into the cache. Closes done when finished.
func (c ecsClientImpl) describeServices() (chan<- string, <-chan struct{}) {
input := make(chan string)
done := make(chan struct{})
// Service names given are batched and details are fetched,
// with full EcsService objects being put into the cache.
// Cannot fail as it will attempt to deliver partial results.
func (c ecsClientImpl) describeServices(services []string) {
const maxServices = 10 // How many services we can put in one Describe command
group := sync.WaitGroup{}
log.Debugf("Describing ECS services")
go func() {
const maxServices = 10 // How many services we can put in one Describe command
group := sync.WaitGroup{}
// split into batches
batches := make([][]string, 0, len(services)/maxServices+1)
for len(services) > maxServices {
batch := services[:maxServices]
services = services[maxServices:]
batches = append(batches, batch)
}
if len(services) > 0 {
batches = append(batches, services)
}
// count and calls is just for logging
count := 0
calls := 0
for _, batch := range batches {
group.Add(1)
go func(names []string) {
defer group.Done()
c.describeServicesBatch(names)
}(batch)
}
batch := make([]string, 0, maxServices)
for arn := range input {
batch = append(batch, arn)
if len(batch) == maxServices {
group.Add(1)
go func(arns []string) {
defer group.Done()
c.describeServicesBatch(arns)
}(batch)
count += len(batch)
calls++
batch = make([]string, 0, maxServices)
}
}
if len(batch) > 0 {
c.describeServicesBatch(batch)
count += len(batch)
calls++
}
log.Debugf("Described %d services in %d calls", count, calls)
group.Wait()
close(done)
}()
return input, done
group.Wait()
}
func (c ecsClientImpl) describeServicesBatch(arns []string) {
arnPtrs := make([]*string, 0, len(arns))
for i := range arns {
arnPtrs = append(arnPtrs, &arns[i])
func (c ecsClientImpl) describeServicesBatch(names []string) {
namePtrs := make([]*string, 0, len(names))
for i := range names {
namePtrs = append(namePtrs, &names[i])
}
resp, err := c.client.DescribeServices(&ecs.DescribeServicesInput{
Cluster: &c.cluster,
Services: arnPtrs,
Services: namePtrs,
})
if err != nil {
log.Warnf("Error describing some ECS services, ECS service report may be incomplete: %v", err)
@@ -306,33 +288,28 @@ func (c ecsClientImpl) ensureTasksAreCached(taskARNs []string) {
}
func (c ecsClientImpl) refreshServices(taskServiceMap map[string]string) map[string]bool {
toDescribe, done := c.describeServices()
servicesRefreshed := map[string]bool{}
toDescribe := []string{}
for _, serviceName := range taskServiceMap {
if servicesRefreshed[serviceName] {
continue
}
toDescribe <- serviceName
toDescribe = append(toDescribe, serviceName)
servicesRefreshed[serviceName] = true
}
close(toDescribe)
<-done
c.describeServices(toDescribe)
return servicesRefreshed
}
func (c ecsClientImpl) describeAllServices(servicesRefreshed map[string]bool) {
serviceNamesChan := c.listServices()
toDescribe, done := c.describeServices()
go func() {
for serviceName := range serviceNamesChan {
if !servicesRefreshed[serviceName] {
toDescribe <- serviceName
servicesRefreshed[serviceName] = true
}
toDescribe := []string{}
for _, serviceName := range c.listServices() {
if !servicesRefreshed[serviceName] {
toDescribe = append(toDescribe, serviceName)
servicesRefreshed[serviceName] = true
}
close(toDescribe)
}()
<-done
}
c.describeServices(toDescribe)
}
func (c ecsClientImpl) makeECSInfo(taskARNs []string, taskServiceMap map[string]string) EcsInfo {