Add some basic metadata to ECS nodes

This commit is contained in:
Mike Lang
2016-11-28 13:32:24 -08:00
parent 9a10e9650d
commit 003ef6b4ea
2 changed files with 81 additions and 37 deletions

View File

@@ -16,6 +16,12 @@ type ecsClient struct {
cluster string
}
type ecsInfo struct {
tasks map[string]*ecs.Task
services map[string]*ecs.Service
taskServiceMap map[string]string
}
func newClient(cluster string) (*ecsClient, error) {
sess := session.New()
@@ -31,9 +37,19 @@ func newClient(cluster string) (*ecsClient, error) {
}
// returns a map from deployment ids to service names
// cannot fail as it will attempt to deliver partial results, though that may end up being no results
func (c ecsClient) getDeploymentMap() map[string]string {
func (c ecsClient) getDeploymentMap(services map[string]*ecs.Service) map[string]string {
results := map[string]string{}
for serviceName, service := range services {
for _, deployment := range service.Deployments {
results[*deployment.Id] = serviceName
}
}
return results
}
// cannot fail as it will attempt to deliver partial results, though that may end up being no results
func (c ecsClient) getServices() map[string]*ecs.Service {
results := map[string]*ecs.Service{}
lock := sync.Mutex{} // lock mediates access to results
group := sync.WaitGroup{}
@@ -62,9 +78,7 @@ func (c ecsClient) getDeploymentMap() map[string]string {
lock.Lock()
for _, service := range resp.Services {
for _, deployment := range service.Deployments {
results[*deployment.Id] = *service.ServiceName
}
results[*service.ServiceName] = service
}
lock.Unlock()
}()
@@ -79,8 +93,7 @@ func (c ecsClient) getDeploymentMap() map[string]string {
return results
}
// returns a map from task ARNs to deployment ids
func (c ecsClient) getTaskDeployments(taskArns []string) (map[string]string, error) {
func (c ecsClient) getTasks(taskArns []string) (map[string]*ecs.Task, error) {
taskPtrs := make([]*string, len(taskArns))
for i := range taskArns {
taskPtrs[i] = &taskArns[i]
@@ -100,36 +113,38 @@ func (c ecsClient) getTaskDeployments(taskArns []string) (map[string]string, err
log.Warnf("Failed to describe ECS task %s, ECS service report may be incomplete: %s", failure.Arn, failure.Reason)
}
results := make(map[string]string, len(resp.Tasks))
results := make(map[string]*ecs.Task, len(resp.Tasks))
for _, task := range resp.Tasks {
results[*task.TaskArn] = *task.StartedBy
results[*task.TaskArn] = task
}
return results, nil
}
// returns a map from task ARNs to service names
func (c ecsClient) getTaskServices(taskArns []string) (map[string]string, error) {
deploymentMapChan := make(chan map[string]string)
func (c ecsClient) getInfo(taskArns []string) (ecsInfo, error) {
servicesChan := make(chan map[string]*ecs.Service)
go func() {
deploymentMapChan <- c.getDeploymentMap()
servicesChan <- c.getServices()
}()
// do these two fetches in parallel
taskDeployments, err := c.getTaskDeployments(taskArns)
deploymentMap := <-deploymentMapChan
tasks, err := c.getTasks(taskArns)
services := <-servicesChan
if err != nil {
return nil, err
return ecsInfo{}, err
}
results := map[string]string{}
for taskArn, depID := range taskDeployments {
deploymentMap := c.getDeploymentMap(services)
taskServiceMap := map[string]string{}
for taskArn, task := range tasks {
// Note not all tasks map to a deployment, or we could otherwise mismatch due to races.
// It's safe to just ignore all these cases and consider them "non-service" tasks.
if service, ok := deploymentMap[depID]; ok {
results[taskArn] = service
if serviceName, ok := deploymentMap[*task.StartedBy]; ok {
taskServiceMap[taskArn] = serviceName
}
}
return results, nil
return ecsInfo{services: services, tasks: tasks, taskServiceMap: taskServiceMap}, nil
}

View File

@@ -1,6 +1,8 @@
package awsecs
import (
"time"
log "github.com/Sirupsen/logrus"
"github.com/weaveworks/scope/probe/docker"
"github.com/weaveworks/scope/report"
@@ -8,17 +10,35 @@ import (
// TaskFamily is the key that stores the task family of an ECS Task
const (
TaskFamily = "ecs_task_family"
Cluster = "ecs_cluster"
CreatedAt = "ecs_created_at"
TaskFamily = "ecs_task_family"
ServiceDesiredCount = "ecs_service_desired_count"
ServiceRunningCount = "ecs_service_running_count"
)
type taskInfo struct {
var (
taskMetadata = report.MetadataTemplates{
Cluster: {ID: Cluster, Label: "Cluster", From: report.FromLatest, Priority: 0},
CreatedAt: {ID: CreatedAt, Label: "Created At", From: report.FromLatest, Priority: 1, Datatype: "datetime"},
TaskFamily: {ID: TaskFamily, Label: "Family", From: report.FromLatest, Priority: 2},
}
serviceMetadata = report.MetadataTemplates{
Cluster: {ID: Cluster, Label: "Cluster", From: report.FromLatest, Priority: 0},
CreatedAt: {ID: CreatedAt, Label: "Created At", From: report.FromLatest, Priority: 1, Datatype: "datetime"},
ServiceDesiredCount: {ID: ServiceDesiredCount, Label: "Desired Task Count", From: report.FromLatest, Priority: 2, Datatype: "number"},
ServiceRunningCount: {ID: ServiceRunningCount, Label: "Running Task Count", From: report.FromLatest, Priority: 3, Datatype: "number"},
}
)
type taskLabelInfo struct {
containerIDs []string
family string
}
// return map from cluster to map of task arns to task infos
func getLabelInfo(rpt report.Report) map[string]map[string]*taskInfo {
results := map[string]map[string]*taskInfo{}
func getLabelInfo(rpt report.Report) map[string]map[string]*taskLabelInfo {
results := map[string]map[string]*taskLabelInfo{}
log.Debug("scanning for ECS containers")
for nodeID, node := range rpt.Container.Nodes {
@@ -39,13 +59,13 @@ func getLabelInfo(rpt report.Report) map[string]map[string]*taskInfo {
taskMap, ok := results[cluster]
if !ok {
taskMap = map[string]*taskInfo{}
taskMap = map[string]*taskLabelInfo{}
results[cluster] = taskMap
}
task, ok := taskMap[taskArn]
if !ok {
task = &taskInfo{containerIDs: []string{}, family: family}
task = &taskLabelInfo{containerIDs: []string{}, family: family}
taskMap[taskArn] = task
}
@@ -78,33 +98,42 @@ func (Reporter) Tag(rpt report.Report) (report.Report, error) {
taskArns = append(taskArns, taskArn)
}
taskServices, err := client.getTaskServices(taskArns)
ecsInfo, err := client.getInfo(taskArns)
if err != nil {
return rpt, err
}
// Create all the services first
unique := map[string]bool{}
for _, serviceName := range taskServices {
if !unique[serviceName] {
serviceID := report.MakeECSServiceNodeID(serviceName)
rpt.ECSService = rpt.ECSService.AddNode(report.MakeNode(serviceID))
unique[serviceName] = true
}
for serviceName, service := range ecsInfo.services {
serviceID := report.MakeECSServiceNodeID(serviceName)
rpt.ECSService = rpt.ECSService.AddNode(report.MakeNodeWith(serviceID, map[string]string{ // TODO add task metadata
Cluster: cluster,
ServiceDesiredCount: string(*service.DesiredCount),
ServiceRunningCount: string(*service.RunningCount),
}))
}
log.Debugf("Created %v ECS service nodes", len(taskServices))
log.Debugf("Created %v ECS service nodes", len(ecsInfo.services))
for taskArn, info := range taskMap {
task, ok := ecsInfo.tasks[taskArn]
if !ok {
// can happen due to partial failures, just skip it
continue
}
// new task node
taskID := report.MakeECSTaskNodeID(taskArn)
node := report.MakeNodeWith(taskID, map[string]string{TaskFamily: info.family})
node := report.MakeNodeWith(taskID, map[string]string{
TaskFamily: info.family,
Cluster: cluster,
CreatedAt: task.CreatedAt.Format(time.RFC3339Nano),
})
rpt.ECSTask = rpt.ECSTask.AddNode(node)
// parents sets to merge into all matching container nodes
parentsSets := report.MakeSets()
parentsSets = parentsSets.Add(report.ECSTask, report.MakeStringSet(taskID))
if serviceName, ok := taskServices[taskArn]; ok {
if serviceName, ok := ecsInfo.taskServiceMap[taskArn]; ok {
serviceID := report.MakeECSServiceNodeID(serviceName)
parentsSets = parentsSets.Add(report.ECSService, report.MakeStringSet(serviceID))
}