diff --git a/probe/awsecs/client.go b/probe/awsecs/client.go index b9d3a10e3..6eeccee60 100644 --- a/probe/awsecs/client.go +++ b/probe/awsecs/client.go @@ -10,6 +10,7 @@ import ( "github.com/aws/aws-sdk-go/aws/ec2metadata" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/ecs" + "github.com/bluele/gcache" ) // A wrapper around an AWS client that makes all the needed calls and just exposes the final results. @@ -23,11 +24,8 @@ type ecsClient interface { type ecsClientImpl struct { client *ecs.ECS cluster string - taskCache map[string]ecsTask // Keys are task ARNs. - serviceCache map[string]ecsService // Keys are service names. - // Governs write access to serviceCache. Note care must still be taken - // that no-one is also trying to read. - serviceCacheLock *sync.Mutex + taskCache gcache.Cache // Keys are task ARNs. + serviceCache gcache.Cache // Keys are service names. } // Since we're caching tasks heavily, we ensure no mistakes by casting into a structure @@ -41,10 +39,6 @@ type ecsTask struct { // which we know it is because otherwise we wouldn't be looking at it. startedAt time.Time startedBy string // tag or deployment id - - // Metadata about this cache copy - fetchedAt time.Time - lastUsedAt time.Time } // Services are highly mutable and so we can only cache them on a best-effort basis. @@ -52,17 +46,12 @@ type ecsTask struct { // but we avoid re-listing services unless we can't find a service for a task. type ecsService struct { serviceName string - // The following values may be stale in a cached copy deploymentIDs []string desiredCount int64 pendingCount int64 runningCount int64 taskDefinitionARN string - - // Metadata about this cache copy - fetchedAt time.Time - lastUsedAt time.Time } type ecsInfo struct { @@ -71,7 +60,7 @@ type ecsInfo struct { taskServiceMap map[string]string } -func newClient(cluster string) (*ecsClient, error) { +func newClient(cluster string, cacheSize int, cacheExpiry time.Duration) (*ecsClient, error) { sess := session.New() region, err := ec2metadata.New(sess).Region() @@ -80,29 +69,24 @@ func newClient(cluster string) (*ecsClient, error) { } return &ecsClientImpl{ - client: ecs.New(sess, &aws.Config{Region: aws.String(region)}), - cluster: cluster, - taskCache: map[string]ecsTask{}, - serviceCache: map[string]ecsService{}, - serviceCacheLock: &sync.Mutex{}, + client: ecs.New(sess, &aws.Config{Region: aws.String(region)}), + cluster: cluster, + taskCache: gcache.New(cacheSize).LRU().Expiration(cacheExpiry).Build(), + serviceCache: gcache.New(cacheSize).LRU().Expiration(cacheExpiry).Build(), }, nil } func newECSTask(task *ecs.Task) ecsTask { - now := time.Now() return ecsTask{ taskARN: *task.TaskArn, createdAt: *task.CreatedAt, taskDefinitionARN: *task.TaskDefinitionArn, startedAt: *task.StartedAt, startedBy: *task.StartedBy, - fetchedAt: now, - lastUsedAt: now, } } func newECSService(service *ecs.Service) ecsService { - now := time.Now() deploymentIDs := make([]string, len(service.Deployments)) for i, deployment := range service.Deployments { deploymentIDs[i] = *deployment.Id @@ -114,8 +98,6 @@ func newECSService(service *ecs.Service) ecsService { pendingCount: *service.PendingCount, runningCount: *service.RunningCount, taskDefinitionARN: *service.TaskDefinition, - fetchedAt: now, - lastUsedAt: now, } } @@ -208,11 +190,9 @@ func (c ecsClientImpl) describeServicesBatch(arns []string) { log.Warnf("Failed to describe ECS service %s, ECS service report may be incomplete: %s", *failure.Arn, failure.Reason) } - c.serviceCacheLock.Lock() for _, service := range resp.Services { - c.serviceCache[*service.ServiceName] = newECSService(service) + c.serviceCache.Set(*service.ServiceName, newECSService(service)) } - c.serviceCacheLock.Unlock() } // get details on given tasks, updating cache with the results @@ -240,34 +220,10 @@ func (c ecsClientImpl) getTasks(taskARNs []string) { } for _, task := range resp.Tasks { - c.taskCache[*task.TaskArn] = newECSTask(task) + c.taskCache.Set(*task.TaskArn, newECSTask(task)) } } -// Evict entries from the caches which have not been used within the eviction interval. -func (c ecsClientImpl) evictOldCacheItems() { - const evictTime = time.Minute - now := time.Now() - - count := 0 - for arn, task := range c.taskCache { - if now.Sub(task.lastUsedAt) > evictTime { - delete(c.taskCache, arn) - count++ - } - } - log.Debugf("Evicted %d old tasks", count) - - count = 0 - for name, service := range c.serviceCache { - if now.Sub(service.lastUsedAt) > evictTime { - delete(c.serviceCache, name) - count++ - } - } - log.Debugf("Evicted %d old services", count) -} - // Try to match a list of task ARNs to service names using cached info. // Returns (task to service map, unmatched tasks). Ignores tasks whose startedby values // don't appear to point to a service. @@ -275,21 +231,29 @@ func (c ecsClientImpl) matchTasksServices(taskARNs []string) (map[string]string, const servicePrefix = "ecs-svc" deploymentMap := map[string]string{} - for serviceName, service := range c.serviceCache { + for _, serviceNameRaw := range c.serviceCache.Keys() { + serviceRaw, err := c.serviceCache.Get(serviceNameRaw) + if err != nil { + // This is rare, but possible if service was evicted after the loop began + continue + } + serviceName := serviceNameRaw.(string) + service := serviceRaw.(ecsService) for _, deployment := range service.deploymentIDs { deploymentMap[deployment] = serviceName } } - log.Debugf("Mapped %d deployments from %d services", len(deploymentMap), len(c.serviceCache)) + log.Debugf("Mapped %d deployments from %d services", len(deploymentMap), c.serviceCache.Len()) results := map[string]string{} unmatched := []string{} for _, taskARN := range taskARNs { - task, ok := c.taskCache[taskARN] - if !ok { + taskRaw, err := c.taskCache.Get(taskARN) + if err != nil { // this can happen if we have a failure while describing tasks, just pretend the task doesn't exist continue } + task := taskRaw.(ecsTask) if !strings.HasPrefix(task.startedBy, servicePrefix) { // task was not started by a service continue @@ -307,11 +271,8 @@ func (c ecsClientImpl) matchTasksServices(taskARNs []string) (map[string]string, func (c ecsClientImpl) ensureTasks(taskARNs []string) { tasksToFetch := []string{} - now := time.Now() for _, taskARN := range taskARNs { - if task, ok := c.taskCache[taskARN]; ok { - task.lastUsedAt = now - } else { + if _, err := c.taskCache.Get(taskARN); err != nil { tasksToFetch = append(tasksToFetch, taskARN) } } @@ -357,7 +318,8 @@ func (c ecsClientImpl) makeECSInfo(taskARNs []string, taskServiceMap map[string] for _, taskARN := range taskARNs { // It's possible that tasks could still be missing from the cache if describe tasks failed. // We'll just pretend they don't exist. - if task, ok := c.taskCache[taskARN]; ok { + if taskRaw, err := c.taskCache.Get(taskARN); err == nil { + task := taskRaw.(ecsTask) tasks[taskARN] = task } } @@ -368,7 +330,8 @@ func (c ecsClientImpl) makeECSInfo(taskARNs []string, taskServiceMap map[string] // Already present. This is expected since multiple tasks can map to the same service. continue } - if service, ok := c.serviceCache[serviceName]; ok { + if serviceRaw, err := c.serviceCache.Get(serviceName); err == nil { + service := serviceRaw.(ecsService) services[serviceName] = service } else { log.Errorf("Service %s referenced by task %s in service map but not found in cache, this shouldn't be able to happen. Removing task and continuing.", serviceName, taskARN) @@ -416,7 +379,5 @@ func (c ecsClientImpl) getInfo(taskARNs []string) ecsInfo { info := c.makeECSInfo(taskARNs, taskServiceMap) - c.evictOldCacheItems() - return info } diff --git a/probe/awsecs/reporter.go b/probe/awsecs/reporter.go index a3199ba0b..982f5d704 100644 --- a/probe/awsecs/reporter.go +++ b/probe/awsecs/reporter.go @@ -101,7 +101,7 @@ func (r Reporter) Tag(rpt report.Report) (report.Report, error) { if !ok { log.Debugf("Creating new ECS client") var err error - client, err = newClient(cluster) + client, err = newClient(cluster, 1e6, time.Hour) // TODO remove these temporary magic values if err != nil { return rpt, err }