From 511f6dad6a9e85aef68cea45b3503259b6e9885b Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Tue, 22 Nov 2016 15:12:57 -0800 Subject: [PATCH] Add report tagger for populating ECS topologies --- probe/awsecs/client.go | 138 +++++++++++++++++++++++++++++++++++++++ probe/awsecs/reporter.go | 118 +++++++++++++++++++++++++++++++++ 2 files changed, 256 insertions(+) create mode 100644 probe/awsecs/client.go create mode 100644 probe/awsecs/reporter.go diff --git a/probe/awsecs/client.go b/probe/awsecs/client.go new file mode 100644 index 000000000..bf5b1a38d --- /dev/null +++ b/probe/awsecs/client.go @@ -0,0 +1,138 @@ +package awsecs + +import ( + "sync" + + log "github.com/Sirupsen/logrus" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/ec2metadata" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/ecs" +) + +// a wrapper around an AWS client that makes all the needed calls and just exposes the final results +type ecsClient struct { + client *ecs.ECS + cluster string +} + +func newClient(cluster string) (*ecsClient, error) { + sess := session.New() + + region, err := ec2metadata.New(sess).Region() + if err != nil { + return nil, err + } + + return &ecsClient{ + client: ecs.New(sess, &aws.Config{Region: aws.String(region)}), + cluster: cluster, + }, nil +} + +// returns a map from deployment ids to service names +// cannot fail as it will attempt to deliver partial results, though that may end up being no results +func (c ecsClient) getDeploymentMap() map[string]string { + results := make(map[string]string) + lock := sync.Mutex{} // lock mediates access to results + + group := sync.WaitGroup{} + + err := c.client.ListServicesPages( + &ecs.ListServicesInput{Cluster: &c.cluster}, + func(page *ecs.ListServicesOutput, lastPage bool) bool { + // describe each page of 10 (the max for one describe command) concurrently + group.Add(1) + go func() { + defer group.Done() + + resp, err := c.client.DescribeServices(&ecs.DescribeServicesInput{ + Cluster: &c.cluster, + Services: page.ServiceArns, + }) + if err != nil { + // rather than trying to propogate errors up, just log a warning here + log.Warnf("Error describing some ECS services, ECS service report may be incomplete: %v", err) + return + } + + for _, failure := range resp.Failures { + // log the failures but still continue with what succeeded + log.Warnf("Failed to describe ECS service %s, ECS service report may be incomplete: %s", failure.Arn, failure.Reason) + } + + lock.Lock() + for _, service := range resp.Services { + for _, deployment := range service.Deployments { + results[*deployment.Id] = *service.ServiceName + } + } + lock.Unlock() + }() + return true + }, + ) + group.Wait() + + if err != nil { + // We want to still return partial results if we have any, so just log a warning + log.Warnf("Error listing ECS services, ECS service report may be incomplete: %v", err) + } + return results +} + +// returns a map from task ARNs to deployment ids +func (c ecsClient) getTaskDeployments(taskArns []string) (map[string]string, error) { + taskPtrs := make([]*string, len(taskArns)) + for i := range taskArns { + taskPtrs[i] = &taskArns[i] + } + + // You'd think there's a limit on how many tasks can be described here, + // but the docs don't mention anything. + resp, err := c.client.DescribeTasks(&ecs.DescribeTasksInput{ + Cluster: &c.cluster, + Tasks: taskPtrs, + }) + if err != nil { + return nil, err + } + + for _, failure := range resp.Failures { + // log the failures but still continue with what succeeded + log.Warnf("Failed to describe ECS task %s, ECS service report may be incomplete: %s", failure.Arn, failure.Reason) + } + + results := make(map[string]string) + for _, task := range resp.Tasks { + results[*task.TaskArn] = *task.StartedBy + } + return results, nil +} + +// returns a map from task ARNs to service names +func (c ecsClient) getTaskServices(taskArns []string) (map[string]string, error) { + deploymentMapChan := make(chan map[string]string) + go func() { + deploymentMapChan <- c.getDeploymentMap() + }() + + // do these two fetches in parallel + taskDeployments, err := c.getTaskDeployments(taskArns) + deploymentMap := <-deploymentMapChan + + if err != nil { + return nil, err + } + + results := make(map[string]string) + for taskArn, depID := range taskDeployments { + // Note not all tasks map to a deployment, or we could otherwise mismatch due to races. + // It's safe to just ignore all these cases and consider them "non-service" tasks. + if service, ok := deploymentMap[depID]; ok { + results[taskArn] = service + } + } + + return results, nil +} diff --git a/probe/awsecs/reporter.go b/probe/awsecs/reporter.go new file mode 100644 index 000000000..3c7ef70f7 --- /dev/null +++ b/probe/awsecs/reporter.go @@ -0,0 +1,118 @@ +package awsecs + +import ( + "fmt" + + log "github.com/Sirupsen/logrus" + "github.com/weaveworks/scope/probe/docker" + "github.com/weaveworks/scope/report" +) + +type taskInfo struct { + containerIDs []string + family string +} + +// return map from cluster to map of task arns to task infos +func getLabelInfo(rpt report.Report) map[string]map[string]*taskInfo { + results := make(map[string]map[string]*taskInfo) + log.Debug("scanning for ECS containers") + for nodeID, node := range rpt.Container.Nodes { + + taskArn, taskArnOk := node.Latest.Lookup(docker.LabelPrefix + "com.amazonaws.ecs.task-arn") + cluster, clusterOk := node.Latest.Lookup(docker.LabelPrefix + "com.amazonaws.ecs.cluster") + family, familyOk := node.Latest.Lookup(docker.LabelPrefix + "com.amazonaws.ecs.task-definition-family") + + if taskArnOk && clusterOk && familyOk { + taskMap, ok := results[cluster] + if !ok { + taskMap = make(map[string]*taskInfo) + results[cluster] = taskMap + } + + task, ok := taskMap[taskArn] + if !ok { + task = &taskInfo{containerIDs: make([]string, 0), family: family} + taskMap[taskArn] = task + } + + task.containerIDs = append(task.containerIDs, nodeID) + } + } + log.Debug("Got ECS container info: %v", results) + return results +} + +// implements Tagger +type Reporter struct { +} + +// Tag needed for Tagger +func (r Reporter) Tag(rpt report.Report) (report.Report, error) { + rpt = rpt.Copy() + + clusterMap := getLabelInfo(rpt) + + for cluster, taskMap := range clusterMap { + log.Debugf("Fetching ECS info for cluster %v with %v tasks", cluster, len(taskMap)) + + client, err := newClient(cluster) + if err != nil { + return rpt, err + } + + taskArns := make([]string, 0, len(taskMap)) + for taskArn := range taskMap { + taskArns = append(taskArns, taskArn) + } + + taskServices, err := client.getTaskServices(taskArns) + if err != nil { + return rpt, err + } + + // Create all the services first + unique := make(map[string]bool) + for _, serviceName := range taskServices { + if !unique[serviceName] { + rpt.ECSService = rpt.ECSService.AddNode(report.MakeNode(serviceNodeID(serviceName))) + unique[serviceName] = true + } + } + log.Debugf("Created %v ECS service nodes", len(taskServices)) + + for taskArn, info := range taskMap { + + // new task node + node := report.MakeNodeWith(taskNodeID(taskArn), map[string]string{"family": info.family}) + + rpt.ECSTask = rpt.ECSTask.AddNode(node) + + for _, containerID := range info.containerIDs { + // TODO set task node as parent of container + log.Debugf("task %v has container %v", taskArn, containerID) + } + + if serviceName, ok := taskServices[taskArn]; ok { + // TODO set service node as parent of task node + log.Debugf("service %v has task %v", serviceName, taskArn) + } + } + + } + + return rpt, nil +} + +// Name needed for Tagger +func (r Reporter) Name() string { + return "awsecs" +} + +func serviceNodeID(id string) string { + return fmt.Sprintf("%s;ECSService", id) +} + +func taskNodeID(id string) string { + return fmt.Sprintf("%s;ECSTask", id) +}