Files
kubeshark/tap/source/packet_source_manager.go
M. Mert Yıldıran 0eb7883a47 Rename the project to Kubeshark (#1226)
* Rename `mizu` to `kubeshark`

* Rename `up9inc` to `kubeshark`

* Change the logo, title, motto and the main color

* Replace the favicon

* Update the docs link

* Change the copyright text in C files

* Remove a comment

* Rewrite the `README.md` and update the logo and screenshots used in it

* Add a `TODO`

* Fix the grammar

* Fix the bottom text in the filtering guide

* Change the Docker Hub username of cross-compilation intermediate images

* Add an install script

* Fix `docker/login-action` in the CI

* Delete `build-custom-branch.yml` GitHub workflow

* Update `README.md`

* Remove `install.sh`

* Change the motto back to "Traffic viewer for Kubernetes"
2022-11-19 11:13:15 +03:00

182 lines
4.7 KiB
Go

package source
import (
"fmt"
"strings"
"github.com/kubeshark/kubeshark/logger"
"github.com/kubeshark/kubeshark/tap/api"
v1 "k8s.io/api/core/v1"
)
const bpfFilterMaxPods = 150
const hostSourcePid = "0"
type PacketSourceManagerConfig struct {
mtls bool
procfs string
interfaceName string
packetCapture string
behaviour TcpPacketSourceBehaviour
}
type PacketSourceManager struct {
sources map[string]*tcpPacketSource
config PacketSourceManagerConfig
}
func NewPacketSourceManager(procfs string, filename string, interfaceName string,
mtls bool, pods []v1.Pod, behaviour TcpPacketSourceBehaviour, ipdefrag bool,
packetCapture string, packets chan<- TcpPacketInfo) (*PacketSourceManager, error) {
hostSource, err := newHostPacketSource(filename, interfaceName, packetCapture, behaviour)
if err != nil {
return nil, err
}
sourceManager := &PacketSourceManager{
sources: map[string]*tcpPacketSource{
hostSourcePid: hostSource,
},
}
sourceManager.config = PacketSourceManagerConfig{
mtls: mtls,
procfs: procfs,
interfaceName: interfaceName,
packetCapture: packetCapture,
behaviour: behaviour,
}
go hostSource.readPackets(ipdefrag, packets)
return sourceManager, nil
}
func newHostPacketSource(filename string, interfaceName string, packetCapture string,
behaviour TcpPacketSourceBehaviour) (*tcpPacketSource, error) {
var name string
if filename == "" {
name = fmt.Sprintf("host-%s", interfaceName)
} else {
name = fmt.Sprintf("file-%s", filename)
}
source, err := newTcpPacketSource(name, filename, interfaceName, packetCapture, behaviour, api.Pcap)
if err != nil {
return nil, err
}
return source, nil
}
func (m *PacketSourceManager) UpdatePods(pods []v1.Pod, ipdefrag bool, packets chan<- TcpPacketInfo) {
if m.config.mtls {
m.updateMtlsPods(m.config.procfs, pods, m.config.interfaceName, m.config.packetCapture, m.config.behaviour, ipdefrag, packets)
}
m.setBPFFilter(pods)
}
func (m *PacketSourceManager) updateMtlsPods(procfs string, pods []v1.Pod,
interfaceName string, packetCapture string, behaviour TcpPacketSourceBehaviour, ipdefrag bool, packets chan<- TcpPacketInfo) {
relevantPids := m.getRelevantPids(procfs, pods)
logger.Log.Infof("Updating mtls pods (new: %v) (current: %v)", relevantPids, m.sources)
for pid, src := range m.sources {
if _, ok := relevantPids[pid]; !ok {
src.close()
delete(m.sources, pid)
}
}
for pid, origin := range relevantPids {
if _, ok := m.sources[pid]; !ok {
source, err := newNetnsPacketSource(procfs, pid, interfaceName, packetCapture, behaviour, origin)
if err == nil {
go source.readPackets(ipdefrag, packets)
m.sources[pid] = source
}
}
}
}
func (m *PacketSourceManager) getRelevantPids(procfs string, pods []v1.Pod) map[string]api.Capture {
relevantPids := make(map[string]api.Capture)
relevantPids[hostSourcePid] = api.Pcap
if envoyPids, err := discoverRelevantEnvoyPids(procfs, pods); err != nil {
logger.Log.Warningf("Unable to discover envoy pids - %w", err)
} else {
for _, pid := range envoyPids {
relevantPids[pid] = api.Envoy
}
}
if linkerdPids, err := discoverRelevantLinkerdPids(procfs, pods); err != nil {
logger.Log.Warningf("Unable to discover linkerd pids - %w", err)
} else {
for _, pid := range linkerdPids {
relevantPids[pid] = api.Linkerd
}
}
return relevantPids
}
func buildBPFExpr(pods []v1.Pod) string {
hostsFilter := make([]string, 0)
for _, pod := range pods {
hostsFilter = append(hostsFilter, fmt.Sprintf("host %s", pod.Status.PodIP))
}
return fmt.Sprintf("%s and port not 443", strings.Join(hostsFilter, " or "))
}
func (m *PacketSourceManager) setBPFFilter(pods []v1.Pod) {
if len(pods) == 0 {
logger.Log.Info("No pods provided, skipping pcap bpf filter")
return
}
var expr string
if len(pods) > bpfFilterMaxPods {
logger.Log.Info("Too many pods for setting ebpf filter %d, setting just not 443", len(pods))
expr = "port not 443"
} else {
expr = buildBPFExpr(pods)
}
logger.Log.Infof("Setting pcap bpf filter %s", expr)
for pid, src := range m.sources {
if err := src.setBPFFilter(expr); err != nil {
logger.Log.Warningf("Error setting bpf filter for %s %v - %w", pid, src, err)
}
}
}
func (m *PacketSourceManager) Close() {
for _, src := range m.sources {
src.close()
}
}
func (m *PacketSourceManager) Stats() string {
result := ""
for _, source := range m.sources {
packetsReceived, packetsDropped, err := source.Stats()
if err != nil {
result = result + fmt.Sprintf("[%s: err:%s]", source.String(), err)
} else {
result = result + fmt.Sprintf("[%s: rec: %d dropped: %d]", source.String(), packetsReceived, packetsDropped)
}
}
return result
}