ecs client: more refactoring for nice code

pulls the inner function of describeServices into its own top-level function,
makes the lock part of the client object as a result
This commit is contained in:
Mike Lang
2016-12-14 16:42:15 -08:00
parent adb6f9d4a1
commit 0fb74d6781

View File

@@ -16,8 +16,11 @@ import (
type ecsClient struct {
client *ecs.ECS
cluster string
taskCache map[string]ecsTask // keys are task ARNs
serviceCache map[string]ecsService // keys are service names
taskCache map[string]ecsTask // Keys are task ARNs.
serviceCache map[string]ecsService // Keys are service names.
// Governs write access to serviceCache. Note care must still be taken
// that no-one is also trying to read.
serviceCacheLock *sync.Mutex
}
// Since we're caching tasks heavily, we ensure no mistakes by casting into a structure
@@ -70,10 +73,11 @@ func newClient(cluster string) (*ecsClient, error) {
}
return &ecsClient{
client: ecs.New(sess, &aws.Config{Region: aws.String(region)}),
cluster: cluster,
taskCache: map[string]ecsTask{},
serviceCache: map[string]ecsService{},
client: ecs.New(sess, &aws.Config{Region: aws.String(region)}),
cluster: cluster,
taskCache: map[string]ecsTask{},
serviceCache: map[string]ecsService{},
serviceCacheLock: &sync.Mutex{},
}, nil
}
@@ -92,9 +96,9 @@ func newECSTask(task *ecs.Task) ecsTask {
func newECSService(service *ecs.Service) ecsService {
now := time.Now()
deploymentIDs := make([]string, 0, len(service.Deployments))
for _, deployment := range service.Deployments {
deploymentIDs = append(deploymentIDs, *deployment.Id)
deploymentIDs := make([]string, len(service.Deployments))
for i, deployment := range service.Deployments {
deploymentIDs[i] = *deployment.Id
}
return ecsService{
serviceName: *service.ServiceName,
@@ -136,62 +140,37 @@ func (c ecsClient) listServices() <-chan string {
// Returns (input, done) channels. Service ARNs given to input are batched and details are fetched,
// with full ecsService objects being put into the cache. Closes done when finished.
func (c ecsClient) describeServices() (chan<- string, <-chan bool) {
func (c ecsClient) describeServices() (chan<- string, <-chan struct{}) {
input := make(chan string)
done := make(chan bool)
done := make(chan struct{})
log.Debugf("Describing ECS services")
go func() {
const maxServices = 10 // How many services we can put in one Describe command
group := sync.WaitGroup{}
lock := sync.Mutex{} // mediates access to the service cache when writing results
describePage := func(arns []string) {
defer group.Done()
// count and calls is just for logging
count := 0
calls := 0
arnPtrs := make([]*string, 0, len(arns))
for i := range arns {
arnPtrs = append(arnPtrs, &arns[i])
}
resp, err := c.client.DescribeServices(&ecs.DescribeServicesInput{
Cluster: &c.cluster,
Services: arnPtrs,
})
if err != nil {
log.Warnf("Error describing some ECS services, ECS service report may be incomplete: %v", err)
return
}
for _, failure := range resp.Failures {
log.Warnf("Failed to describe ECS service %s, ECS service report may be incomplete: %s", *failure.Arn, failure.Reason)
}
lock.Lock()
for _, service := range resp.Services {
c.serviceCache[*service.ServiceName] = newECSService(service)
}
lock.Unlock()
}
count := 0 // this is just for logging
calls := 0 // this is just for logging
page := make([]string, 0, maxServices)
batch := make([]string, 0, maxServices)
for arn := range input {
page = append(page, arn)
if len(page) == maxServices {
batch = append(batch, arn)
if len(batch) == maxServices {
group.Add(1)
go describePage(page)
count += len(page)
go func(arns []string) {
defer group.Done()
c.describeServicesBatch(arns)
}(batch)
count += len(batch)
calls++
page = make([]string, 0, maxServices)
batch = make([]string, 0, maxServices)
}
}
if len(page) > 0 {
group.Add(1)
go describePage(page)
count += len(page)
if len(batch) > 0 {
c.describeServicesBatch(batch)
count += len(batch)
calls++
}
@@ -203,6 +182,32 @@ func (c ecsClient) describeServices() (chan<- string, <-chan bool) {
return input, done
}
func (c ecsClient) describeServicesBatch(arns []string) {
arnPtrs := make([]*string, 0, len(arns))
for i := range arns {
arnPtrs = append(arnPtrs, &arns[i])
}
resp, err := c.client.DescribeServices(&ecs.DescribeServicesInput{
Cluster: &c.cluster,
Services: arnPtrs,
})
if err != nil {
log.Warnf("Error describing some ECS services, ECS service report may be incomplete: %v", err)
return
}
for _, failure := range resp.Failures {
log.Warnf("Failed to describe ECS service %s, ECS service report may be incomplete: %s", *failure.Arn, failure.Reason)
}
c.serviceCacheLock.Lock()
for _, service := range resp.Services {
c.serviceCache[*service.ServiceName] = newECSService(service)
}
c.serviceCacheLock.Unlock()
}
// get details on given tasks, updating cache with the results
func (c ecsClient) getTasks(taskARNs []string) {
log.Debugf("Describing %d ECS tasks", len(taskARNs))