From 0fb74d6781516a005eeebee75de9063bc1d24cda Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Wed, 14 Dec 2016 16:42:15 -0800 Subject: [PATCH] ecs client: more refactoring for nice code pulls the inner function of describeServices into its own top-level function, makes the lock part of the client object as a result --- probe/awsecs/client.go | 107 +++++++++++++++++++++-------------------- 1 file changed, 56 insertions(+), 51 deletions(-) diff --git a/probe/awsecs/client.go b/probe/awsecs/client.go index 54fb51531..cb1eeb9de 100644 --- a/probe/awsecs/client.go +++ b/probe/awsecs/client.go @@ -16,8 +16,11 @@ import ( type ecsClient struct { client *ecs.ECS cluster string - taskCache map[string]ecsTask // keys are task ARNs - serviceCache map[string]ecsService // keys are service names + taskCache map[string]ecsTask // Keys are task ARNs. + serviceCache map[string]ecsService // Keys are service names. + // Governs write access to serviceCache. Note care must still be taken + // that no-one is also trying to read. + serviceCacheLock *sync.Mutex } // Since we're caching tasks heavily, we ensure no mistakes by casting into a structure @@ -70,10 +73,11 @@ func newClient(cluster string) (*ecsClient, error) { } return &ecsClient{ - client: ecs.New(sess, &aws.Config{Region: aws.String(region)}), - cluster: cluster, - taskCache: map[string]ecsTask{}, - serviceCache: map[string]ecsService{}, + client: ecs.New(sess, &aws.Config{Region: aws.String(region)}), + cluster: cluster, + taskCache: map[string]ecsTask{}, + serviceCache: map[string]ecsService{}, + serviceCacheLock: &sync.Mutex{}, }, nil } @@ -92,9 +96,9 @@ func newECSTask(task *ecs.Task) ecsTask { func newECSService(service *ecs.Service) ecsService { now := time.Now() - deploymentIDs := make([]string, 0, len(service.Deployments)) - for _, deployment := range service.Deployments { - deploymentIDs = append(deploymentIDs, *deployment.Id) + deploymentIDs := make([]string, len(service.Deployments)) + for i, deployment := range service.Deployments { + deploymentIDs[i] = *deployment.Id } return ecsService{ serviceName: *service.ServiceName, @@ -136,62 +140,37 @@ func (c ecsClient) listServices() <-chan string { // Returns (input, done) channels. Service ARNs given to input are batched and details are fetched, // with full ecsService objects being put into the cache. Closes done when finished. -func (c ecsClient) describeServices() (chan<- string, <-chan bool) { +func (c ecsClient) describeServices() (chan<- string, <-chan struct{}) { input := make(chan string) - done := make(chan bool) + done := make(chan struct{}) log.Debugf("Describing ECS services") go func() { const maxServices = 10 // How many services we can put in one Describe command group := sync.WaitGroup{} - lock := sync.Mutex{} // mediates access to the service cache when writing results - describePage := func(arns []string) { - defer group.Done() + // count and calls is just for logging + count := 0 + calls := 0 - arnPtrs := make([]*string, 0, len(arns)) - for i := range arns { - arnPtrs = append(arnPtrs, &arns[i]) - } - - resp, err := c.client.DescribeServices(&ecs.DescribeServicesInput{ - Cluster: &c.cluster, - Services: arnPtrs, - }) - if err != nil { - log.Warnf("Error describing some ECS services, ECS service report may be incomplete: %v", err) - return - } - - for _, failure := range resp.Failures { - log.Warnf("Failed to describe ECS service %s, ECS service report may be incomplete: %s", *failure.Arn, failure.Reason) - } - - lock.Lock() - for _, service := range resp.Services { - c.serviceCache[*service.ServiceName] = newECSService(service) - } - lock.Unlock() - } - - count := 0 // this is just for logging - calls := 0 // this is just for logging - page := make([]string, 0, maxServices) + batch := make([]string, 0, maxServices) for arn := range input { - page = append(page, arn) - if len(page) == maxServices { + batch = append(batch, arn) + if len(batch) == maxServices { group.Add(1) - go describePage(page) - count += len(page) + go func(arns []string) { + defer group.Done() + c.describeServicesBatch(arns) + }(batch) + count += len(batch) calls++ - page = make([]string, 0, maxServices) + batch = make([]string, 0, maxServices) } } - if len(page) > 0 { - group.Add(1) - go describePage(page) - count += len(page) + if len(batch) > 0 { + c.describeServicesBatch(batch) + count += len(batch) calls++ } @@ -203,6 +182,32 @@ func (c ecsClient) describeServices() (chan<- string, <-chan bool) { return input, done } +func (c ecsClient) describeServicesBatch(arns []string) { + arnPtrs := make([]*string, 0, len(arns)) + for i := range arns { + arnPtrs = append(arnPtrs, &arns[i]) + } + + resp, err := c.client.DescribeServices(&ecs.DescribeServicesInput{ + Cluster: &c.cluster, + Services: arnPtrs, + }) + if err != nil { + log.Warnf("Error describing some ECS services, ECS service report may be incomplete: %v", err) + return + } + + for _, failure := range resp.Failures { + log.Warnf("Failed to describe ECS service %s, ECS service report may be incomplete: %s", *failure.Arn, failure.Reason) + } + + c.serviceCacheLock.Lock() + for _, service := range resp.Services { + c.serviceCache[*service.ServiceName] = newECSService(service) + } + c.serviceCacheLock.Unlock() +} + // get details on given tasks, updating cache with the results func (c ecsClient) getTasks(taskARNs []string) { log.Debugf("Describing %d ECS tasks", len(taskARNs))