Add report tagger for populating ECS topologies

This commit is contained in:
Mike Lang
2016-11-22 15:12:57 -08:00
parent db23e64e9c
commit 511f6dad6a
2 changed files with 256 additions and 0 deletions

138
probe/awsecs/client.go Normal file
View File

@@ -0,0 +1,138 @@
package awsecs
import (
"sync"
log "github.com/Sirupsen/logrus"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ecs"
)
// a wrapper around an AWS client that makes all the needed calls and just exposes the final results
type ecsClient struct {
client *ecs.ECS
cluster string
}
func newClient(cluster string) (*ecsClient, error) {
sess := session.New()
region, err := ec2metadata.New(sess).Region()
if err != nil {
return nil, err
}
return &ecsClient{
client: ecs.New(sess, &aws.Config{Region: aws.String(region)}),
cluster: cluster,
}, nil
}
// returns a map from deployment ids to service names
// cannot fail as it will attempt to deliver partial results, though that may end up being no results
func (c ecsClient) getDeploymentMap() map[string]string {
results := make(map[string]string)
lock := sync.Mutex{} // lock mediates access to results
group := sync.WaitGroup{}
err := c.client.ListServicesPages(
&ecs.ListServicesInput{Cluster: &c.cluster},
func(page *ecs.ListServicesOutput, lastPage bool) bool {
// describe each page of 10 (the max for one describe command) concurrently
group.Add(1)
go func() {
defer group.Done()
resp, err := c.client.DescribeServices(&ecs.DescribeServicesInput{
Cluster: &c.cluster,
Services: page.ServiceArns,
})
if err != nil {
// rather than trying to propogate errors up, just log a warning here
log.Warnf("Error describing some ECS services, ECS service report may be incomplete: %v", err)
return
}
for _, failure := range resp.Failures {
// log the failures but still continue with what succeeded
log.Warnf("Failed to describe ECS service %s, ECS service report may be incomplete: %s", failure.Arn, failure.Reason)
}
lock.Lock()
for _, service := range resp.Services {
for _, deployment := range service.Deployments {
results[*deployment.Id] = *service.ServiceName
}
}
lock.Unlock()
}()
return true
},
)
group.Wait()
if err != nil {
// We want to still return partial results if we have any, so just log a warning
log.Warnf("Error listing ECS services, ECS service report may be incomplete: %v", err)
}
return results
}
// returns a map from task ARNs to deployment ids
func (c ecsClient) getTaskDeployments(taskArns []string) (map[string]string, error) {
taskPtrs := make([]*string, len(taskArns))
for i := range taskArns {
taskPtrs[i] = &taskArns[i]
}
// You'd think there's a limit on how many tasks can be described here,
// but the docs don't mention anything.
resp, err := c.client.DescribeTasks(&ecs.DescribeTasksInput{
Cluster: &c.cluster,
Tasks: taskPtrs,
})
if err != nil {
return nil, err
}
for _, failure := range resp.Failures {
// log the failures but still continue with what succeeded
log.Warnf("Failed to describe ECS task %s, ECS service report may be incomplete: %s", failure.Arn, failure.Reason)
}
results := make(map[string]string)
for _, task := range resp.Tasks {
results[*task.TaskArn] = *task.StartedBy
}
return results, nil
}
// returns a map from task ARNs to service names
func (c ecsClient) getTaskServices(taskArns []string) (map[string]string, error) {
deploymentMapChan := make(chan map[string]string)
go func() {
deploymentMapChan <- c.getDeploymentMap()
}()
// do these two fetches in parallel
taskDeployments, err := c.getTaskDeployments(taskArns)
deploymentMap := <-deploymentMapChan
if err != nil {
return nil, err
}
results := make(map[string]string)
for taskArn, depID := range taskDeployments {
// Note not all tasks map to a deployment, or we could otherwise mismatch due to races.
// It's safe to just ignore all these cases and consider them "non-service" tasks.
if service, ok := deploymentMap[depID]; ok {
results[taskArn] = service
}
}
return results, nil
}

118
probe/awsecs/reporter.go Normal file
View File

@@ -0,0 +1,118 @@
package awsecs
import (
"fmt"
log "github.com/Sirupsen/logrus"
"github.com/weaveworks/scope/probe/docker"
"github.com/weaveworks/scope/report"
)
type taskInfo struct {
containerIDs []string
family string
}
// return map from cluster to map of task arns to task infos
func getLabelInfo(rpt report.Report) map[string]map[string]*taskInfo {
results := make(map[string]map[string]*taskInfo)
log.Debug("scanning for ECS containers")
for nodeID, node := range rpt.Container.Nodes {
taskArn, taskArnOk := node.Latest.Lookup(docker.LabelPrefix + "com.amazonaws.ecs.task-arn")
cluster, clusterOk := node.Latest.Lookup(docker.LabelPrefix + "com.amazonaws.ecs.cluster")
family, familyOk := node.Latest.Lookup(docker.LabelPrefix + "com.amazonaws.ecs.task-definition-family")
if taskArnOk && clusterOk && familyOk {
taskMap, ok := results[cluster]
if !ok {
taskMap = make(map[string]*taskInfo)
results[cluster] = taskMap
}
task, ok := taskMap[taskArn]
if !ok {
task = &taskInfo{containerIDs: make([]string, 0), family: family}
taskMap[taskArn] = task
}
task.containerIDs = append(task.containerIDs, nodeID)
}
}
log.Debug("Got ECS container info: %v", results)
return results
}
// implements Tagger
type Reporter struct {
}
// Tag needed for Tagger
func (r Reporter) Tag(rpt report.Report) (report.Report, error) {
rpt = rpt.Copy()
clusterMap := getLabelInfo(rpt)
for cluster, taskMap := range clusterMap {
log.Debugf("Fetching ECS info for cluster %v with %v tasks", cluster, len(taskMap))
client, err := newClient(cluster)
if err != nil {
return rpt, err
}
taskArns := make([]string, 0, len(taskMap))
for taskArn := range taskMap {
taskArns = append(taskArns, taskArn)
}
taskServices, err := client.getTaskServices(taskArns)
if err != nil {
return rpt, err
}
// Create all the services first
unique := make(map[string]bool)
for _, serviceName := range taskServices {
if !unique[serviceName] {
rpt.ECSService = rpt.ECSService.AddNode(report.MakeNode(serviceNodeID(serviceName)))
unique[serviceName] = true
}
}
log.Debugf("Created %v ECS service nodes", len(taskServices))
for taskArn, info := range taskMap {
// new task node
node := report.MakeNodeWith(taskNodeID(taskArn), map[string]string{"family": info.family})
rpt.ECSTask = rpt.ECSTask.AddNode(node)
for _, containerID := range info.containerIDs {
// TODO set task node as parent of container
log.Debugf("task %v has container %v", taskArn, containerID)
}
if serviceName, ok := taskServices[taskArn]; ok {
// TODO set service node as parent of task node
log.Debugf("service %v has task %v", serviceName, taskArn)
}
}
}
return rpt, nil
}
// Name needed for Tagger
func (r Reporter) Name() string {
return "awsecs"
}
func serviceNodeID(id string) string {
return fmt.Sprintf("%s;ECSService", id)
}
func taskNodeID(id string) string {
return fmt.Sprintf("%s;ECSTask", id)
}