mirror of
https://github.com/kubeshark/kubeshark.git
synced 2026-05-07 01:37:30 +00:00
* Run `go generate tls_tapper.go` * Add `golang_uprobes.c` * Add Golang hooks and offsets * Add `golangConnection` struct and implement `pollGolangReadWrite` method * Upgrade `github.com/cilium/ebpf` version to `v0.8.1` * Fix the linter error * Move map related stuff to `maps.h` and run `go generate tls_tapper.go` * Remove unused parameter * Add an environment variable to test Golang locally * Replace `Libssl` occurrences with `Ssllib` for consistency * Fix exe path finding * Temporarily disable OpenSSL * Fix the mixed offsets and dissection preparation * Change the read symbol from `net/http.(*persistConn).Read` to `crypto/tls.(*Conn).Read` * Remove `len` and `cap` fields * Fix the indent * Fix the read data address * Make `golang_dial_writes` key `__u64` and include the PID * Fix the read data address one more time * Temporarily disable the PCAP capture * Add a uprobe for `net/http.(*gzipReader).Read` to read chunked HTTP response body * Cancel `golang_crypto_tls_read_uprobe` if it's a gzip read * Make hash map names more meaningful * Pass the connection address from `write` to `gzip` through a common address between `gzip` and `dial` * Fix the probed line number links * Add `golangReader` struct and implement its `Read` method * Have a single counter pair and request response matcher per Golang connection * Add `MIZU_GLOBAL_GOLANG_PATH` environment variable * `NULL` terminate the bytes with `unix.ByteSliceToString` * Temporarily reject the gzip chunks * Add malformed TODOs * Revert "`NULL` terminate the bytes with `unix.ByteSliceToString`" This reverts commit7ee7ef7e44. * Bring back `len` and `cap` fields * Set `len` and `cap` in `golang_net_http_gzipreader_read_uprobe` as well * Remove two `TODO`s * Fix the `key_gzip` offsets * Compress if it's gzip chunk (probably wrong!) * Revert "Compress if it's gzip chunk (probably wrong!)" This reverts commit094a7c3da4. * Remove `golang_net_http_gzipreader_read_uprobe` * Read constant 4KiB * Use constant read length * Get the correct len of bytes (saw the second entry) * Set all buffer sizes to `CHUNK_SIZE` * Remove a `TODO` * Revert "Temporarily disable the PCAP capture" This reverts commita2da15ef2d. * Update `golang_crypto_tls_read_uprobe` * Set the `reader` field of `tlsStream` to fix a `nil pointer dereference` error * Don't export any fields of `golangConnection` * Close the reader when we drop the connection * Add a tracepoint for `sys_enter_close` to detect socket closes * Rename `socket` struct to `golang_socket` * Call `should_tap` in Golang uprobes * Add `log_error` calls * Revert "Temporarily disable OpenSSL" This reverts commitf54d9a453f. * Fix linter * Revert "Revert "Temporarily disable OpenSSL"" This reverts commit2433d867af. * Change `golang_read_writes` map type from `BPF_RINGBUF` to `BPF_PERF_OUTPUT` * Rename `golang_read_write` to `golang_event` * Define an error * Add comments * Revert "Revert "Revert "Temporarily disable OpenSSL""" This reverts commite5a1de9c71. * Fix `pollGolang` * Revert "Revert "Revert "Revert "Temporarily disable OpenSSL"""" This reverts commit6e1bd5d4f3. * Fix `panic: send on closed channel` * Revert "Revert "Revert "Revert "Revert "Temporarily disable OpenSSL""""" This reverts commit57d0584655. * Use `findLibraryByPid` * Revert "Revert "Revert "Revert "Revert "Revert "Temporarily disable OpenSSL"""""" This reverts commit46f3d290b0. * Revert "Revert "Revert "Revert "Revert "Revert "Revert "Temporarily disable OpenSSL""""""" This reverts commit775c833c06. * Log tapping Golang * Fix `Poll` * Refactor `golang_net_http_dialconn_uprobe` * Remove an excess error check * Fix `can only use path@version syntax with 'go get' and 'go install' in module-aware mode` error in `tap/tlstapper/bpf-builder/build.sh` * Unify Golang and OpenSSL under a single perf event buffer and `tls_chunk` struct * Generate `tlsTapperChunkType` type (enum) as well * Use kernel page size for the `sys_closes` perf buffer * Fix the linter error * Fix `MIZU_GLOBAL_GOLANG_PID` environment variable's functionality * Rely on tracepoints for file descriptor retrieval in Golang implementation * Remove the unnecessary changes * Move common functions into `common.c` * Declare `lookup_ssl_info` function to reduce duplication * Fix linter * Add comments and TODOs * Remove `MIZU_GLOBAL_GOLANG_PATH` environment variable * Update the object files * Fix indentation * Update object files * Add `go_abi_internal.h` * Fix `lookup_ssl_info` * Convert indentation to spaces * Add header guard comment * Add more comments * Find the `ret` instructions using Capstone Engine and `uprobe` the `return` statements * Implement `get_fd_from_tcp_conn` function * Separate SSL contexts to OpenSSL and Go * Move `get_count_bytes` from `common.c` to `openssl_uprobes.c` * Rename everything contains Golang to Go * Reduce duplication in `go_uprobes.c` * Update the comments * Install Capstone in CI and Docker native builds * Update `devops/install-capstone.sh` * Add Capstone to AArch64 cross-compilation target * Fix some of the issues on ARM64 * Delete the map element in `_ex_urpobe` * Remove an unsued `LOG_` macro * Rename `aquynh` to `capstone-engine` * Add comment * Revert "Fix some of the issues on ARM64" This reverts commit0b3eceddf4. * Revert "Revert "Fix some of the issues on ARM64"" This reverts commit681534ada1. * Update object files * Remove unnecessary return * Increase timeout * #run_acceptance_tests * #run_acceptance_tests * Fix the `arm64v8` sourced builds * #run_acceptance_tests
322 lines
9.7 KiB
Go
322 lines
9.7 KiB
Go
// Copyright 2012 Google, Inc. All rights reserved.
|
|
//
|
|
// Use of this source code is governed by a BSD-style license
|
|
// that can be found in the LICENSE file in the root of the source
|
|
// tree.
|
|
|
|
// The pcapdump binary implements a tcpdump-like command line tool with gopacket
|
|
// using pcap as a backend data collection mechanism.
|
|
package tap
|
|
|
|
import (
|
|
"encoding/json"
|
|
"flag"
|
|
"fmt"
|
|
"os"
|
|
"runtime"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/shirou/gopsutil/cpu"
|
|
"github.com/struCoder/pidusage"
|
|
"github.com/up9inc/mizu/logger"
|
|
"github.com/up9inc/mizu/tap/api"
|
|
"github.com/up9inc/mizu/tap/diagnose"
|
|
"github.com/up9inc/mizu/tap/source"
|
|
"github.com/up9inc/mizu/tap/tlstapper"
|
|
v1 "k8s.io/api/core/v1"
|
|
)
|
|
|
|
const cleanPeriod = time.Second * 10
|
|
|
|
var maxcount = flag.Int64("c", -1, "Only grab this many packets, then exit")
|
|
var decoder = flag.String("decoder", "", "Name of the decoder to use (default: guess from capture)")
|
|
var statsevery = flag.Int("stats", 60, "Output statistics every N seconds")
|
|
var lazy = flag.Bool("lazy", false, "If true, do lazy decoding")
|
|
var nodefrag = flag.Bool("nodefrag", false, "If true, do not do IPv4 defrag")
|
|
var checksum = flag.Bool("checksum", false, "Check TCP checksum") // global
|
|
var nooptcheck = flag.Bool("nooptcheck", true, "Do not check TCP options (useful to ignore MSS on captures with TSO)") // global
|
|
var ignorefsmerr = flag.Bool("ignorefsmerr", true, "Ignore TCP FSM errors") // global
|
|
var allowmissinginit = flag.Bool("allowmissinginit", true, "Support streams without SYN/SYN+ACK/ACK sequence") // global
|
|
var verbose = flag.Bool("verbose", false, "Be verbose")
|
|
var debug = flag.Bool("debug", false, "Display debug information")
|
|
var quiet = flag.Bool("quiet", false, "Be quiet regarding errors")
|
|
var hexdumppkt = flag.Bool("dumppkt", false, "Dump packet as hex")
|
|
var procfs = flag.String("procfs", "/proc", "The procfs directory, used when mapping host volumes into a container")
|
|
var ignoredPorts = flag.String("ignore-ports", "", "A comma separated list of ports to ignore")
|
|
|
|
// capture
|
|
var iface = flag.String("i", "en0", "Interface to read packets from")
|
|
var fname = flag.String("r", "", "Filename to read from, overrides -i")
|
|
var snaplen = flag.Int("s", 65536, "Snap length (number of bytes max to read per packet")
|
|
var tstype = flag.String("timestamp_type", "", "Type of timestamps to use")
|
|
var promisc = flag.Bool("promisc", true, "Set promiscuous mode")
|
|
var staleTimeoutSeconds = flag.Int("staletimout", 120, "Max time in seconds to keep connections which don't transmit data")
|
|
var servicemesh = flag.Bool("servicemesh", false, "Record decrypted traffic if the cluster is configured with a service mesh and with mtls")
|
|
var tls = flag.Bool("tls", false, "Enable TLS tapper")
|
|
|
|
var memprofile = flag.String("memprofile", "", "Write memory profile")
|
|
|
|
type TapOpts struct {
|
|
HostMode bool
|
|
IgnoredPorts []uint16
|
|
}
|
|
|
|
var extensions []*api.Extension // global
|
|
var filteringOptions *api.TrafficFilteringOptions // global
|
|
var tapTargets []v1.Pod // global
|
|
var packetSourceManager *source.PacketSourceManager // global
|
|
var mainPacketInputChan chan source.TcpPacketInfo // global
|
|
var tlsTapperInstance *tlstapper.TlsTapper // global
|
|
|
|
func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem, extensionsRef []*api.Extension, options *api.TrafficFilteringOptions) {
|
|
extensions = extensionsRef
|
|
filteringOptions = options
|
|
|
|
streamsMap := NewTcpStreamMap()
|
|
|
|
if *tls {
|
|
for _, e := range extensions {
|
|
if e.Protocol.Name == "http" {
|
|
tlsTapperInstance = startTlsTapper(e, outputItems, options, streamsMap)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
if GetMemoryProfilingEnabled() {
|
|
diagnose.StartMemoryProfiler(os.Getenv(MemoryProfilingDumpPath), os.Getenv(MemoryProfilingTimeIntervalSeconds))
|
|
}
|
|
|
|
assembler := initializePassiveTapper(opts, outputItems, streamsMap)
|
|
go startPassiveTapper(streamsMap, assembler)
|
|
}
|
|
|
|
func UpdateTapTargets(newTapTargets []v1.Pod) {
|
|
success := true
|
|
|
|
tapTargets = newTapTargets
|
|
|
|
packetSourceManager.UpdatePods(tapTargets, !*nodefrag, mainPacketInputChan)
|
|
|
|
if tlsTapperInstance != nil && os.Getenv("MIZU_GLOBAL_GOLANG_PID") == "" {
|
|
if err := tlstapper.UpdateTapTargets(tlsTapperInstance, &tapTargets, *procfs); err != nil {
|
|
tlstapper.LogError(err)
|
|
success = false
|
|
}
|
|
}
|
|
|
|
printNewTapTargets(success)
|
|
}
|
|
|
|
func printNewTapTargets(success bool) {
|
|
printStr := ""
|
|
for _, tapTarget := range tapTargets {
|
|
printStr += fmt.Sprintf("%s (%s), ", tapTarget.Status.PodIP, tapTarget.Name)
|
|
}
|
|
printStr = strings.TrimRight(printStr, ", ")
|
|
|
|
if success {
|
|
logger.Log.Infof("Now tapping: %s", printStr)
|
|
} else {
|
|
logger.Log.Errorf("Failed to start tapping: %s", printStr)
|
|
}
|
|
}
|
|
|
|
func printPeriodicStats(cleaner *Cleaner) {
|
|
statsPeriod := time.Second * time.Duration(*statsevery)
|
|
ticker := time.NewTicker(statsPeriod)
|
|
|
|
logicalCoreCount, err := cpu.Counts(true)
|
|
if err != nil {
|
|
logicalCoreCount = -1
|
|
}
|
|
|
|
physicalCoreCount, err := cpu.Counts(false)
|
|
if err != nil {
|
|
physicalCoreCount = -1
|
|
}
|
|
|
|
for {
|
|
<-ticker.C
|
|
|
|
// Since the start
|
|
errorMapLen, errorsSummery := diagnose.TapErrors.GetErrorsSummary()
|
|
|
|
logger.Log.Infof("%v (errors: %v, errTypes:%v) - Errors Summary: %s",
|
|
time.Since(diagnose.AppStats.StartTime),
|
|
diagnose.TapErrors.ErrorsCount,
|
|
errorMapLen,
|
|
errorsSummery,
|
|
)
|
|
|
|
// At this moment
|
|
memStats := runtime.MemStats{}
|
|
runtime.ReadMemStats(&memStats)
|
|
sysInfo, err := pidusage.GetStat(os.Getpid())
|
|
if err != nil {
|
|
sysInfo = &pidusage.SysInfo{
|
|
CPU: -1,
|
|
Memory: -1,
|
|
}
|
|
}
|
|
logger.Log.Infof(
|
|
"mem: %d, goroutines: %d, cpu: %f, cores: %d/%d, rss: %f",
|
|
memStats.HeapAlloc,
|
|
runtime.NumGoroutine(),
|
|
sysInfo.CPU,
|
|
logicalCoreCount,
|
|
physicalCoreCount,
|
|
sysInfo.Memory)
|
|
|
|
// Since the last print
|
|
cleanStats := cleaner.dumpStats()
|
|
logger.Log.Infof(
|
|
"cleaner - flushed connections: %d, closed connections: %d, deleted messages: %d",
|
|
cleanStats.flushed,
|
|
cleanStats.closed,
|
|
cleanStats.deleted,
|
|
)
|
|
currentAppStats := diagnose.AppStats.DumpStats()
|
|
appStatsJSON, _ := json.Marshal(currentAppStats)
|
|
logger.Log.Infof("app stats - %v", string(appStatsJSON))
|
|
}
|
|
}
|
|
|
|
func initializePacketSources() error {
|
|
if packetSourceManager != nil {
|
|
packetSourceManager.Close()
|
|
}
|
|
|
|
var bpffilter string
|
|
if len(flag.Args()) > 0 {
|
|
bpffilter = strings.Join(flag.Args(), " ")
|
|
}
|
|
|
|
behaviour := source.TcpPacketSourceBehaviour{
|
|
SnapLength: *snaplen,
|
|
Promisc: *promisc,
|
|
Tstype: *tstype,
|
|
DecoderName: *decoder,
|
|
Lazy: *lazy,
|
|
BpfFilter: bpffilter,
|
|
}
|
|
|
|
var err error
|
|
packetSourceManager, err = source.NewPacketSourceManager(*procfs, *fname, *iface, *servicemesh, tapTargets, behaviour, !*nodefrag, mainPacketInputChan)
|
|
return err
|
|
}
|
|
|
|
func initializePassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem, streamsMap api.TcpStreamMap) *tcpAssembler {
|
|
diagnose.InitializeErrorsMap(*debug, *verbose, *quiet)
|
|
diagnose.InitializeTapperInternalStats()
|
|
|
|
mainPacketInputChan = make(chan source.TcpPacketInfo)
|
|
|
|
if err := initializePacketSources(); err != nil {
|
|
logger.Log.Fatal(err)
|
|
}
|
|
|
|
opts.IgnoredPorts = append(opts.IgnoredPorts, buildIgnoredPortsList(*ignoredPorts)...)
|
|
|
|
assembler := NewTcpAssembler(outputItems, streamsMap, opts)
|
|
|
|
return assembler
|
|
}
|
|
|
|
func startPassiveTapper(streamsMap api.TcpStreamMap, assembler *tcpAssembler) {
|
|
go streamsMap.CloseTimedoutTcpStreamChannels()
|
|
|
|
diagnose.AppStats.SetStartTime(time.Now())
|
|
|
|
staleConnectionTimeout := time.Second * time.Duration(*staleTimeoutSeconds)
|
|
cleaner := Cleaner{
|
|
assembler: assembler.Assembler,
|
|
assemblerMutex: &assembler.assemblerMutex,
|
|
cleanPeriod: cleanPeriod,
|
|
connectionTimeout: staleConnectionTimeout,
|
|
streamsMap: streamsMap,
|
|
}
|
|
cleaner.start()
|
|
|
|
go printPeriodicStats(&cleaner)
|
|
|
|
assembler.processPackets(*hexdumppkt, mainPacketInputChan)
|
|
|
|
if diagnose.TapErrors.OutputLevel >= 2 {
|
|
assembler.dumpStreamPool()
|
|
}
|
|
|
|
if err := diagnose.DumpMemoryProfile(*memprofile); err != nil {
|
|
logger.Log.Errorf("Error dumping memory profile %v", err)
|
|
}
|
|
|
|
assembler.waitAndDump()
|
|
|
|
diagnose.InternalStats.PrintStatsSummary()
|
|
diagnose.TapErrors.PrintSummary()
|
|
logger.Log.Infof("AppStats: %v", diagnose.AppStats)
|
|
}
|
|
|
|
func startTlsTapper(extension *api.Extension, outputItems chan *api.OutputChannelItem,
|
|
options *api.TrafficFilteringOptions, streamsMap api.TcpStreamMap) *tlstapper.TlsTapper {
|
|
tls := tlstapper.TlsTapper{}
|
|
chunksBufferSize := os.Getpagesize() * 100
|
|
logBufferSize := os.Getpagesize()
|
|
|
|
if err := tls.Init(chunksBufferSize, logBufferSize, *procfs, extension); err != nil {
|
|
tlstapper.LogError(err)
|
|
return nil
|
|
}
|
|
|
|
if err := tlstapper.UpdateTapTargets(&tls, &tapTargets, *procfs); err != nil {
|
|
tlstapper.LogError(err)
|
|
return nil
|
|
}
|
|
|
|
// A quick way to instrument libssl.so without PID filtering - used for debuging and troubleshooting
|
|
//
|
|
if os.Getenv("MIZU_GLOBAL_SSL_LIBRARY") != "" {
|
|
if err := tls.GlobalSsllibTap(os.Getenv("MIZU_GLOBAL_SSL_LIBRARY")); err != nil {
|
|
tlstapper.LogError(err)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// A quick way to instrument Go `crypto/tls` without PID filtering - used for debuging and troubleshooting
|
|
//
|
|
if os.Getenv("MIZU_GLOBAL_GOLANG_PID") != "" {
|
|
if err := tls.GlobalGoTap(*procfs, os.Getenv("MIZU_GLOBAL_GOLANG_PID")); err != nil {
|
|
tlstapper.LogError(err)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
var emitter api.Emitter = &api.Emitting{
|
|
AppStats: &diagnose.AppStats,
|
|
OutputChannel: outputItems,
|
|
}
|
|
|
|
go tls.PollForLogging()
|
|
go tls.Poll(emitter, options, streamsMap)
|
|
|
|
return &tls
|
|
}
|
|
|
|
func buildIgnoredPortsList(ignoredPorts string) []uint16 {
|
|
tmp := strings.Split(ignoredPorts, ",")
|
|
result := make([]uint16, len(tmp))
|
|
|
|
for i, raw := range tmp {
|
|
v, err := strconv.Atoi(raw)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
result[i] = uint16(v)
|
|
}
|
|
|
|
return result
|
|
}
|