mirror of
https://github.com/kubeshark/kubeshark.git
synced 2026-02-19 20:40:17 +00:00
Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c026656b5e | ||
|
|
6caa94f08f | ||
|
|
b77ea63f42 | ||
|
|
2635964a28 | ||
|
|
a16faca5fb | ||
|
|
8cf6f56a3c | ||
|
|
a849aae94c |
2
.gitignore
vendored
2
.gitignore
vendored
@@ -15,7 +15,6 @@
|
||||
# vendor/
|
||||
.idea/
|
||||
build
|
||||
*.db
|
||||
|
||||
# Mac OS
|
||||
.DS_Store
|
||||
@@ -32,3 +31,4 @@ pprof/*
|
||||
|
||||
# Database Files
|
||||
*.bin
|
||||
*.gob
|
||||
|
||||
@@ -42,8 +42,8 @@ RUN go build -ldflags="-s -w \
|
||||
-X 'mizuserver/pkg/version.SemVer=${SEM_VER}'" -o mizuagent .
|
||||
|
||||
# Download Basenine executable, verify the sha1sum and move it to a directory in $PATH
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.2.9/basenine_linux_amd64 ./basenine_linux_amd64
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.2.9/basenine_linux_amd64.sha256 ./basenine_linux_amd64.sha256
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.2.10/basenine_linux_amd64 ./basenine_linux_amd64
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.2.10/basenine_linux_amd64.sha256 ./basenine_linux_amd64.sha256
|
||||
RUN shasum -a 256 -c basenine_linux_amd64.sha256
|
||||
RUN chmod +x ./basenine_linux_amd64
|
||||
|
||||
@@ -52,7 +52,7 @@ RUN cd .. && /bin/bash build_extensions.sh
|
||||
|
||||
FROM alpine:3.14
|
||||
|
||||
RUN apk add bash libpcap-dev tcpdump
|
||||
RUN apk add bash libpcap-dev
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
|
||||
62
README.md
62
README.md
@@ -11,7 +11,12 @@ Think TCPDump and Chrome Dev Tools combined.
|
||||
## Features
|
||||
|
||||
- Simple and powerful CLI
|
||||
- Real-time view of all HTTP requests, REST and gRPC API calls
|
||||
- Monitoring network traffic in real-time. Supported protocols:
|
||||
- [HTTP/1.1](https://datatracker.ietf.org/doc/html/rfc2616)
|
||||
- [HTTP/2](https://datatracker.ietf.org/doc/html/rfc7540) (gRPC)
|
||||
- [AMQP](https://www.rabbitmq.com/amqp-0-9-1-reference.html) (RabbitMQ, Apache Qpid, etc.)
|
||||
- [Apache Kafka](https://kafka.apache.org/protocol)
|
||||
- [Redis](https://redis.io/topics/protocol)
|
||||
- No installation or code instrumentation
|
||||
- Works completely on premises
|
||||
|
||||
@@ -44,15 +49,6 @@ SHA256 checksums are available on the [Releases](https://github.com/up9inc/mizu/
|
||||
### Development (unstable) Build
|
||||
Pick one from the [Releases](https://github.com/up9inc/mizu/releases) page
|
||||
|
||||
## Kubeconfig & Permissions
|
||||
While `mizu`most often works out of the box, you can influence its behavior:
|
||||
|
||||
1. [OPTIONAL] Set `KUBECONFIG` environment variable to your Kubernetes configuration. If this is not set, Mizu assumes that configuration is at `${HOME}/.kube/config`
|
||||
2. `mizu` assumes user running the command has permissions to create resources (such as pods, services, namespaces) on your Kubernetes cluster (no worries - `mizu` resources are cleaned up upon termination)
|
||||
|
||||
For detailed list of k8s permissions see [PERMISSIONS](docs/PERMISSIONS.md) document
|
||||
|
||||
|
||||
## How to Run
|
||||
|
||||
1. Find pods you'd like to tap to in your Kubernetes cluster
|
||||
@@ -111,45 +107,25 @@ To tap all pods in current namespace -
|
||||
Web interface is now available at http://localhost:8899
|
||||
^C
|
||||
```
|
||||
### To run mizu mizu daemon mode (detached from cli)
|
||||
```bash
|
||||
$ mizu tap "^ca.*" --daemon
|
||||
Mizu will store up to 200MB of traffic, old traffic will be cleared once the limit is reached.
|
||||
Tapping pods in namespaces "sock-shop"
|
||||
Waiting for mizu to be ready... (may take a few minutes)
|
||||
+carts-66c77f5fbb-fq65r
|
||||
+catalogue-5f4cb7cf5-7zrmn
|
||||
..
|
||||
|
||||
$ mizu view
|
||||
Establishing connection to k8s cluster...
|
||||
Mizu is available at http://localhost:8899
|
||||
^C
|
||||
..
|
||||
|
||||
$ mizu clean # mizu will continue running in cluster until clean is executed
|
||||
Removing mizu resources
|
||||
```
|
||||
|
||||
`mizu view` provides one way to access Mizu. For other options, see [Accessing Mizu Wiki Page](https://github.com/up9inc/mizu/wiki/Accessing-Mizu).
|
||||
|
||||
## Configuration
|
||||
|
||||
Mizu can work with config file which should be stored in ${HOME}/.mizu/config.yaml (macOS: ~/.mizu/config.yaml) <br />
|
||||
In case no config file found, defaults will be used <br />
|
||||
Mizu can optionally work with a config file that can be provided as a CLI argument (using `--set config-path=<PATH>`) or if not provided, will be stored at ${HOME}/.mizu/config.yaml
|
||||
In case of partial configuration defined, all other fields will be used with defaults <br />
|
||||
You can always override the defaults or config file with CLI flags
|
||||
|
||||
To get the default config params run `mizu config` <br />
|
||||
To generate a new config file with default values use `mizu config -r`
|
||||
|
||||
### Telemetry
|
||||
|
||||
By default, mizu reports usage telemetry. It can be disabled by adding a line of `telemetry: false` in the `${HOME}/.mizu/config.yaml` file
|
||||
|
||||
|
||||
## Advanced Usage
|
||||
|
||||
### Kubeconfig
|
||||
|
||||
It is possible to change the kubeconfig path using `KUBECONFIG` environment variable or the command like flag
|
||||
with `--set kube-config-path=<PATH>`. </br >
|
||||
If both are not set - Mizu assumes that configuration is at `${HOME}/.kube/config`
|
||||
|
||||
### Namespace-Restricted Mode
|
||||
|
||||
Some users have permission to only manage resources in one particular namespace assigned to them
|
||||
@@ -163,6 +139,8 @@ using the `--namespace` flag or by setting `tap.namespaces` in the config file
|
||||
|
||||
Setting `mizu-resources-namespace=mizu` resets Mizu to its default behavior
|
||||
|
||||
For detailed list of k8s permissions see [PERMISSIONS](docs/PERMISSIONS.md) document
|
||||
|
||||
### User agent filtering
|
||||
|
||||
User-agent filtering (like health checks) - can be configured using command-line options:
|
||||
@@ -203,14 +181,10 @@ and when changed it will support accessing by IP
|
||||
|
||||
### Run in daemon mode
|
||||
|
||||
Mizu can be ran detached from the cli using the daemon flag: `mizu tap --daemon`. This type of mizu instance will run indefinitely in the cluster.
|
||||
|
||||
Please note that daemon mode requires you to have RBAC creation permissions, see the [permissions](docs/PERMISSIONS.md) doc for more details.
|
||||
|
||||
In order to access a daemon mizu you will have to run `mizu view` after running the `tap --daemon` command.
|
||||
|
||||
To stop the detached mizu instance and clean all cluster side resources, run `mizu clean`
|
||||
Mizu can be run detached from the cli using the daemon flag: `mizu tap --daemon`. This type of mizu instance will run
|
||||
indefinitely in the cluster.
|
||||
|
||||
For more information please refer to [DAEMON MODE](docs/DAEMON_MODE.md)
|
||||
|
||||
## How to Run local UI
|
||||
|
||||
|
||||
@@ -16,7 +16,7 @@ require (
|
||||
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
|
||||
github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20211114204315-4d028da5fda5
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20211118123155-7ed075f85c73
|
||||
github.com/up9inc/mizu/shared v0.0.0
|
||||
github.com/up9inc/mizu/tap v0.0.0
|
||||
github.com/up9inc/mizu/tap/api v0.0.0
|
||||
|
||||
@@ -450,8 +450,8 @@ github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
|
||||
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
|
||||
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
|
||||
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20211114204315-4d028da5fda5 h1:JbLairDLEJpAC8bwmFuOAB+LYpY/oQbzGRSWRpkF7PQ=
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20211114204315-4d028da5fda5/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20211118123155-7ed075f85c73 h1:FJUM7w7E0jRGFPcSMa7cVy+jr5zcpbyT6qA30dEtGGI=
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20211118123155-7ed075f85c73/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
|
||||
github.com/vektah/gqlparser v1.1.2/go.mod h1:1ycwN7Ij5njmMkPPAOaRFY4rET2Enx7IkVv3vaXspKw=
|
||||
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f h1:p4VB7kIXpOQvVn1ZaTIVp+3vuYAXFe3OJEvjbUYJLaA=
|
||||
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
|
||||
|
||||
@@ -94,17 +94,17 @@ func main() {
|
||||
panic("API server address must be provided with --api-server-address when using --tap")
|
||||
}
|
||||
|
||||
hostMode := os.Getenv(shared.HostModeEnvVar) == "1"
|
||||
tapOpts := &tap.TapOpts{HostMode: hostMode}
|
||||
tapTargets := getTapTargets()
|
||||
if tapTargets != nil {
|
||||
tap.SetFilterAuthorities(tapTargets)
|
||||
logger.Log.Infof("Filtering for the following authorities: %v", tap.GetFilterIPs())
|
||||
tapOpts.FilterAuthorities = tapTargets
|
||||
logger.Log.Infof("Filtering for the following authorities: %v", tapOpts.FilterAuthorities)
|
||||
}
|
||||
|
||||
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
|
||||
|
||||
filteringOptions := getTrafficFilteringOptions()
|
||||
hostMode := os.Getenv(shared.HostModeEnvVar) == "1"
|
||||
tapOpts := &tap.TapOpts{HostMode: hostMode}
|
||||
tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, extensions, filteringOptions)
|
||||
socketConnection, err := dialSocketWithRetry(*apiServerAddress, socketConnectionRetries, socketConnectionRetryDelay)
|
||||
if err != nil {
|
||||
@@ -443,6 +443,7 @@ func startMizuTapperSyncer(ctx context.Context) (*kubernetes.MizuTapperSyncer, e
|
||||
IgnoredUserAgents: config.Config.IgnoredUserAgents,
|
||||
MizuApiFilteringOptions: config.Config.MizuApiFilteringOptions,
|
||||
MizuServiceAccountExists: true, //assume service account exists since daemon mode will not function without it anyway
|
||||
Istio: config.Config.Istio,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
|
||||
@@ -3,6 +3,7 @@ package cmd
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/up9inc/mizu/cli/up9"
|
||||
"os"
|
||||
|
||||
"github.com/creasty/defaults"
|
||||
@@ -62,6 +63,12 @@ Supported protocols are HTTP and gRPC.`,
|
||||
logger.Log.Errorf("failed to log in, err: %v", err)
|
||||
return nil
|
||||
}
|
||||
} else if isValidToken := up9.IsTokenValid(config.Config.Auth.Token, config.Config.Auth.EnvName); !isValidToken {
|
||||
logger.Log.Errorf("Token is not valid, please log in again to continue")
|
||||
if err := auth.Login(); err != nil {
|
||||
logger.Log.Errorf("failed to log in, err: %v", err)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -113,4 +120,5 @@ func init() {
|
||||
tapCmd.Flags().String(configStructs.EnforcePolicyFile, defaultTapConfig.EnforcePolicyFile, "Yaml file path with policy rules")
|
||||
tapCmd.Flags().String(configStructs.ContractFile, defaultTapConfig.ContractFile, "OAS/Swagger file to validate to monitor the contracts")
|
||||
tapCmd.Flags().Bool(configStructs.DaemonModeTapName, defaultTapConfig.DaemonMode, "Run mizu in daemon mode, detached from the cli")
|
||||
tapCmd.Flags().Bool(configStructs.IstioName, defaultTapConfig.Istio, "Record decrypted traffic if the cluster configured with istio and mtls")
|
||||
}
|
||||
|
||||
@@ -214,6 +214,7 @@ func startTapperSyncer(ctx context.Context, cancel context.CancelFunc, provider
|
||||
IgnoredUserAgents: config.Config.Tap.IgnoredUserAgents,
|
||||
MizuApiFilteringOptions: mizuApiFilteringOptions,
|
||||
MizuServiceAccountExists: state.mizuServiceAccountExists,
|
||||
Istio: config.Config.Tap.Istio,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
|
||||
@@ -402,6 +402,7 @@ func getMizuAgentConfig(targetNamespaces []string, mizuApiFilteringOptions *api.
|
||||
MizuResourcesNamespace: Config.MizuResourcesNamespace,
|
||||
MizuApiFilteringOptions: *mizuApiFilteringOptions,
|
||||
AgentDatabasePath: shared.DataDirPath,
|
||||
Istio: Config.Tap.Istio,
|
||||
}
|
||||
return &config, nil
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ const (
|
||||
EnforcePolicyFile = "traffic-validation-file"
|
||||
ContractFile = "contract"
|
||||
DaemonModeTapName = "daemon"
|
||||
IstioName = "istio"
|
||||
)
|
||||
|
||||
type TapConfig struct {
|
||||
@@ -48,6 +49,7 @@ type TapConfig struct {
|
||||
TapperResources shared.Resources `yaml:"tapper-resources"`
|
||||
DaemonMode bool `yaml:"daemon" default:"false"`
|
||||
NoPersistentVolumeClaim bool `yaml:"no-persistent-volume-claim" default:"false"`
|
||||
Istio bool `yaml:"istio" default:"false"`
|
||||
}
|
||||
|
||||
func (config *TapConfig) PodRegex() *regexp.Regexp {
|
||||
|
||||
31
cli/up9/provider.go
Normal file
31
cli/up9/provider.go
Normal file
@@ -0,0 +1,31 @@
|
||||
package up9
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
)
|
||||
|
||||
func IsTokenValid(tokenString string, envName string) bool {
|
||||
whoAmIUrl, _ := url.Parse(fmt.Sprintf("https://trcc.%s/admin/whoami", envName))
|
||||
|
||||
req := &http.Request{
|
||||
Method: http.MethodGet,
|
||||
URL: whoAmIUrl,
|
||||
Header: map[string][]string{
|
||||
"Authorization": {fmt.Sprintf("bearer %s", tokenString)},
|
||||
},
|
||||
}
|
||||
|
||||
response, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
defer response.Body.Close()
|
||||
|
||||
if response.StatusCode != http.StatusOK {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
@@ -37,8 +37,8 @@ COPY agent .
|
||||
RUN go build -gcflags="all=-N -l" -o mizuagent .
|
||||
|
||||
# Download Basenine executable, verify the sha1sum and move it to a directory in $PATH
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.2.9/basenine_linux_amd64 ./basenine_linux_amd64
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.2.9/basenine_linux_amd64.sha256 ./basenine_linux_amd64.sha256
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.2.10/basenine_linux_amd64 ./basenine_linux_amd64
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.2.10/basenine_linux_amd64.sha256 ./basenine_linux_amd64.sha256
|
||||
RUN shasum -a 256 -c basenine_linux_amd64.sha256
|
||||
RUN chmod +x ./basenine_linux_amd64
|
||||
|
||||
@@ -48,7 +48,7 @@ RUN cd .. && /bin/bash build_extensions_debug.sh
|
||||
|
||||
FROM golang:1.16-alpine
|
||||
|
||||
RUN apk add bash libpcap-dev tcpdump
|
||||
RUN apk add bash libpcap-dev
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
|
||||
80
docs/DAEMON_MODE.md
Normal file
80
docs/DAEMON_MODE.md
Normal file
@@ -0,0 +1,80 @@
|
||||
# Mizu daemon mode
|
||||
|
||||
Mizu can be run detached from the cli using the daemon flag: `mizu tap --daemon`. This type of mizu instance will run
|
||||
indefinitely in the cluster.
|
||||
|
||||
Please note that daemon mode requires you to have RBAC creation permissions, see the [permissions](PERMISSIONS.md)
|
||||
doc for more details.
|
||||
|
||||
```bash
|
||||
$ mizu tap "^ca.*" --daemon
|
||||
Mizu will store up to 200MB of traffic, old traffic will be cleared once the limit is reached.
|
||||
Tapping pods in namespaces "sock-shop"
|
||||
Waiting for mizu to be ready... (may take a few minutes)
|
||||
+carts-66c77f5fbb-fq65r
|
||||
+catalogue-5f4cb7cf5-7zrmn
|
||||
..
|
||||
```
|
||||
|
||||
## Stop mizu daemon
|
||||
|
||||
To stop the detached mizu instance and clean all cluster side resources, run `mizu clean`
|
||||
|
||||
```bash
|
||||
$ mizu clean # mizu will continue running in cluster until clean is executed
|
||||
Removing mizu resources
|
||||
```
|
||||
|
||||
## Expose mizu web app
|
||||
|
||||
Mizu could be exposed at a later stage in any of the following ways:
|
||||
|
||||
### Using mizu view command
|
||||
|
||||
In a machine that can access both the cluster and a browser, you can run `mizu view` command which creates a proxy.
|
||||
Besides that, all the regular ways to expose k8s pods are valid.
|
||||
|
||||
```bash
|
||||
$ mizu view
|
||||
Establishing connection to k8s cluster...
|
||||
Mizu is available at http://localhost:8899
|
||||
^C
|
||||
..
|
||||
```
|
||||
|
||||
### Port Forward
|
||||
|
||||
```bash
|
||||
$ kubectl port-forward -n mizu deployment/mizu-api-server 8899:8899
|
||||
```
|
||||
|
||||
### NodePort
|
||||
|
||||
```bash
|
||||
$ kubectl expose -n mizu deployment mizu-api-server --name mizu-node-port --type NodePort --port 80 --target-port 8899
|
||||
```
|
||||
|
||||
Mizu's IP is the IP of any node (get the IP with `kubectl get nodes -o wide`) and the port is the target port of the new
|
||||
service (`kubectl get services -n mizu mizu-node-port`). Note that this method will expose Mizu to public access if your
|
||||
nodes are public.
|
||||
|
||||
### LoadBalancer
|
||||
|
||||
```bash
|
||||
$ kubectl expose deployment -n mizu --port 80 --target-port 8899 mizu-api-server --type=LoadBalancer --name=mizu-lb
|
||||
service/mizu-lb exposed
|
||||
..
|
||||
|
||||
$ kubectl get services -n mizu
|
||||
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
|
||||
mizu-api-server ClusterIP 10.107.200.100 <none> 80/TCP 5m5s
|
||||
mizu-lb LoadBalancer 10.107.200.101 34.77.120.116 80:30141/TCP 76s
|
||||
```
|
||||
|
||||
Note that `LoadBalancer` services only work on supported clusters (usually cloud providers) and might incur extra costs
|
||||
|
||||
If you changed the `mizu-resources-namespace` value, make sure the `-n mizu` flag of the `kubectl expose` command is
|
||||
changed to the value of `mizu-resources-namespace`
|
||||
|
||||
mizu will now be available both by running `mizu view` or by accessing the `EXTERNAL-IP` of the `mizu-lb` service
|
||||
through your browser.
|
||||
46
docs/ISTIO.md
Normal file
46
docs/ISTIO.md
Normal file
@@ -0,0 +1,46 @@
|
||||

|
||||
# Istio mutual tls (mtls) with Mizu
|
||||
This document describe how Mizu tapper handles workloads configured with mtls, making the internal traffic between services in a cluster to be encrypted.
|
||||
|
||||
Besides Istio there are other service meshes that implement mtls. However, as of now Istio is the most used one, and this is why we are focusing on it.
|
||||
|
||||
In order to create an Istio setup for development, follow those steps:
|
||||
1. Deploy a sample application to a Kubernetes cluster, the sample application needs to make internal service to service calls
|
||||
2. SSH to one of the nodes, and run `tcpdump`
|
||||
3. Make sure you see the internal service to service calls in a plain text
|
||||
4. Deploy Istio to the cluster - make sure it is attached to all pods of the sample application, and that it is configured with mtls (default)
|
||||
5. Run `tcpdump` again, make sure you don't see the internal service to service calls in a plain text
|
||||
|
||||
## The connection between Istio and Envoy
|
||||
In order to implement its service mesh capabilities, [Istio](https://istio.io) use an [Envoy](https://www.envoyproxy.io) sidecar in front of every pod in the cluster. The Envoy is responsible for the mtls communication, and that's why we are focusing on Envoy proxy.
|
||||
|
||||
In the future we might see more players in that field, then we'll have to either add support for each of them or go with a unified eBPF solution.
|
||||
|
||||
## Network namespaces
|
||||
A [linux network namespace](https://man7.org/linux/man-pages/man7/network_namespaces.7.html) is an isolation that limit the process view of the network. In the container world it used to isolate one container from another. In the Kubernetes world it used to isolate a pod from another. That means that two containers running on the same pod share the same network namespace. A container can reach a container in the same pod by accessing `localhost`.
|
||||
|
||||
An Envoy proxy configured with mtls receives the inbound traffic directed to the pod, decrypts it and sends it via `localhost` to the target container.
|
||||
|
||||
## Tapping mtls traffic
|
||||
In order for Mizu to be able to see the decrypted traffic it needs to listen on the same network namespace of the target pod. Multiple threads of the same process can have different network namespaces.
|
||||
|
||||
[gopacket](https://github.com/google/gopacket) uses [libpacp](https://github.com/the-tcpdump-group/libpcap) by default for capturing the traffic. Libpacap doesn't support network namespaces and we can't ask it to listen to traffic on a different namespace. However, we can change the network namespace of the calling thread and then start libpcap to see the traffic on a different namespace.
|
||||
|
||||
## Finding the network namespace of a running process
|
||||
The network namespace of a running process can be found in `/proc/PID/ns/net` link. Once we have this link, we can ask Linux to change the network namespace of a thread to this one.
|
||||
|
||||
This mean that Mizu needs to have access to the `/proc` (procfs) of the running node.
|
||||
|
||||
## Finding the network namespace of a running pod
|
||||
In order for Mizu to be able to listen to mtls traffic, it needs to get the PIDs of the the running pods, filter them according to the user filters and then start listen to their internal network namespace traffic.
|
||||
|
||||
There is no official way in Kubernetes to get from pod to PID. The CRI implementation purposefully doesn't force a pod to be a processes on the host. It can be a Virtual Machine as well like [Kata containers](https://katacontainers.io)
|
||||
|
||||
While we can provide a solution for various CRIs (like Docker, Containerd and CRI-O) it's better to have a unified solution. In order to achieve that, Mizu scans all the processes in the host, and finds the Envoy processes using their `/proc/PID/exe` link.
|
||||
|
||||
Once Mizu detects an Envoy process, it need to check whether this specific Envoy process is relevant according the user filters. The user filters are a list of `CLUSTER_IPS`. The tapper gets them via the `TapOpts.FilterAuthorities` list.
|
||||
|
||||
Istio sends an `INSTANCE_IP` environment variable to every Envoy proxy process. By examining the Envoy process's environment variables we can see whether it's relevant or not. Examining a process environment variables is done by reading the `/proc/PID/envion` file.
|
||||
|
||||
## Edge cases
|
||||
The method we use to find Envoy processes and correlate them to the cluster IPs may be inaccurate in certain situations. If, for example, a user runs an Envoy process manually, and set its `INSTANCE_IP` environment variable to one of the `CLUSTER_IPS` the tapper gets, then Mizu will capture traffic for it.
|
||||
@@ -43,6 +43,7 @@ type TapperSyncerConfig struct {
|
||||
IgnoredUserAgents []string
|
||||
MizuApiFilteringOptions api.TrafficFilteringOptions
|
||||
MizuServiceAccountExists bool
|
||||
Istio bool
|
||||
}
|
||||
|
||||
func CreateAndStartMizuTapperSyncer(ctx context.Context, kubernetesProvider *Provider, config TapperSyncerConfig) (*MizuTapperSyncer, error) {
|
||||
@@ -222,6 +223,7 @@ func (tapperSyncer *MizuTapperSyncer) updateMizuTappers() error {
|
||||
tapperSyncer.config.ImagePullPolicy,
|
||||
tapperSyncer.config.MizuApiFilteringOptions,
|
||||
tapperSyncer.config.LogLevel,
|
||||
tapperSyncer.config.Istio,
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -612,7 +612,7 @@ func (provider *Provider) CreateConfigMap(ctx context.Context, namespace string,
|
||||
return nil
|
||||
}
|
||||
|
||||
func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, apiServerPodIp string, nodeToTappedPodIPMap map[string][]string, serviceAccountName string, resources shared.Resources, imagePullPolicy core.PullPolicy, mizuApiFilteringOptions api.TrafficFilteringOptions, logLevel logging.Level) error {
|
||||
func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, apiServerPodIp string, nodeToTappedPodIPMap map[string][]string, serviceAccountName string, resources shared.Resources, imagePullPolicy core.PullPolicy, mizuApiFilteringOptions api.TrafficFilteringOptions, logLevel logging.Level, istio bool) error {
|
||||
logger.Log.Debugf("Applying %d tapper daemon sets, ns: %s, daemonSetName: %s, podImage: %s, tapperPodName: %s", len(nodeToTappedPodIPMap), namespace, daemonSetName, podImage, tapperPodName)
|
||||
|
||||
if len(nodeToTappedPodIPMap) == 0 {
|
||||
@@ -635,7 +635,10 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
|
||||
"--tap",
|
||||
"--api-server-address", fmt.Sprintf("ws://%s/wsTapper", apiServerPodIp),
|
||||
"--nodefrag",
|
||||
"--procfs", procfsMountPath,
|
||||
}
|
||||
|
||||
if istio {
|
||||
mizuCmd = append(mizuCmd, "--procfs", procfsMountPath, "--istio")
|
||||
}
|
||||
|
||||
agentContainer := applyconfcore.Container()
|
||||
|
||||
@@ -43,6 +43,7 @@ type MizuAgentConfig struct {
|
||||
MizuResourcesNamespace string `json:"mizuResourceNamespace"`
|
||||
MizuApiFilteringOptions api.TrafficFilteringOptions `json:"mizuApiFilteringOptions"`
|
||||
AgentDatabasePath string `json:"agentDatabasePath"`
|
||||
Istio bool `json:"istio"`
|
||||
}
|
||||
|
||||
type WebSocketMessageMetadata struct {
|
||||
|
||||
@@ -50,14 +50,15 @@ var tstype = flag.String("timestamp_type", "", "Type of timestamps to use")
|
||||
var promisc = flag.Bool("promisc", true, "Set promiscuous mode")
|
||||
var staleTimeoutSeconds = flag.Int("staletimout", 120, "Max time in seconds to keep connections which don't transmit data")
|
||||
var pids = flag.String("pids", "", "A comma separated list of PIDs to capture their network namespaces")
|
||||
var istio = flag.Bool("istio", false, "Record decrypted traffic if the cluster configured with istio and mtls")
|
||||
|
||||
var memprofile = flag.String("memprofile", "", "Write memory profile")
|
||||
|
||||
type TapOpts struct {
|
||||
HostMode bool
|
||||
HostMode bool
|
||||
FilterAuthorities []string
|
||||
}
|
||||
|
||||
var hostMode bool // global
|
||||
var extensions []*api.Extension // global
|
||||
var filteringOptions *api.TrafficFilteringOptions // global
|
||||
|
||||
@@ -80,15 +81,18 @@ func inArrayString(arr []string, valueToCheck string) bool {
|
||||
}
|
||||
|
||||
func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem, extensionsRef []*api.Extension, options *api.TrafficFilteringOptions) {
|
||||
hostMode = opts.HostMode
|
||||
extensions = extensionsRef
|
||||
filteringOptions = options
|
||||
|
||||
if opts.FilterAuthorities == nil {
|
||||
opts.FilterAuthorities = []string{}
|
||||
}
|
||||
|
||||
if GetMemoryProfilingEnabled() {
|
||||
diagnose.StartMemoryProfiler(os.Getenv(MemoryProfilingDumpPath), os.Getenv(MemoryProfilingTimeIntervalSeconds))
|
||||
}
|
||||
|
||||
go startPassiveTapper(outputItems)
|
||||
go startPassiveTapper(opts, outputItems)
|
||||
}
|
||||
|
||||
func printPeriodicStats(cleaner *Cleaner) {
|
||||
@@ -131,7 +135,7 @@ func printPeriodicStats(cleaner *Cleaner) {
|
||||
}
|
||||
}
|
||||
|
||||
func initializePacketSources() (*source.PacketSourceManager, error) {
|
||||
func initializePacketSources(opts *TapOpts) (*source.PacketSourceManager, error) {
|
||||
var bpffilter string
|
||||
if len(flag.Args()) > 0 {
|
||||
bpffilter = strings.Join(flag.Args(), " ")
|
||||
@@ -146,17 +150,17 @@ func initializePacketSources() (*source.PacketSourceManager, error) {
|
||||
BpfFilter: bpffilter,
|
||||
}
|
||||
|
||||
return source.NewPacketSourceManager(*procfs, *pids, *fname, *iface, behaviour)
|
||||
return source.NewPacketSourceManager(*procfs, *pids, *fname, *iface, *istio, opts.FilterAuthorities, behaviour)
|
||||
}
|
||||
|
||||
func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
||||
func startPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem) {
|
||||
streamsMap := NewTcpStreamMap()
|
||||
go streamsMap.closeTimedoutTcpStreamChannels()
|
||||
|
||||
diagnose.InitializeErrorsMap(*debug, *verbose, *quiet)
|
||||
diagnose.InitializeTapperInternalStats()
|
||||
|
||||
sources, err := initializePacketSources()
|
||||
sources, err := initializePacketSources(opts)
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Fatal(err)
|
||||
@@ -169,7 +173,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
||||
}
|
||||
|
||||
packets := make(chan source.TcpPacketInfo)
|
||||
assembler := NewTcpAssembler(outputItems, streamsMap)
|
||||
assembler := NewTcpAssembler(outputItems, streamsMap, opts)
|
||||
|
||||
diagnose.AppStats.SetStartTime(time.Now())
|
||||
|
||||
|
||||
@@ -18,24 +18,6 @@ const (
|
||||
TcpStreamChannelTimeoutMsDefaultValue = 10000
|
||||
)
|
||||
|
||||
type globalSettings struct {
|
||||
filterAuthorities []string
|
||||
}
|
||||
|
||||
var gSettings = &globalSettings{
|
||||
filterAuthorities: []string{},
|
||||
}
|
||||
|
||||
func SetFilterAuthorities(ipAddresses []string) {
|
||||
gSettings.filterAuthorities = ipAddresses
|
||||
}
|
||||
|
||||
func GetFilterIPs() []string {
|
||||
addresses := make([]string, len(gSettings.filterAuthorities))
|
||||
copy(addresses, gSettings.filterAuthorities)
|
||||
return addresses
|
||||
}
|
||||
|
||||
func GetMaxBufferedPagesTotal() int {
|
||||
valueFromEnv, err := strconv.Atoi(os.Getenv(MaxBufferedPagesTotalEnvVarName))
|
||||
if err != nil {
|
||||
|
||||
112
tap/source/envoy_discoverer.go
Normal file
112
tap/source/envoy_discoverer.go
Normal file
@@ -0,0 +1,112 @@
|
||||
package source
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"regexp"
|
||||
"strings"
|
||||
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
)
|
||||
|
||||
const envoyBinary = "/envoy"
|
||||
|
||||
var numberRegex = regexp.MustCompile("[0-9]+")
|
||||
|
||||
func discoverRelevantEnvoyPids(procfs string, clusterIps []string) ([]string, error) {
|
||||
result := make([]string, 0)
|
||||
|
||||
pids, err := ioutil.ReadDir(procfs)
|
||||
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
|
||||
logger.Log.Infof("Starting envoy auto discoverer %v %v - scanning %v potential pids",
|
||||
procfs, clusterIps, len(pids))
|
||||
|
||||
for _, pid := range pids {
|
||||
if !pid.IsDir() {
|
||||
continue
|
||||
}
|
||||
|
||||
if !numberRegex.MatchString(pid.Name()) {
|
||||
continue
|
||||
}
|
||||
|
||||
if checkPid(procfs, pid.Name(), clusterIps) {
|
||||
result = append(result, pid.Name())
|
||||
}
|
||||
}
|
||||
|
||||
logger.Log.Infof("Found %v relevant envoy processes - %v", len(result), result)
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func checkPid(procfs string, pid string, clusterIps []string) bool {
|
||||
execLink := fmt.Sprintf("%v/%v/exe", procfs, pid)
|
||||
exec, err := os.Readlink(execLink)
|
||||
|
||||
if err != nil {
|
||||
// Debug on purpose - it may happen due to many reasons and we only care
|
||||
// for it during troubleshooting
|
||||
//
|
||||
logger.Log.Debugf("Unable to read link %v - %v\n", execLink, err)
|
||||
return false
|
||||
}
|
||||
|
||||
if !strings.HasSuffix(exec, envoyBinary) {
|
||||
return false
|
||||
}
|
||||
|
||||
environmentFile := fmt.Sprintf("%v/%v/environ", procfs, pid)
|
||||
clusterIp, err := readEnvironmentVariable(environmentFile, "INSTANCE_IP")
|
||||
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if clusterIp == "" {
|
||||
logger.Log.Debugf("Found an envoy process without INSTANCE_IP variable %v\n", pid)
|
||||
return false
|
||||
}
|
||||
|
||||
logger.Log.Infof("Found envoy pid %v with cluster ip %v", pid, clusterIp)
|
||||
|
||||
for _, value := range clusterIps {
|
||||
if value == clusterIp {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func readEnvironmentVariable(file string, name string) (string, error) {
|
||||
bytes, err := ioutil.ReadFile(file)
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Warningf("Error reading environment file %v - %v", file, err)
|
||||
return "", err
|
||||
}
|
||||
|
||||
envs := strings.Split(string(bytes), string([]byte{0}))
|
||||
|
||||
for _, env := range envs {
|
||||
if !strings.Contains(env, "=") {
|
||||
continue
|
||||
}
|
||||
|
||||
parts := strings.Split(env, "=")
|
||||
varName := parts[0]
|
||||
value := parts[1]
|
||||
|
||||
if name == varName {
|
||||
return value, nil
|
||||
}
|
||||
}
|
||||
|
||||
return "", nil
|
||||
}
|
||||
@@ -15,26 +15,63 @@ type PacketSourceManager struct {
|
||||
}
|
||||
|
||||
func NewPacketSourceManager(procfs string, pids string, filename string, interfaceName string,
|
||||
behaviour TcpPacketSourceBehaviour) (*PacketSourceManager, error) {
|
||||
istio bool, clusterIps []string, behaviour TcpPacketSourceBehaviour) (*PacketSourceManager, error) {
|
||||
sources := make([]*tcpPacketSource, 0)
|
||||
hostSource, err := newHostPacketSource(filename, interfaceName, behaviour)
|
||||
sources, err := createHostSource(sources, filename, interfaceName, behaviour)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sources = append(sources, hostSource)
|
||||
|
||||
if pids != "" {
|
||||
netnsSources := newNetnsPacketSources(procfs, pids, interfaceName, behaviour)
|
||||
sources = append(sources, netnsSources...)
|
||||
}
|
||||
sources = createSourcesFromPids(sources, procfs, pids, interfaceName, behaviour)
|
||||
sources = createSourcesFromEnvoy(sources, istio, procfs, clusterIps, interfaceName, behaviour)
|
||||
|
||||
return &PacketSourceManager{
|
||||
sources: sources,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func createHostSource(sources []*tcpPacketSource, filename string, interfaceName string,
|
||||
behaviour TcpPacketSourceBehaviour) ([]*tcpPacketSource, error) {
|
||||
hostSource, err := newHostPacketSource(filename, interfaceName, behaviour)
|
||||
|
||||
if err != nil {
|
||||
return sources, err
|
||||
}
|
||||
|
||||
return append(sources, hostSource), nil
|
||||
}
|
||||
|
||||
func createSourcesFromPids(sources []*tcpPacketSource, procfs string, pids string,
|
||||
interfaceName string, behaviour TcpPacketSourceBehaviour) []*tcpPacketSource {
|
||||
if pids == "" {
|
||||
return sources
|
||||
}
|
||||
|
||||
netnsSources := newNetnsPacketSources(procfs, strings.Split(pids, ","), interfaceName, behaviour)
|
||||
sources = append(sources, netnsSources...)
|
||||
return sources
|
||||
}
|
||||
|
||||
func createSourcesFromEnvoy(sources []*tcpPacketSource, istio bool, procfs string, clusterIps []string,
|
||||
interfaceName string, behaviour TcpPacketSourceBehaviour) []*tcpPacketSource {
|
||||
if !istio {
|
||||
return sources
|
||||
}
|
||||
|
||||
envoyPids, err := discoverRelevantEnvoyPids(procfs, clusterIps)
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Warningf("Unable to discover envoy pids - %v", err)
|
||||
return sources
|
||||
}
|
||||
|
||||
netnsSources := newNetnsPacketSources(procfs, envoyPids, interfaceName, behaviour)
|
||||
sources = append(sources, netnsSources...)
|
||||
|
||||
return sources
|
||||
}
|
||||
|
||||
func newHostPacketSource(filename string, interfaceName string,
|
||||
behaviour TcpPacketSourceBehaviour) (*tcpPacketSource, error) {
|
||||
var name string
|
||||
@@ -54,11 +91,11 @@ func newHostPacketSource(filename string, interfaceName string,
|
||||
return source, nil
|
||||
}
|
||||
|
||||
func newNetnsPacketSources(procfs string, pids string, interfaceName string,
|
||||
func newNetnsPacketSources(procfs string, pids []string, interfaceName string,
|
||||
behaviour TcpPacketSourceBehaviour) []*tcpPacketSource {
|
||||
result := make([]*tcpPacketSource, 0)
|
||||
|
||||
for _, pidstr := range strings.Split(pids, ",") {
|
||||
for _, pidstr := range pids {
|
||||
pid, err := strconv.Atoi(pidstr)
|
||||
|
||||
if err != nil {
|
||||
@@ -100,9 +137,9 @@ func newNetnsPacketSource(pid int, nsh netns.NsHandle, interfaceName string,
|
||||
//
|
||||
runtime.LockOSThread()
|
||||
defer runtime.UnlockOSThread()
|
||||
|
||||
|
||||
oldnetns, err := netns.Get()
|
||||
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Errorf("Unable to get netns of current thread %v", err)
|
||||
errors <- err
|
||||
|
||||
@@ -33,13 +33,13 @@ func (c *context) GetCaptureInfo() gopacket.CaptureInfo {
|
||||
return c.CaptureInfo
|
||||
}
|
||||
|
||||
func NewTcpAssembler(outputItems chan *api.OutputChannelItem, streamsMap *tcpStreamMap) *tcpAssembler {
|
||||
func NewTcpAssembler(outputItems chan *api.OutputChannelItem, streamsMap *tcpStreamMap, opts *TapOpts) *tcpAssembler {
|
||||
var emitter api.Emitter = &api.Emitting{
|
||||
AppStats: &diagnose.AppStats,
|
||||
OutputChannel: outputItems,
|
||||
}
|
||||
|
||||
streamFactory := NewTcpStreamFactory(emitter, streamsMap)
|
||||
streamFactory := NewTcpStreamFactory(emitter, streamsMap, opts)
|
||||
streamPool := reassembly.NewStreamPool(streamFactory)
|
||||
assembler := reassembly.NewAssembler(streamPool)
|
||||
|
||||
|
||||
@@ -24,6 +24,7 @@ type tcpStreamFactory struct {
|
||||
Emitter api.Emitter
|
||||
streamsMap *tcpStreamMap
|
||||
ownIps []string
|
||||
opts *TapOpts
|
||||
}
|
||||
|
||||
type tcpStreamWrapper struct {
|
||||
@@ -31,7 +32,7 @@ type tcpStreamWrapper struct {
|
||||
createdAt time.Time
|
||||
}
|
||||
|
||||
func NewTcpStreamFactory(emitter api.Emitter, streamsMap *tcpStreamMap) *tcpStreamFactory {
|
||||
func NewTcpStreamFactory(emitter api.Emitter, streamsMap *tcpStreamMap, opts *TapOpts) *tcpStreamFactory {
|
||||
var ownIps []string
|
||||
|
||||
if localhostIPs, err := getLocalhostIPs(); err != nil {
|
||||
@@ -47,6 +48,7 @@ func NewTcpStreamFactory(emitter api.Emitter, streamsMap *tcpStreamMap) *tcpStre
|
||||
Emitter: emitter,
|
||||
streamsMap: streamsMap,
|
||||
ownIps: ownIps,
|
||||
opts: opts,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -139,17 +141,17 @@ func (factory *tcpStreamFactory) WaitGoRoutines() {
|
||||
}
|
||||
|
||||
func (factory *tcpStreamFactory) getStreamProps(srcIP string, srcPort string, dstIP string, dstPort string) *streamProps {
|
||||
if hostMode {
|
||||
if inArrayString(gSettings.filterAuthorities, fmt.Sprintf("%s:%s", dstIP, dstPort)) {
|
||||
if factory.opts.HostMode {
|
||||
if inArrayString(factory.opts.FilterAuthorities, fmt.Sprintf("%s:%s", dstIP, dstPort)) {
|
||||
logger.Log.Debugf("getStreamProps %s", fmt.Sprintf("+ host1 %s:%s", dstIP, dstPort))
|
||||
return &streamProps{isTapTarget: true, isOutgoing: false}
|
||||
} else if inArrayString(gSettings.filterAuthorities, dstIP) {
|
||||
} else if inArrayString(factory.opts.FilterAuthorities, dstIP) {
|
||||
logger.Log.Debugf("getStreamProps %s", fmt.Sprintf("+ host2 %s", dstIP))
|
||||
return &streamProps{isTapTarget: true, isOutgoing: false}
|
||||
} else if inArrayString(gSettings.filterAuthorities, fmt.Sprintf("%s:%s", srcIP, srcPort)) {
|
||||
} else if inArrayString(factory.opts.FilterAuthorities, fmt.Sprintf("%s:%s", srcIP, srcPort)) {
|
||||
logger.Log.Debugf("getStreamProps %s", fmt.Sprintf("+ host3 %s:%s", srcIP, srcPort))
|
||||
return &streamProps{isTapTarget: true, isOutgoing: true}
|
||||
} else if inArrayString(gSettings.filterAuthorities, srcIP) {
|
||||
} else if inArrayString(factory.opts.FilterAuthorities, srcIP) {
|
||||
logger.Log.Debugf("getStreamProps %s", fmt.Sprintf("+ host4 %s", srcIP))
|
||||
return &streamProps{isTapTarget: true, isOutgoing: true}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user