mirror of
https://github.com/weaveworks/scope.git
synced 2026-03-04 02:30:45 +00:00
Changes to how ECS AWS API is used to minimize API calls
Due to AWS API rate limits, we need to minimize API calls as much as possible. Our stated objectives: * for all displayed tasks and services to have up-to-date metadata * for all tasks to map to services if able My approach here: * Tasks only contain immutable fields (that we care about). We cache tasks forever. We only DescribeTasks the first time we see a new task. * We attempt to match tasks to services with what info we have. Any "referenced" services, ie. a service with at least one matching task, needs to be updated to refresh changing data. * In the event that a task doesn't match any of the (updated) services, ie. a new service entirely needs to be found, we do a full list and detail of all services (we don't re-detail ones we just refreshed). * To avoid unbounded memory usage, we evict tasks and services from the cache after 1 minute without use. This should be long enough for things like temporary failures to be glossed over. This gives us exactly one call per task, and one call per referenced service per report, which is unavoidable to maintain fresh data. Expensive "describe all" service queries are kept to only when newly-referenced services appear, which should be rare. We could make a few very minor improvements here, such as trying to refresh unreferenced but known services before doing a list query, or getting details one by one when "describing all" and stopping when all matches have been found, but I believe these would produce very minor, if any, gains in number of calls while having an unjustifiable effect on latency since we wouldn't be able to do requests as concurrently. Speaking of which, this change has a minor performance impact. Even though we're now doing less calls, we can't do them as concurrently. Old code: concurrently: describe tasks (1 call) sequentially: list services (1 call) describe services (N calls concurrently) Assuming full concurrency, total latency: 2 end-to-end calls New code (worst case): sequentially: describe tasks (1 call) describe services (N calls concurrently) list services (1 call) describe services (N calls concurrently) Assuming full concurrency, total latency: 4 end-to-end calls In practical terms, I don't expect this to matter.
This commit is contained in:
@@ -2,6 +2,7 @@ package awsecs
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
@@ -14,11 +15,49 @@ import (
|
||||
type ecsClient struct {
|
||||
client *ecs.ECS
|
||||
cluster string
|
||||
taskCache map[string]ecsTask
|
||||
serviceCache map[string]ecsService
|
||||
}
|
||||
|
||||
// Since we're caching tasks heavily, we ensure no mistakes by casting into a structure
|
||||
// that only contains immutable attributes of the resource.
|
||||
type ecsTask struct {
|
||||
taskARN string
|
||||
createdAt time.Time
|
||||
taskDefinitionARN string
|
||||
|
||||
// These started fields are immutable once set, and guarenteed to be set once the task is running,
|
||||
// which we know it is because otherwise we wouldn't be looking at it.
|
||||
startedAt time.Time
|
||||
startedBy string // tag or deployment id
|
||||
|
||||
// Metadata about this cache copy
|
||||
fetchedAt time.Time
|
||||
lastUsedAt time.Time
|
||||
}
|
||||
|
||||
// Services are highly mutable and so we can only cache them on a best-effort basis.
|
||||
// We have to refresh referenced (ie. has an associated task) services each report
|
||||
// but we avoid re-listing services unless we can't find a service for a task.
|
||||
type ecsService struct {
|
||||
serviceName string
|
||||
createdAt time.Time
|
||||
|
||||
// The following values may be stale in a cached copy
|
||||
deploymentIDs []string
|
||||
desiredCount int64
|
||||
pendingCount int64
|
||||
runningCount int64
|
||||
taskDefinitionARN string
|
||||
|
||||
// Metadata about this cache copy
|
||||
fetchedAt time.Time
|
||||
lastUsedAt time.Time
|
||||
}
|
||||
|
||||
type ecsInfo struct {
|
||||
tasks map[string]*ecs.Task
|
||||
services map[string]*ecs.Service
|
||||
tasks map[string]ecsTask
|
||||
services map[string]ecsService
|
||||
taskServiceMap map[string]string
|
||||
}
|
||||
|
||||
@@ -33,67 +72,116 @@ func newClient(cluster string) (*ecsClient, error) {
|
||||
return &ecsClient{
|
||||
client: ecs.New(sess, &aws.Config{Region: aws.String(region)}),
|
||||
cluster: cluster,
|
||||
taskCache: map[string]ecsTask{},
|
||||
serviceCache: map[string]ecsService{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// returns a map from deployment ids to service names
|
||||
func (c ecsClient) getDeploymentMap(services map[string]*ecs.Service) map[string]string {
|
||||
results := map[string]string{}
|
||||
for serviceName, service := range services {
|
||||
for _, deployment := range service.Deployments {
|
||||
results[*deployment.Id] = serviceName
|
||||
func newECSTask(task *ecs.Task) ecsTask {
|
||||
now = time.Now()
|
||||
return ecsTask{
|
||||
taskARN: *task.TaskARN,
|
||||
createdAt: *task.CreatedAt,
|
||||
taskDefinitionARN: *task.TaskDefinitionArn,
|
||||
startedAt: *task.StartedAt,
|
||||
startedBy: *task.StartedBy,
|
||||
fetchedAt: now,
|
||||
lastUsedAt: now,
|
||||
}
|
||||
}
|
||||
|
||||
func newECSService(service *ecs.Service) ecsService {
|
||||
now = time.Now()
|
||||
deploymentIDs = make([]string, 0, len(service.Deployments))
|
||||
for _, deployment := range service.Deployments {
|
||||
deploymentIDs = append(deploymentIDs, *deployment.ID)
|
||||
}
|
||||
return ecsService{
|
||||
serviceName: *service.ServiceName,
|
||||
createdAt: *service.CreatedAt,
|
||||
deploymentIDs: deploymentIDs,
|
||||
desiredCount: *service.DesiredCount,
|
||||
pendingCount: *service.PendingCount,
|
||||
runningCount: *service.RunningCount,
|
||||
taskDefinitionARN: *service.TaskDefinitionARN,
|
||||
fetchedAt: now,
|
||||
lastUsedAt: now,
|
||||
}
|
||||
}
|
||||
|
||||
// Returns a channel from which service ARNs can be read.
|
||||
// Cannot fail as it will attempt to deliver partial results, though that may end up being no results.
|
||||
func (c ecsClient) listServices() <-chan string {
|
||||
results := make(chan string)
|
||||
go func() {
|
||||
err := c.client.ListServicesPages(
|
||||
&ecs.ListServicesInput{Cluster: &c.cluster},
|
||||
func(page *ecs.ListServicesOutput, lastPage bool) bool {
|
||||
for _, arn := range page.ServiceArns {
|
||||
results <- arn
|
||||
}
|
||||
}
|
||||
)
|
||||
if err != nil {
|
||||
log.Warnf("Error listing ECS services, ECS service report may be incomplete: %v", err)
|
||||
}
|
||||
}
|
||||
close(results)
|
||||
}()
|
||||
return results
|
||||
}
|
||||
|
||||
// cannot fail as it will attempt to deliver partial results, though that may end up being no results
|
||||
func (c ecsClient) getServices() map[string]*ecs.Service {
|
||||
results := map[string]*ecs.Service{}
|
||||
lock := sync.Mutex{} // lock mediates access to results
|
||||
// Returns (input, done) channels. Service ARNs given to input are batched and details are fetched,
|
||||
// with full ecsService objects being put into the cache. Closes done when finished.
|
||||
func (c ecsClient) describeServices() (chan<- string, <-chan bool) {
|
||||
input := make(chan string)
|
||||
|
||||
group := sync.WaitGroup{}
|
||||
go func() {
|
||||
const MAX_SERVICES = 10 // How many services we can put in one Describe command
|
||||
group := sync.WaitGroup{}
|
||||
lock := sync.Mutex{} // mediates access to the service cache when writing results
|
||||
|
||||
err := c.client.ListServicesPages(
|
||||
&ecs.ListServicesInput{Cluster: &c.cluster},
|
||||
func(page *ecs.ListServicesOutput, lastPage bool) bool {
|
||||
// describe each page of 10 (the max for one describe command) concurrently
|
||||
group.Add(1)
|
||||
serviceArns := page.ServiceArns
|
||||
go func() {
|
||||
defer group.Done()
|
||||
page := make([]string, 0, MAX_SERVICES)
|
||||
for arn := range input {
|
||||
page = append(page, arn)
|
||||
if len(page) == MAX_SERVICES {
|
||||
group.Add(1)
|
||||
|
||||
resp, err := c.client.DescribeServices(&ecs.DescribeServicesInput{
|
||||
Cluster: &c.cluster,
|
||||
Services: serviceArns,
|
||||
})
|
||||
if err != nil {
|
||||
log.Warnf("Error describing some ECS services, ECS service report may be incomplete: %v", err)
|
||||
return
|
||||
}
|
||||
go func(arns []string) {
|
||||
defer group.Done()
|
||||
|
||||
for _, failure := range resp.Failures {
|
||||
log.Warnf("Failed to describe ECS service %s, ECS service report may be incomplete: %s", *failure.Arn, *failure.Reason)
|
||||
}
|
||||
resp, err := c.client.DescribeServices(&ecs.DescribeServicesInput{
|
||||
Cluster: &c.cluster,
|
||||
Services: arns,
|
||||
})
|
||||
if err != nil {
|
||||
log.Warnf("Error describing some ECS services, ECS service report may be incomplete: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
lock.Lock()
|
||||
for _, service := range resp.Services {
|
||||
results[*service.ServiceName] = service
|
||||
}
|
||||
lock.Unlock()
|
||||
}()
|
||||
return true
|
||||
},
|
||||
)
|
||||
group.Wait()
|
||||
for _, failure := range resp.Failures {
|
||||
log.Warnf("Failed to describe ECS service %s, ECS service report may be incomplete: %s", *failure.Arn, failure.Reason)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.Warnf("Error listing ECS services, ECS service report may be incomplete: %v", err)
|
||||
}
|
||||
return results
|
||||
mutex.Lock()
|
||||
for _, service := range resp.Services {
|
||||
c.serviceCache[*service.ServiceName] = newECSService(service)
|
||||
}
|
||||
mutex.Unlock()
|
||||
}(page)
|
||||
|
||||
page = make([]string, 0, MAX_SERVICES)
|
||||
}
|
||||
}
|
||||
|
||||
group.Wait()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
return input, done
|
||||
}
|
||||
|
||||
func (c ecsClient) getTasks(taskArns []string) (map[string]*ecs.Task, error) {
|
||||
// get details on given tasks, updating cache with the results
|
||||
func (c ecsClient) getTasks(taskArns []string) {
|
||||
taskPtrs := make([]*string, len(taskArns))
|
||||
for i := range taskArns {
|
||||
taskPtrs[i] = &taskArns[i]
|
||||
@@ -106,45 +194,160 @@ func (c ecsClient) getTasks(taskArns []string) (map[string]*ecs.Task, error) {
|
||||
Tasks: taskPtrs,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
log.Warnf("Failed to describe ECS tasks, ECS service report may be incomplete: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, failure := range resp.Failures {
|
||||
log.Warnf("Failed to describe ECS task %s, ECS service report may be incomplete: %s", *failure.Arn, *failure.Reason)
|
||||
}
|
||||
|
||||
results := make(map[string]*ecs.Task, len(resp.Tasks))
|
||||
for _, task := range resp.Tasks {
|
||||
results[*task.TaskArn] = task
|
||||
c.taskCache[*task.TaskArn] = newECSTask(task)
|
||||
}
|
||||
return results, nil
|
||||
}
|
||||
|
||||
// returns a map from task ARNs to service names
|
||||
func (c ecsClient) getInfo(taskArns []string) (ecsInfo, error) {
|
||||
servicesChan := make(chan map[string]*ecs.Service)
|
||||
go func() {
|
||||
servicesChan <- c.getServices()
|
||||
}()
|
||||
// Evict entries from the caches which have not been used within the eviction interval.
|
||||
func (c ecsClient) evictOldCacheItems() {
|
||||
const EVICT_TIME = time.Minute
|
||||
now = time.Now()
|
||||
|
||||
// do these two fetches in parallel
|
||||
tasks, err := c.getTasks(taskArns)
|
||||
services := <-servicesChan
|
||||
|
||||
if err != nil {
|
||||
return ecsInfo{}, err
|
||||
}
|
||||
|
||||
deploymentMap := c.getDeploymentMap(services)
|
||||
|
||||
taskServiceMap := map[string]string{}
|
||||
for taskArn, task := range tasks {
|
||||
// Note not all tasks map to a deployment, or we could otherwise mismatch due to races.
|
||||
// It's safe to just ignore all these cases and consider them "non-service" tasks.
|
||||
if serviceName, ok := deploymentMap[*task.StartedBy]; ok {
|
||||
taskServiceMap[taskArn] = serviceName
|
||||
for arn, task := range c.taskCache {
|
||||
if now - task.lastUsedAt > EVICT_TIME {
|
||||
delete(c.taskCache, arn)
|
||||
}
|
||||
}
|
||||
|
||||
return ecsInfo{services: services, tasks: tasks, taskServiceMap: taskServiceMap}, nil
|
||||
for name, service := range c.serviceCache {
|
||||
if now - service.lastUsedAt > EVICT_TIME {
|
||||
delete(c.serviceCache, name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Try to match a list of task ARNs to service names using cached info.
|
||||
// Returns (task to service map, unmatched tasks). Ignores tasks whose startedby values
|
||||
// don't appear to point to a service.
|
||||
func (c ecsClient) matchTasksServices(taskARNs []string) (map[string]string, []string) {
|
||||
const SERVICE_PREFIX = "aws-svc" // TODO confirm this
|
||||
|
||||
deploymentMap := map[string]string{}
|
||||
for serviceName, service := range c.serviceCache {
|
||||
for _, deployment := range service.DeploymentIDs {
|
||||
deploymentMap[deployment] = serviceName
|
||||
}
|
||||
}
|
||||
|
||||
results := map[string]string{}
|
||||
unmatched := []string{}
|
||||
for _, taskARN := range taskARNs {
|
||||
task, ok := c.taskCache[taskARN]
|
||||
if ! ok {
|
||||
// this can happen if we have a failure while describing tasks, just pretend the task doesn't exist
|
||||
continue
|
||||
}
|
||||
if ! strings.HasPrefix(task.startedBy, SERVICE_PREFIX) {
|
||||
// task was not started by a service
|
||||
continue
|
||||
}
|
||||
if service, ok := deploymentMap[task.startedBy]; ok {
|
||||
results[taskARN] = service.serviceName
|
||||
} else {
|
||||
unmatched = append(unmatched, taskARN)
|
||||
}
|
||||
}
|
||||
|
||||
return results, unmatched
|
||||
}
|
||||
|
||||
// Returns a ecsInfo struct containing data needed for a report.
|
||||
func (c ecsClient) getInfo(taskARNs []string) ecsInfo {
|
||||
|
||||
// We do a weird order of operations here to minimize unneeded cache refreshes.
|
||||
// First, we ensure we have all the tasks we need, and fetch the ones we don't.
|
||||
// We also mark the tasks as being used here to prevent eviction.
|
||||
tasksToFetch := []string{}
|
||||
now = Time.Now()
|
||||
for _, taskARN := range taskARNs {
|
||||
if task, ok := c.taskCache[taskARN]; ok {
|
||||
task.lastUsedAt = now
|
||||
} else {
|
||||
tasksToFetch = append(tasksToFetch, taskARN)
|
||||
}
|
||||
}
|
||||
// This might not fully succeed, but we only try once and ignore any further missing tasks.
|
||||
c.getTasks(tasksToFetch)
|
||||
|
||||
// We're going to do this matching process potentially several times, but that's ok - it's quite cheap.
|
||||
// First, we want to see how far we get with existing data, and identify the set of services
|
||||
// we'll need to refresh regardless.
|
||||
taskServiceMap, unmatched := c.matchTasksServices(taskARNs)
|
||||
|
||||
// In order to ensure service details are fresh, we need to refresh any referenced services
|
||||
toDescribe, done := describeServices()
|
||||
servicesRefreshed := map[string]bool{}
|
||||
for taskARN, serviceName := range taskServiceMap {
|
||||
if servicesRefreshed[serviceName] {
|
||||
continue
|
||||
}
|
||||
toDescribe <- serviceName
|
||||
servicesRefreshed[serviceName] = true
|
||||
}
|
||||
close(toDescribe)
|
||||
<-done
|
||||
|
||||
// In refreshing, we may have picked up any new deployment ids.
|
||||
// If we still have tasks unmatched, we try again.
|
||||
if len(unmatched) > 0 {
|
||||
taskServiceMap, unmatched = c.matchTasksServices(taskARNs)
|
||||
}
|
||||
|
||||
// If we still have tasks unmatched, we'll have to try harder. Get a list of all services and,
|
||||
// if not already refreshed, fetch them.
|
||||
if len(unmatched) > 0 {
|
||||
serviceNamesChan := listServices()
|
||||
toDescribe, done := describeServices()
|
||||
go func() {
|
||||
for serviceName := range serviceNamesChan {
|
||||
if ! servicesRefreshed[serviceName] {
|
||||
toDescribe <- serviceName
|
||||
servicesRefreshed[serviceName] = true
|
||||
}
|
||||
close(toDescribe)
|
||||
}
|
||||
}()
|
||||
<-done
|
||||
|
||||
taskServiceMap, unmatched = c.matchTasksServices(taskARNs)
|
||||
// If we still have unmatched at this point, we don't care - this may be due to partial failures,
|
||||
// race conditions, and other weirdness.
|
||||
}
|
||||
|
||||
// The maps to return are the referenced subsets of the full caches
|
||||
tasks := map[string]ecsTask{}
|
||||
for _, taskARN := range taskARNs {
|
||||
// It's possible that tasks could still be missing from the cache if describe tasks failed.
|
||||
// We'll just pretend they don't exist.
|
||||
if task, ok := c.taskCache[taskARN]; ok {
|
||||
tasks[taskARN] = task
|
||||
}
|
||||
}
|
||||
|
||||
services := map[string]ecsService{}
|
||||
for taskARN, serviceName := range taskServiceMap {
|
||||
if _, ok := taskServiceMap[serviceName]; ok {
|
||||
// Already present. This is expected since multiple tasks can map to the same service.
|
||||
continue
|
||||
}
|
||||
if service, ok := c.serviceCache[serviceName]; ok {
|
||||
services[serviceName] = service
|
||||
} else {
|
||||
log.Errorf("Service %s referenced by task %s in service map but not found in cache, this shouldn't be able to happen. Removing task and continuing.", serviceName, taskARN)
|
||||
delete(taskServiceMap, taskARN)
|
||||
}
|
||||
}
|
||||
|
||||
c.evictOldCacheItems()
|
||||
|
||||
return ecsInfo{services: services, tasks: tasks, taskServiceMap: taskServiceMap}
|
||||
}
|
||||
|
||||
@@ -99,18 +99,15 @@ func (Reporter) Tag(rpt report.Report) (report.Report, error) {
|
||||
taskArns = append(taskArns, taskArn)
|
||||
}
|
||||
|
||||
ecsInfo, err := client.getInfo(taskArns)
|
||||
if err != nil {
|
||||
return rpt, err
|
||||
}
|
||||
ecsInfo := client.getInfo(taskArns)
|
||||
|
||||
// Create all the services first
|
||||
for serviceName, service := range ecsInfo.services {
|
||||
serviceID := report.MakeECSServiceNodeID(serviceName)
|
||||
rpt.ECSService = rpt.ECSService.AddNode(report.MakeNodeWith(serviceID, map[string]string{
|
||||
Cluster: cluster,
|
||||
ServiceDesiredCount: fmt.Sprintf("%d", *service.DesiredCount),
|
||||
ServiceRunningCount: fmt.Sprintf("%d", *service.RunningCount),
|
||||
ServiceDesiredCount: fmt.Sprintf("%d", service.desiredCount),
|
||||
ServiceRunningCount: fmt.Sprintf("%d", service.runningCount),
|
||||
}))
|
||||
}
|
||||
log.Debugf("Created %v ECS service nodes", len(ecsInfo.services))
|
||||
@@ -127,7 +124,7 @@ func (Reporter) Tag(rpt report.Report) (report.Report, error) {
|
||||
node := report.MakeNodeWith(taskID, map[string]string{
|
||||
TaskFamily: info.family,
|
||||
Cluster: cluster,
|
||||
CreatedAt: task.CreatedAt.Format(time.RFC3339Nano),
|
||||
CreatedAt: task.createdAt.Format(time.RFC3339Nano),
|
||||
})
|
||||
rpt.ECSTask = rpt.ECSTask.AddNode(node)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user