mirror of
https://github.com/weaveworks/scope.git
synced 2026-02-14 18:09:59 +00:00
``` $ git grep -l Sirupsen | grep -v vendor | xargs sed -i '' 's:github.com/Sirupsen/logrus:github.com/sirupsen/logrus:g' $ gofmt -s -w app $ gofmt -s -w common $ gofmt -s -w probe $ gofmt -s -w prog $ gofmt -s -w tools ```
444 lines
14 KiB
Go
444 lines
14 KiB
Go
package awsecs
|
|
|
|
import (
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/aws/aws-sdk-go/aws"
|
|
"github.com/aws/aws-sdk-go/aws/ec2metadata"
|
|
"github.com/aws/aws-sdk-go/aws/session"
|
|
"github.com/aws/aws-sdk-go/service/ecs"
|
|
"github.com/bluele/gcache"
|
|
log "github.com/sirupsen/logrus"
|
|
)
|
|
|
|
const servicePrefix = "ecs-svc" // Task StartedBy field begins with this if it was started by a service
|
|
|
|
// EcsClient is a wrapper around an AWS client that makes all the needed calls and just exposes the final results.
|
|
// We create an interface so we can mock for testing.
|
|
type EcsClient interface {
|
|
// Returns a EcsInfo struct containing data needed for a report.
|
|
GetInfo([]string) EcsInfo
|
|
// Scales a service up or down by amount
|
|
ScaleService(string, int) error
|
|
}
|
|
|
|
// actual implementation
|
|
type ecsClientImpl struct {
|
|
client *ecs.ECS
|
|
cluster string
|
|
taskCache gcache.Cache // Keys are task ARNs.
|
|
serviceCache gcache.Cache // Keys are service names.
|
|
}
|
|
|
|
// EcsTask describes the parts of ECS tasks we care about.
|
|
// Since we're caching tasks heavily, we ensure no mistakes by casting into a structure
|
|
// that only contains immutable attributes of the resource.
|
|
// Exported for test.
|
|
type EcsTask struct {
|
|
TaskARN string
|
|
CreatedAt time.Time
|
|
TaskDefinitionARN string
|
|
|
|
// These started fields are immutable once set, and guaranteed to be set once the task is running,
|
|
// which we know it is because otherwise we wouldn't be looking at it.
|
|
StartedAt time.Time
|
|
StartedBy string // tag or deployment id
|
|
}
|
|
|
|
// EcsService describes the parts of ECS services we care about.
|
|
// Services are highly mutable and so we can only cache them on a best-effort basis.
|
|
// We have to refresh referenced (ie. has an associated task) services each report
|
|
// but we avoid re-listing services unless we can't find a service for a task.
|
|
// Exported for test.
|
|
type EcsService struct {
|
|
ServiceName string
|
|
// The following values may be stale in a cached copy
|
|
DeploymentIDs []string
|
|
DesiredCount int64
|
|
PendingCount int64
|
|
RunningCount int64
|
|
TaskDefinitionARN string
|
|
}
|
|
|
|
// EcsInfo is exported for test
|
|
type EcsInfo struct {
|
|
Tasks map[string]EcsTask
|
|
Services map[string]EcsService
|
|
TaskServiceMap map[string]string
|
|
}
|
|
|
|
func newClient(cluster string, cacheSize int, cacheExpiry time.Duration, clusterRegion string) (EcsClient, error) {
|
|
sess := session.New()
|
|
var err error
|
|
|
|
if clusterRegion == "" {
|
|
clusterRegion, err = ec2metadata.New(sess).Region()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return &ecsClientImpl{
|
|
client: ecs.New(sess, &aws.Config{Region: aws.String(clusterRegion)}),
|
|
cluster: cluster,
|
|
taskCache: gcache.New(cacheSize).LRU().Expiration(cacheExpiry).Build(),
|
|
serviceCache: gcache.New(cacheSize).LRU().Expiration(cacheExpiry).Build(),
|
|
}, nil
|
|
}
|
|
|
|
// Helper functions to map from the AWS practice of storing everything
|
|
// via a pointer to the Go practice of using zero values
|
|
func stringOrBlank(s *string) string {
|
|
if s == nil {
|
|
return ""
|
|
}
|
|
return *s
|
|
}
|
|
|
|
func timeOrZero(t *time.Time) time.Time {
|
|
if t == nil {
|
|
return time.Time{}
|
|
}
|
|
return *t
|
|
}
|
|
|
|
func int64OrZero(i *int64) int64 {
|
|
if i == nil {
|
|
return 0
|
|
}
|
|
return *i
|
|
}
|
|
|
|
func newECSTask(task *ecs.Task) EcsTask {
|
|
return EcsTask{
|
|
TaskARN: stringOrBlank(task.TaskArn),
|
|
CreatedAt: timeOrZero(task.CreatedAt),
|
|
TaskDefinitionARN: stringOrBlank(task.TaskDefinitionArn),
|
|
StartedAt: timeOrZero(task.StartedAt),
|
|
StartedBy: stringOrBlank(task.StartedBy),
|
|
}
|
|
}
|
|
|
|
func newECSService(service *ecs.Service) EcsService {
|
|
deploymentIDs := make([]string, len(service.Deployments))
|
|
for i, deployment := range service.Deployments {
|
|
deploymentIDs[i] = stringOrBlank(deployment.Id)
|
|
}
|
|
return EcsService{
|
|
ServiceName: stringOrBlank(service.ServiceName),
|
|
DeploymentIDs: deploymentIDs,
|
|
DesiredCount: int64OrZero(service.DesiredCount),
|
|
PendingCount: int64OrZero(service.PendingCount),
|
|
RunningCount: int64OrZero(service.RunningCount),
|
|
TaskDefinitionARN: stringOrBlank(service.TaskDefinition),
|
|
}
|
|
}
|
|
|
|
// IsServiceManaged returns true if the task was started by a service.
|
|
func (t EcsTask) IsServiceManaged() bool {
|
|
return strings.HasPrefix(t.StartedBy, servicePrefix)
|
|
}
|
|
|
|
// Fetches a task from the cache, returning (task, ok) as per map[]
|
|
func (c ecsClientImpl) getCachedTask(taskARN string) (EcsTask, bool) {
|
|
if taskRaw, err := c.taskCache.Get(taskARN); err == nil {
|
|
return taskRaw.(EcsTask), true
|
|
}
|
|
return EcsTask{}, false
|
|
}
|
|
|
|
// Fetches a service from the cache, returning (service, ok) as per map[]
|
|
func (c ecsClientImpl) getCachedService(serviceName string) (EcsService, bool) {
|
|
if serviceRaw, err := c.serviceCache.Get(serviceName); err == nil {
|
|
return serviceRaw.(EcsService), true
|
|
}
|
|
return EcsService{}, false
|
|
}
|
|
|
|
// Returns a list of service names.
|
|
// Cannot fail as it will attempt to deliver partial results, though that may end up being no results.
|
|
func (c ecsClientImpl) listServices() []string {
|
|
log.Debugf("Listing ECS services")
|
|
results := []string{}
|
|
err := c.client.ListServicesPages(
|
|
&ecs.ListServicesInput{Cluster: &c.cluster},
|
|
func(page *ecs.ListServicesOutput, lastPage bool) bool {
|
|
if page == nil {
|
|
return true
|
|
}
|
|
for _, name := range page.ServiceArns {
|
|
if name != nil {
|
|
results = append(results, *name)
|
|
}
|
|
}
|
|
return true
|
|
},
|
|
)
|
|
if err != nil {
|
|
log.Warnf("Error listing ECS services, ECS service report may be incomplete: %v", err)
|
|
}
|
|
log.Debugf("Listed %d services", len(results))
|
|
return results
|
|
}
|
|
|
|
// Service names given are batched and details are fetched,
|
|
// with full EcsService objects being put into the cache.
|
|
// Cannot fail as it will attempt to deliver partial results.
|
|
func (c ecsClientImpl) describeServices(services []string) {
|
|
const maxServices = 10 // How many services we can put in one Describe command
|
|
group := sync.WaitGroup{}
|
|
|
|
log.Debugf("Describing ECS services")
|
|
|
|
// split into batches
|
|
batches := make([][]string, 0, len(services)/maxServices+1)
|
|
for len(services) > maxServices {
|
|
batch := services[:maxServices]
|
|
services = services[maxServices:]
|
|
batches = append(batches, batch)
|
|
}
|
|
if len(services) > 0 {
|
|
batches = append(batches, services)
|
|
}
|
|
|
|
for _, batch := range batches {
|
|
group.Add(1)
|
|
go func(names []string) {
|
|
defer group.Done()
|
|
c.describeServicesBatch(names)
|
|
}(batch)
|
|
}
|
|
|
|
group.Wait()
|
|
}
|
|
|
|
func (c ecsClientImpl) describeServicesBatch(names []string) {
|
|
namePtrs := make([]*string, 0, len(names))
|
|
for i := range names {
|
|
namePtrs = append(namePtrs, &names[i])
|
|
}
|
|
|
|
resp, err := c.client.DescribeServices(&ecs.DescribeServicesInput{
|
|
Cluster: &c.cluster,
|
|
Services: namePtrs,
|
|
})
|
|
if err != nil {
|
|
log.Warnf("Error describing some ECS services, ECS service report may be incomplete: %v", err)
|
|
return
|
|
}
|
|
|
|
for _, failure := range resp.Failures {
|
|
log.Warnf("Failed to describe ECS service %s, ECS service report may be incomplete: %s", stringOrBlank(failure.Arn), stringOrBlank(failure.Reason))
|
|
}
|
|
|
|
for _, service := range resp.Services {
|
|
if service != nil && service.ServiceName != nil {
|
|
c.serviceCache.Set(*service.ServiceName, newECSService(service))
|
|
}
|
|
}
|
|
}
|
|
|
|
// get details on given tasks, updating cache with the results
|
|
func (c ecsClientImpl) getTasks(taskARNs []string) {
|
|
log.Debugf("Describing %d ECS tasks", len(taskARNs))
|
|
|
|
taskPtrs := make([]*string, len(taskARNs))
|
|
for i := range taskARNs {
|
|
taskPtrs[i] = &taskARNs[i]
|
|
}
|
|
|
|
// You'd think there's a limit on how many tasks can be described here,
|
|
// but the docs don't mention anything.
|
|
resp, err := c.client.DescribeTasks(&ecs.DescribeTasksInput{
|
|
Cluster: &c.cluster,
|
|
Tasks: taskPtrs,
|
|
})
|
|
if err != nil {
|
|
log.Warnf("Failed to describe ECS tasks, ECS service report may be incomplete: %v", err)
|
|
return
|
|
}
|
|
|
|
for _, failure := range resp.Failures {
|
|
log.Warnf("Failed to describe ECS task %s, ECS service report may be incomplete: %s", stringOrBlank(failure.Arn), stringOrBlank(failure.Reason))
|
|
}
|
|
|
|
for _, task := range resp.Tasks {
|
|
if task != nil && task.TaskArn != nil {
|
|
c.taskCache.Set(*task.TaskArn, newECSTask(task))
|
|
}
|
|
}
|
|
}
|
|
|
|
// Try to match a list of task ARNs to service names using cached info.
|
|
// Returns (task to service map, unmatched tasks). Ignores tasks whose startedby values
|
|
// don't appear to point to a service.
|
|
func (c ecsClientImpl) matchTasksServices(taskARNs []string) (map[string]string, []string) {
|
|
deploymentMap := map[string]string{}
|
|
for _, serviceNameRaw := range c.serviceCache.Keys() {
|
|
serviceName := serviceNameRaw.(string)
|
|
service, ok := c.getCachedService(serviceName)
|
|
if !ok {
|
|
// This is rare, but possible if service was evicted after the loop began
|
|
continue
|
|
}
|
|
for _, deployment := range service.DeploymentIDs {
|
|
deploymentMap[deployment] = serviceName
|
|
}
|
|
}
|
|
log.Debugf("Mapped %d deployments from %d services", len(deploymentMap), c.serviceCache.Len())
|
|
|
|
results := map[string]string{}
|
|
unmatched := []string{}
|
|
for _, taskARN := range taskARNs {
|
|
task, ok := c.getCachedTask(taskARN)
|
|
if !ok {
|
|
// this can happen if we have a failure while describing tasks, just pretend the task doesn't exist
|
|
continue
|
|
}
|
|
if !task.IsServiceManaged() {
|
|
continue
|
|
}
|
|
if serviceName, ok := deploymentMap[task.StartedBy]; ok {
|
|
results[taskARN] = serviceName
|
|
} else {
|
|
unmatched = append(unmatched, taskARN)
|
|
}
|
|
}
|
|
|
|
log.Debugf("Matched %d from %d tasks, %d unmatched", len(results), len(taskARNs), len(unmatched))
|
|
return results, unmatched
|
|
}
|
|
|
|
func (c ecsClientImpl) ensureTasksAreCached(taskARNs []string) {
|
|
tasksToFetch := []string{}
|
|
for _, taskARN := range taskARNs {
|
|
if _, err := c.taskCache.Get(taskARN); err != nil {
|
|
tasksToFetch = append(tasksToFetch, taskARN)
|
|
}
|
|
}
|
|
if len(tasksToFetch) > 0 {
|
|
// This might not fully succeed, but we only try once and ignore any further missing tasks.
|
|
c.getTasks(tasksToFetch)
|
|
}
|
|
}
|
|
|
|
func (c ecsClientImpl) refreshServices(taskServiceMap map[string]string) map[string]bool {
|
|
servicesRefreshed := map[string]bool{}
|
|
toDescribe := []string{}
|
|
for _, serviceName := range taskServiceMap {
|
|
if servicesRefreshed[serviceName] {
|
|
continue
|
|
}
|
|
toDescribe = append(toDescribe, serviceName)
|
|
servicesRefreshed[serviceName] = true
|
|
}
|
|
c.describeServices(toDescribe)
|
|
return servicesRefreshed
|
|
}
|
|
|
|
func (c ecsClientImpl) describeAllServices(servicesRefreshed map[string]bool) {
|
|
toDescribe := []string{}
|
|
for _, serviceName := range c.listServices() {
|
|
if !servicesRefreshed[serviceName] {
|
|
toDescribe = append(toDescribe, serviceName)
|
|
servicesRefreshed[serviceName] = true
|
|
}
|
|
}
|
|
c.describeServices(toDescribe)
|
|
}
|
|
|
|
func (c ecsClientImpl) makeECSInfo(taskARNs []string, taskServiceMap map[string]string) EcsInfo {
|
|
// The maps to return are the referenced subsets of the full caches
|
|
tasks := map[string]EcsTask{}
|
|
for _, taskARN := range taskARNs {
|
|
// It's possible that tasks could still be missing from the cache if describe tasks failed.
|
|
// We'll just pretend they don't exist.
|
|
if task, ok := c.getCachedTask(taskARN); ok {
|
|
tasks[taskARN] = task
|
|
}
|
|
}
|
|
|
|
services := map[string]EcsService{}
|
|
for taskARN, serviceName := range taskServiceMap {
|
|
if _, ok := taskServiceMap[serviceName]; ok {
|
|
// Already present. This is expected since multiple tasks can map to the same service.
|
|
continue
|
|
}
|
|
if service, ok := c.getCachedService(serviceName); ok {
|
|
services[serviceName] = service
|
|
} else {
|
|
log.Errorf("Service %s referenced by task %s in service map but not found in cache, this shouldn't be able to happen. Removing task and continuing.", serviceName, taskARN)
|
|
delete(taskServiceMap, taskARN)
|
|
}
|
|
}
|
|
|
|
return EcsInfo{Services: services, Tasks: tasks, TaskServiceMap: taskServiceMap}
|
|
}
|
|
|
|
// Implements EcsClient.GetInfo
|
|
func (c ecsClientImpl) GetInfo(taskARNs []string) EcsInfo {
|
|
log.Debugf("Getting ECS info on %d tasks", len(taskARNs))
|
|
|
|
// We do a weird order of operations here to minimize unneeded cache refreshes.
|
|
// First, we ensure we have all the tasks we need, and fetch the ones we don't.
|
|
// We also mark the tasks as being used here to prevent eviction.
|
|
c.ensureTasksAreCached(taskARNs)
|
|
|
|
// We're going to do this matching process potentially several times, but that's ok - it's quite cheap.
|
|
// First, we want to see how far we get with existing data, and identify the set of services
|
|
// we'll need to refresh regardless.
|
|
taskServiceMap, unmatched := c.matchTasksServices(taskARNs)
|
|
|
|
// In order to ensure service details are fresh, we need to refresh any referenced services
|
|
log.Debugf("Refreshing ECS services")
|
|
servicesRefreshed := c.refreshServices(taskServiceMap)
|
|
|
|
// In refreshing, we may have picked up any new deployment ids.
|
|
// If we still have tasks unmatched, we try again.
|
|
if len(unmatched) > 0 {
|
|
taskServiceMap, unmatched = c.matchTasksServices(taskARNs)
|
|
}
|
|
|
|
// If we still have tasks unmatched, we'll have to try harder. Get a list of all services and,
|
|
// if not already refreshed, fetch them.
|
|
log.Debugf("After refreshing services, %d tasks unmatched", len(unmatched))
|
|
if len(unmatched) > 0 {
|
|
c.describeAllServices(servicesRefreshed)
|
|
|
|
taskServiceMap, unmatched = c.matchTasksServices(taskARNs)
|
|
// If we still have unmatched at this point, we don't care - this may be due to partial failures,
|
|
// race conditions, and other weirdness.
|
|
}
|
|
|
|
info := c.makeECSInfo(taskARNs, taskServiceMap)
|
|
|
|
return info
|
|
}
|
|
|
|
// Implements EcsClient.ScaleService
|
|
func (c ecsClientImpl) ScaleService(serviceName string, amount int) error {
|
|
// Note this is inherently racey, due to needing to get, modify, then update the DesiredCount.
|
|
|
|
// refresh service in cache
|
|
c.describeServices([]string{serviceName})
|
|
// now check the cache to see if it worked
|
|
service, ok := c.getCachedService(serviceName)
|
|
if !ok {
|
|
return fmt.Errorf("Service %s not found", serviceName)
|
|
}
|
|
|
|
newCount := service.DesiredCount + int64(amount)
|
|
if newCount < 1 {
|
|
return fmt.Errorf("Cannot reduce count below one")
|
|
}
|
|
_, err := c.client.UpdateService(&ecs.UpdateServiceInput{
|
|
Cluster: &c.cluster,
|
|
Service: &serviceName,
|
|
DesiredCount: &newCount,
|
|
})
|
|
return err
|
|
}
|