aws ecs probe: Use a size and time bound LRU gcache for caching

instead of our own hand-rolled size-unbound cache
This commit is contained in:
Mike Lang
2017-01-11 18:19:24 -08:00
parent e220ae822f
commit 513977081d
2 changed files with 28 additions and 67 deletions

View File

@@ -10,6 +10,7 @@ import (
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ecs"
"github.com/bluele/gcache"
)
// A wrapper around an AWS client that makes all the needed calls and just exposes the final results.
@@ -23,11 +24,8 @@ type ecsClient interface {
type ecsClientImpl struct {
client *ecs.ECS
cluster string
taskCache map[string]ecsTask // Keys are task ARNs.
serviceCache map[string]ecsService // Keys are service names.
// Governs write access to serviceCache. Note care must still be taken
// that no-one is also trying to read.
serviceCacheLock *sync.Mutex
taskCache gcache.Cache // Keys are task ARNs.
serviceCache gcache.Cache // Keys are service names.
}
// Since we're caching tasks heavily, we ensure no mistakes by casting into a structure
@@ -41,10 +39,6 @@ type ecsTask struct {
// which we know it is because otherwise we wouldn't be looking at it.
startedAt time.Time
startedBy string // tag or deployment id
// Metadata about this cache copy
fetchedAt time.Time
lastUsedAt time.Time
}
// Services are highly mutable and so we can only cache them on a best-effort basis.
@@ -52,17 +46,12 @@ type ecsTask struct {
// but we avoid re-listing services unless we can't find a service for a task.
type ecsService struct {
serviceName string
// The following values may be stale in a cached copy
deploymentIDs []string
desiredCount int64
pendingCount int64
runningCount int64
taskDefinitionARN string
// Metadata about this cache copy
fetchedAt time.Time
lastUsedAt time.Time
}
type ecsInfo struct {
@@ -71,7 +60,7 @@ type ecsInfo struct {
taskServiceMap map[string]string
}
func newClient(cluster string) (*ecsClient, error) {
func newClient(cluster string, cacheSize int, cacheExpiry time.Duration) (*ecsClient, error) {
sess := session.New()
region, err := ec2metadata.New(sess).Region()
@@ -80,29 +69,24 @@ func newClient(cluster string) (*ecsClient, error) {
}
return &ecsClientImpl{
client: ecs.New(sess, &aws.Config{Region: aws.String(region)}),
cluster: cluster,
taskCache: map[string]ecsTask{},
serviceCache: map[string]ecsService{},
serviceCacheLock: &sync.Mutex{},
client: ecs.New(sess, &aws.Config{Region: aws.String(region)}),
cluster: cluster,
taskCache: gcache.New(cacheSize).LRU().Expiration(cacheExpiry).Build(),
serviceCache: gcache.New(cacheSize).LRU().Expiration(cacheExpiry).Build(),
}, nil
}
func newECSTask(task *ecs.Task) ecsTask {
now := time.Now()
return ecsTask{
taskARN: *task.TaskArn,
createdAt: *task.CreatedAt,
taskDefinitionARN: *task.TaskDefinitionArn,
startedAt: *task.StartedAt,
startedBy: *task.StartedBy,
fetchedAt: now,
lastUsedAt: now,
}
}
func newECSService(service *ecs.Service) ecsService {
now := time.Now()
deploymentIDs := make([]string, len(service.Deployments))
for i, deployment := range service.Deployments {
deploymentIDs[i] = *deployment.Id
@@ -114,8 +98,6 @@ func newECSService(service *ecs.Service) ecsService {
pendingCount: *service.PendingCount,
runningCount: *service.RunningCount,
taskDefinitionARN: *service.TaskDefinition,
fetchedAt: now,
lastUsedAt: now,
}
}
@@ -208,11 +190,9 @@ func (c ecsClientImpl) describeServicesBatch(arns []string) {
log.Warnf("Failed to describe ECS service %s, ECS service report may be incomplete: %s", *failure.Arn, failure.Reason)
}
c.serviceCacheLock.Lock()
for _, service := range resp.Services {
c.serviceCache[*service.ServiceName] = newECSService(service)
c.serviceCache.Set(*service.ServiceName, newECSService(service))
}
c.serviceCacheLock.Unlock()
}
// get details on given tasks, updating cache with the results
@@ -240,34 +220,10 @@ func (c ecsClientImpl) getTasks(taskARNs []string) {
}
for _, task := range resp.Tasks {
c.taskCache[*task.TaskArn] = newECSTask(task)
c.taskCache.Set(*task.TaskArn, newECSTask(task))
}
}
// Evict entries from the caches which have not been used within the eviction interval.
func (c ecsClientImpl) evictOldCacheItems() {
const evictTime = time.Minute
now := time.Now()
count := 0
for arn, task := range c.taskCache {
if now.Sub(task.lastUsedAt) > evictTime {
delete(c.taskCache, arn)
count++
}
}
log.Debugf("Evicted %d old tasks", count)
count = 0
for name, service := range c.serviceCache {
if now.Sub(service.lastUsedAt) > evictTime {
delete(c.serviceCache, name)
count++
}
}
log.Debugf("Evicted %d old services", count)
}
// Try to match a list of task ARNs to service names using cached info.
// Returns (task to service map, unmatched tasks). Ignores tasks whose startedby values
// don't appear to point to a service.
@@ -275,21 +231,29 @@ func (c ecsClientImpl) matchTasksServices(taskARNs []string) (map[string]string,
const servicePrefix = "ecs-svc"
deploymentMap := map[string]string{}
for serviceName, service := range c.serviceCache {
for _, serviceNameRaw := range c.serviceCache.Keys() {
serviceRaw, err := c.serviceCache.Get(serviceNameRaw)
if err != nil {
// This is rare, but possible if service was evicted after the loop began
continue
}
serviceName := serviceNameRaw.(string)
service := serviceRaw.(ecsService)
for _, deployment := range service.deploymentIDs {
deploymentMap[deployment] = serviceName
}
}
log.Debugf("Mapped %d deployments from %d services", len(deploymentMap), len(c.serviceCache))
log.Debugf("Mapped %d deployments from %d services", len(deploymentMap), c.serviceCache.Len())
results := map[string]string{}
unmatched := []string{}
for _, taskARN := range taskARNs {
task, ok := c.taskCache[taskARN]
if !ok {
taskRaw, err := c.taskCache.Get(taskARN)
if err != nil {
// this can happen if we have a failure while describing tasks, just pretend the task doesn't exist
continue
}
task := taskRaw.(ecsTask)
if !strings.HasPrefix(task.startedBy, servicePrefix) {
// task was not started by a service
continue
@@ -307,11 +271,8 @@ func (c ecsClientImpl) matchTasksServices(taskARNs []string) (map[string]string,
func (c ecsClientImpl) ensureTasks(taskARNs []string) {
tasksToFetch := []string{}
now := time.Now()
for _, taskARN := range taskARNs {
if task, ok := c.taskCache[taskARN]; ok {
task.lastUsedAt = now
} else {
if _, err := c.taskCache.Get(taskARN); err != nil {
tasksToFetch = append(tasksToFetch, taskARN)
}
}
@@ -357,7 +318,8 @@ func (c ecsClientImpl) makeECSInfo(taskARNs []string, taskServiceMap map[string]
for _, taskARN := range taskARNs {
// It's possible that tasks could still be missing from the cache if describe tasks failed.
// We'll just pretend they don't exist.
if task, ok := c.taskCache[taskARN]; ok {
if taskRaw, err := c.taskCache.Get(taskARN); err == nil {
task := taskRaw.(ecsTask)
tasks[taskARN] = task
}
}
@@ -368,7 +330,8 @@ func (c ecsClientImpl) makeECSInfo(taskARNs []string, taskServiceMap map[string]
// Already present. This is expected since multiple tasks can map to the same service.
continue
}
if service, ok := c.serviceCache[serviceName]; ok {
if serviceRaw, err := c.serviceCache.Get(serviceName); err == nil {
service := serviceRaw.(ecsService)
services[serviceName] = service
} else {
log.Errorf("Service %s referenced by task %s in service map but not found in cache, this shouldn't be able to happen. Removing task and continuing.", serviceName, taskARN)
@@ -416,7 +379,5 @@ func (c ecsClientImpl) getInfo(taskARNs []string) ecsInfo {
info := c.makeECSInfo(taskARNs, taskServiceMap)
c.evictOldCacheItems()
return info
}

View File

@@ -101,7 +101,7 @@ func (r Reporter) Tag(rpt report.Report) (report.Report, error) {
if !ok {
log.Debugf("Creating new ECS client")
var err error
client, err = newClient(cluster)
client, err = newClient(cluster, 1e6, time.Hour) // TODO remove these temporary magic values
if err != nil {
return rpt, err
}