Add local networks for each k8s service (#1490)

This commit is contained in:
Alfonso Acosta
2016-05-12 17:05:22 +01:00
committed by Tom Wilkie
parent 809a97d194
commit 1e63d7a23d
4 changed files with 37 additions and 5 deletions

View File

@@ -12,6 +12,7 @@ import (
"github.com/weaveworks/scope/probe"
"github.com/weaveworks/scope/probe/controls"
"github.com/weaveworks/scope/probe/docker"
"github.com/weaveworks/scope/probe/host"
"github.com/weaveworks/scope/report"
)
@@ -88,15 +89,17 @@ type Reporter struct {
pipes controls.PipeClient
probeID string
probe *probe.Probe
hostID string
}
// NewReporter makes a new Reporter
func NewReporter(client Client, pipes controls.PipeClient, probeID string, probe *probe.Probe) *Reporter {
func NewReporter(client Client, pipes controls.PipeClient, probeID string, hostID string, probe *probe.Probe) *Reporter {
reporter := &Reporter{
client: client,
pipes: pipes,
probeID: probeID,
probe: probe,
hostID: hostID,
}
reporter.registerControls()
client.WatchPods(reporter.podEvent)
@@ -180,6 +183,10 @@ func (r *Reporter) Report() (report.Report, error) {
if err != nil {
return result, err
}
hostTopology := r.hostTopology(services)
if err != nil {
return result, err
}
deploymentTopology, deployments, err := r.deploymentTopology(r.probeID)
if err != nil {
return result, err
@@ -194,6 +201,7 @@ func (r *Reporter) Report() (report.Report, error) {
}
result.Pod = result.Pod.Merge(podTopology)
result.Service = result.Service.Merge(serviceTopology)
result.Host = result.Host.Merge(hostTopology)
result.Deployment = result.Deployment.Merge(deploymentTopology)
result.ReplicaSet = result.ReplicaSet.Merge(replicaSetTopology)
return result, nil
@@ -214,6 +222,25 @@ func (r *Reporter) serviceTopology() (report.Topology, []Service, error) {
return result, services, err
}
// FIXME: Hideous hack to remove persistent-connection edges to virtual service
// IPs attributed to the internet. We add each service IP as a /32 network
// (the global service-cluster-ip-range is not exposed by the API
// server so we treat each IP as a /32 network see
// https://github.com/kubernetes/kubernetes/issues/25533).
// The right way of fixing this is performing DNAT mapping on persistent
// connections for which we don't have a robust solution
// (see https://github.com/weaveworks/scope/issues/1491)
func (r *Reporter) hostTopology(services []Service) report.Topology {
localNetworks := report.EmptyStringSet
for _, service := range services {
localNetworks = localNetworks.Add(service.ClusterIP() + "/32")
}
node := report.MakeNode(report.MakeHostNodeID(r.hostID))
node = node.WithSets(report.EmptySets.
Add(host.LocalNetworks, localNetworks))
return report.MakeTopology().AddNode(node)
}
func (r *Reporter) deploymentTopology(probeID string) (report.Topology, []Deployment, error) {
var (
result = report.MakeTopology().

View File

@@ -184,7 +184,7 @@ func TestReporter(t *testing.T) {
pod1ID := report.MakePodNodeID(pod1UID)
pod2ID := report.MakePodNodeID(pod2UID)
serviceID := report.MakeServiceNodeID(serviceUID)
rpt, _ := kubernetes.NewReporter(newMockClient(), nil, "", nil).Report()
rpt, _ := kubernetes.NewReporter(newMockClient(), nil, "", "foo", nil).Report()
// Reporter should have added the following pods
for _, pod := range []struct {
@@ -247,7 +247,7 @@ func TestTagger(t *testing.T) {
docker.LabelPrefix + "io.kubernetes.pod.uid": "123456",
}))
rpt, err := kubernetes.NewReporter(newMockClient(), nil, "", nil).Tag(rpt)
rpt, err := kubernetes.NewReporter(newMockClient(), nil, "", "", nil).Tag(rpt)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
@@ -275,7 +275,7 @@ func TestReporterGetLogs(t *testing.T) {
client := newMockClient()
pipes := mockPipeClient{}
reporter := kubernetes.NewReporter(client, pipes, "", nil)
reporter := kubernetes.NewReporter(client, pipes, "", "", nil)
// Should error on invalid IDs
{

View File

@@ -16,6 +16,7 @@ type Service interface {
Meta
GetNode() report.Node
Selector() labels.Selector
ClusterIP() string
}
type service struct {
@@ -42,3 +43,7 @@ func (s *service) GetNode() report.Node {
}
return s.MetaNode(report.MakeServiceNodeID(s.UID())).WithLatests(latest)
}
func (s *service) ClusterIP() string {
return s.Spec.ClusterIP
}

View File

@@ -160,7 +160,7 @@ func probeMain(flags probeFlags) {
if flags.kubernetesEnabled {
if client, err := kubernetes.NewClient(flags.kubernetesAPI, flags.kubernetesInterval); err == nil {
defer client.Stop()
reporter := kubernetes.NewReporter(client, clients, probeID, p)
reporter := kubernetes.NewReporter(client, clients, probeID, hostID, p)
defer reporter.Stop()
p.AddReporter(reporter)
p.AddTagger(reporter)