Files
kubeshark/tap/passive_tapper.go
Nimrod Gilboa Markevich a5c35d7d90 Update tap targets over ws (#901)
Update tappers via websocket instead of by env var. This way the DaemonSet doesn't have to be applied just to notify the tappers that the tap targets changed. The number of tapper restarts is reduced. The DaemonSet still gets applied when there is a need to add/remove a tapper from a node.
2022-03-23 13:50:33 +02:00

269 lines
8.1 KiB
Go

// Copyright 2012 Google, Inc. All rights reserved.
//
// Use of this source code is governed by a BSD-style license
// that can be found in the LICENSE file in the root of the source
// tree.
// The pcapdump binary implements a tcpdump-like command line tool with gopacket
// using pcap as a backend data collection mechanism.
package tap
import (
"encoding/json"
"flag"
"fmt"
"os"
"runtime"
"strings"
"time"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap/api"
"github.com/up9inc/mizu/tap/diagnose"
"github.com/up9inc/mizu/tap/source"
"github.com/up9inc/mizu/tap/tlstapper"
v1 "k8s.io/api/core/v1"
)
const cleanPeriod = time.Second * 10
//lint:ignore U1000 will be used in the future
var remoteOnlyOutboundPorts = []int{80, 443}
var maxcount = flag.Int64("c", -1, "Only grab this many packets, then exit")
var decoder = flag.String("decoder", "", "Name of the decoder to use (default: guess from capture)")
var statsevery = flag.Int("stats", 60, "Output statistics every N seconds")
var lazy = flag.Bool("lazy", false, "If true, do lazy decoding")
var nodefrag = flag.Bool("nodefrag", false, "If true, do not do IPv4 defrag")
var checksum = flag.Bool("checksum", false, "Check TCP checksum") // global
var nooptcheck = flag.Bool("nooptcheck", true, "Do not check TCP options (useful to ignore MSS on captures with TSO)") // global
var ignorefsmerr = flag.Bool("ignorefsmerr", true, "Ignore TCP FSM errors") // global
var allowmissinginit = flag.Bool("allowmissinginit", true, "Support streams without SYN/SYN+ACK/ACK sequence") // global
var verbose = flag.Bool("verbose", false, "Be verbose")
var debug = flag.Bool("debug", false, "Display debug information")
var quiet = flag.Bool("quiet", false, "Be quiet regarding errors")
var hexdumppkt = flag.Bool("dumppkt", false, "Dump packet as hex")
var procfs = flag.String("procfs", "/proc", "The procfs directory, used when mapping host volumes into a container")
// capture
var iface = flag.String("i", "en0", "Interface to read packets from")
var fname = flag.String("r", "", "Filename to read from, overrides -i")
var snaplen = flag.Int("s", 65536, "Snap length (number of bytes max to read per packet")
var tstype = flag.String("timestamp_type", "", "Type of timestamps to use")
var promisc = flag.Bool("promisc", true, "Set promiscuous mode")
var staleTimeoutSeconds = flag.Int("staletimout", 120, "Max time in seconds to keep connections which don't transmit data")
var servicemesh = flag.Bool("servicemesh", false, "Record decrypted traffic if the cluster is configured with a service mesh and with mtls")
var tls = flag.Bool("tls", false, "Enable TLS tapper")
var memprofile = flag.String("memprofile", "", "Write memory profile")
type TapOpts struct {
HostMode bool
}
var extensions []*api.Extension // global
var filteringOptions *api.TrafficFilteringOptions // global
var tapTargets []v1.Pod // global
var packetSourceManager *source.PacketSourceManager // global
var mainPacketInputChan chan source.TcpPacketInfo // global
func inArrayInt(arr []int, valueToCheck int) bool {
for _, value := range arr {
if value == valueToCheck {
return true
}
}
return false
}
func inArrayString(arr []string, valueToCheck string) bool {
for _, value := range arr {
if value == valueToCheck {
return true
}
}
return false
}
func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem, extensionsRef []*api.Extension, options *api.TrafficFilteringOptions) {
extensions = extensionsRef
filteringOptions = options
if *tls {
for _, e := range extensions {
if e.Protocol.Name == "http" {
startTlsTapper(e, outputItems, options)
break
}
}
}
if GetMemoryProfilingEnabled() {
diagnose.StartMemoryProfiler(os.Getenv(MemoryProfilingDumpPath), os.Getenv(MemoryProfilingTimeIntervalSeconds))
}
go startPassiveTapper(opts, outputItems)
}
func UpdateTapTargets(newTapTargets []v1.Pod) {
tapTargets = newTapTargets
if err := initializePacketSources(); err != nil {
logger.Log.Fatal(err)
}
printNewTapTargets()
}
func printNewTapTargets() {
printStr := ""
for _, tapTarget := range tapTargets {
printStr += fmt.Sprintf("%s (%s), ", tapTarget.Status.PodIP, tapTarget.Name)
}
printStr = strings.TrimRight(printStr, ", ")
logger.Log.Infof("Now tapping: %s", printStr)
}
func printPeriodicStats(cleaner *Cleaner) {
statsPeriod := time.Second * time.Duration(*statsevery)
ticker := time.NewTicker(statsPeriod)
for {
<-ticker.C
// Since the start
errorMapLen, errorsSummery := diagnose.TapErrors.GetErrorsSummary()
logger.Log.Infof("%v (errors: %v, errTypes:%v) - Errors Summary: %s",
time.Since(diagnose.AppStats.StartTime),
diagnose.TapErrors.ErrorsCount,
errorMapLen,
errorsSummery,
)
// At this moment
memStats := runtime.MemStats{}
runtime.ReadMemStats(&memStats)
logger.Log.Infof(
"mem: %d, goroutines: %d",
memStats.HeapAlloc,
runtime.NumGoroutine(),
)
// Since the last print
cleanStats := cleaner.dumpStats()
logger.Log.Infof(
"cleaner - flushed connections: %d, closed connections: %d, deleted messages: %d",
cleanStats.flushed,
cleanStats.closed,
cleanStats.deleted,
)
currentAppStats := diagnose.AppStats.DumpStats()
appStatsJSON, _ := json.Marshal(currentAppStats)
logger.Log.Infof("app stats - %v", string(appStatsJSON))
}
}
func initializePacketSources() error {
if packetSourceManager != nil {
packetSourceManager.Close()
}
var bpffilter string
if len(flag.Args()) > 0 {
bpffilter = strings.Join(flag.Args(), " ")
}
behaviour := source.TcpPacketSourceBehaviour{
SnapLength: *snaplen,
Promisc: *promisc,
Tstype: *tstype,
DecoderName: *decoder,
Lazy: *lazy,
BpfFilter: bpffilter,
}
var err error
if packetSourceManager, err = source.NewPacketSourceManager(*procfs, *fname, *iface, *servicemesh, tapTargets, behaviour); err != nil {
return err
} else {
packetSourceManager.ReadPackets(!*nodefrag, mainPacketInputChan)
return nil
}
}
func startPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem) {
streamsMap := NewTcpStreamMap()
go streamsMap.closeTimedoutTcpStreamChannels()
diagnose.InitializeErrorsMap(*debug, *verbose, *quiet)
diagnose.InitializeTapperInternalStats()
mainPacketInputChan = make(chan source.TcpPacketInfo)
if err := initializePacketSources(); err != nil {
logger.Log.Fatal(err)
}
assembler := NewTcpAssembler(outputItems, streamsMap, opts)
diagnose.AppStats.SetStartTime(time.Now())
staleConnectionTimeout := time.Second * time.Duration(*staleTimeoutSeconds)
cleaner := Cleaner{
assembler: assembler.Assembler,
assemblerMutex: &assembler.assemblerMutex,
cleanPeriod: cleanPeriod,
connectionTimeout: staleConnectionTimeout,
streamsMap: streamsMap,
}
cleaner.start()
go printPeriodicStats(&cleaner)
assembler.processPackets(*hexdumppkt, mainPacketInputChan)
if diagnose.TapErrors.OutputLevel >= 2 {
assembler.dumpStreamPool()
}
if err := diagnose.DumpMemoryProfile(*memprofile); err != nil {
logger.Log.Errorf("Error dumping memory profile %v", err)
}
assembler.waitAndDump()
diagnose.InternalStats.PrintStatsSummary()
diagnose.TapErrors.PrintSummary()
logger.Log.Infof("AppStats: %v", diagnose.AppStats)
}
func startTlsTapper(extension *api.Extension, outputItems chan *api.OutputChannelItem, options *api.TrafficFilteringOptions) {
tls := tlstapper.TlsTapper{}
tlsPerfBufferSize := os.Getpagesize() * 100
if err := tls.Init(tlsPerfBufferSize, *procfs, extension); err != nil {
tlstapper.LogError(err)
return
}
// A quick way to instrument libssl.so without PID filtering - used for debuging and troubleshooting
//
if os.Getenv("MIZU_GLOBAL_SSL_LIBRARY") != "" {
if err := tls.GlobalTap(os.Getenv("MIZU_GLOBAL_SSL_LIBRARY")); err != nil {
tlstapper.LogError(err)
return
}
}
if err := tlstapper.UpdateTapTargets(&tls, &tapTargets, *procfs); err != nil {
tlstapper.LogError(err)
return
}
var emitter api.Emitter = &api.Emitting{
AppStats: &diagnose.AppStats,
OutputChannel: outputItems,
}
go tls.Poll(emitter, options)
}