Compare commits

..

7 Commits

Author SHA1 Message Date
David Levanon
6979441422 tls missing addresses (#825)
* stream seen file descriptors from ebpf

* re-generate bpf object files

* fixing pr comments
2022-03-14 15:40:27 +02:00
David Levanon
9ec8347c6c set bpf filter for pcap (#865)
* set bpf filter for pcap

* implement pod updating mechanism

* Update tap/source/netns_packet_source.go

* Update tap/source/netns_packet_source.go

* minor pr fixes

Co-authored-by: Nimrod Gilboa Markevich <59927337+nimrod-up9@users.noreply.github.com>
2022-03-14 15:35:49 +02:00
Igor Gov
617fb89ca5 Build custom branch Github action (#890)
* Build custom branch github action #build_and_publish_custom_image

* #build_and_publish_custom_image

* #build_and_publish_custom_image

* #build_and_publish_custom_image

* #build_and_publish_custom_image

* #build_and_publish_custom_image

* .
2022-03-14 13:15:28 +02:00
Igor Gov
1cbd9cb199 Adding dev latest tag for each pre-release docker (#888) 2022-03-13 09:47:49 +02:00
Igor Gov
23c1b66855 Adding dev latest tag for each pre-release docker (#885) 2022-03-10 17:26:56 +02:00
RoyUP9
f5fa9ff270 Added mizu install template (#884) 2022-03-09 17:52:55 +02:00
David Levanon
4159938cea add minikube over virtualbox cgroup format (#882) 2022-03-09 11:19:11 +02:00
24 changed files with 649 additions and 304 deletions

View File

@@ -0,0 +1,44 @@
name: Build Custom Branch
on: push
concurrency:
group: custom-branch-build-${{ github.ref }}
cancel-in-progress: true
jobs:
build:
name: Push custom branch image to GCR
runs-on: ubuntu-latest
if: ${{ contains(github.event.head_commit.message, '#build_and_publish_custom_image') }}
steps:
- name: Check out the repo
uses: actions/checkout@v2
- id: 'auth'
uses: 'google-github-actions/auth@v0'
with:
credentials_json: '${{ secrets.GCR_JSON_KEY }}'
- name: 'Set up Cloud SDK'
uses: 'google-github-actions/setup-gcloud@v0'
- name: Get base image name
shell: bash
run: echo "##[set-output name=image;]$(echo gcr.io/up9-docker-hub/mizu/${GITHUB_REF#refs/heads/})"
id: base_image_step
- name: Login to GCR
uses: docker/login-action@v1
with:
registry: gcr.io
username: _json_key
password: ${{ secrets.GCR_JSON_KEY }}
- name: Build and push
uses: docker/build-push-action@v2
with:
context: .
push: true
tags: ${{ steps.base_image_step.outputs.image }}:latest

View File

@@ -59,6 +59,7 @@ jobs:
tags: |
type=raw,${{ steps.versioning.outputs.version }}
type=raw,value=latest,enable=${{ steps.condval.outputs.value == 'stable' }}
type=raw,value=dev-latest,enable=${{ steps.condval.outputs.value == 'dev' }}
flavor: |
latest=auto
prefix=
@@ -145,6 +146,7 @@ jobs:
tags: |
type=raw,${{ steps.versioning.outputs.version }}
type=raw,value=latest,enable=${{ steps.condval.outputs.value == 'stable' }}
type=raw,value=dev-latest,enable=${{ steps.condval.outputs.value == 'dev' }}
flavor: |
latest=auto
prefix=
@@ -208,7 +210,7 @@ jobs:
tags: |
type=raw,${{ steps.versioning.outputs.version }}
type=raw,value=latest,enable=${{ steps.condval.outputs.value == 'stable' }}
type=raw,value=dev-latest,enable=${{ steps.condval.outputs.value == 'dev' }}
- name: Login to Docker Hub
uses: docker/login-action@v1
with:

View File

@@ -4,11 +4,10 @@ import (
"bytes"
"encoding/json"
"fmt"
"io"
"github.com/up9inc/mizu/cli/utils"
"io/ioutil"
"net/http"
"net/url"
"strings"
"time"
"github.com/up9inc/mizu/shared/kubernetes"
@@ -59,7 +58,7 @@ func (provider *Provider) TestConnection() error {
func (provider *Provider) isReachable() (bool, error) {
echoUrl := fmt.Sprintf("%s/echo", provider.url)
if _, err := provider.get(echoUrl); err != nil {
if _, err := utils.Get(echoUrl, provider.client); err != nil {
return false, err
} else {
return true, nil
@@ -72,7 +71,7 @@ func (provider *Provider) ReportTapperStatus(tapperStatus shared.TapperStatus) e
if jsonValue, err := json.Marshal(tapperStatus); err != nil {
return fmt.Errorf("failed Marshal the tapper status %w", err)
} else {
if _, err := provider.post(tapperStatusUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
if _, err := utils.Post(tapperStatusUrl, "application/json", bytes.NewBuffer(jsonValue), provider.client); err != nil {
return fmt.Errorf("failed sending to API server the tapped pods %w", err)
} else {
logger.Log.Debugf("Reported to server API about tapper status: %v", tapperStatus)
@@ -89,7 +88,7 @@ func (provider *Provider) ReportTappedPods(pods []core.Pod) error {
if jsonValue, err := json.Marshal(podInfos); err != nil {
return fmt.Errorf("failed Marshal the tapped pods %w", err)
} else {
if _, err := provider.post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
if _, err := utils.Post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue), provider.client); err != nil {
return fmt.Errorf("failed sending to API server the tapped pods %w", err)
} else {
logger.Log.Debugf("Reported to server API about %d taped pods successfully", len(podInfos))
@@ -101,7 +100,7 @@ func (provider *Provider) ReportTappedPods(pods []core.Pod) error {
func (provider *Provider) GetGeneralStats() (map[string]interface{}, error) {
generalStatsUrl := fmt.Sprintf("%s/status/general", provider.url)
response, requestErr := provider.get(generalStatsUrl)
response, requestErr := utils.Get(generalStatsUrl, provider.client)
if requestErr != nil {
return nil, fmt.Errorf("failed to get general stats for telemetry, err: %w", requestErr)
}
@@ -126,7 +125,7 @@ func (provider *Provider) GetVersion() (string, error) {
Method: http.MethodGet,
URL: versionUrl,
}
statusResp, err := provider.do(req)
statusResp, err := utils.Do(req, provider.client)
if err != nil {
return "", err
}
@@ -139,40 +138,3 @@ func (provider *Provider) GetVersion() (string, error) {
return versionResponse.Ver, nil
}
// When err is nil, resp always contains a non-nil resp.Body.
// Caller should close resp.Body when done reading from it.
func (provider *Provider) get(url string) (*http.Response, error) {
return provider.checkError(provider.client.Get(url))
}
// When err is nil, resp always contains a non-nil resp.Body.
// Caller should close resp.Body when done reading from it.
func (provider *Provider) post(url, contentType string, body io.Reader) (*http.Response, error) {
return provider.checkError(provider.client.Post(url, contentType, body))
}
// When err is nil, resp always contains a non-nil resp.Body.
// Caller should close resp.Body when done reading from it.
func (provider *Provider) do(req *http.Request) (*http.Response, error) {
return provider.checkError(provider.client.Do(req))
}
func (provider *Provider) checkError(response *http.Response, errInOperation error) (*http.Response, error) {
if (errInOperation != nil) {
return response, errInOperation
// Check only if status != 200 (and not status >= 300). Agent APIs return only 200 on success.
} else if response.StatusCode != http.StatusOK {
body, err := ioutil.ReadAll(response.Body)
response.Body.Close()
response.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
if err != nil {
return response, err
}
errorMsg := strings.ReplaceAll((string(body)), "\n", ";")
return response, fmt.Errorf("got response with status code: %d, body: %s", response.StatusCode, errorMsg)
}
return response, nil
}

42
cli/bucket/provider.go Normal file
View File

@@ -0,0 +1,42 @@
package bucket
import (
"fmt"
"github.com/up9inc/mizu/cli/utils"
"io/ioutil"
"net/http"
"time"
)
type Provider struct {
url string
client *http.Client
}
const DefaultTimeout = 2 * time.Second
func NewProvider(url string, timeout time.Duration) *Provider {
return &Provider{
url: url,
client: &http.Client{
Timeout: timeout,
},
}
}
func (provider *Provider) GetInstallTemplate(templateName string) (string, error) {
url := fmt.Sprintf("%s/%v", provider.url, templateName)
response, err := utils.Get(url, provider.client)
if err != nil {
return "", err
}
defer response.Body.Close()
installTemplate, err := ioutil.ReadAll(response.Body)
if err != nil {
return "", err
}
return string(installTemplate), nil
}

View File

@@ -36,8 +36,8 @@ func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx con
return
}
apiProvider = apiserver.NewProvider(GetApiServerUrl(port), apiserver.DefaultRetries, apiserver.DefaultTimeout)
if err := apiProvider.TestConnection(); err != nil {
provider := apiserver.NewProvider(GetApiServerUrl(port), apiserver.DefaultRetries, apiserver.DefaultTimeout)
if err := provider.TestConnection(); err != nil {
logger.Log.Debugf("Couldn't connect using proxy, stopping proxy and trying to create port-forward")
if err := httpServer.Shutdown(ctx); err != nil {
logger.Log.Debugf("Error occurred while stopping proxy %v", errormessage.FormatError(err))
@@ -51,8 +51,8 @@ func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx con
return
}
apiProvider = apiserver.NewProvider(GetApiServerUrl(port), apiserver.DefaultRetries, apiserver.DefaultTimeout)
if err := apiProvider.TestConnection(); err != nil {
provider = apiserver.NewProvider(GetApiServerUrl(port), apiserver.DefaultRetries, apiserver.DefaultTimeout)
if err := provider.TestConnection(); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", fsUtils.GetLogFilePath()))
cancel()
return

View File

@@ -3,7 +3,6 @@ package cmd
import (
"github.com/spf13/cobra"
"github.com/up9inc/mizu/cli/telemetry"
"github.com/up9inc/mizu/shared/logger"
)
var installCmd = &cobra.Command{
@@ -11,14 +10,7 @@ var installCmd = &cobra.Command{
Short: "Installs mizu components",
RunE: func(cmd *cobra.Command, args []string) error {
go telemetry.ReportRun("install", nil)
logger.Log.Infof("This command has been deprecated, please use helm as described below.\n\n")
logger.Log.Infof("To install stable build of Mizu on your cluster using helm, run the following command:")
logger.Log.Infof(" helm install mizu up9mizu --repo https://static.up9.com/mizu/helm --namespace=mizu --create-namespace\n\n")
logger.Log.Infof("To install development build of Mizu on your cluster using helm, run the following command:")
logger.Log.Infof(" helm install mizu up9mizu --repo https://static.up9.com/mizu/helm-develop --namespace=mizu --create-namespace\n")
runMizuInstall()
return nil
},
}

19
cli/cmd/installRunner.go Normal file
View File

@@ -0,0 +1,19 @@
package cmd
import (
"fmt"
"github.com/up9inc/mizu/cli/bucket"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/shared/logger"
)
func runMizuInstall() {
bucketProvider := bucket.NewProvider(config.Config.Install.TemplateUrl, bucket.DefaultTimeout)
installTemplate, err := bucketProvider.GetInstallTemplate(config.Config.Install.TemplateName)
if err != nil {
logger.Log.Errorf("Failed getting install template, err: %v", err)
return
}
fmt.Print(installTemplate)
}

View File

@@ -23,6 +23,7 @@ const (
type ConfigStruct struct {
Tap configStructs.TapConfig `yaml:"tap"`
Check configStructs.CheckConfig `yaml:"check"`
Install configStructs.InstallConfig `yaml:"install"`
Version configStructs.VersionConfig `yaml:"version"`
View configStructs.ViewConfig `yaml:"view"`
Logs configStructs.LogsConfig `yaml:"logs"`

View File

@@ -0,0 +1,6 @@
package configStructs
type InstallConfig struct {
TemplateUrl string `yaml:"template-url" default:"https://storage.googleapis.com/static.up9.io/mizu/helm-template"`
TemplateName string `yaml:"template-name" default:"helm-template.yaml"`
}

47
cli/utils/httpUtils.go Normal file
View File

@@ -0,0 +1,47 @@
package utils
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"
"strings"
)
// Get - When err is nil, resp always contains a non-nil resp.Body.
// Caller should close resp.Body when done reading from it.
func Get(url string, client *http.Client) (*http.Response, error) {
return checkError(client.Get(url))
}
// Post - When err is nil, resp always contains a non-nil resp.Body.
// Caller should close resp.Body when done reading from it.
func Post(url, contentType string, body io.Reader, client *http.Client) (*http.Response, error) {
return checkError(client.Post(url, contentType, body))
}
// Do - When err is nil, resp always contains a non-nil resp.Body.
// Caller should close resp.Body when done reading from it.
func Do(req *http.Request, client *http.Client) (*http.Response, error) {
return checkError(client.Do(req))
}
func checkError(response *http.Response, errInOperation error) (*http.Response, error) {
if errInOperation != nil {
return response, errInOperation
// Check only if status != 200 (and not status >= 300). Agent APIs return only 200 on success.
} else if response.StatusCode != http.StatusOK {
body, err := ioutil.ReadAll(response.Body)
response.Body.Close()
response.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
if err != nil {
return response, err
}
errorMsg := strings.ReplaceAll(string(body), "\n", ";")
return response, fmt.Errorf("got response with status code: %d, body: %s", response.StatusCode, errorMsg)
}
return response, nil
}

View File

@@ -325,15 +325,22 @@ func (tapperSyncer *MizuTapperSyncer) updateMizuTappers() error {
tapperSyncer.config.MizuApiFilteringOptions,
tapperSyncer.config.LogLevel,
tapperSyncer.config.ServiceMesh,
tapperSyncer.config.Tls,
); err != nil {
tapperSyncer.config.Tls); err != nil {
return err
}
logger.Log.Debugf("Successfully created %v tappers", len(tapperSyncer.nodeToTappedPodMap))
} else {
if err := tapperSyncer.kubernetesProvider.RemoveDaemonSet(tapperSyncer.context, tapperSyncer.config.MizuResourcesNamespace, TapperDaemonSetName); err != nil {
if err := tapperSyncer.kubernetesProvider.ResetMizuTapperDaemonSet(
tapperSyncer.context,
tapperSyncer.config.MizuResourcesNamespace,
TapperDaemonSetName,
tapperSyncer.config.AgentImage,
TapperPodName); err != nil {
return err
}
logger.Log.Debugf("Successfully reset tapper daemon set")
}
return nil

View File

@@ -449,9 +449,9 @@ func (provider *Provider) CanI(ctx context.Context, namespace string, resource s
Spec: auth.SelfSubjectAccessReviewSpec{
ResourceAttributes: &auth.ResourceAttributes{
Namespace: namespace,
Resource: resource,
Verb: verb,
Group: group,
Resource: resource,
Verb: verb,
Group: group,
},
},
}
@@ -995,6 +995,55 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
return err
}
func (provider *Provider) ResetMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string) error {
agentContainer := applyconfcore.Container()
agentContainer.WithName(tapperPodName)
agentContainer.WithImage(podImage)
nodeSelectorRequirement := applyconfcore.NodeSelectorRequirement()
nodeSelectorRequirement.WithKey("mizu-non-existing-label")
nodeSelectorRequirement.WithOperator(core.NodeSelectorOpExists)
nodeSelectorTerm := applyconfcore.NodeSelectorTerm()
nodeSelectorTerm.WithMatchExpressions(nodeSelectorRequirement)
nodeSelector := applyconfcore.NodeSelector()
nodeSelector.WithNodeSelectorTerms(nodeSelectorTerm)
nodeAffinity := applyconfcore.NodeAffinity()
nodeAffinity.WithRequiredDuringSchedulingIgnoredDuringExecution(nodeSelector)
affinity := applyconfcore.Affinity()
affinity.WithNodeAffinity(nodeAffinity)
podSpec := applyconfcore.PodSpec()
podSpec.WithContainers(agentContainer)
podSpec.WithAffinity(affinity)
podTemplate := applyconfcore.PodTemplateSpec()
podTemplate.WithLabels(map[string]string{
"app": tapperPodName,
LabelManagedBy: provider.managedBy,
LabelCreatedBy: provider.createdBy,
})
podTemplate.WithSpec(podSpec)
labelSelector := applyconfmeta.LabelSelector()
labelSelector.WithMatchLabels(map[string]string{"app": tapperPodName})
applyOptions := metav1.ApplyOptions{
Force: true,
FieldManager: fieldManagerName,
}
daemonSet := applyconfapp.DaemonSet(daemonSetName, namespace)
daemonSet.
WithLabels(map[string]string{
LabelManagedBy: provider.managedBy,
LabelCreatedBy: provider.createdBy,
}).
WithSpec(applyconfapp.DaemonSetSpec().WithSelector(labelSelector).WithTemplate(podTemplate))
_, err := provider.clientSet.AppsV1().DaemonSets(namespace).Apply(ctx, daemonSet, applyOptions)
return err
}
func (provider *Provider) listPodsImpl(ctx context.Context, regex *regexp.Regexp, namespaces []string, listOptions metav1.ListOptions) ([]core.Pod, error) {
var pods []core.Pod
for _, namespace := range namespaces {
@@ -1038,7 +1087,7 @@ func (provider *Provider) ListAllRunningPodsMatchingRegex(ctx context.Context, r
return matchingPods, nil
}
func(provider *Provider) ListPodsByAppLabel(ctx context.Context, namespaces string, labelName string) ([]core.Pod, error) {
func (provider *Provider) ListPodsByAppLabel(ctx context.Context, namespaces string, labelName string) ([]core.Pod, error) {
pods, err := provider.clientSet.CoreV1().Pods(namespaces).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("app=%s", labelName)})
if err != nil {
return nil, err

View File

@@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"sort"
@@ -18,6 +19,9 @@ import (
const mizuTestEnvVar = "MIZU_TEST"
var UnknownIp net.IP = net.IP{0, 0, 0, 0}
var UnknownPort uint16 = 0
type Protocol struct {
Name string `json:"name"`
LongName string `json:"longName"`

View File

@@ -52,7 +52,6 @@ var snaplen = flag.Int("s", 65536, "Snap length (number of bytes max to read per
var tstype = flag.String("timestamp_type", "", "Type of timestamps to use")
var promisc = flag.Bool("promisc", true, "Set promiscuous mode")
var staleTimeoutSeconds = flag.Int("staletimout", 120, "Max time in seconds to keep connections which don't transmit data")
var pids = flag.String("pids", "", "A comma separated list of PIDs to capture their network namespaces")
var servicemesh = flag.Bool("servicemesh", false, "Record decrypted traffic if the cluster is configured with a service mesh and with mtls")
var tls = flag.Bool("tls", false, "Enable TLS tapper")
@@ -190,7 +189,7 @@ func initializePacketSources() error {
}
var err error
if packetSourceManager, err = source.NewPacketSourceManager(*procfs, *pids, *fname, *iface, *servicemesh, tapTargets, behaviour); err != nil {
if packetSourceManager, err = source.NewPacketSourceManager(*procfs, *fname, *iface, *servicemesh, tapTargets, behaviour); err != nil {
return err
} else {
packetSourceManager.ReadPackets(!*nodefrag, mainPacketInputChan)
@@ -248,7 +247,7 @@ func startTlsTapper(extension *api.Extension, outputItems chan *api.OutputChanne
tls := tlstapper.TlsTapper{}
tlsPerfBufferSize := os.Getpagesize() * 100
if err := tls.Init(tlsPerfBufferSize); err != nil {
if err := tls.Init(tlsPerfBufferSize, *procfs, extension); err != nil {
tlstapper.LogError(err)
return
}
@@ -272,6 +271,5 @@ func startTlsTapper(extension *api.Extension, outputItems chan *api.OutputChanne
OutputChannel: outputItems,
}
poller := tlstapper.NewTlsPoller(&tls, extension)
go poller.Poll(extension, emitter, options)
go tls.Poll(emitter, options)
}

View File

@@ -0,0 +1,83 @@
package source
import (
"fmt"
"runtime"
"github.com/up9inc/mizu/shared/logger"
"github.com/vishvananda/netns"
)
func newNetnsPacketSource(procfs string, pid string,
interfaceName string, behaviour TcpPacketSourceBehaviour) (*tcpPacketSource, error) {
nsh, err := netns.GetFromPath(fmt.Sprintf("%s/%s/ns/net", procfs, pid))
if err != nil {
logger.Log.Errorf("Unable to get netns of pid %s - %w", pid, err)
return nil, err
}
src, err := newPacketSourceFromNetnsHandle(pid, nsh, interfaceName, behaviour)
if err != nil {
logger.Log.Errorf("Error starting netns packet source for %s - %w", pid, err)
return nil, err
}
return src, nil
}
func newPacketSourceFromNetnsHandle(pid string, nsh netns.NsHandle, interfaceName string,
behaviour TcpPacketSourceBehaviour) (*tcpPacketSource, error) {
done := make(chan *tcpPacketSource)
errors := make(chan error)
go func(done chan<- *tcpPacketSource) {
// Setting a netns should be done from a dedicated OS thread.
//
// goroutines are not really OS threads, we try to mimic the issue by
// locking the OS thread to this goroutine
//
runtime.LockOSThread()
defer runtime.UnlockOSThread()
oldnetns, err := netns.Get()
if err != nil {
logger.Log.Errorf("Unable to get netns of current thread %w", err)
errors <- err
return
}
if err := netns.Set(nsh); err != nil {
logger.Log.Errorf("Unable to set netns of pid %s - %w", pid, err)
errors <- err
return
}
name := fmt.Sprintf("netns-%s-%s", pid, interfaceName)
src, err := newTcpPacketSource(name, "", interfaceName, behaviour)
if err != nil {
logger.Log.Errorf("Error listening to PID %s - %w", pid, err)
errors <- err
return
}
if err := netns.Set(oldnetns); err != nil {
logger.Log.Errorf("Unable to set back netns of current thread %w", err)
errors <- err
return
}
done <- src
}(done)
select {
case err := <-errors:
return nil, err
case source := <-done:
return source, nil
}
}

View File

@@ -2,109 +2,46 @@ package source
import (
"fmt"
"runtime"
"strconv"
"strings"
"github.com/up9inc/mizu/shared/logger"
"github.com/vishvananda/netns"
v1 "k8s.io/api/core/v1"
)
const bpfFilterMaxPods = 150
const hostSourcePid = "0"
type PacketSourceManager struct {
sources []*tcpPacketSource
sources map[string]*tcpPacketSource
}
func NewPacketSourceManager(procfs string, pids string, filename string, interfaceName string,
func NewPacketSourceManager(procfs string, filename string, interfaceName string,
mtls bool, pods []v1.Pod, behaviour TcpPacketSourceBehaviour) (*PacketSourceManager, error) {
sources := make([]*tcpPacketSource, 0)
sources, err := createHostSource(sources, filename, interfaceName, behaviour)
hostSource, err := newHostPacketSource(filename, interfaceName, behaviour)
if err != nil {
return nil, err
}
sources = createSourcesFromPids(sources, procfs, pids, interfaceName, behaviour)
sources = createSourcesFromEnvoy(sources, mtls, procfs, pods, interfaceName, behaviour)
sources = createSourcesFromLinkerd(sources, mtls, procfs, pods, interfaceName, behaviour)
return &PacketSourceManager{
sources: sources,
}, nil
}
func createHostSource(sources []*tcpPacketSource, filename string, interfaceName string,
behaviour TcpPacketSourceBehaviour) ([]*tcpPacketSource, error) {
hostSource, err := newHostPacketSource(filename, interfaceName, behaviour)
if err != nil {
return sources, err
sourceManager := &PacketSourceManager{
sources: map[string]*tcpPacketSource{
hostSourcePid: hostSource,
},
}
return append(sources, hostSource), nil
}
func createSourcesFromPids(sources []*tcpPacketSource, procfs string, pids string,
interfaceName string, behaviour TcpPacketSourceBehaviour) []*tcpPacketSource {
if pids == "" {
return sources
}
netnsSources := newNetnsPacketSources(procfs, strings.Split(pids, ","), interfaceName, behaviour)
sources = append(sources, netnsSources...)
return sources
}
func createSourcesFromEnvoy(sources []*tcpPacketSource, mtls bool, procfs string, pods []v1.Pod,
interfaceName string, behaviour TcpPacketSourceBehaviour) []*tcpPacketSource {
if !mtls {
return sources
}
envoyPids, err := discoverRelevantEnvoyPids(procfs, pods)
if err != nil {
logger.Log.Warningf("Unable to discover envoy pids - %v", err)
return sources
}
netnsSources := newNetnsPacketSources(procfs, envoyPids, interfaceName, behaviour)
sources = append(sources, netnsSources...)
return sources
}
func createSourcesFromLinkerd(sources []*tcpPacketSource, mtls bool, procfs string, pods []v1.Pod,
interfaceName string, behaviour TcpPacketSourceBehaviour) []*tcpPacketSource {
if !mtls {
return sources
}
linkerdPids, err := discoverRelevantLinkerdPids(procfs, pods)
if err != nil {
logger.Log.Warningf("Unable to discover linkerd pids - %v", err)
return sources
}
netnsSources := newNetnsPacketSources(procfs, linkerdPids, interfaceName, behaviour)
sources = append(sources, netnsSources...)
return sources
sourceManager.UpdatePods(mtls, procfs, pods, interfaceName, behaviour)
return sourceManager, nil
}
func newHostPacketSource(filename string, interfaceName string,
behaviour TcpPacketSourceBehaviour) (*tcpPacketSource, error) {
var name string
if filename == "" {
name = fmt.Sprintf("host-%v", interfaceName)
name = fmt.Sprintf("host-%s", interfaceName)
} else {
name = fmt.Sprintf("file-%v", filename)
name = fmt.Sprintf("file-%s", filename)
}
source, err := newTcpPacketSource(name, filename, interfaceName, behaviour)
if err != nil {
return nil, err
}
@@ -112,90 +49,93 @@ func newHostPacketSource(filename string, interfaceName string,
return source, nil
}
func newNetnsPacketSources(procfs string, pids []string, interfaceName string,
behaviour TcpPacketSourceBehaviour) []*tcpPacketSource {
result := make([]*tcpPacketSource, 0)
for _, pidstr := range pids {
pid, err := strconv.Atoi(pidstr)
if err != nil {
logger.Log.Errorf("Invalid PID: %v - %v", pid, err)
continue
}
nsh, err := netns.GetFromPath(fmt.Sprintf("%v/%v/ns/net", procfs, pid))
if err != nil {
logger.Log.Errorf("Unable to get netns of pid %v - %v", pid, err)
continue
}
src, err := newNetnsPacketSource(pid, nsh, interfaceName, behaviour)
if err != nil {
logger.Log.Errorf("Error starting netns packet source for %v - %v", pid, err)
continue
}
result = append(result, src)
func (m *PacketSourceManager) UpdatePods(mtls bool, procfs string, pods []v1.Pod,
interfaceName string, behaviour TcpPacketSourceBehaviour) {
if mtls {
m.updateMtlsPods(procfs, pods, interfaceName, behaviour)
}
return result
m.setBPFFilter(pods)
}
func newNetnsPacketSource(pid int, nsh netns.NsHandle, interfaceName string,
behaviour TcpPacketSourceBehaviour) (*tcpPacketSource, error) {
func (m *PacketSourceManager) updateMtlsPods(procfs string, pods []v1.Pod,
interfaceName string, behaviour TcpPacketSourceBehaviour) {
done := make(chan *tcpPacketSource)
errors := make(chan error)
relevantPids := m.getRelevantPids(procfs, pods)
logger.Log.Infof("Updating mtls pods (new: %v) (current: %v)", relevantPids, m.sources)
go func(done chan<- *tcpPacketSource) {
// Setting a netns should be done from a dedicated OS thread.
//
// goroutines are not really OS threads, we try to mimic the issue by
// locking the OS thread to this goroutine
//
runtime.LockOSThread()
defer runtime.UnlockOSThread()
oldnetns, err := netns.Get()
if err != nil {
logger.Log.Errorf("Unable to get netns of current thread %v", err)
errors <- err
return
for pid, src := range m.sources {
if _, ok := relevantPids[pid]; !ok {
src.close()
delete(m.sources, pid)
}
}
if err := netns.Set(nsh); err != nil {
logger.Log.Errorf("Unable to set netns of pid %v - %v", pid, err)
errors <- err
return
for pid := range relevantPids {
if _, ok := m.sources[pid]; !ok {
source, err := newNetnsPacketSource(procfs, pid, interfaceName, behaviour)
if err == nil {
m.sources[pid] = source
}
}
}
}
name := fmt.Sprintf("netns-%v-%v", pid, interfaceName)
src, err := newTcpPacketSource(name, "", interfaceName, behaviour)
func (m *PacketSourceManager) getRelevantPids(procfs string, pods []v1.Pod) map[string]bool {
relevantPids := make(map[string]bool)
relevantPids[hostSourcePid] = true
if err != nil {
logger.Log.Errorf("Error listening to PID %v - %v", pid, err)
errors <- err
return
if envoyPids, err := discoverRelevantEnvoyPids(procfs, pods); err != nil {
logger.Log.Warningf("Unable to discover envoy pids - %w", err)
} else {
for _, pid := range envoyPids {
relevantPids[pid] = true
}
}
if err := netns.Set(oldnetns); err != nil {
logger.Log.Errorf("Unable to set back netns of current thread %v", err)
errors <- err
return
if linkerdPids, err := discoverRelevantLinkerdPids(procfs, pods); err != nil {
logger.Log.Warningf("Unable to discover linkerd pids - %w", err)
} else {
for _, pid := range linkerdPids {
relevantPids[pid] = true
}
}
done <- src
}(done)
return relevantPids
}
select {
case err := <-errors:
return nil, err
case source := <-done:
return source, nil
func buildBPFExpr(pods []v1.Pod) string {
hostsFilter := make([]string, 0)
for _, pod := range pods {
hostsFilter = append(hostsFilter, fmt.Sprintf("host %s", pod.Status.PodIP))
}
return fmt.Sprintf("%s and port not 443", strings.Join(hostsFilter, " or "))
}
func (m *PacketSourceManager) setBPFFilter(pods []v1.Pod) {
if len(pods) == 0 {
logger.Log.Info("No pods provided, skipping pcap bpf filter")
return
}
var expr string
if len(pods) > bpfFilterMaxPods {
logger.Log.Info("Too many pods for setting ebpf filter %d, setting just not 443", len(pods))
expr = "port not 443"
} else {
expr = buildBPFExpr(pods)
}
logger.Log.Infof("Setting pcap bpf filter %s", expr)
for pid, src := range m.sources {
if err := src.setBPFFilter(expr); err != nil {
logger.Log.Warningf("Error setting bpf filter for %s %v - %w", pid, src, err)
}
}
}

View File

@@ -98,6 +98,14 @@ func newTcpPacketSource(name, filename string, interfaceName string,
return result, nil
}
func (source *tcpPacketSource) String() string {
return source.name
}
func (source *tcpPacketSource) setBPFFilter(expr string) (err error) {
return source.handle.SetBPFFilter(expr)
}
func (source *tcpPacketSource) close() {
if source.handle != nil {
source.handle.Close()

View File

@@ -10,7 +10,7 @@ Copyright (C) UP9 Inc.
#define FLAGS_IS_CLIENT_BIT (1 << 0)
#define FLAGS_IS_READ_BIT (1 << 1)
// The same struct can be found in Chunk.go
// The same struct can be found in chunk.go
//
// Be careful when editing, alignment and padding should be exactly the same in go/c.
//

View File

@@ -8,20 +8,20 @@ import (
"github.com/go-errors/errors"
)
const FLAGS_IS_CLIENT_BIT int32 = (1 << 0)
const FLAGS_IS_READ_BIT int32 = (1 << 1)
const FLAGS_IS_CLIENT_BIT uint32 = (1 << 0)
const FLAGS_IS_READ_BIT uint32 = (1 << 1)
// The same struct can be found in maps.h
//
// Be careful when editing, alignment and padding should be exactly the same in go/c.
//
type tlsChunk struct {
Pid int32
Tgid int32
Len int32
Recorded int32
Fd int32
Flags int32
Pid uint32
Tgid uint32
Len uint32
Recorded uint32
Fd uint32
Flags uint32
Address [16]byte
Data [4096]byte
}
@@ -68,3 +68,7 @@ func (c *tlsChunk) isWrite() bool {
func (c *tlsChunk) getRecordedData() []byte {
return c.Data[:c.Recorded]
}
func (c *tlsChunk) isRequest() bool {
return (c.isClient() && c.isWrite()) || (c.isServer() && c.isRead())
}

View File

@@ -0,0 +1,102 @@
package tlstapper
import (
"fmt"
"io/ioutil"
"net"
"os"
"regexp"
"strconv"
"strings"
"github.com/go-errors/errors"
)
var socketInodeRegex = regexp.MustCompile(`socket:\[(\d+)\]`)
const (
SRC_ADDRESS_FILED_INDEX = 1
DST_ADDRESS_FILED_INDEX = 2
INODE_FILED_INDEX = 9
)
// This file helps to extract Ip and Port out of a Socket file descriptor.
//
// The equivalent bash commands are:
//
// > ls -l /proc/<pid>/fd/<fd>
// Output something like "socket:[1234]" for sockets - 1234 is the inode of the socket
// > cat /proc/<pid>/net/tcp | grep <inode>
// Output a line per ipv4 socket, the 9th field is the inode of the socket
// The 1st and 2nd fields are the source and dest ip and ports in a Hex format
// 0100007F:50 is 127.0.0.1:80
func getAddressBySockfd(procfs string, pid uint32, fd uint32, src bool) (net.IP, uint16, error) {
inode, err := getSocketInode(procfs, pid, fd)
if err != nil {
return nil, 0, err
}
tcppath := fmt.Sprintf("%s/%d/net/tcp", procfs, pid)
tcp, err := ioutil.ReadFile(tcppath)
if err != nil {
return nil, 0, errors.Wrap(err, 0)
}
for _, line := range strings.Split(string(tcp), "\n") {
parts := strings.Fields(line)
if len(parts) < 10 {
continue
}
if inode == parts[INODE_FILED_INDEX] {
if src {
return parseHexAddress(parts[SRC_ADDRESS_FILED_INDEX])
} else {
return parseHexAddress(parts[DST_ADDRESS_FILED_INDEX])
}
}
}
return nil, 0, errors.Errorf("address not found [pid: %d] [sockfd: %d] [inode: %s]", pid, fd, inode)
}
func getSocketInode(procfs string, pid uint32, fd uint32) (string, error) {
fdlinkPath := fmt.Sprintf("%s/%d/fd/%d", procfs, pid, fd)
fdlink, err := os.Readlink(fdlinkPath)
if err != nil {
return "", errors.Wrap(err, 0)
}
tokens := socketInodeRegex.FindStringSubmatch(fdlink)
if tokens == nil || len(tokens) < 1 {
return "", errors.Errorf("socket inode not found [pid: %d] [sockfd: %d] [link: %s]", pid, fd, fdlink)
}
return tokens[1], nil
}
// Format looks like 0100007F:50 for 127.0.0.1:80
//
func parseHexAddress(addr string) (net.IP, uint16, error) {
addrParts := strings.Split(addr, ":")
port, err := strconv.ParseUint(addrParts[1], 16, 16)
if err != nil {
return nil, 0, errors.Wrap(err, 0)
}
ip, err := strconv.ParseUint(addrParts[0], 16, 32)
if err != nil {
return nil, 0, errors.Wrap(err, 0)
}
return net.IP{uint8(ip), uint8(ip >> 8), uint8(ip >> 16), uint8(ip >> 24)}, uint16(port), nil
}

View File

@@ -2,7 +2,6 @@ package tlstapper
import (
"debug/elf"
"fmt"
"github.com/go-errors/errors"
"github.com/up9inc/mizu/shared/logger"
@@ -47,7 +46,7 @@ func findBaseAddress(sslElf *elf.File, sslLibraryPath string) (uint64, error) {
}
}
return 0, errors.New(fmt.Sprintf("Program header not found in %v", sslLibraryPath))
return 0, errors.Errorf("Program header not found in %v", sslLibraryPath)
}
func findSslOffsets(sslElf *elf.File, base uint64) (sslOffsets, error) {

View File

@@ -2,43 +2,64 @@ package tlstapper
import (
"bufio"
"bytes"
"fmt"
"net"
"encoding/binary"
"encoding/hex"
"os"
"strconv"
"strings"
"github.com/cilium/ebpf/perf"
"github.com/go-errors/errors"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap/api"
)
const UNKNOWN_PORT uint16 = 80
const UNKNOWN_HOST string = "127.0.0.1"
type tlsPoller struct {
tls *TlsTapper
readers map[string]*tlsReader
closedReaders chan string
reqResMatcher api.RequestResponseMatcher
chunksReader *perf.Reader
extension *api.Extension
procfs string
}
func NewTlsPoller(tls *TlsTapper, extension *api.Extension) *tlsPoller {
func newTlsPoller(tls *TlsTapper, extension *api.Extension, procfs string) *tlsPoller {
return &tlsPoller{
tls: tls,
readers: make(map[string]*tlsReader),
closedReaders: make(chan string, 100),
reqResMatcher: extension.Dissector.NewResponseRequestMatcher(),
extension: extension,
chunksReader: nil,
procfs: procfs,
}
}
func (p *tlsPoller) Poll(extension *api.Extension,
emitter api.Emitter, options *api.TrafficFilteringOptions) {
func (p *tlsPoller) init(bpfObjects *tlsTapperObjects, bufferSize int) error {
var err error
p.chunksReader, err = perf.NewReader(bpfObjects.ChunksBuffer, bufferSize)
if err != nil {
return errors.Wrap(err, 0)
}
return nil
}
func (p *tlsPoller) close() error {
return p.chunksReader.Close()
}
func (p *tlsPoller) poll(emitter api.Emitter, options *api.TrafficFilteringOptions) {
chunks := make(chan *tlsChunk)
go p.tls.pollPerf(chunks)
go p.pollChunksPerfBuffer(chunks)
for {
select {
@@ -47,7 +68,7 @@ func (p *tlsPoller) Poll(extension *api.Extension,
return
}
if err := p.handleTlsChunk(chunk, extension, emitter, options); err != nil {
if err := p.handleTlsChunk(chunk, p.extension, emitter, options); err != nil {
LogError(err)
}
case key := <-p.closedReaders:
@@ -56,6 +77,41 @@ func (p *tlsPoller) Poll(extension *api.Extension,
}
}
func (p *tlsPoller) pollChunksPerfBuffer(chunks chan<- *tlsChunk) {
logger.Log.Infof("Start polling for tls events")
for {
record, err := p.chunksReader.Read()
if err != nil {
close(chunks)
if errors.Is(err, perf.ErrClosed) {
return
}
LogError(errors.Errorf("Error reading chunks from tls perf, aborting TLS! %v", err))
return
}
if record.LostSamples != 0 {
logger.Log.Infof("Buffer is full, dropped %d chunks", record.LostSamples)
continue
}
buffer := bytes.NewReader(record.RawSample)
var chunk tlsChunk
if err := binary.Read(buffer, binary.LittleEndian, &chunk); err != nil {
LogError(errors.Errorf("Error parsing chunk %v", err))
continue
}
chunks <- &chunk
}
}
func (p *tlsPoller) handleTlsChunk(chunk *tlsChunk, extension *api.Extension,
emitter api.Emitter, options *api.TrafficFilteringOptions) error {
ip, port, err := chunk.getAddress()
@@ -75,7 +131,7 @@ func (p *tlsPoller) handleTlsChunk(chunk *tlsChunk, extension *api.Extension,
reader.chunks <- chunk
if os.Getenv("MIZU_VERBOSE_TLS_TAPPER") == "true" {
logTls(chunk, ip, port)
p.logTls(chunk, ip, port)
}
return nil
@@ -92,10 +148,9 @@ func (p *tlsPoller) startNewTlsReader(chunk *tlsChunk, ip net.IP, port uint16, k
},
}
isRequest := (chunk.isClient() && chunk.isWrite()) || (chunk.isServer() && chunk.isRead())
tcpid := buildTcpId(isRequest, ip, port)
tcpid := p.buildTcpId(chunk, ip, port)
go dissect(extension, reader, isRequest, &tcpid, emitter, options, p.reqResMatcher)
go dissect(extension, reader, chunk.isRequest(), &tcpid, emitter, options, p.reqResMatcher)
return reader
}
@@ -120,27 +175,36 @@ func buildTlsKey(chunk *tlsChunk, ip net.IP, port uint16) string {
return fmt.Sprintf("%v:%v-%v:%v", chunk.isClient(), chunk.isRead(), ip, port)
}
func buildTcpId(isRequest bool, ip net.IP, port uint16) api.TcpID {
if isRequest {
func (p *tlsPoller) buildTcpId(chunk *tlsChunk, ip net.IP, port uint16) api.TcpID {
myIp, myPort, err := getAddressBySockfd(p.procfs, chunk.Pid, chunk.Fd, chunk.isClient())
if err != nil {
// May happen if the socket already closed, very likely to happen for localhost
//
myIp = api.UnknownIp
myPort = api.UnknownPort
}
if chunk.isRequest() {
return api.TcpID{
SrcIP: UNKNOWN_HOST,
SrcIP: myIp.String(),
DstIP: ip.String(),
SrcPort: strconv.Itoa(int(UNKNOWN_PORT)),
DstPort: strconv.FormatInt(int64(port), 10),
SrcPort: strconv.FormatUint(uint64(myPort), 10),
DstPort: strconv.FormatUint(uint64(port), 10),
Ident: "",
}
} else {
return api.TcpID{
SrcIP: ip.String(),
DstIP: UNKNOWN_HOST,
SrcPort: strconv.FormatInt(int64(port), 10),
DstPort: strconv.Itoa(int(UNKNOWN_PORT)),
DstIP: myIp.String(),
SrcPort: strconv.FormatUint(uint64(port), 10),
DstPort: strconv.FormatUint(uint64(myPort), 10),
Ident: "",
}
}
}
func logTls(chunk *tlsChunk, ip net.IP, port uint16) {
func (p *tlsPoller) logTls(chunk *tlsChunk, ip net.IP, port uint16) {
var flagsStr string
if chunk.isClient() {
@@ -155,8 +219,13 @@ func logTls(chunk *tlsChunk, ip net.IP, port uint16) {
flagsStr += "W"
}
srcIp, srcPort, _ := getAddressBySockfd(p.procfs, chunk.Pid, chunk.Fd, true)
dstIp, dstPort, _ := getAddressBySockfd(p.procfs, chunk.Pid, chunk.Fd, false)
str := strings.ReplaceAll(strings.ReplaceAll(string(chunk.Data[0:chunk.Recorded]), "\n", " "), "\r", "")
logger.Log.Infof("PID: %v (tid: %v) (fd: %v) (client: %v) (addr: %v:%v) (recorded %v out of %v) - %v - %v",
chunk.Pid, chunk.Tgid, chunk.Fd, flagsStr, ip, port, chunk.Recorded, chunk.Len, str, hex.EncodeToString(chunk.Data[0:chunk.Recorded]))
logger.Log.Infof("PID: %v (tid: %v) (fd: %v) (client: %v) (addr: %v:%v) (fdaddr %v:%v>%v:%v) (recorded %v out of %v) - %v - %v",
chunk.Pid, chunk.Tgid, chunk.Fd, flagsStr, ip, port,
srcIp, srcPort, dstIp, dstPort,
chunk.Recorded, chunk.Len, str, hex.EncodeToString(chunk.Data[0:chunk.Recorded]))
}

View File

@@ -132,8 +132,22 @@ func extractCgroup(lines []string) string {
return ""
}
// cgroup in the /proc/<pid>/cgroup may look something like
//
// /system.slice/docker-<ID>.scope
// /system.slice/containerd-<ID>.scope
// /kubepods.slice/kubepods-burstable.slice/kubepods-burstable-pod3beae8e0_164d_4689_a087_efd902d8c2ab.slice/docker-<ID>.scope
// /kubepods/besteffort/pod7709c1d5-447c-428f-bed9-8ddec35c93f4/<ID>
//
// This function extract the <ID> out of the cgroup path, the <ID> should match
// the "Container ID:" field when running kubectl describe pod <POD>
//
func normalizeCgroup(cgrouppath string) string {
basename := strings.TrimSpace(path.Base(cgrouppath))
if strings.Contains(basename, "-") {
basename = basename[strings.Index(basename, "-") + 1:]
}
if strings.Contains(basename, ".") {
return strings.TrimSuffix(basename, filepath.Ext(basename))

View File

@@ -1,13 +1,10 @@
package tlstapper
import (
"bytes"
"encoding/binary"
"github.com/cilium/ebpf/perf"
"github.com/cilium/ebpf/rlimit"
"github.com/go-errors/errors"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap/api"
)
//go:generate go run github.com/cilium/ebpf/cmd/bpf2go tlsTapper bpf/tls_tapper.c -- -O2 -g -D__TARGET_ARCH_x86
@@ -16,10 +13,10 @@ type TlsTapper struct {
bpfObjects tlsTapperObjects
syscallHooks syscallHooks
sslHooksStructs []sslHooks
reader *perf.Reader
poller *tlsPoller
}
func (t *TlsTapper) Init(bufferSize int) error {
func (t *TlsTapper) Init(bufferSize int, procfs string, extension *api.Extension) error {
logger.Log.Infof("Initializing tls tapper (bufferSize: %v)", bufferSize)
if err := setupRLimit(); err != nil {
@@ -27,55 +24,23 @@ func (t *TlsTapper) Init(bufferSize int) error {
}
t.bpfObjects = tlsTapperObjects{}
if err := loadTlsTapperObjects(&t.bpfObjects, nil); err != nil {
return errors.Wrap(err, 0)
}
t.syscallHooks = syscallHooks{}
if err := t.syscallHooks.installSyscallHooks(&t.bpfObjects); err != nil {
return err
}
t.sslHooksStructs = make([]sslHooks, 0)
return t.initChunksReader(bufferSize)
t.poller = newTlsPoller(t, extension, procfs)
return t.poller.init(&t.bpfObjects, bufferSize)
}
func (t *TlsTapper) pollPerf(chunks chan<- *tlsChunk) {
logger.Log.Infof("Start polling for tls events")
for {
record, err := t.reader.Read()
if err != nil {
close(chunks)
if errors.Is(err, perf.ErrClosed) {
return
}
LogError(errors.Errorf("Error reading chunks from tls perf, aborting TLS! %v", err))
return
}
if record.LostSamples != 0 {
logger.Log.Infof("Buffer is full, dropped %d chunks", record.LostSamples)
continue
}
buffer := bytes.NewReader(record.RawSample)
var chunk tlsChunk
if err := binary.Read(buffer, binary.LittleEndian, &chunk); err != nil {
LogError(errors.Errorf("Error parsing chunk %v", err))
continue
}
chunks <- &chunk
}
func (t *TlsTapper) Poll(emitter api.Emitter, options *api.TrafficFilteringOptions) {
t.poller.poll(emitter, options)
}
func (t *TlsTapper) GlobalTap(sslLibrary string) error {
@@ -118,7 +83,7 @@ func (t *TlsTapper) Close() []error {
errors = append(errors, sslHooks.close()...)
}
if err := t.reader.Close(); err != nil {
if err := t.poller.close(); err != nil {
errors = append(errors, err)
}
@@ -135,18 +100,6 @@ func setupRLimit() error {
return nil
}
func (t *TlsTapper) initChunksReader(bufferSize int) error {
var err error
t.reader, err = perf.NewReader(t.bpfObjects.ChunksBuffer, bufferSize)
if err != nil {
return errors.Wrap(err, 0)
}
return nil
}
func (t *TlsTapper) tapPid(pid uint32, sslLibrary string) error {
logger.Log.Infof("Tapping TLS (pid: %v) (sslLibrary: %v)", pid, sslLibrary)