Allow disabling controls in probes

This commit is contained in:
Alfonso Acosta
2016-07-01 14:40:20 +00:00
parent 682edd37e4
commit 6f1e52cd0d
9 changed files with 47 additions and 19 deletions

View File

@@ -43,7 +43,7 @@ func TestControl(t *testing.T) {
Value: "foo",
}
})
client, err := appclient.NewAppClient(probeConfig, ip+":"+port, ip+":"+port, controlHandler)
client, err := appclient.NewAppClient(probeConfig, ip+":"+port, ip+":"+port, controlHandler, false)
if err != nil {
t.Fatal(err)
}

View File

@@ -83,7 +83,7 @@ func TestPipeClose(t *testing.T) {
probeConfig := appclient.ProbeConfig{
ProbeID: "foo",
}
client, err := appclient.NewAppClient(probeConfig, ip+":"+port, ip+":"+port, nil)
client, err := appclient.NewAppClient(probeConfig, ip+":"+port, ip+":"+port, nil, false)
if err != nil {
t.Fatal(err)
}

View File

@@ -57,11 +57,12 @@ type appClient struct {
readers chan io.Reader
// For controls
control xfer.ControlHandler
noControls bool
control xfer.ControlHandler
}
// NewAppClient makes a new appClient.
func NewAppClient(pc ProbeConfig, hostname, target string, control xfer.ControlHandler) (AppClient, error) {
func NewAppClient(pc ProbeConfig, hostname, target string, control xfer.ControlHandler, noControls bool) (AppClient, error) {
httpTransport, err := pc.getHTTPTransport(hostname)
if err != nil {
return nil, err
@@ -82,9 +83,10 @@ func NewAppClient(pc ProbeConfig, hostname, target string, control xfer.ControlH
wsDialer: websocket.Dialer{
TLSClientConfig: httpTransport.TLSClientConfig,
},
conns: map[string]xfer.Websocket{},
readers: make(chan io.Reader, 2),
control: control,
conns: map[string]xfer.Websocket{},
readers: make(chan io.Reader, 2),
noControls: noControls,
control: control,
}, nil
}
@@ -231,6 +233,9 @@ func (c *appClient) controlConnection() (bool, error) {
}
func (c *appClient) ControlConnection() {
if c.noControls {
return
}
go func() {
log.Infof("Control connection to %s starting", c.target)
defer log.Infof("Control connection to %s exiting", c.target)

View File

@@ -109,14 +109,14 @@ func TestAppClientPublish(t *testing.T) {
Insecure: false,
}
p, err := NewAppClient(pc, u.Host, s.URL, nil)
p, err := NewAppClient(pc, u.Host, s.URL, nil, false)
if err != nil {
t.Fatal(err)
}
defer p.Stop()
// First few reports might be dropped as the client is spinning up.
rp := NewReportPublisher(p)
rp := NewReportPublisher(p, false)
for i := 0; i < 10; i++ {
if err := rp.Publish(rpt); err != nil {
t.Error(err)
@@ -158,7 +158,7 @@ func TestAppClientDetails(t *testing.T) {
ProbeID: "",
Insecure: false,
}
p, err := NewAppClient(pc, u.Host, s.URL, nil)
p, err := NewAppClient(pc, u.Host, s.URL, nil, false)
if err != nil {
t.Fatal(err)
}
@@ -203,12 +203,12 @@ func TestStop(t *testing.T) {
Insecure: false,
}
p, err := NewAppClient(pc, u.Host, s.URL, nil)
p, err := NewAppClient(pc, u.Host, s.URL, nil, false)
if err != nil {
t.Fatal(err)
}
rp := NewReportPublisher(p)
rp := NewReportPublisher(p, false)
// Make sure the app received our report and is stuck
for done := false; !done; {

View File

@@ -8,18 +8,37 @@ import (
// A ReportPublisher uses a buffer pool to serialise reports, which it
// then passes to a publisher
type ReportPublisher struct {
publisher Publisher
publisher Publisher
noControls bool
}
// NewReportPublisher creates a new report publisher
func NewReportPublisher(publisher Publisher) *ReportPublisher {
func NewReportPublisher(publisher Publisher, noControls bool) *ReportPublisher {
return &ReportPublisher{
publisher: publisher,
publisher: publisher,
noControls: noControls,
}
}
func removeControls(r report.Report) report.Report {
r.Endpoint.Controls = report.Controls{}
r.Process.Controls = report.Controls{}
r.Container.Controls = report.Controls{}
r.ContainerImage.Controls = report.Controls{}
r.Pod.Controls = report.Controls{}
r.Service.Controls = report.Controls{}
r.Deployment.Controls = report.Controls{}
r.ReplicaSet.Controls = report.Controls{}
r.Host.Controls = report.Controls{}
r.Overlay.Controls = report.Controls{}
return r
}
// Publish serialises and compresses a report, then passes it to a publisher
func (p *ReportPublisher) Publish(r report.Report) error {
if p.noControls {
r = removeControls(r)
}
buf := &bytes.Buffer{}
r.WriteBinary(buf)
return p.publisher.Publish(buf)

View File

@@ -68,11 +68,12 @@ type Ticker interface {
func New(
spyInterval, publishInterval time.Duration,
publisher appclient.Publisher,
noControls bool,
) *Probe {
result := &Probe{
spyInterval: spyInterval,
publishInterval: publishInterval,
publisher: appclient.NewReportPublisher(publisher),
publisher: appclient.NewReportPublisher(publisher, noControls),
quit: make(chan struct{}),
spiedReports: make(chan report.Report, reportBufferSize),
shortcutReports: make(chan report.Report, reportBufferSize),

View File

@@ -19,7 +19,7 @@ func TestApply(t *testing.T) {
endpointNode = report.MakeNodeWith(endpointNodeID, map[string]string{"5": "6"})
)
p := New(0, 0, nil)
p := New(0, 0, nil, false)
p.AddTagger(NewTopologyTagger())
r := report.MakeReport()
@@ -95,7 +95,7 @@ func TestProbe(t *testing.T) {
pub := mockPublisher{make(chan report.Report, 10)}
p := New(10*time.Millisecond, 100*time.Millisecond, pub)
p := New(10*time.Millisecond, 100*time.Millisecond, pub, false)
p.AddReporter(mockReporter{want})
p.Start()
defer p.Stop()

View File

@@ -67,6 +67,7 @@ type probeFlags struct {
logLevel string
resolver string
noApp bool
noControls bool
useConntrack bool // Use conntrack for endpoint topo
spyProcs bool // Associate endpoints with processes (must be root)
@@ -145,6 +146,7 @@ func main() {
flag.DurationVar(&flags.probe.publishInterval, "probe.publish.interval", 3*time.Second, "publish (output) interval")
flag.DurationVar(&flags.probe.spyInterval, "probe.spy.interval", time.Second, "spy (scan) interval")
flag.StringVar(&flags.probe.pluginsRoot, "probe.plugins.root", "/var/run/scope/plugins", "Root directory to search for plugins")
flag.BoolVar(&flags.probe.noControls, "probe.no-controls", false, "Disable controls (read-only mode)")
flag.BoolVar(&flags.probe.insecure, "probe.insecure", false, "(SSL) explicitly allow \"insecure\" SSL connections and transfers")
flag.StringVar(&flags.probe.resolver, "probe.resolver", "", "IP address & port of resolver to use. Default is to use system resolver.")

View File

@@ -117,6 +117,7 @@ func probeMain(flags probeFlags) {
return appclient.NewAppClient(
probeConfig, hostname, endpoint,
xfer.ControlHandlerFunc(controls.HandleControlRequest),
flags.noControls,
)
})
defer clients.Stop()
@@ -128,7 +129,7 @@ func probeMain(flags probeFlags) {
resolver := appclient.NewResolver(targets, dnsLookupFn, clients.Set)
defer resolver.Stop()
p := probe.New(flags.spyInterval, flags.publishInterval, clients)
p := probe.New(flags.spyInterval, flags.publishInterval, clients, flags.noControls)
hostReporter := host.NewReporter(hostID, hostName, probeID, version, clients)
defer hostReporter.Stop()