mirror of
https://github.com/kubeshark/kubeshark.git
synced 2026-02-15 02:19:54 +00:00
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d844d6eb04 | ||
|
|
6979441422 | ||
|
|
9ec8347c6c |
@@ -78,8 +78,8 @@ RUN go build -ldflags="-extldflags=-static -s -w \
|
||||
-X 'github.com/up9inc/mizu/agent/pkg/version.Ver=${VER}'" -o mizuagent .
|
||||
|
||||
# Download Basenine executable, verify the sha1sum
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.5.4/basenine_linux_${GOARCH} ./basenine_linux_${GOARCH}
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.5.4/basenine_linux_${GOARCH}.sha256 ./basenine_linux_${GOARCH}.sha256
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.6.3/basenine_linux_${GOARCH} ./basenine_linux_${GOARCH}
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.6.3/basenine_linux_${GOARCH}.sha256 ./basenine_linux_${GOARCH}.sha256
|
||||
RUN shasum -a 256 -c basenine_linux_${GOARCH}.sha256
|
||||
RUN chmod +x ./basenine_linux_${GOARCH}
|
||||
RUN mv ./basenine_linux_${GOARCH} ./basenine
|
||||
|
||||
@@ -22,7 +22,7 @@ require (
|
||||
github.com/ory/kratos-client-go v0.8.2-alpha.1
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible
|
||||
github.com/stretchr/testify v1.7.0
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20220302182733-74dc40dc2ef0
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20220315070758-3a76cfc4378e
|
||||
github.com/up9inc/mizu/shared v0.0.0
|
||||
github.com/up9inc/mizu/tap v0.0.0
|
||||
github.com/up9inc/mizu/tap/api v0.0.0
|
||||
|
||||
10
agent/go.sum
10
agent/go.sum
@@ -853,14 +853,8 @@ github.com/ugorji/go v1.2.6/go.mod h1:anCg0y61KIhDlPZmnH+so+RQbysYVyDko0IMgJv0Nn
|
||||
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
|
||||
github.com/ugorji/go/codec v1.2.6 h1:7kbGefxLoDBuYXOms4yD7223OpNMMPNPZxXk5TvFcyQ=
|
||||
github.com/ugorji/go/codec v1.2.6/go.mod h1:V6TCNZ4PHqoHGFZuSG1W8nrCzzdgA2DozYxWFFpvxTw=
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20220220204122-0ef8cb24fab1 h1:0XN8s3HtwUBr9hbWRAFulFMsu1f2cabfJbwpz/sOoLA=
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20220220204122-0ef8cb24fab1/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20220301135911-d2111357b14e h1:nv/A/AeF8PcU91aHAj6o2cU8fl/46v0ZLj7wgIKjv+o=
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20220301135911-d2111357b14e/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20220302073458-c32e0adf1500 h1:T1QHxt65NMete/GobVSvcHnwZAQibvahhrMTCgtnSS4=
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20220302073458-c32e0adf1500/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20220302182733-74dc40dc2ef0 h1:mSqZuJJV4UZyaAoC8x7/AO7DLidlXepFyU18Vm3rFiA=
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20220302182733-74dc40dc2ef0/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20220315070758-3a76cfc4378e h1:/9dFXqvRDHcwPQdIGHP6iz6M0iAWBPOxYf6C+Ntq5w0=
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20220315070758-3a76cfc4378e/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
|
||||
github.com/vektah/gqlparser v1.1.2/go.mod h1:1ycwN7Ij5njmMkPPAOaRFY4rET2Enx7IkVv3vaXspKw=
|
||||
github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74 h1:gga7acRE695APm9hlsSMoOoE65U4/TcqNj90mc69Rlg=
|
||||
github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
|
||||
|
||||
@@ -11,7 +11,7 @@ require (
|
||||
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
|
||||
github.com/spf13/cobra v1.3.0
|
||||
github.com/spf13/pflag v1.0.5
|
||||
github.com/up9inc/basenine/server/lib v0.0.0-20220302182733-74dc40dc2ef0
|
||||
github.com/up9inc/basenine/server/lib v0.0.0-20220315070758-3a76cfc4378e
|
||||
github.com/up9inc/mizu/shared v0.0.0
|
||||
github.com/up9inc/mizu/tap/api v0.0.0
|
||||
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8
|
||||
@@ -35,6 +35,7 @@ require (
|
||||
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
|
||||
github.com/alecthomas/participle/v2 v2.0.0-alpha7 // indirect
|
||||
github.com/chai2010/gettext-go v0.0.0-20160711120539-c6fed771bfd5 // indirect
|
||||
github.com/clbanning/mxj/v2 v2.5.5 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/dlclark/regexp2 v1.4.0 // indirect
|
||||
github.com/docker/go-units v0.4.0 // indirect
|
||||
|
||||
@@ -120,6 +120,8 @@ github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5P
|
||||
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
|
||||
github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag=
|
||||
github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I=
|
||||
github.com/clbanning/mxj/v2 v2.5.5 h1:oT81vUeEiQQ/DcHbzSytRngP6Ky9O+L+0Bw0zSJag9E=
|
||||
github.com/clbanning/mxj/v2 v2.5.5/go.mod h1:hNiWqW14h+kc+MdF9C6/YoRfjEJoR3ou6tn/Qo+ve2s=
|
||||
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
|
||||
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
|
||||
github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
|
||||
@@ -598,8 +600,8 @@ github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69
|
||||
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
|
||||
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
|
||||
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
|
||||
github.com/up9inc/basenine/server/lib v0.0.0-20220302182733-74dc40dc2ef0 h1:9PQamOq285DyVsRlS4KB/x2+xkr5QlpiT9Y/BPutS4A=
|
||||
github.com/up9inc/basenine/server/lib v0.0.0-20220302182733-74dc40dc2ef0/go.mod h1:R9bG4y/iq89jNC0xZ25uKDqenyKFTR3X9acGDOkKWSE=
|
||||
github.com/up9inc/basenine/server/lib v0.0.0-20220315070758-3a76cfc4378e h1:reG/QwyxdfvGObfdrae7DZc3rTMiGwQ6S/4PRkwtBoE=
|
||||
github.com/up9inc/basenine/server/lib v0.0.0-20220315070758-3a76cfc4378e/go.mod h1:ZIkxWiJm65jYQIso9k+OZKhR7gQ1we2jNyE2kQX9IQI=
|
||||
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
|
||||
github.com/xlab/treeprint v0.0.0-20181112141820-a009c3971eca/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg=
|
||||
github.com/xlab/treeprint v1.1.0 h1:G/1DjNkPpfZCFt9CSh6b5/nY4VimlbHF3Rh4obvtzDk=
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"sort"
|
||||
@@ -18,6 +19,9 @@ import (
|
||||
|
||||
const mizuTestEnvVar = "MIZU_TEST"
|
||||
|
||||
var UnknownIp net.IP = net.IP{0, 0, 0, 0}
|
||||
var UnknownPort uint16 = 0
|
||||
|
||||
type Protocol struct {
|
||||
Name string `json:"name"`
|
||||
LongName string `json:"longName"`
|
||||
|
||||
@@ -52,7 +52,6 @@ var snaplen = flag.Int("s", 65536, "Snap length (number of bytes max to read per
|
||||
var tstype = flag.String("timestamp_type", "", "Type of timestamps to use")
|
||||
var promisc = flag.Bool("promisc", true, "Set promiscuous mode")
|
||||
var staleTimeoutSeconds = flag.Int("staletimout", 120, "Max time in seconds to keep connections which don't transmit data")
|
||||
var pids = flag.String("pids", "", "A comma separated list of PIDs to capture their network namespaces")
|
||||
var servicemesh = flag.Bool("servicemesh", false, "Record decrypted traffic if the cluster is configured with a service mesh and with mtls")
|
||||
var tls = flag.Bool("tls", false, "Enable TLS tapper")
|
||||
|
||||
@@ -190,7 +189,7 @@ func initializePacketSources() error {
|
||||
}
|
||||
|
||||
var err error
|
||||
if packetSourceManager, err = source.NewPacketSourceManager(*procfs, *pids, *fname, *iface, *servicemesh, tapTargets, behaviour); err != nil {
|
||||
if packetSourceManager, err = source.NewPacketSourceManager(*procfs, *fname, *iface, *servicemesh, tapTargets, behaviour); err != nil {
|
||||
return err
|
||||
} else {
|
||||
packetSourceManager.ReadPackets(!*nodefrag, mainPacketInputChan)
|
||||
@@ -248,7 +247,7 @@ func startTlsTapper(extension *api.Extension, outputItems chan *api.OutputChanne
|
||||
tls := tlstapper.TlsTapper{}
|
||||
tlsPerfBufferSize := os.Getpagesize() * 100
|
||||
|
||||
if err := tls.Init(tlsPerfBufferSize); err != nil {
|
||||
if err := tls.Init(tlsPerfBufferSize, *procfs, extension); err != nil {
|
||||
tlstapper.LogError(err)
|
||||
return
|
||||
}
|
||||
@@ -272,6 +271,5 @@ func startTlsTapper(extension *api.Extension, outputItems chan *api.OutputChanne
|
||||
OutputChannel: outputItems,
|
||||
}
|
||||
|
||||
poller := tlstapper.NewTlsPoller(&tls, extension)
|
||||
go poller.Poll(extension, emitter, options)
|
||||
go tls.Poll(emitter, options)
|
||||
}
|
||||
|
||||
83
tap/source/netns_packet_source.go
Normal file
83
tap/source/netns_packet_source.go
Normal file
@@ -0,0 +1,83 @@
|
||||
package source
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"runtime"
|
||||
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
"github.com/vishvananda/netns"
|
||||
)
|
||||
|
||||
func newNetnsPacketSource(procfs string, pid string,
|
||||
interfaceName string, behaviour TcpPacketSourceBehaviour) (*tcpPacketSource, error) {
|
||||
nsh, err := netns.GetFromPath(fmt.Sprintf("%s/%s/ns/net", procfs, pid))
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Errorf("Unable to get netns of pid %s - %w", pid, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
src, err := newPacketSourceFromNetnsHandle(pid, nsh, interfaceName, behaviour)
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Errorf("Error starting netns packet source for %s - %w", pid, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return src, nil
|
||||
}
|
||||
|
||||
func newPacketSourceFromNetnsHandle(pid string, nsh netns.NsHandle, interfaceName string,
|
||||
behaviour TcpPacketSourceBehaviour) (*tcpPacketSource, error) {
|
||||
|
||||
done := make(chan *tcpPacketSource)
|
||||
errors := make(chan error)
|
||||
|
||||
go func(done chan<- *tcpPacketSource) {
|
||||
// Setting a netns should be done from a dedicated OS thread.
|
||||
//
|
||||
// goroutines are not really OS threads, we try to mimic the issue by
|
||||
// locking the OS thread to this goroutine
|
||||
//
|
||||
runtime.LockOSThread()
|
||||
defer runtime.UnlockOSThread()
|
||||
|
||||
oldnetns, err := netns.Get()
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Errorf("Unable to get netns of current thread %w", err)
|
||||
errors <- err
|
||||
return
|
||||
}
|
||||
|
||||
if err := netns.Set(nsh); err != nil {
|
||||
logger.Log.Errorf("Unable to set netns of pid %s - %w", pid, err)
|
||||
errors <- err
|
||||
return
|
||||
}
|
||||
|
||||
name := fmt.Sprintf("netns-%s-%s", pid, interfaceName)
|
||||
src, err := newTcpPacketSource(name, "", interfaceName, behaviour)
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Errorf("Error listening to PID %s - %w", pid, err)
|
||||
errors <- err
|
||||
return
|
||||
}
|
||||
|
||||
if err := netns.Set(oldnetns); err != nil {
|
||||
logger.Log.Errorf("Unable to set back netns of current thread %w", err)
|
||||
errors <- err
|
||||
return
|
||||
}
|
||||
|
||||
done <- src
|
||||
}(done)
|
||||
|
||||
select {
|
||||
case err := <-errors:
|
||||
return nil, err
|
||||
case source := <-done:
|
||||
return source, nil
|
||||
}
|
||||
}
|
||||
@@ -2,109 +2,46 @@ package source
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
"github.com/vishvananda/netns"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
)
|
||||
|
||||
const bpfFilterMaxPods = 150
|
||||
const hostSourcePid = "0"
|
||||
|
||||
type PacketSourceManager struct {
|
||||
sources []*tcpPacketSource
|
||||
sources map[string]*tcpPacketSource
|
||||
}
|
||||
|
||||
func NewPacketSourceManager(procfs string, pids string, filename string, interfaceName string,
|
||||
func NewPacketSourceManager(procfs string, filename string, interfaceName string,
|
||||
mtls bool, pods []v1.Pod, behaviour TcpPacketSourceBehaviour) (*PacketSourceManager, error) {
|
||||
sources := make([]*tcpPacketSource, 0)
|
||||
sources, err := createHostSource(sources, filename, interfaceName, behaviour)
|
||||
|
||||
hostSource, err := newHostPacketSource(filename, interfaceName, behaviour)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sources = createSourcesFromPids(sources, procfs, pids, interfaceName, behaviour)
|
||||
sources = createSourcesFromEnvoy(sources, mtls, procfs, pods, interfaceName, behaviour)
|
||||
sources = createSourcesFromLinkerd(sources, mtls, procfs, pods, interfaceName, behaviour)
|
||||
|
||||
return &PacketSourceManager{
|
||||
sources: sources,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func createHostSource(sources []*tcpPacketSource, filename string, interfaceName string,
|
||||
behaviour TcpPacketSourceBehaviour) ([]*tcpPacketSource, error) {
|
||||
hostSource, err := newHostPacketSource(filename, interfaceName, behaviour)
|
||||
|
||||
if err != nil {
|
||||
return sources, err
|
||||
sourceManager := &PacketSourceManager{
|
||||
sources: map[string]*tcpPacketSource{
|
||||
hostSourcePid: hostSource,
|
||||
},
|
||||
}
|
||||
|
||||
return append(sources, hostSource), nil
|
||||
}
|
||||
|
||||
func createSourcesFromPids(sources []*tcpPacketSource, procfs string, pids string,
|
||||
interfaceName string, behaviour TcpPacketSourceBehaviour) []*tcpPacketSource {
|
||||
if pids == "" {
|
||||
return sources
|
||||
}
|
||||
|
||||
netnsSources := newNetnsPacketSources(procfs, strings.Split(pids, ","), interfaceName, behaviour)
|
||||
sources = append(sources, netnsSources...)
|
||||
return sources
|
||||
}
|
||||
|
||||
func createSourcesFromEnvoy(sources []*tcpPacketSource, mtls bool, procfs string, pods []v1.Pod,
|
||||
interfaceName string, behaviour TcpPacketSourceBehaviour) []*tcpPacketSource {
|
||||
if !mtls {
|
||||
return sources
|
||||
}
|
||||
|
||||
envoyPids, err := discoverRelevantEnvoyPids(procfs, pods)
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Warningf("Unable to discover envoy pids - %v", err)
|
||||
return sources
|
||||
}
|
||||
|
||||
netnsSources := newNetnsPacketSources(procfs, envoyPids, interfaceName, behaviour)
|
||||
sources = append(sources, netnsSources...)
|
||||
|
||||
return sources
|
||||
}
|
||||
|
||||
func createSourcesFromLinkerd(sources []*tcpPacketSource, mtls bool, procfs string, pods []v1.Pod,
|
||||
interfaceName string, behaviour TcpPacketSourceBehaviour) []*tcpPacketSource {
|
||||
if !mtls {
|
||||
return sources
|
||||
}
|
||||
|
||||
linkerdPids, err := discoverRelevantLinkerdPids(procfs, pods)
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Warningf("Unable to discover linkerd pids - %v", err)
|
||||
return sources
|
||||
}
|
||||
|
||||
netnsSources := newNetnsPacketSources(procfs, linkerdPids, interfaceName, behaviour)
|
||||
sources = append(sources, netnsSources...)
|
||||
|
||||
return sources
|
||||
sourceManager.UpdatePods(mtls, procfs, pods, interfaceName, behaviour)
|
||||
return sourceManager, nil
|
||||
}
|
||||
|
||||
func newHostPacketSource(filename string, interfaceName string,
|
||||
behaviour TcpPacketSourceBehaviour) (*tcpPacketSource, error) {
|
||||
var name string
|
||||
|
||||
if filename == "" {
|
||||
name = fmt.Sprintf("host-%v", interfaceName)
|
||||
name = fmt.Sprintf("host-%s", interfaceName)
|
||||
} else {
|
||||
name = fmt.Sprintf("file-%v", filename)
|
||||
name = fmt.Sprintf("file-%s", filename)
|
||||
}
|
||||
|
||||
source, err := newTcpPacketSource(name, filename, interfaceName, behaviour)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -112,90 +49,93 @@ func newHostPacketSource(filename string, interfaceName string,
|
||||
return source, nil
|
||||
}
|
||||
|
||||
func newNetnsPacketSources(procfs string, pids []string, interfaceName string,
|
||||
behaviour TcpPacketSourceBehaviour) []*tcpPacketSource {
|
||||
result := make([]*tcpPacketSource, 0)
|
||||
|
||||
for _, pidstr := range pids {
|
||||
pid, err := strconv.Atoi(pidstr)
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Errorf("Invalid PID: %v - %v", pid, err)
|
||||
continue
|
||||
}
|
||||
|
||||
nsh, err := netns.GetFromPath(fmt.Sprintf("%v/%v/ns/net", procfs, pid))
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Errorf("Unable to get netns of pid %v - %v", pid, err)
|
||||
continue
|
||||
}
|
||||
|
||||
src, err := newNetnsPacketSource(pid, nsh, interfaceName, behaviour)
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Errorf("Error starting netns packet source for %v - %v", pid, err)
|
||||
continue
|
||||
}
|
||||
|
||||
result = append(result, src)
|
||||
func (m *PacketSourceManager) UpdatePods(mtls bool, procfs string, pods []v1.Pod,
|
||||
interfaceName string, behaviour TcpPacketSourceBehaviour) {
|
||||
if mtls {
|
||||
m.updateMtlsPods(procfs, pods, interfaceName, behaviour)
|
||||
}
|
||||
|
||||
return result
|
||||
m.setBPFFilter(pods)
|
||||
}
|
||||
|
||||
func newNetnsPacketSource(pid int, nsh netns.NsHandle, interfaceName string,
|
||||
behaviour TcpPacketSourceBehaviour) (*tcpPacketSource, error) {
|
||||
func (m *PacketSourceManager) updateMtlsPods(procfs string, pods []v1.Pod,
|
||||
interfaceName string, behaviour TcpPacketSourceBehaviour) {
|
||||
|
||||
done := make(chan *tcpPacketSource)
|
||||
errors := make(chan error)
|
||||
relevantPids := m.getRelevantPids(procfs, pods)
|
||||
logger.Log.Infof("Updating mtls pods (new: %v) (current: %v)", relevantPids, m.sources)
|
||||
|
||||
go func(done chan<- *tcpPacketSource) {
|
||||
// Setting a netns should be done from a dedicated OS thread.
|
||||
//
|
||||
// goroutines are not really OS threads, we try to mimic the issue by
|
||||
// locking the OS thread to this goroutine
|
||||
//
|
||||
runtime.LockOSThread()
|
||||
defer runtime.UnlockOSThread()
|
||||
|
||||
oldnetns, err := netns.Get()
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Errorf("Unable to get netns of current thread %v", err)
|
||||
errors <- err
|
||||
return
|
||||
for pid, src := range m.sources {
|
||||
if _, ok := relevantPids[pid]; !ok {
|
||||
src.close()
|
||||
delete(m.sources, pid)
|
||||
}
|
||||
}
|
||||
|
||||
if err := netns.Set(nsh); err != nil {
|
||||
logger.Log.Errorf("Unable to set netns of pid %v - %v", pid, err)
|
||||
errors <- err
|
||||
return
|
||||
for pid := range relevantPids {
|
||||
if _, ok := m.sources[pid]; !ok {
|
||||
source, err := newNetnsPacketSource(procfs, pid, interfaceName, behaviour)
|
||||
|
||||
if err == nil {
|
||||
m.sources[pid] = source
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
name := fmt.Sprintf("netns-%v-%v", pid, interfaceName)
|
||||
src, err := newTcpPacketSource(name, "", interfaceName, behaviour)
|
||||
func (m *PacketSourceManager) getRelevantPids(procfs string, pods []v1.Pod) map[string]bool {
|
||||
relevantPids := make(map[string]bool)
|
||||
relevantPids[hostSourcePid] = true
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Errorf("Error listening to PID %v - %v", pid, err)
|
||||
errors <- err
|
||||
return
|
||||
if envoyPids, err := discoverRelevantEnvoyPids(procfs, pods); err != nil {
|
||||
logger.Log.Warningf("Unable to discover envoy pids - %w", err)
|
||||
} else {
|
||||
for _, pid := range envoyPids {
|
||||
relevantPids[pid] = true
|
||||
}
|
||||
}
|
||||
|
||||
if err := netns.Set(oldnetns); err != nil {
|
||||
logger.Log.Errorf("Unable to set back netns of current thread %v", err)
|
||||
errors <- err
|
||||
return
|
||||
if linkerdPids, err := discoverRelevantLinkerdPids(procfs, pods); err != nil {
|
||||
logger.Log.Warningf("Unable to discover linkerd pids - %w", err)
|
||||
} else {
|
||||
for _, pid := range linkerdPids {
|
||||
relevantPids[pid] = true
|
||||
}
|
||||
}
|
||||
|
||||
done <- src
|
||||
}(done)
|
||||
return relevantPids
|
||||
}
|
||||
|
||||
select {
|
||||
case err := <-errors:
|
||||
return nil, err
|
||||
case source := <-done:
|
||||
return source, nil
|
||||
func buildBPFExpr(pods []v1.Pod) string {
|
||||
hostsFilter := make([]string, 0)
|
||||
|
||||
for _, pod := range pods {
|
||||
hostsFilter = append(hostsFilter, fmt.Sprintf("host %s", pod.Status.PodIP))
|
||||
}
|
||||
|
||||
return fmt.Sprintf("%s and port not 443", strings.Join(hostsFilter, " or "))
|
||||
}
|
||||
|
||||
func (m *PacketSourceManager) setBPFFilter(pods []v1.Pod) {
|
||||
if len(pods) == 0 {
|
||||
logger.Log.Info("No pods provided, skipping pcap bpf filter")
|
||||
return
|
||||
}
|
||||
|
||||
var expr string
|
||||
|
||||
if len(pods) > bpfFilterMaxPods {
|
||||
logger.Log.Info("Too many pods for setting ebpf filter %d, setting just not 443", len(pods))
|
||||
expr = "port not 443"
|
||||
} else {
|
||||
expr = buildBPFExpr(pods)
|
||||
}
|
||||
|
||||
logger.Log.Infof("Setting pcap bpf filter %s", expr)
|
||||
|
||||
for pid, src := range m.sources {
|
||||
if err := src.setBPFFilter(expr); err != nil {
|
||||
logger.Log.Warningf("Error setting bpf filter for %s %v - %w", pid, src, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -98,6 +98,14 @@ func newTcpPacketSource(name, filename string, interfaceName string,
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (source *tcpPacketSource) String() string {
|
||||
return source.name
|
||||
}
|
||||
|
||||
func (source *tcpPacketSource) setBPFFilter(expr string) (err error) {
|
||||
return source.handle.SetBPFFilter(expr)
|
||||
}
|
||||
|
||||
func (source *tcpPacketSource) close() {
|
||||
if source.handle != nil {
|
||||
source.handle.Close()
|
||||
|
||||
@@ -10,7 +10,7 @@ Copyright (C) UP9 Inc.
|
||||
#define FLAGS_IS_CLIENT_BIT (1 << 0)
|
||||
#define FLAGS_IS_READ_BIT (1 << 1)
|
||||
|
||||
// The same struct can be found in Chunk.go
|
||||
// The same struct can be found in chunk.go
|
||||
//
|
||||
// Be careful when editing, alignment and padding should be exactly the same in go/c.
|
||||
//
|
||||
|
||||
@@ -8,20 +8,20 @@ import (
|
||||
"github.com/go-errors/errors"
|
||||
)
|
||||
|
||||
const FLAGS_IS_CLIENT_BIT int32 = (1 << 0)
|
||||
const FLAGS_IS_READ_BIT int32 = (1 << 1)
|
||||
const FLAGS_IS_CLIENT_BIT uint32 = (1 << 0)
|
||||
const FLAGS_IS_READ_BIT uint32 = (1 << 1)
|
||||
|
||||
// The same struct can be found in maps.h
|
||||
//
|
||||
// Be careful when editing, alignment and padding should be exactly the same in go/c.
|
||||
//
|
||||
type tlsChunk struct {
|
||||
Pid int32
|
||||
Tgid int32
|
||||
Len int32
|
||||
Recorded int32
|
||||
Fd int32
|
||||
Flags int32
|
||||
Pid uint32
|
||||
Tgid uint32
|
||||
Len uint32
|
||||
Recorded uint32
|
||||
Fd uint32
|
||||
Flags uint32
|
||||
Address [16]byte
|
||||
Data [4096]byte
|
||||
}
|
||||
@@ -68,3 +68,7 @@ func (c *tlsChunk) isWrite() bool {
|
||||
func (c *tlsChunk) getRecordedData() []byte {
|
||||
return c.Data[:c.Recorded]
|
||||
}
|
||||
|
||||
func (c *tlsChunk) isRequest() bool {
|
||||
return (c.isClient() && c.isWrite()) || (c.isServer() && c.isRead())
|
||||
}
|
||||
|
||||
102
tap/tlstapper/sockfd_info.go
Normal file
102
tap/tlstapper/sockfd_info.go
Normal file
@@ -0,0 +1,102 @@
|
||||
package tlstapper
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"os"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/go-errors/errors"
|
||||
)
|
||||
|
||||
var socketInodeRegex = regexp.MustCompile(`socket:\[(\d+)\]`)
|
||||
|
||||
const (
|
||||
SRC_ADDRESS_FILED_INDEX = 1
|
||||
DST_ADDRESS_FILED_INDEX = 2
|
||||
INODE_FILED_INDEX = 9
|
||||
)
|
||||
|
||||
// This file helps to extract Ip and Port out of a Socket file descriptor.
|
||||
//
|
||||
// The equivalent bash commands are:
|
||||
//
|
||||
// > ls -l /proc/<pid>/fd/<fd>
|
||||
// Output something like "socket:[1234]" for sockets - 1234 is the inode of the socket
|
||||
// > cat /proc/<pid>/net/tcp | grep <inode>
|
||||
// Output a line per ipv4 socket, the 9th field is the inode of the socket
|
||||
// The 1st and 2nd fields are the source and dest ip and ports in a Hex format
|
||||
// 0100007F:50 is 127.0.0.1:80
|
||||
|
||||
func getAddressBySockfd(procfs string, pid uint32, fd uint32, src bool) (net.IP, uint16, error) {
|
||||
inode, err := getSocketInode(procfs, pid, fd)
|
||||
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
tcppath := fmt.Sprintf("%s/%d/net/tcp", procfs, pid)
|
||||
tcp, err := ioutil.ReadFile(tcppath)
|
||||
|
||||
if err != nil {
|
||||
return nil, 0, errors.Wrap(err, 0)
|
||||
}
|
||||
|
||||
for _, line := range strings.Split(string(tcp), "\n") {
|
||||
parts := strings.Fields(line)
|
||||
|
||||
if len(parts) < 10 {
|
||||
continue
|
||||
}
|
||||
|
||||
if inode == parts[INODE_FILED_INDEX] {
|
||||
if src {
|
||||
return parseHexAddress(parts[SRC_ADDRESS_FILED_INDEX])
|
||||
} else {
|
||||
return parseHexAddress(parts[DST_ADDRESS_FILED_INDEX])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil, 0, errors.Errorf("address not found [pid: %d] [sockfd: %d] [inode: %s]", pid, fd, inode)
|
||||
}
|
||||
|
||||
func getSocketInode(procfs string, pid uint32, fd uint32) (string, error) {
|
||||
fdlinkPath := fmt.Sprintf("%s/%d/fd/%d", procfs, pid, fd)
|
||||
fdlink, err := os.Readlink(fdlinkPath)
|
||||
|
||||
if err != nil {
|
||||
return "", errors.Wrap(err, 0)
|
||||
}
|
||||
|
||||
tokens := socketInodeRegex.FindStringSubmatch(fdlink)
|
||||
|
||||
if tokens == nil || len(tokens) < 1 {
|
||||
return "", errors.Errorf("socket inode not found [pid: %d] [sockfd: %d] [link: %s]", pid, fd, fdlink)
|
||||
}
|
||||
|
||||
return tokens[1], nil
|
||||
}
|
||||
|
||||
// Format looks like 0100007F:50 for 127.0.0.1:80
|
||||
//
|
||||
func parseHexAddress(addr string) (net.IP, uint16, error) {
|
||||
addrParts := strings.Split(addr, ":")
|
||||
|
||||
port, err := strconv.ParseUint(addrParts[1], 16, 16)
|
||||
|
||||
if err != nil {
|
||||
return nil, 0, errors.Wrap(err, 0)
|
||||
}
|
||||
|
||||
ip, err := strconv.ParseUint(addrParts[0], 16, 32)
|
||||
|
||||
if err != nil {
|
||||
return nil, 0, errors.Wrap(err, 0)
|
||||
}
|
||||
|
||||
return net.IP{uint8(ip), uint8(ip >> 8), uint8(ip >> 16), uint8(ip >> 24)}, uint16(port), nil
|
||||
}
|
||||
@@ -2,7 +2,6 @@ package tlstapper
|
||||
|
||||
import (
|
||||
"debug/elf"
|
||||
"fmt"
|
||||
|
||||
"github.com/go-errors/errors"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
@@ -47,7 +46,7 @@ func findBaseAddress(sslElf *elf.File, sslLibraryPath string) (uint64, error) {
|
||||
}
|
||||
}
|
||||
|
||||
return 0, errors.New(fmt.Sprintf("Program header not found in %v", sslLibraryPath))
|
||||
return 0, errors.Errorf("Program header not found in %v", sslLibraryPath)
|
||||
}
|
||||
|
||||
func findSslOffsets(sslElf *elf.File, base uint64) (sslOffsets, error) {
|
||||
|
||||
@@ -2,43 +2,64 @@ package tlstapper
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"fmt"
|
||||
"net"
|
||||
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/cilium/ebpf/perf"
|
||||
"github.com/go-errors/errors"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
const UNKNOWN_PORT uint16 = 80
|
||||
const UNKNOWN_HOST string = "127.0.0.1"
|
||||
|
||||
type tlsPoller struct {
|
||||
tls *TlsTapper
|
||||
readers map[string]*tlsReader
|
||||
closedReaders chan string
|
||||
reqResMatcher api.RequestResponseMatcher
|
||||
chunksReader *perf.Reader
|
||||
extension *api.Extension
|
||||
procfs string
|
||||
}
|
||||
|
||||
func NewTlsPoller(tls *TlsTapper, extension *api.Extension) *tlsPoller {
|
||||
func newTlsPoller(tls *TlsTapper, extension *api.Extension, procfs string) *tlsPoller {
|
||||
return &tlsPoller{
|
||||
tls: tls,
|
||||
readers: make(map[string]*tlsReader),
|
||||
closedReaders: make(chan string, 100),
|
||||
reqResMatcher: extension.Dissector.NewResponseRequestMatcher(),
|
||||
extension: extension,
|
||||
chunksReader: nil,
|
||||
procfs: procfs,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *tlsPoller) Poll(extension *api.Extension,
|
||||
emitter api.Emitter, options *api.TrafficFilteringOptions) {
|
||||
func (p *tlsPoller) init(bpfObjects *tlsTapperObjects, bufferSize int) error {
|
||||
var err error
|
||||
|
||||
p.chunksReader, err = perf.NewReader(bpfObjects.ChunksBuffer, bufferSize)
|
||||
|
||||
if err != nil {
|
||||
return errors.Wrap(err, 0)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *tlsPoller) close() error {
|
||||
return p.chunksReader.Close()
|
||||
}
|
||||
|
||||
func (p *tlsPoller) poll(emitter api.Emitter, options *api.TrafficFilteringOptions) {
|
||||
chunks := make(chan *tlsChunk)
|
||||
|
||||
go p.tls.pollPerf(chunks)
|
||||
go p.pollChunksPerfBuffer(chunks)
|
||||
|
||||
for {
|
||||
select {
|
||||
@@ -47,7 +68,7 @@ func (p *tlsPoller) Poll(extension *api.Extension,
|
||||
return
|
||||
}
|
||||
|
||||
if err := p.handleTlsChunk(chunk, extension, emitter, options); err != nil {
|
||||
if err := p.handleTlsChunk(chunk, p.extension, emitter, options); err != nil {
|
||||
LogError(err)
|
||||
}
|
||||
case key := <-p.closedReaders:
|
||||
@@ -56,6 +77,41 @@ func (p *tlsPoller) Poll(extension *api.Extension,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *tlsPoller) pollChunksPerfBuffer(chunks chan<- *tlsChunk) {
|
||||
logger.Log.Infof("Start polling for tls events")
|
||||
|
||||
for {
|
||||
record, err := p.chunksReader.Read()
|
||||
|
||||
if err != nil {
|
||||
close(chunks)
|
||||
|
||||
if errors.Is(err, perf.ErrClosed) {
|
||||
return
|
||||
}
|
||||
|
||||
LogError(errors.Errorf("Error reading chunks from tls perf, aborting TLS! %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
if record.LostSamples != 0 {
|
||||
logger.Log.Infof("Buffer is full, dropped %d chunks", record.LostSamples)
|
||||
continue
|
||||
}
|
||||
|
||||
buffer := bytes.NewReader(record.RawSample)
|
||||
|
||||
var chunk tlsChunk
|
||||
|
||||
if err := binary.Read(buffer, binary.LittleEndian, &chunk); err != nil {
|
||||
LogError(errors.Errorf("Error parsing chunk %v", err))
|
||||
continue
|
||||
}
|
||||
|
||||
chunks <- &chunk
|
||||
}
|
||||
}
|
||||
|
||||
func (p *tlsPoller) handleTlsChunk(chunk *tlsChunk, extension *api.Extension,
|
||||
emitter api.Emitter, options *api.TrafficFilteringOptions) error {
|
||||
ip, port, err := chunk.getAddress()
|
||||
@@ -75,7 +131,7 @@ func (p *tlsPoller) handleTlsChunk(chunk *tlsChunk, extension *api.Extension,
|
||||
reader.chunks <- chunk
|
||||
|
||||
if os.Getenv("MIZU_VERBOSE_TLS_TAPPER") == "true" {
|
||||
logTls(chunk, ip, port)
|
||||
p.logTls(chunk, ip, port)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -92,10 +148,9 @@ func (p *tlsPoller) startNewTlsReader(chunk *tlsChunk, ip net.IP, port uint16, k
|
||||
},
|
||||
}
|
||||
|
||||
isRequest := (chunk.isClient() && chunk.isWrite()) || (chunk.isServer() && chunk.isRead())
|
||||
tcpid := buildTcpId(isRequest, ip, port)
|
||||
tcpid := p.buildTcpId(chunk, ip, port)
|
||||
|
||||
go dissect(extension, reader, isRequest, &tcpid, emitter, options, p.reqResMatcher)
|
||||
go dissect(extension, reader, chunk.isRequest(), &tcpid, emitter, options, p.reqResMatcher)
|
||||
return reader
|
||||
}
|
||||
|
||||
@@ -120,27 +175,36 @@ func buildTlsKey(chunk *tlsChunk, ip net.IP, port uint16) string {
|
||||
return fmt.Sprintf("%v:%v-%v:%v", chunk.isClient(), chunk.isRead(), ip, port)
|
||||
}
|
||||
|
||||
func buildTcpId(isRequest bool, ip net.IP, port uint16) api.TcpID {
|
||||
if isRequest {
|
||||
func (p *tlsPoller) buildTcpId(chunk *tlsChunk, ip net.IP, port uint16) api.TcpID {
|
||||
myIp, myPort, err := getAddressBySockfd(p.procfs, chunk.Pid, chunk.Fd, chunk.isClient())
|
||||
|
||||
if err != nil {
|
||||
// May happen if the socket already closed, very likely to happen for localhost
|
||||
//
|
||||
myIp = api.UnknownIp
|
||||
myPort = api.UnknownPort
|
||||
}
|
||||
|
||||
if chunk.isRequest() {
|
||||
return api.TcpID{
|
||||
SrcIP: UNKNOWN_HOST,
|
||||
SrcIP: myIp.String(),
|
||||
DstIP: ip.String(),
|
||||
SrcPort: strconv.Itoa(int(UNKNOWN_PORT)),
|
||||
DstPort: strconv.FormatInt(int64(port), 10),
|
||||
SrcPort: strconv.FormatUint(uint64(myPort), 10),
|
||||
DstPort: strconv.FormatUint(uint64(port), 10),
|
||||
Ident: "",
|
||||
}
|
||||
} else {
|
||||
return api.TcpID{
|
||||
SrcIP: ip.String(),
|
||||
DstIP: UNKNOWN_HOST,
|
||||
SrcPort: strconv.FormatInt(int64(port), 10),
|
||||
DstPort: strconv.Itoa(int(UNKNOWN_PORT)),
|
||||
DstIP: myIp.String(),
|
||||
SrcPort: strconv.FormatUint(uint64(port), 10),
|
||||
DstPort: strconv.FormatUint(uint64(myPort), 10),
|
||||
Ident: "",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func logTls(chunk *tlsChunk, ip net.IP, port uint16) {
|
||||
func (p *tlsPoller) logTls(chunk *tlsChunk, ip net.IP, port uint16) {
|
||||
var flagsStr string
|
||||
|
||||
if chunk.isClient() {
|
||||
@@ -155,8 +219,13 @@ func logTls(chunk *tlsChunk, ip net.IP, port uint16) {
|
||||
flagsStr += "W"
|
||||
}
|
||||
|
||||
srcIp, srcPort, _ := getAddressBySockfd(p.procfs, chunk.Pid, chunk.Fd, true)
|
||||
dstIp, dstPort, _ := getAddressBySockfd(p.procfs, chunk.Pid, chunk.Fd, false)
|
||||
|
||||
str := strings.ReplaceAll(strings.ReplaceAll(string(chunk.Data[0:chunk.Recorded]), "\n", " "), "\r", "")
|
||||
|
||||
logger.Log.Infof("PID: %v (tid: %v) (fd: %v) (client: %v) (addr: %v:%v) (recorded %v out of %v) - %v - %v",
|
||||
chunk.Pid, chunk.Tgid, chunk.Fd, flagsStr, ip, port, chunk.Recorded, chunk.Len, str, hex.EncodeToString(chunk.Data[0:chunk.Recorded]))
|
||||
logger.Log.Infof("PID: %v (tid: %v) (fd: %v) (client: %v) (addr: %v:%v) (fdaddr %v:%v>%v:%v) (recorded %v out of %v) - %v - %v",
|
||||
chunk.Pid, chunk.Tgid, chunk.Fd, flagsStr, ip, port,
|
||||
srcIp, srcPort, dstIp, dstPort,
|
||||
chunk.Recorded, chunk.Len, str, hex.EncodeToString(chunk.Data[0:chunk.Recorded]))
|
||||
}
|
||||
|
||||
@@ -1,13 +1,10 @@
|
||||
package tlstapper
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
|
||||
"github.com/cilium/ebpf/perf"
|
||||
"github.com/cilium/ebpf/rlimit"
|
||||
"github.com/go-errors/errors"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
//go:generate go run github.com/cilium/ebpf/cmd/bpf2go tlsTapper bpf/tls_tapper.c -- -O2 -g -D__TARGET_ARCH_x86
|
||||
@@ -16,10 +13,10 @@ type TlsTapper struct {
|
||||
bpfObjects tlsTapperObjects
|
||||
syscallHooks syscallHooks
|
||||
sslHooksStructs []sslHooks
|
||||
reader *perf.Reader
|
||||
poller *tlsPoller
|
||||
}
|
||||
|
||||
func (t *TlsTapper) Init(bufferSize int) error {
|
||||
func (t *TlsTapper) Init(bufferSize int, procfs string, extension *api.Extension) error {
|
||||
logger.Log.Infof("Initializing tls tapper (bufferSize: %v)", bufferSize)
|
||||
|
||||
if err := setupRLimit(); err != nil {
|
||||
@@ -27,55 +24,23 @@ func (t *TlsTapper) Init(bufferSize int) error {
|
||||
}
|
||||
|
||||
t.bpfObjects = tlsTapperObjects{}
|
||||
|
||||
if err := loadTlsTapperObjects(&t.bpfObjects, nil); err != nil {
|
||||
return errors.Wrap(err, 0)
|
||||
}
|
||||
|
||||
t.syscallHooks = syscallHooks{}
|
||||
|
||||
if err := t.syscallHooks.installSyscallHooks(&t.bpfObjects); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
t.sslHooksStructs = make([]sslHooks, 0)
|
||||
|
||||
return t.initChunksReader(bufferSize)
|
||||
t.poller = newTlsPoller(t, extension, procfs)
|
||||
return t.poller.init(&t.bpfObjects, bufferSize)
|
||||
}
|
||||
|
||||
func (t *TlsTapper) pollPerf(chunks chan<- *tlsChunk) {
|
||||
logger.Log.Infof("Start polling for tls events")
|
||||
|
||||
for {
|
||||
record, err := t.reader.Read()
|
||||
|
||||
if err != nil {
|
||||
close(chunks)
|
||||
|
||||
if errors.Is(err, perf.ErrClosed) {
|
||||
return
|
||||
}
|
||||
|
||||
LogError(errors.Errorf("Error reading chunks from tls perf, aborting TLS! %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
if record.LostSamples != 0 {
|
||||
logger.Log.Infof("Buffer is full, dropped %d chunks", record.LostSamples)
|
||||
continue
|
||||
}
|
||||
|
||||
buffer := bytes.NewReader(record.RawSample)
|
||||
|
||||
var chunk tlsChunk
|
||||
|
||||
if err := binary.Read(buffer, binary.LittleEndian, &chunk); err != nil {
|
||||
LogError(errors.Errorf("Error parsing chunk %v", err))
|
||||
continue
|
||||
}
|
||||
|
||||
chunks <- &chunk
|
||||
}
|
||||
func (t *TlsTapper) Poll(emitter api.Emitter, options *api.TrafficFilteringOptions) {
|
||||
t.poller.poll(emitter, options)
|
||||
}
|
||||
|
||||
func (t *TlsTapper) GlobalTap(sslLibrary string) error {
|
||||
@@ -118,7 +83,7 @@ func (t *TlsTapper) Close() []error {
|
||||
errors = append(errors, sslHooks.close()...)
|
||||
}
|
||||
|
||||
if err := t.reader.Close(); err != nil {
|
||||
if err := t.poller.close(); err != nil {
|
||||
errors = append(errors, err)
|
||||
}
|
||||
|
||||
@@ -135,18 +100,6 @@ func setupRLimit() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *TlsTapper) initChunksReader(bufferSize int) error {
|
||||
var err error
|
||||
|
||||
t.reader, err = perf.NewReader(t.bpfObjects.ChunksBuffer, bufferSize)
|
||||
|
||||
if err != nil {
|
||||
return errors.Wrap(err, 0)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *TlsTapper) tapPid(pid uint32, sslLibrary string) error {
|
||||
logger.Log.Infof("Tapping TLS (pid: %v) (sslLibrary: %v)", pid, sslLibrary)
|
||||
|
||||
|
||||
@@ -52,12 +52,12 @@ export default class Api {
|
||||
}
|
||||
|
||||
getEntry = async (id, query) => {
|
||||
const response = await this.client.get(`/entries/${id}?query=${query}`);
|
||||
const response = await this.client.get(`/entries/${id}?query=${encodeURIComponent(query)}`);
|
||||
return response.data;
|
||||
}
|
||||
|
||||
fetchEntries = async (leftOff, direction, query, limit, timeoutMs) => {
|
||||
const response = await this.client.get(`/entries/?leftOff=${leftOff}&direction=${direction}&query=${query}&limit=${limit}&timeoutMs=${timeoutMs}`).catch(function (thrown) {
|
||||
const response = await this.client.get(`/entries/?leftOff=${leftOff}&direction=${direction}&query=${encodeURIComponent(query)}&limit=${limit}&timeoutMs=${timeoutMs}`).catch(function (thrown) {
|
||||
console.error(thrown.message);
|
||||
return {};
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user