Compare commits

..

34 Commits

Author SHA1 Message Date
Igor Gov
c026656b5e Improving daemon documentation (#457) 2021-11-21 19:37:02 +02:00
David Levanon
6caa94f08f Add support to auto discover envoy processes (#459)
* discover envoy pids using cluster ips

* add istio flag to cli + rename mtls flag to istio

* add istio.md to docs

* Fixing typos

* Fix minor typos and grammer in docs

Co-authored-by: Nimrod Gilboa Markevich <nimrod@up9.com>
2021-11-21 15:45:07 +02:00
RoyUP9
b77ea63f42 Add token validity check (#483) 2021-11-21 15:14:02 +02:00
gadotroee
2635964a28 Update README (#486) 2021-11-21 14:09:21 +02:00
M. Mert Yıldıran
a16faca5fb Ignore gob files (#488)
* Ignore gob files

* Remove `*.db` from `.gitignore`
2021-11-21 09:29:01 +03:00
M. Mert Yıldıran
8cf6f56a3c Remove unnecessary tcpdump dependency from Dockerfile (#491) 2021-11-21 09:12:51 +03:00
M. Mert Yıldıran
a849aae94c Upgrade Basenine version from 0.2.9 to 0.2.10 (#484)
* Upgrade Basenine version from `0.2.9` to `0.2.10`

Fixes the issues in `limit` and `rlimit` helpers that occur when they are on the left operand of a binary expression.

* Upgrade the client hash to latest
2021-11-19 18:57:19 +03:00
M. Mert Yıldıran
8118569460 Show the source and destination IP in the entry feed (#485) 2021-11-18 20:21:51 +03:00
Nimrod Gilboa Markevich
2e75834dd0 Refactor watch pods to allow reusing watch wrapper (#470)
Currently shared/kubernetes/watch.go:FilteredWatch only watches pods.
This PR makes it reusable for other types of resources.
This is done in preparation for watching k8s events.
2021-11-18 11:53:11 +02:00
M. Mert Yıldıran
dd53a36d5f Prevent the crash on client-side in case of text being undefined in FancyTextDisplay (#481)
* Prevent the crash on client-side in case of `text` being undefined in `FancyTextDisplay`

* Use `String(text)` instead
2021-11-17 18:50:09 +03:00
M. Mert Yıldıran
ad78f1dcd7 Clear focusedEntryId state in case of a filter is applied (#482) 2021-11-17 18:20:23 +03:00
M. Mert Yıldıran
a13fec3dae Sync entries in batches just as before (using uploadIntervalSec parameter) (#477)
* Sync entries in batches just as before (using `uploadIntervalSec` parameter)

* Replace `lastTimeSynced` value with `time.Time{}`

Since it will be overwritten by the very first iteration.
2021-11-17 15:16:49 +03:00
M. Mert Yıldıran
bb85312b9f Don't omit the key-value pair if the value is false in EntryTableSection (#478) 2021-11-17 15:02:23 +03:00
RamiBerm
18be46809e TRA-3903 minor daemon mode refactor (#479)
* Update common.go and tapRunner.go

* Update common.go
2021-11-17 11:18:08 +02:00
RamiBerm
b7f7daa05c TRA-3903 fix daemon mode in permission restricted configs (#473)
* Update tapRunner.go, permissions-all-namespaces-daemon.yaml, and 2 more files...

* Update tapRunner.go

* Update tapRunner.go and permissions-ns-daemon.yaml

* Update tapRunner.go

* Update tapRunner.go

* Update tapRunner.go
2021-11-17 11:14:43 +02:00
M. Mert Yıldıran
95d2a868e1 Update the UI screenshots (#476)
* Update the UI screenshots

* Update `mizu-ui.png`
2021-11-16 22:44:31 +03:00
RamiBerm
36077a9985 TRA-3903 - display targetted pods before waiting for all daemon resources to be created (#475)
* WIP

* Update tapRunner.go

* Update tapRunner.go
2021-11-16 17:53:38 +02:00
RamiBerm
51e0dd8ba9 TRA-3903 add flag to disable pvc creation for daemon mode (#474)
* Update tapRunner.go and tapConfig.go

* Update tapConfig.go

* Revert "Update tapConfig.go"

This reverts commit 5c7c02c4ab.
2021-11-16 17:11:47 +02:00
M. Mert Yıldıran
7f265dc4c5 Return 404 instead of 500 if the entry could not be found and display a toast message (#464) 2021-11-16 17:13:07 +03:00
RoyUP9
1c75ce314b fixed redact acceptance test (#472) 2021-11-16 15:49:08 +02:00
RamiBerm
89836d8d75 TRA-3903 better health endpoint for daemon mode (#471)
* Update main.go, status_controller.go, and 2 more files...

* Update status_controller.go and mizuTapperSyncer.go
2021-11-16 15:44:27 +02:00
RoyUP9
763f72a640 remove newline in logs, fixed logs time format (#469) 2021-11-16 12:07:48 +02:00
Igor Gov
a6ec246dd1 Stop reduction of user agent header (#468) 2021-11-16 11:33:31 +02:00
RoyUP9
3e30815fb4 changes log format to be more readable (#463) 2021-11-16 11:01:40 +02:00
M. Mert Yıldıran
a6bf39fad5 Prevent elapsedTime to be negative (#467)
Also fix the `elapsedTime` for Redis.
2021-11-16 02:52:48 +03:00
M. Mert Yıldıran
58a1eac247 Set response.bodySize to 0 if it's negative (#466) 2021-11-16 01:58:22 +03:00
M. Mert Yıldıran
ad574956df Upgrade Basenine version from 0.2.8 to 0.2.9 (#465)
Fixes `limit` helper being not finished because of lack of meta updates.
2021-11-16 00:53:29 +03:00
M. Mert Yıldıran
618cb3a409 Optimize UI entry feed performance (#452)
* Optimize the React code for feeding the entries

By building `EntryItem` only once and updating the `entries` state on meta query messages.

* Upgrade `react-scrollable-feed-virtualized` version from `1.4.3` to `1.4.8`

* Fix the `isSelected` state

* Set the query text before deciding the background to prevent lags while typing

* Upgrade Basenine version from `0.2.6` to `0.2.7`

* Set the query background color only if the query is same after the HTTP request and use `useEffect` instead

* Upgrade Basenine version from `0.2.7` to `0.2.8`

* Use `CancelToken` of `axios` instead of trying to check the query state

* Turn `updateQuery` function into a state hook

* Update the macro for `http`

* Do the `source.cancel()` call in `axios.CancelToken`

* Reduce client-side logging
2021-11-15 17:32:05 +03:00
M. Mert Yıldıran
2582b7a65c Ignore SNYK-JS-JSONSCHEMA-1920922 (#462)
Dependency tree:
`node-sass@5.0.0 > node-gyp@7.1.2 > request@2.88.2 > http-signature@1.2.0 > jsprim@1.4.1 > json-schema@0.2.3`

`node-sass` should fix it first.
2021-11-15 17:29:20 +03:00
RoyUP9
4641ee7c54 fixed acceptance test go sum (#458) 2021-11-14 13:54:10 +02:00
RoyUP9
14a5fe11e7 changed logger debug mode to log level (#456) 2021-11-14 12:21:48 +02:00
Nimrod Gilboa Markevich
6909e6e657 Add link to exposing mizu wiki page in README (#455) 2021-11-11 16:31:47 +02:00
RoyUP9
3e132905ce extend cleanup timeout to solve context timeout problem in dump logs (#453) 2021-11-11 14:30:35 +02:00
RoyUP9
ea0b3fb34e moved headless to root config, use headless in view (#450) 2021-11-11 12:11:02 +02:00
71 changed files with 1058 additions and 487 deletions

2
.gitignore vendored
View File

@@ -15,7 +15,6 @@
# vendor/
.idea/
build
*.db
# Mac OS
.DS_Store
@@ -32,3 +31,4 @@ pprof/*
# Database Files
*.bin
*.gob

View File

@@ -42,8 +42,8 @@ RUN go build -ldflags="-s -w \
-X 'mizuserver/pkg/version.SemVer=${SEM_VER}'" -o mizuagent .
# Download Basenine executable, verify the sha1sum and move it to a directory in $PATH
ADD https://github.com/up9inc/basenine/releases/download/v0.2.6/basenine_linux_amd64 ./basenine_linux_amd64
ADD https://github.com/up9inc/basenine/releases/download/v0.2.6/basenine_linux_amd64.sha256 ./basenine_linux_amd64.sha256
ADD https://github.com/up9inc/basenine/releases/download/v0.2.10/basenine_linux_amd64 ./basenine_linux_amd64
ADD https://github.com/up9inc/basenine/releases/download/v0.2.10/basenine_linux_amd64.sha256 ./basenine_linux_amd64.sha256
RUN shasum -a 256 -c basenine_linux_amd64.sha256
RUN chmod +x ./basenine_linux_amd64
@@ -52,7 +52,7 @@ RUN cd .. && /bin/bash build_extensions.sh
FROM alpine:3.14
RUN apk add bash libpcap-dev tcpdump
RUN apk add bash libpcap-dev
WORKDIR /app

View File

@@ -11,7 +11,12 @@ Think TCPDump and Chrome Dev Tools combined.
## Features
- Simple and powerful CLI
- Real-time view of all HTTP requests, REST and gRPC API calls
- Monitoring network traffic in real-time. Supported protocols:
- [HTTP/1.1](https://datatracker.ietf.org/doc/html/rfc2616)
- [HTTP/2](https://datatracker.ietf.org/doc/html/rfc7540) (gRPC)
- [AMQP](https://www.rabbitmq.com/amqp-0-9-1-reference.html) (RabbitMQ, Apache Qpid, etc.)
- [Apache Kafka](https://kafka.apache.org/protocol)
- [Redis](https://redis.io/topics/protocol)
- No installation or code instrumentation
- Works completely on premises
@@ -44,15 +49,6 @@ SHA256 checksums are available on the [Releases](https://github.com/up9inc/mizu/
### Development (unstable) Build
Pick one from the [Releases](https://github.com/up9inc/mizu/releases) page
## Kubeconfig & Permissions
While `mizu`most often works out of the box, you can influence its behavior:
1. [OPTIONAL] Set `KUBECONFIG` environment variable to your Kubernetes configuration. If this is not set, Mizu assumes that configuration is at `${HOME}/.kube/config`
2. `mizu` assumes user running the command has permissions to create resources (such as pods, services, namespaces) on your Kubernetes cluster (no worries - `mizu` resources are cleaned up upon termination)
For detailed list of k8s permissions see [PERMISSIONS](docs/PERMISSIONS.md) document
## How to Run
1. Find pods you'd like to tap to in your Kubernetes cluster
@@ -111,68 +107,25 @@ To tap all pods in current namespace -
Web interface is now available at http://localhost:8899
^C
```
### To run mizu mizu daemon mode (detached from cli)
```bash
$ mizu tap "^ca.*" --daemon
Mizu will store up to 200MB of traffic, old traffic will be cleared once the limit is reached.
Tapping pods in namespaces "sock-shop"
Waiting for mizu to be ready... (may take a few minutes)
+carts-66c77f5fbb-fq65r
+catalogue-5f4cb7cf5-7zrmn
..
$ mizu view
Establishing connection to k8s cluster...
Mizu is available at http://localhost:8899
^C
..
$ mizu clean # mizu will continue running in cluster until clean is executed
Removing mizu resources
```
### To run mizu daemon mode with LoadBalancer kubernetes service
```bash
$ mizu tap "^ca.*" --daemon
Mizu will store up to 200MB of traffic, old traffic will be cleared once the limit is reached.
Tapping pods in namespaces "sock-shop"
Waiting for mizu to be ready... (may take a few minutes)
..
$ kubectl expose deployment -n mizu --port 80 --target-port 8899 mizu-api-server --type=LoadBalancer --name=mizu-lb
service/mizu-lb exposed
..
$ kubectl get services -n mizu
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
mizu-api-server ClusterIP 10.107.200.100 <none> 80/TCP 5m5s
mizu-lb LoadBalancer 10.107.200.101 34.77.120.116 80:30141/TCP 76s
```
Note that `LoadBalancer` services only work on supported clusters (usually cloud providers) and might incur extra costs
If you changed the `mizu-resources-namespace` value, make sure the `-n mizu` flag of the `kubectl expose` command is changed to the value of `mizu-resources-namespace`
mizu will now be available both by running `mizu view` or by accessing the `EXTERNAL-IP` of the `mizu-lb` service through your browser.
## Configuration
Mizu can work with config file which should be stored in ${HOME}/.mizu/config.yaml (macOS: ~/.mizu/config.yaml) <br />
In case no config file found, defaults will be used <br />
Mizu can optionally work with a config file that can be provided as a CLI argument (using `--set config-path=<PATH>`) or if not provided, will be stored at ${HOME}/.mizu/config.yaml
In case of partial configuration defined, all other fields will be used with defaults <br />
You can always override the defaults or config file with CLI flags
To get the default config params run `mizu config` <br />
To generate a new config file with default values use `mizu config -r`
### Telemetry
By default, mizu reports usage telemetry. It can be disabled by adding a line of `telemetry: false` in the `${HOME}/.mizu/config.yaml` file
## Advanced Usage
### Kubeconfig
It is possible to change the kubeconfig path using `KUBECONFIG` environment variable or the command like flag
with `--set kube-config-path=<PATH>`. </br >
If both are not set - Mizu assumes that configuration is at `${HOME}/.kube/config`
### Namespace-Restricted Mode
Some users have permission to only manage resources in one particular namespace assigned to them
@@ -186,6 +139,8 @@ using the `--namespace` flag or by setting `tap.namespaces` in the config file
Setting `mizu-resources-namespace=mizu` resets Mizu to its default behavior
For detailed list of k8s permissions see [PERMISSIONS](docs/PERMISSIONS.md) document
### User agent filtering
User-agent filtering (like health checks) - can be configured using command-line options:
@@ -226,14 +181,10 @@ and when changed it will support accessing by IP
### Run in daemon mode
Mizu can be ran detached from the cli using the daemon flag: `mizu tap --daemon`. This type of mizu instance will run indefinitely in the cluster.
Please note that daemon mode requires you to have RBAC creation permissions, see the [permissions](docs/PERMISSIONS.md) doc for more details.
In order to access a daemon mizu you will have to run `mizu view` after running the `tap --daemon` command.
To stop the detached mizu instance and clean all cluster side resources, run `mizu clean`
Mizu can be run detached from the cli using the daemon flag: `mizu tap --daemon`. This type of mizu instance will run
indefinitely in the cluster.
For more information please refer to [DAEMON MODE](docs/DAEMON_MODE.md)
## How to Run local UI

View File

@@ -304,6 +304,7 @@ github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W
github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 h1:lDH9UUVJtmYCjyT0CI4q8xvlXPxeZ0gYCVvWbmPlp88=
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=

View File

@@ -427,9 +427,10 @@ func TestTapRedact(t *testing.T) {
}
proxyUrl := getProxyUrl(defaultNamespaceName, defaultServiceName)
requestHeaders := map[string]string{"User-Header": "Mizu"}
requestBody := map[string]string{"User": "Mizu"}
for i := 0; i < defaultEntriesCount; i++ {
if _, requestErr := executeHttpPostRequest(fmt.Sprintf("%v/post", proxyUrl), requestBody); requestErr != nil {
if _, requestErr := executeHttpPostRequestWithHeaders(fmt.Sprintf("%v/post", proxyUrl), requestHeaders, requestBody); requestErr != nil {
t.Errorf("failed to send proxy request, err: %v", requestErr)
return
}
@@ -460,12 +461,12 @@ func TestTapRedact(t *testing.T) {
headers := request["_headers"].([]interface{})
for _, headerInterface := range headers {
header := headerInterface.(map[string]interface{})
if header["name"].(string) != "User-Agent" {
if header["name"].(string) != "User-Header" {
continue
}
userAgent := header["value"].(string)
if userAgent != "[REDACTED]" {
userHeader := header["value"].(string)
if userHeader != "[REDACTED]" {
return fmt.Errorf("unexpected result - user agent is not redacted")
}
}
@@ -530,9 +531,10 @@ func TestTapNoRedact(t *testing.T) {
}
proxyUrl := getProxyUrl(defaultNamespaceName, defaultServiceName)
requestHeaders := map[string]string{"User-Header": "Mizu"}
requestBody := map[string]string{"User": "Mizu"}
for i := 0; i < defaultEntriesCount; i++ {
if _, requestErr := executeHttpPostRequest(fmt.Sprintf("%v/post", proxyUrl), requestBody); requestErr != nil {
if _, requestErr := executeHttpPostRequestWithHeaders(fmt.Sprintf("%v/post", proxyUrl), requestHeaders, requestBody); requestErr != nil {
t.Errorf("failed to send proxy request, err: %v", requestErr)
return
}
@@ -563,12 +565,12 @@ func TestTapNoRedact(t *testing.T) {
headers := request["_headers"].([]interface{})
for _, headerInterface := range headers {
header := headerInterface.(map[string]interface{})
if header["name"].(string) != "User-Agent" {
if header["name"].(string) != "User-Header" {
continue
}
userAgent := header["value"].(string)
if userAgent == "[REDACTED]" {
userHeader := header["value"].(string)
if userHeader == "[REDACTED]" {
return fmt.Errorf("unexpected result - user agent is redacted")
}
}
@@ -827,21 +829,21 @@ func TestTapDumpLogs(t *testing.T) {
return
}
var dumpsLogsPath string
var dumpLogsPath string
for _, file := range files {
fileName := file.Name()
if strings.Contains(fileName, "mizu_logs") {
dumpsLogsPath = path.Join(mizuFolderPath, fileName)
dumpLogsPath = path.Join(mizuFolderPath, fileName)
break
}
}
if dumpsLogsPath == "" {
if dumpLogsPath == "" {
t.Errorf("dump logs file not found")
return
}
zipReader, zipError := zip.OpenReader(dumpsLogsPath)
zipReader, zipError := zip.OpenReader(dumpLogsPath)
if zipError != nil {
t.Errorf("failed to get zip reader, err: %v", zipError)
return

View File

@@ -93,16 +93,16 @@ func getDefaultCommandArgs() []string {
telemetry := "telemetry=false"
agentImage := "agent-image=gcr.io/up9-docker-hub/mizu/ci:0.0.0"
imagePullPolicy := "image-pull-policy=Never"
headless := "headless=true"
return []string{setFlag, telemetry, setFlag, agentImage, setFlag, imagePullPolicy}
return []string{setFlag, telemetry, setFlag, agentImage, setFlag, imagePullPolicy, setFlag, headless}
}
func getDefaultTapCommandArgs() []string {
headless := "--headless"
tapCommand := "tap"
defaultCmdArgs := getDefaultCommandArgs()
return append([]string{tapCommand, headless}, defaultCmdArgs...)
return append([]string{tapCommand}, defaultCmdArgs...)
}
func getDefaultTapCommandArgsWithDaemonMode() []string {
@@ -135,11 +135,17 @@ func getDefaultConfigCommandArgs() []string {
}
func getDefaultCleanCommandArgs() []string {
return []string{"clean"}
cleanCommand := "clean"
defaultCmdArgs := getDefaultCommandArgs()
return append([]string{cleanCommand}, defaultCmdArgs...)
}
func getDefaultViewCommandArgs() []string {
return []string{"view"}
viewCommand := "view"
defaultCmdArgs := getDefaultCommandArgs()
return append([]string{viewCommand}, defaultCmdArgs...)
}
func retriesExecute(retriesCount int, executeFunc func() error) error {
@@ -234,13 +240,24 @@ func executeHttpGetRequest(url string) (interface{}, error) {
return executeHttpRequest(response, requestErr)
}
func executeHttpPostRequest(url string, body interface{}) (interface{}, error) {
func executeHttpPostRequestWithHeaders(url string, headers map[string]string, body interface{}) (interface{}, error) {
requestBody, jsonErr := json.Marshal(body)
if jsonErr != nil {
return nil, jsonErr
}
response, requestErr := http.Post(url, "application/json", bytes.NewBuffer(requestBody))
request, err := http.NewRequest(http.MethodPost, url, bytes.NewBuffer(requestBody))
if err != nil {
return nil, err
}
request.Header.Add("Content-Type", "application/json")
for headerKey, headerValue := range headers {
request.Header.Add(headerKey, headerValue)
}
client := &http.Client{}
response, requestErr := client.Do(request)
return executeHttpRequest(response, requestErr)
}

View File

@@ -16,7 +16,7 @@ require (
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/up9inc/basenine/client/go v0.0.0-20211109233221-12b405471084
github.com/up9inc/basenine/client/go v0.0.0-20211118123155-7ed075f85c73
github.com/up9inc/mizu/shared v0.0.0
github.com/up9inc/mizu/tap v0.0.0
github.com/up9inc/mizu/tap/api v0.0.0

View File

@@ -450,8 +450,8 @@ github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/up9inc/basenine/client/go v0.0.0-20211109233221-12b405471084 h1:gLoP7AyS/c6pYuBQOgALWpzzc5/aSrq98Lr49JRfmfs=
github.com/up9inc/basenine/client/go v0.0.0-20211109233221-12b405471084/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
github.com/up9inc/basenine/client/go v0.0.0-20211118123155-7ed075f85c73 h1:FJUM7w7E0jRGFPcSMa7cVy+jr5zcpbyT6qA30dEtGGI=
github.com/up9inc/basenine/client/go v0.0.0-20211118123155-7ed075f85c73/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
github.com/vektah/gqlparser v1.1.2/go.mod h1:1ycwN7Ij5njmMkPPAOaRFY4rET2Enx7IkVv3vaXspKw=
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f h1:p4VB7kIXpOQvVn1ZaTIVp+3vuYAXFe3OJEvjbUYJLaA=
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=

View File

@@ -94,17 +94,17 @@ func main() {
panic("API server address must be provided with --api-server-address when using --tap")
}
hostMode := os.Getenv(shared.HostModeEnvVar) == "1"
tapOpts := &tap.TapOpts{HostMode: hostMode}
tapTargets := getTapTargets()
if tapTargets != nil {
tap.SetFilterAuthorities(tapTargets)
logger.Log.Infof("Filtering for the following authorities: %v", tap.GetFilterIPs())
tapOpts.FilterAuthorities = tapTargets
logger.Log.Infof("Filtering for the following authorities: %v", tapOpts.FilterAuthorities)
}
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
filteringOptions := getTrafficFilteringOptions()
hostMode := os.Getenv(shared.HostModeEnvVar) == "1"
tapOpts := &tap.TapOpts{HostMode: hostMode}
tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, extensions, filteringOptions)
socketConnection, err := dialSocketWithRetry(*apiServerAddress, socketConnectionRetries, socketConnectionRetryDelay)
if err != nil {
@@ -207,7 +207,7 @@ func loadExtensions() {
extensionsMap = make(map[string]*tapApi.Extension)
for i, file := range files {
filename := file.Name()
logger.Log.Infof("Loading extension: %s\n", filename)
logger.Log.Infof("Loading extension: %s", filename)
extension := &tapApi.Extension{
Path: path.Join(extensionsDir, filename),
}
@@ -219,7 +219,7 @@ func loadExtensions() {
var ok bool
dissector, ok = symDissector.(tapApi.Dissector)
if err != nil || !ok {
panic(fmt.Sprintf("Failed to load the extension: %s\n", extension.Path))
panic(fmt.Sprintf("Failed to load the extension: %s", extension.Path))
}
dissector.Register(extension)
extension.Dissector = dissector
@@ -232,7 +232,7 @@ func loadExtensions() {
})
for _, extension := range extensions {
logger.Log.Infof("Extension Properties: %+v\n", extension)
logger.Log.Infof("Extension Properties: %+v", extension)
}
controllers.InitExtensionsMap(extensionsMap)
@@ -398,10 +398,11 @@ func getSyncEntriesConfig() *shared.SyncEntriesConfig {
}
func determineLogLevel() (logLevel logging.Level) {
logLevel = logging.INFO
if os.Getenv(shared.DebugModeEnvVar) == "1" {
logLevel = logging.DEBUG
logLevel, err := logging.LogLevel(os.Getenv(shared.LogLevelEnvVar))
if err != nil {
logLevel = logging.INFO
}
return
}
@@ -438,10 +439,11 @@ func startMizuTapperSyncer(ctx context.Context) (*kubernetes.MizuTapperSyncer, e
AgentImage: config.Config.AgentImage,
TapperResources: config.Config.TapperResources,
ImagePullPolicy: v1.PullPolicy(config.Config.PullPolicy),
DumpLogs: config.Config.DumpLogs,
LogLevel: config.Config.LogLevel,
IgnoredUserAgents: config.Config.IgnoredUserAgents,
MizuApiFilteringOptions: config.Config.MizuApiFilteringOptions,
MizuServiceAccountExists: true, //assume service account exists since daemon mode will not function without it anyway
Istio: config.Config.Istio,
})
if err != nil {
@@ -458,7 +460,7 @@ func startMizuTapperSyncer(ctx context.Context) (*kubernetes.MizuTapperSyncer, e
return
}
logger.Log.Fatalf("fatal tap syncer error: %v", syncerErr)
case _, ok := <-tapperSyncer.TapPodChangesOut:
case tapPodChangeEvent, ok := <-tapperSyncer.TapPodChangesOut:
if !ok {
logger.Log.Debug("mizuTapperSyncer pod changes channel closed, ending listener loop")
return
@@ -471,6 +473,7 @@ func startMizuTapperSyncer(ctx context.Context) (*kubernetes.MizuTapperSyncer, e
}
api.BroadcastToBrowserClients(serializedTapStatus)
providers.TapStatus.Pods = tapStatus.Pods
providers.ExpectedTapperAmount = tapPodChangeEvent.ExpectedTapperAmount
case <-ctx.Done():
logger.Log.Debug("mizuTapperSyncer event listener loop exiting due to context done")
return

View File

@@ -76,7 +76,7 @@ func startReadingFiles(workingDir string) {
sort.Sort(utils.ByModTime(harFiles))
if len(harFiles) == 0 {
logger.Log.Infof("Waiting for new files\n")
logger.Log.Infof("Waiting for new files")
time.Sleep(3 * time.Second)
continue
}
@@ -109,7 +109,7 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
ctx := context.Background()
doc, contractContent, router, err := loadOAS(ctx)
if err != nil {
logger.Log.Infof("Disabled OAS validation: %s\n", err.Error())
logger.Log.Infof("Disabled OAS validation: %s", err.Error())
disableOASValidation = true
}
@@ -154,7 +154,7 @@ func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, re
unresolvedSource := connectionInfo.ClientIP
resolvedSource = k8sResolver.Resolve(unresolvedSource)
if resolvedSource == "" {
logger.Log.Debugf("Cannot find resolved name to source: %s\n", unresolvedSource)
logger.Log.Debugf("Cannot find resolved name to source: %s", unresolvedSource)
if os.Getenv("SKIP_NOT_RESOLVED_SOURCE") == "1" {
return
}
@@ -162,7 +162,7 @@ func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, re
unresolvedDestination := fmt.Sprintf("%s:%s", connectionInfo.ServerIP, connectionInfo.ServerPort)
resolvedDestination = k8sResolver.Resolve(unresolvedDestination)
if resolvedDestination == "" {
logger.Log.Debugf("Cannot find resolved name to dest: %s\n", unresolvedDestination)
logger.Log.Debugf("Cannot find resolved name to dest: %s", unresolvedDestination)
if os.Getenv("SKIP_NOT_RESOLVED_DEST") == "1" {
return
}

View File

@@ -146,7 +146,7 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even
var metadata *basenine.Metadata
err = json.Unmarshal(bytes, &metadata)
if err != nil {
logger.Log.Debugf("Error recieving metadata: %v\n", err.Error())
logger.Log.Debugf("Error recieving metadata: %v", err.Error())
}
metadataBytes, _ := models.CreateWebsocketQueryMetadataMessage(metadata)
@@ -167,7 +167,7 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even
func socketCleanup(socketId int, socketConnection *SocketConnection) {
err := socketConnection.connection.Close()
if err != nil {
logger.Log.Errorf("Error closing socket connection for socket id %d: %v\n", socketId, err)
logger.Log.Errorf("Error closing socket connection for socket id %d: %v", socketId, err)
}
websocketIdsLock.Lock()

View File

@@ -65,14 +65,14 @@ func (h *RoutesEventHandlers) WebSocketMessage(_ int, message []byte) {
var socketMessageBase shared.WebSocketMessageMetadata
err := json.Unmarshal(message, &socketMessageBase)
if err != nil {
logger.Log.Infof("Could not unmarshal websocket message %v\n", err)
logger.Log.Infof("Could not unmarshal websocket message %v", err)
} else {
switch socketMessageBase.MessageType {
case shared.WebSocketMessageTypeTappedEntry:
var tappedEntryMessage models.WebSocketTappedEntryMessage
err := json.Unmarshal(message, &tappedEntryMessage)
if err != nil {
logger.Log.Infof("Could not unmarshal message of message type %s %v\n", socketMessageBase.MessageType, err)
logger.Log.Infof("Could not unmarshal message of message type %s %v", socketMessageBase.MessageType, err)
} else {
// NOTE: This is where the message comes back from the intermediate WebSocket to code.
h.SocketOutChannel <- tappedEntryMessage.Data
@@ -81,7 +81,7 @@ func (h *RoutesEventHandlers) WebSocketMessage(_ int, message []byte) {
var statusMessage shared.WebSocketStatusMessage
err := json.Unmarshal(message, &statusMessage)
if err != nil {
logger.Log.Infof("Could not unmarshal message of message type %s %v\n", socketMessageBase.MessageType, err)
logger.Log.Infof("Could not unmarshal message of message type %s %v", socketMessageBase.MessageType, err)
} else {
providers.TapStatus.Pods = statusMessage.TappingStatus.Pods
BroadcastToBrowserClients(message)
@@ -90,7 +90,7 @@ func (h *RoutesEventHandlers) WebSocketMessage(_ int, message []byte) {
var outboundLinkMessage models.WebsocketOutboundLinkMessage
err := json.Unmarshal(message, &outboundLinkMessage)
if err != nil {
logger.Log.Infof("Could not unmarshal message of message type %s %v\n", socketMessageBase.MessageType, err)
logger.Log.Infof("Could not unmarshal message of message type %s %v", socketMessageBase.MessageType, err)
} else {
handleTLSLink(outboundLinkMessage)
}

View File

@@ -25,7 +25,12 @@ func Error(c *gin.Context, err error) bool {
if err != nil {
logger.Log.Errorf("Error getting entry: %v", err)
c.Error(err)
c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": true, "msg": err.Error()})
c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{
"error": true,
"type": "error",
"autoClose": "5000",
"msg": err.Error(),
})
return true // signal that there was an error and the caller should return
}
return false // no error, can continue
@@ -39,7 +44,13 @@ func GetEntry(c *gin.Context) {
return // exit
}
err = json.Unmarshal(bytes, &entry)
if Error(c, err) {
if err != nil {
c.JSON(http.StatusNotFound, gin.H{
"error": true,
"type": "error",
"autoClose": "5000",
"msg": string(bytes),
})
return // exit
}

View File

@@ -2,7 +2,9 @@ package controllers
import (
"encoding/json"
"fmt"
"mizuserver/pkg/api"
"mizuserver/pkg/config"
"mizuserver/pkg/holder"
"mizuserver/pkg/providers"
"mizuserver/pkg/up9"
@@ -15,6 +17,13 @@ import (
)
func HealthCheck(c *gin.Context) {
if config.Config.DaemonMode {
if providers.ExpectedTapperAmount != providers.TappersCount {
c.JSON(http.StatusInternalServerError, fmt.Sprintf("expecting more tappers than are actually connected (%d expected, %d connected)", providers.ExpectedTapperAmount, providers.TappersCount))
return
}
}
response := shared.HealthResponse{
TapStatus: providers.TapStatus,
TappersCount: providers.TappersCount,
@@ -37,7 +46,7 @@ func PostTappedPods(c *gin.Context) {
providers.TapStatus.Pods = tapStatus.Pods
message := shared.CreateWebSocketStatusMessage(*tapStatus)
if jsonBytes, err := json.Marshal(message); err != nil {
logger.Log.Errorf("Could not Marshal message %v\n", err)
logger.Log.Errorf("Could not Marshal message %v", err)
} else {
api.BroadcastToBrowserClients(jsonBytes)
}

View File

@@ -19,7 +19,7 @@ var (
TapStatus shared.TapStatus
authStatus *models.AuthStatus
RecentTLSLinks = cache.New(tlsLinkRetainmentTime, tlsLinkRetainmentTime)
ExpectedTapperAmount = -1 //only relevant in daemon mode as cli manages tappers otherwise
tappersCountLock = sync.Mutex{}
)

View File

@@ -164,10 +164,10 @@ func (resolver *Resolver) watchServices(ctx context.Context) error {
func (resolver *Resolver) saveResolvedName(key string, resolved string, eventType watch.EventType) {
if eventType == watch.Deleted {
resolver.nameMap.Remove(key)
logger.Log.Infof("setting %s=nil\n", key)
logger.Log.Infof("setting %s=nil", key)
} else {
resolver.nameMap.Set(key, resolved)
logger.Log.Infof("setting %s=%s\n", key, resolved)
logger.Log.Infof("setting %s=%s", key, resolved)
}
}
@@ -188,7 +188,7 @@ func (resolver *Resolver) infiniteErrorHandleRetryFunc(ctx context.Context, fun
var statusError *k8serrors.StatusError
if errors.As(err, &statusError) {
if statusError.ErrStatus.Reason == metav1.StatusReasonForbidden {
logger.Log.Infof("Resolver loop encountered permission error, aborting event listening - %v\n", err)
logger.Log.Infof("Resolver loop encountered permission error, aborting event listening - %v", err)
return
}
}

View File

@@ -112,14 +112,14 @@ func GetAnalyzeInfo() *shared.AnalyzeStatus {
}
func SyncEntries(syncEntriesConfig *shared.SyncEntriesConfig) error {
logger.Log.Infof("Sync entries - started\n")
logger.Log.Infof("Sync entries - started")
var (
token, model string
guestMode bool
)
if syncEntriesConfig.Token == "" {
logger.Log.Infof("Sync entries - creating anonymous token. env %s\n", syncEntriesConfig.Env)
logger.Log.Infof("Sync entries - creating anonymous token. env %s", syncEntriesConfig.Env)
guestToken, err := createAnonymousToken(syncEntriesConfig.Env)
if err != nil {
return fmt.Errorf("failed creating anonymous token, err: %v", err)
@@ -133,7 +133,7 @@ func SyncEntries(syncEntriesConfig *shared.SyncEntriesConfig) error {
model = syncEntriesConfig.Workspace
guestMode = false
logger.Log.Infof("Sync entries - upserting model. env %s, model %s\n", syncEntriesConfig.Env, model)
logger.Log.Infof("Sync entries - upserting model. env %s, model %s", syncEntriesConfig.Env, model)
if err := upsertModel(token, model, syncEntriesConfig.Env); err != nil {
return fmt.Errorf("failed upserting model, err: %v", err)
}
@@ -144,7 +144,7 @@ func SyncEntries(syncEntriesConfig *shared.SyncEntriesConfig) error {
return fmt.Errorf("invalid model name, model name: %s", model)
}
logger.Log.Infof("Sync entries - syncing. token: %s, model: %s, guest mode: %v\n", token, model, guestMode)
logger.Log.Infof("Sync entries - syncing. token: %s, model: %s, guest mode: %v", token, model, guestMode)
go syncEntriesImpl(token, model, syncEntriesConfig.Env, syncEntriesConfig.UploadIntervalSec, guestMode)
return nil
@@ -209,7 +209,7 @@ func syncEntriesImpl(token string, model string, envPrefix string, uploadInterva
// "http or grpc" filter indicates that we're only interested in HTTP and gRPC entries
query := "http or grpc"
logger.Log.Infof("Getting entries from the database\n")
logger.Log.Infof("Getting entries from the database")
var connection *basenine.Connection
var err error
@@ -227,6 +227,10 @@ func syncEntriesImpl(token string, model string, envPrefix string, uploadInterva
connection.Close()
}()
lastTimeSynced := time.Time{}
batch := make([]har.Entry, 0)
handleDataChannel := func(wg *sync.WaitGroup, connection *basenine.Connection, data chan []byte) {
defer wg.Done()
for {
@@ -239,7 +243,6 @@ func syncEntriesImpl(token string, model string, envPrefix string, uploadInterva
var dataMap map[string]interface{}
err = json.Unmarshal(dataBytes, &dataMap)
result := make([]har.Entry, 0)
var entry tapApi.MizuEntry
if err := json.Unmarshal([]byte(dataBytes), &entry); err != nil {
continue
@@ -261,14 +264,22 @@ func syncEntriesImpl(token string, model string, envPrefix string, uploadInterva
continue
}
result = append(result, *harEntry)
batch = append(batch, *harEntry)
body, jMarshalErr := json.Marshal(result)
now := time.Now()
if lastTimeSynced.Add(time.Duration(uploadIntervalSec) * time.Second).After(now) {
continue
}
lastTimeSynced = now
body, jMarshalErr := json.Marshal(batch)
batchSize := len(batch)
if jMarshalErr != nil {
analyzeInformation.Reset()
logger.Log.Infof("Stopping sync entries")
logger.Log.Fatal(jMarshalErr)
}
batch = make([]har.Entry, 0)
var in bytes.Buffer
w := zlib.NewWriter(&in)
@@ -293,7 +304,7 @@ func syncEntriesImpl(token string, model string, envPrefix string, uploadInterva
logger.Log.Info("Stopping sync entries")
logger.Log.Fatal(postErr)
}
analyzeInformation.SentCount += 1
analyzeInformation.SentCount += batchSize
if analyzeInformation.SentCount%SentCountLogInterval == 0 {
logger.Log.Infof("Uploaded %v entries until now", analyzeInformation.SentCount)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 491 KiB

After

Width:  |  Height:  |  Size: 640 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 55 KiB

After

Width:  |  Height:  |  Size: 110 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 43 KiB

After

Width:  |  Height:  |  Size: 53 KiB

View File

@@ -4,9 +4,15 @@ import (
"context"
"errors"
"fmt"
"github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/mizu"
"github.com/up9inc/mizu/cli/mizu/fsUtils"
"github.com/up9inc/mizu/cli/telemetry"
"os"
"os/signal"
"path"
"syscall"
"time"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/config/configStructs"
@@ -64,3 +70,42 @@ func handleKubernetesProviderError(err error) {
logger.Log.Error(err)
}
}
func finishMizuExecution(kubernetesProvider *kubernetes.Provider, apiProvider *apiserver.Provider) {
telemetry.ReportAPICalls(apiProvider)
removalCtx, cancel := context.WithTimeout(context.Background(), cleanupTimeout)
defer cancel()
dumpLogsIfNeeded(removalCtx, kubernetesProvider)
cleanUpMizuResources(removalCtx, cancel, kubernetesProvider)
}
func dumpLogsIfNeeded(ctx context.Context, kubernetesProvider *kubernetes.Provider) {
if !config.Config.DumpLogs {
return
}
mizuDir := mizu.GetMizuFolderPath()
filePath := path.Join(mizuDir, fmt.Sprintf("mizu_logs_%s.zip", time.Now().Format("2006_01_02__15_04_05")))
if err := fsUtils.DumpLogs(ctx, kubernetesProvider, filePath); err != nil {
logger.Log.Errorf("Failed dump logs %v", err)
}
}
func cleanUpMizuResources(ctx context.Context, cancel context.CancelFunc, kubernetesProvider *kubernetes.Provider) {
logger.Log.Infof("\nRemoving mizu resources")
var leftoverResources []string
if config.Config.IsNsRestrictedMode() {
leftoverResources = cleanUpRestrictedMode(ctx, kubernetesProvider)
} else {
leftoverResources = cleanUpNonRestrictedMode(ctx, cancel, kubernetesProvider)
}
if len(leftoverResources) > 0 {
errMsg := fmt.Sprintf("Failed to remove the following resources, for more info check logs at %s:", fsUtils.GetLogFilePath())
for _, resource := range leftoverResources {
errMsg += "\n- " + resource
}
logger.Log.Errorf(uiUtils.Error, errMsg)
}
}

View File

@@ -23,6 +23,7 @@ Further info is available at https://github.com/up9inc/mizu`,
if err := config.InitConfig(cmd); err != nil {
logger.Log.Fatal(err)
}
return nil
},
}

View File

@@ -3,6 +3,7 @@ package cmd
import (
"errors"
"fmt"
"github.com/up9inc/mizu/cli/up9"
"os"
"github.com/creasty/defaults"
@@ -62,6 +63,12 @@ Supported protocols are HTTP and gRPC.`,
logger.Log.Errorf("failed to log in, err: %v", err)
return nil
}
} else if isValidToken := up9.IsTokenValid(config.Config.Auth.Token, config.Config.Auth.EnvName); !isValidToken {
logger.Log.Errorf("Token is not valid, please log in again to continue")
if err := auth.Login(); err != nil {
logger.Log.Errorf("failed to log in, err: %v", err)
return nil
}
}
}
}
@@ -113,5 +120,5 @@ func init() {
tapCmd.Flags().String(configStructs.EnforcePolicyFile, defaultTapConfig.EnforcePolicyFile, "Yaml file path with policy rules")
tapCmd.Flags().String(configStructs.ContractFile, defaultTapConfig.ContractFile, "OAS/Swagger file to validate to monitor the contracts")
tapCmd.Flags().Bool(configStructs.DaemonModeTapName, defaultTapConfig.DaemonMode, "Run mizu in daemon mode, detached from the cli")
tapCmd.Flags().Bool(configStructs.HeadlessMode, defaultTapConfig.HeadlessMode, "Enable headless mode.")
tapCmd.Flags().Bool(configStructs.IstioName, defaultTapConfig.Istio, "Record decrypted traffic if the cluster configured with istio and mtls")
}

View File

@@ -5,27 +5,24 @@ import (
"errors"
"fmt"
"io/ioutil"
"path"
"regexp"
"strings"
"time"
"github.com/up9inc/mizu/cli/cmd/goUtils"
"github.com/getkin/kin-openapi/openapi3"
"gopkg.in/yaml.v3"
core "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"github.com/getkin/kin-openapi/openapi3"
"github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/cmd/goUtils"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/errormessage"
"gopkg.in/yaml.v3"
core "k8s.io/api/core/v1"
"github.com/up9inc/mizu/cli/mizu"
"github.com/up9inc/mizu/cli/mizu/fsUtils"
"github.com/up9inc/mizu/cli/telemetry"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/kubernetes"
@@ -119,6 +116,9 @@ func RunMizuTap() {
logger.Log.Infof("Tapping pods in %s", namespacesStr)
if config.Config.Tap.DryRun {
if err := printTappedPodsPreview(ctx, kubernetesProvider, targetNamespaces); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error listing pods: %v", errormessage.FormatError(err)))
}
return
}
@@ -134,7 +134,7 @@ func RunMizuTap() {
return
}
if config.Config.Tap.DaemonMode {
if err := handleDaemonModePostCreation(cancel, kubernetesProvider); err != nil {
if err := handleDaemonModePostCreation(ctx, cancel, kubernetesProvider, targetNamespaces); err != nil {
defer finishMizuExecution(kubernetesProvider, apiProvider)
cancel()
} else {
@@ -156,28 +156,38 @@ func RunMizuTap() {
}
}
func handleDaemonModePostCreation(cancel context.CancelFunc, kubernetesProvider *kubernetes.Provider) error {
func handleDaemonModePostCreation(ctx context.Context, cancel context.CancelFunc, kubernetesProvider *kubernetes.Provider, namespaces []string) error {
if err := printTappedPodsPreview(ctx, kubernetesProvider, namespaces); err != nil {
return err
}
apiProvider := apiserver.NewProvider(GetApiServerUrl(), 90, 1*time.Second)
if err := waitForDaemonModeToBeReady(cancel, kubernetesProvider, apiProvider); err != nil {
return err
}
if err := printDaemonModeTappedPods(apiProvider); err != nil {
return err
}
return nil
}
func printDaemonModeTappedPods(apiProvider *apiserver.Provider) error {
if healthStatus, err := apiProvider.GetHealthStatus(); err != nil {
/*
this function is a bit problematic as it might be detached from the actual pods the mizu api server will tap.
The alternative would be to wait for api server to be ready and then query it for the pods it listens to, this has
the arguably worse drawback of taking a relatively very long time before the user sees which pods are targeted, if any.
*/
func printTappedPodsPreview(ctx context.Context, kubernetesProvider *kubernetes.Provider, namespaces []string) error {
if matchingPods, err := kubernetesProvider.ListAllRunningPodsMatchingRegex(ctx, config.Config.Tap.PodRegex(), namespaces); err != nil {
return err
} else {
for _, tappedPod := range healthStatus.TapStatus.Pods {
if len(matchingPods) == 0 {
printNoPodsFoundSuggestion(namespaces)
}
logger.Log.Info("Pods that match the provided criteria at this instant:")
for _, tappedPod := range matchingPods {
logger.Log.Infof(uiUtils.Green, fmt.Sprintf("+%s", tappedPod.Name))
}
return nil
}
return nil
}
func waitForDaemonModeToBeReady(cancel context.CancelFunc, kubernetesProvider *kubernetes.Provider, apiProvider *apiserver.Provider) error {
@@ -200,10 +210,11 @@ func startTapperSyncer(ctx context.Context, cancel context.CancelFunc, provider
AgentImage: config.Config.AgentImage,
TapperResources: config.Config.Tap.TapperResources,
ImagePullPolicy: config.Config.ImagePullPolicy(),
DumpLogs: config.Config.DumpLogs,
LogLevel: config.Config.LogLevel(),
IgnoredUserAgents: config.Config.Tap.IgnoredUserAgents,
MizuApiFilteringOptions: mizuApiFilteringOptions,
MizuServiceAccountExists: state.mizuServiceAccountExists,
Istio: config.Config.Tap.Istio,
})
if err != nil {
@@ -215,11 +226,7 @@ func startTapperSyncer(ctx context.Context, cancel context.CancelFunc, provider
}
if len(tapperSyncer.CurrentlyTappedPods) == 0 {
var suggestionStr string
if !shared.Contains(targetNamespaces, kubernetes.K8sAllNamespaces) {
suggestionStr = ". Select a different namespace with -n or tap all namespaces with -A"
}
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Did not find any pods matching the regex argument%s", suggestionStr))
printNoPodsFoundSuggestion(targetNamespaces)
}
go func() {
@@ -252,6 +259,14 @@ func startTapperSyncer(ctx context.Context, cancel context.CancelFunc, provider
return nil
}
func printNoPodsFoundSuggestion(targetNamespaces []string) {
var suggestionStr string
if !shared.Contains(targetNamespaces, kubernetes.K8sAllNamespaces) {
suggestionStr = ". You can also try selecting a different namespace with -n or tap all namespaces with -A"
}
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Did not find any currently running pods that match the regex argument, mizu will automatically tap matching pods if any are created later%s", suggestionStr))
}
func getErrorDisplayTextForK8sTapManagerError(err kubernetes.K8sTapManagerError) string {
switch err.TapManagerReason {
case kubernetes.TapManagerPodListError:
@@ -282,7 +297,7 @@ func createMizuResources(ctx context.Context, cancel context.CancelFunc, kuberne
}
if err := createMizuConfigmap(ctx, kubernetesProvider, serializedValidationRules, serializedContract, serializedMizuConfig); err != nil {
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Failed to create resources required for policy validation. Mizu will not validate policy rules. error: %v\n", errormessage.FormatError(err)))
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Failed to create resources required for policy validation. Mizu will not validate policy rules. error: %v", errormessage.FormatError(err)))
}
var err error
@@ -310,6 +325,7 @@ func createMizuResources(ctx context.Context, cancel context.CancelFunc, kuberne
MaxEntriesDBSizeBytes: config.Config.Tap.MaxEntriesDBSizeBytes(),
Resources: config.Config.Tap.ApiServerResources,
ImagePullPolicy: config.Config.ImagePullPolicy(),
LogLevel: config.Config.LogLevel(),
}
if config.Config.Tap.DaemonMode {
@@ -358,20 +374,9 @@ func createMizuApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.
}
func createMizuApiServerDeployment(ctx context.Context, kubernetesProvider *kubernetes.Provider, opts *kubernetes.ApiServerOptions) error {
isDefaultStorageClassAvailable, err := kubernetesProvider.IsDefaultStorageProviderAvailable(ctx)
volumeClaimCreated := false
if err != nil {
return err
}
if isDefaultStorageClassAvailable {
if _, err = kubernetesProvider.CreatePersistentVolumeClaim(ctx, config.Config.MizuResourcesNamespace, kubernetes.PersistentVolumeClaimName, config.Config.Tap.MaxEntriesDBSizeBytes()+mizu.DaemonModePersistentVolumeSizeBufferBytes); err != nil {
logger.Log.Warningf(uiUtils.Yellow, "An error has occured while creating a persistent volume claim for mizu, this will mean that mizu's data will be lost on pod restart")
logger.Log.Debugf("error creating persistent volume claim: %v", err)
} else {
volumeClaimCreated = true
}
} else {
logger.Log.Warningf(uiUtils.Yellow, "Could not find default volume provider in this cluster, this will mean that mizu's data will be lost on pod restart")
if !config.Config.Tap.NoPersistentVolumeClaim {
volumeClaimCreated = TryToCreatePersistentVolumeClaim(ctx, kubernetesProvider)
}
pod, err := kubernetesProvider.GetMizuApiServerPodObject(opts, volumeClaimCreated, kubernetes.PersistentVolumeClaimName)
@@ -386,6 +391,26 @@ func createMizuApiServerDeployment(ctx context.Context, kubernetesProvider *kube
return nil
}
func TryToCreatePersistentVolumeClaim(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool {
isDefaultStorageClassAvailable, err := kubernetesProvider.IsDefaultStorageProviderAvailable(ctx)
if err != nil {
logger.Log.Warningf(uiUtils.Yellow, "An error occured when checking if a default storage provider exists in this cluster, this means mizu data will be lost on mizu-api-server pod restart")
logger.Log.Debugf("error checking if default storage class exists: %v", err)
return false
} else if !isDefaultStorageClassAvailable {
logger.Log.Warningf(uiUtils.Yellow, "Could not find default storage provider in this cluster, this means mizu data will be lost on mizu-api-server pod restart")
return false
}
if _, err = kubernetesProvider.CreatePersistentVolumeClaim(ctx, config.Config.MizuResourcesNamespace, kubernetes.PersistentVolumeClaimName, config.Config.Tap.MaxEntriesDBSizeBytes()+mizu.DaemonModePersistentVolumeSizeBufferBytes); err != nil {
logger.Log.Warningf(uiUtils.Yellow, "An error has occured while creating a persistent volume claim for mizu, this means mizu data will be lost on mizu-api-server pod restart")
logger.Log.Debugf("error creating persistent volume claim: %v", err)
return false
}
return true
}
func getMizuApiFilteringOptions() (*api.TrafficFilteringOptions, error) {
var compiledRegexSlice []*api.SerializableRegexp
@@ -420,45 +445,6 @@ func getSyncEntriesConfig() *shared.SyncEntriesConfig {
}
}
func finishMizuExecution(kubernetesProvider *kubernetes.Provider, apiProvider *apiserver.Provider) {
telemetry.ReportAPICalls(apiProvider)
removalCtx, cancel := context.WithTimeout(context.Background(), cleanupTimeout)
defer cancel()
dumpLogsIfNeeded(removalCtx, kubernetesProvider)
cleanUpMizuResources(removalCtx, cancel, kubernetesProvider)
}
func dumpLogsIfNeeded(ctx context.Context, kubernetesProvider *kubernetes.Provider) {
if !config.Config.DumpLogs {
return
}
mizuDir := mizu.GetMizuFolderPath()
filePath := path.Join(mizuDir, fmt.Sprintf("mizu_logs_%s.zip", time.Now().Format("2006_01_02__15_04_05")))
if err := fsUtils.DumpLogs(ctx, kubernetesProvider, filePath); err != nil {
logger.Log.Errorf("Failed dump logs %v", err)
}
}
func cleanUpMizuResources(ctx context.Context, cancel context.CancelFunc, kubernetesProvider *kubernetes.Provider) {
logger.Log.Infof("\nRemoving mizu resources\n")
var leftoverResources []string
if config.Config.IsNsRestrictedMode() {
leftoverResources = cleanUpRestrictedMode(ctx, kubernetesProvider)
} else {
leftoverResources = cleanUpNonRestrictedMode(ctx, cancel, kubernetesProvider)
}
if len(leftoverResources) > 0 {
errMsg := fmt.Sprintf("Failed to remove the following resources, for more info check logs at %s:", fsUtils.GetLogFilePath())
for _, resource := range leftoverResources {
errMsg += "\n- " + resource
}
logger.Log.Errorf(uiUtils.Error, errMsg)
}
}
func cleanUpRestrictedMode(ctx context.Context, kubernetesProvider *kubernetes.Provider) []string {
leftoverResources := make([]string, 0)
@@ -569,7 +555,8 @@ func waitUntilNamespaceDeleted(ctx context.Context, cancel context.CancelFunc, k
func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", kubernetes.ApiServerPodName))
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, []string{config.Config.MizuResourcesNamespace}, podExactRegex)
podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper)
isPodReady := false
timeAfter := time.After(25 * time.Second)
for {
@@ -590,12 +577,19 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
logger.Log.Infof("%s removed", kubernetes.ApiServerPodName)
cancel()
return
case modifiedPod, ok := <-modified:
case wEvent, ok := <-modified:
if !ok {
modified = nil
continue
}
modifiedPod, err := wEvent.ToPod()
if err != nil {
logger.Log.Errorf(uiUtils.Error, err)
cancel()
continue
}
logger.Log.Debugf("Watching API Server pod loop, modified: %v", modifiedPod.Status.Phase)
if modifiedPod.Status.Phase == core.PodPending {
@@ -625,8 +619,8 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
break
}
logger.Log.Infof("Mizu is available at %s\n", url)
if !config.Config.Tap.HeadlessMode {
logger.Log.Infof("Mizu is available at %s", url)
if !config.Config.HeadlessMode {
uiUtils.OpenBrowser(url)
}
if err := apiProvider.ReportTappedPods(state.tapperSyncer.CurrentlyTappedPods); err != nil {
@@ -656,34 +650,57 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.TapperDaemonSetName))
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, []string{config.Config.MizuResourcesNamespace}, podExactRegex)
podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper)
var prevPodPhase core.PodPhase
for {
select {
case addedPod, ok := <-added:
case wEvent, ok := <-added:
if !ok {
added = nil
continue
}
addedPod, err := wEvent.ToPod()
if err != nil {
logger.Log.Errorf(uiUtils.Error, err)
cancel()
continue
}
logger.Log.Debugf("Tapper is created [%s]", addedPod.Name)
case removedPod, ok := <-removed:
case wEvent, ok := <-removed:
if !ok {
removed = nil
continue
}
removedPod, err := wEvent.ToPod()
if err != nil {
logger.Log.Errorf(uiUtils.Error, err)
cancel()
continue
}
logger.Log.Debugf("Tapper is removed [%s]", removedPod.Name)
case modifiedPod, ok := <-modified:
case wEvent, ok := <-modified:
if !ok {
modified = nil
continue
}
modifiedPod, err := wEvent.ToPod()
if err != nil {
logger.Log.Errorf(uiUtils.Error, err)
cancel()
continue
}
if modifiedPod.Status.Phase == core.PodPending && modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue {
logger.Log.Infof(uiUtils.Red, fmt.Sprintf("Wasn't able to deploy the tapper %s. Reason: \"%s\"", modifiedPod.Name, modifiedPod.Status.Conditions[0].Message))
cancel()
break
continue
}
podStatus := modifiedPod.Status

View File

@@ -56,9 +56,11 @@ func runMizuView() {
return
}
logger.Log.Infof("Mizu is available at %s\n", url)
logger.Log.Infof("Mizu is available at %s", url)
uiUtils.OpenBrowser(url)
if !config.Config.HeadlessMode {
uiUtils.OpenBrowser(url)
}
if isCompatible, err := version.CheckVersionCompatibility(apiServerProvider); err != nil {
logger.Log.Errorf("Failed to check versions compatibility %v", err)

View File

@@ -52,6 +52,10 @@ func InitConfig(cmd *cobra.Command) error {
cmd.Flags().Visit(initFlag)
if err := Config.validate(); err != nil {
return fmt.Errorf("config validation failed, err: %v", err)
}
finalConfigPrettified, _ := uiUtils.PrettyJson(Config)
logger.Log.Debugf("Init config finished\n Final config: %v", finalConfigPrettified)
@@ -392,12 +396,13 @@ func getMizuAgentConfig(targetNamespaces []string, mizuApiFilteringOptions *api.
TargetNamespaces: targetNamespaces,
AgentImage: Config.AgentImage,
PullPolicy: Config.ImagePullPolicyStr,
DumpLogs: Config.DumpLogs,
LogLevel: Config.LogLevel(),
IgnoredUserAgents: Config.Tap.IgnoredUserAgents,
TapperResources: Config.Tap.TapperResources,
MizuResourcesNamespace: Config.MizuResourcesNamespace,
MizuApiFilteringOptions: *mizuApiFilteringOptions,
AgentDatabasePath: shared.DataDirPath,
Istio: Config.Tap.Istio,
}
return &config, nil
}

View File

@@ -2,6 +2,7 @@ package config
import (
"fmt"
"github.com/op/go-logging"
"github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/mizu"
v1 "k8s.io/api/core/v1"
@@ -31,6 +32,16 @@ type ConfigStruct struct {
DumpLogs bool `yaml:"dump-logs" default:"false"`
KubeConfigPathStr string `yaml:"kube-config-path"`
ConfigFilePath string `yaml:"config-path,omitempty" readonly:""`
HeadlessMode bool `yaml:"headless" default:"false"`
LogLevelStr string `yaml:"log-level,omitempty" default:"INFO" readonly:""`
}
func(config *ConfigStruct) validate() error {
if _, err := logging.LogLevel(config.LogLevelStr); err != nil {
return fmt.Errorf("%s is not a valid log level, err: %v", config.LogLevelStr, err)
}
return nil
}
func (config *ConfigStruct) SetDefaults() {
@@ -59,3 +70,8 @@ func (config *ConfigStruct) KubeConfigPath() string {
home := homedir.HomeDir()
return filepath.Join(home, ".kube", "config")
}
func (config *ConfigStruct) LogLevel() logging.Level {
logLevel, _ := logging.LogLevel(config.LogLevelStr)
return logLevel
}

View File

@@ -3,6 +3,8 @@ package configStructs
import (
"errors"
"fmt"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared/logger"
"regexp"
"github.com/up9inc/mizu/shared"
@@ -23,30 +25,31 @@ const (
EnforcePolicyFile = "traffic-validation-file"
ContractFile = "contract"
DaemonModeTapName = "daemon"
HeadlessMode = "headless"
IstioName = "istio"
)
type TapConfig struct {
UploadIntervalSec int `yaml:"upload-interval" default:"10"`
PodRegexStr string `yaml:"regex" default:".*"`
GuiPort uint16 `yaml:"gui-port" default:"8899"`
ProxyHost string `yaml:"proxy-host" default:"127.0.0.1"`
Namespaces []string `yaml:"namespaces"`
Analysis bool `yaml:"analysis" default:"false"`
AllNamespaces bool `yaml:"all-namespaces" default:"false"`
PlainTextFilterRegexes []string `yaml:"regex-masking"`
IgnoredUserAgents []string `yaml:"ignored-user-agents"`
DisableRedaction bool `yaml:"no-redact" default:"false"`
HumanMaxEntriesDBSize string `yaml:"max-entries-db-size" default:"200MB"`
DryRun bool `yaml:"dry-run" default:"false"`
Workspace string `yaml:"workspace"`
EnforcePolicyFile string `yaml:"traffic-validation-file"`
ContractFile string `yaml:"contract"`
AskUploadConfirmation bool `yaml:"ask-upload-confirmation" default:"true"`
ApiServerResources shared.Resources `yaml:"api-server-resources"`
TapperResources shared.Resources `yaml:"tapper-resources"`
DaemonMode bool `yaml:"daemon" default:"false"`
HeadlessMode bool `yaml:"headless" default:"false"`
UploadIntervalSec int `yaml:"upload-interval" default:"10"`
PodRegexStr string `yaml:"regex" default:".*"`
GuiPort uint16 `yaml:"gui-port" default:"8899"`
ProxyHost string `yaml:"proxy-host" default:"127.0.0.1"`
Namespaces []string `yaml:"namespaces"`
Analysis bool `yaml:"analysis" default:"false"`
AllNamespaces bool `yaml:"all-namespaces" default:"false"`
PlainTextFilterRegexes []string `yaml:"regex-masking"`
IgnoredUserAgents []string `yaml:"ignored-user-agents"`
DisableRedaction bool `yaml:"no-redact" default:"false"`
HumanMaxEntriesDBSize string `yaml:"max-entries-db-size" default:"200MB"`
DryRun bool `yaml:"dry-run" default:"false"`
Workspace string `yaml:"workspace"`
EnforcePolicyFile string `yaml:"traffic-validation-file"`
ContractFile string `yaml:"contract"`
AskUploadConfirmation bool `yaml:"ask-upload-confirmation" default:"true"`
ApiServerResources shared.Resources `yaml:"api-server-resources"`
TapperResources shared.Resources `yaml:"tapper-resources"`
DaemonMode bool `yaml:"daemon" default:"false"`
NoPersistentVolumeClaim bool `yaml:"no-persistent-volume-claim" default:"false"`
Istio bool `yaml:"istio" default:"false"`
}
func (config *TapConfig) PodRegex() *regexp.Regexp {
@@ -81,5 +84,9 @@ func (config *TapConfig) Validate() error {
return errors.New(fmt.Sprintf("Can't run with both --%s and --%s flags", AnalysisTapName, WorkspaceTapName))
}
if config.NoPersistentVolumeClaim && !config.DaemonMode {
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("the --set tap.no-persistent-volume-claim=true flag has no effect without the --%s flag, the claim will not be created anyway.", DaemonModeTapName))
}
return nil
}

View File

@@ -8,6 +8,7 @@ require (
github.com/getkin/kin-openapi v0.79.0
github.com/google/go-github/v37 v37.0.0
github.com/google/uuid v1.1.2
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
github.com/spf13/cobra v1.1.3
github.com/spf13/pflag v1.0.5
github.com/up9inc/mizu/shared v0.0.0

View File

@@ -78,6 +78,6 @@ func DumpLogs(ctx context.Context, provider *kubernetes.Provider, filePath strin
logger.Log.Debugf("Successfully added file %s", GetLogFilePath())
}
logger.Log.Infof("You can find the zip file with all logs in %s\n", filePath)
logger.Log.Infof("You can find the zip file with all logs in %s", filePath)
return nil
}

31
cli/up9/provider.go Normal file
View File

@@ -0,0 +1,31 @@
package up9
import (
"fmt"
"net/http"
"net/url"
)
func IsTokenValid(tokenString string, envName string) bool {
whoAmIUrl, _ := url.Parse(fmt.Sprintf("https://trcc.%s/admin/whoami", envName))
req := &http.Request{
Method: http.MethodGet,
URL: whoAmIUrl,
Header: map[string][]string{
"Authorization": {fmt.Sprintf("bearer %s", tokenString)},
},
}
response, err := http.DefaultClient.Do(req)
if err != nil {
return false
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
return false
}
return true
}

View File

@@ -37,8 +37,8 @@ COPY agent .
RUN go build -gcflags="all=-N -l" -o mizuagent .
# Download Basenine executable, verify the sha1sum and move it to a directory in $PATH
ADD https://github.com/up9inc/basenine/releases/download/v0.2.6/basenine_linux_amd64 ./basenine_linux_amd64
ADD https://github.com/up9inc/basenine/releases/download/v0.2.6/basenine_linux_amd64.sha256 ./basenine_linux_amd64.sha256
ADD https://github.com/up9inc/basenine/releases/download/v0.2.10/basenine_linux_amd64 ./basenine_linux_amd64
ADD https://github.com/up9inc/basenine/releases/download/v0.2.10/basenine_linux_amd64.sha256 ./basenine_linux_amd64.sha256
RUN shasum -a 256 -c basenine_linux_amd64.sha256
RUN chmod +x ./basenine_linux_amd64
@@ -48,7 +48,7 @@ RUN cd .. && /bin/bash build_extensions_debug.sh
FROM golang:1.16-alpine
RUN apk add bash libpcap-dev tcpdump
RUN apk add bash libpcap-dev
WORKDIR /app

80
docs/DAEMON_MODE.md Normal file
View File

@@ -0,0 +1,80 @@
# Mizu daemon mode
Mizu can be run detached from the cli using the daemon flag: `mizu tap --daemon`. This type of mizu instance will run
indefinitely in the cluster.
Please note that daemon mode requires you to have RBAC creation permissions, see the [permissions](PERMISSIONS.md)
doc for more details.
```bash
$ mizu tap "^ca.*" --daemon
Mizu will store up to 200MB of traffic, old traffic will be cleared once the limit is reached.
Tapping pods in namespaces "sock-shop"
Waiting for mizu to be ready... (may take a few minutes)
+carts-66c77f5fbb-fq65r
+catalogue-5f4cb7cf5-7zrmn
..
```
## Stop mizu daemon
To stop the detached mizu instance and clean all cluster side resources, run `mizu clean`
```bash
$ mizu clean # mizu will continue running in cluster until clean is executed
Removing mizu resources
```
## Expose mizu web app
Mizu could be exposed at a later stage in any of the following ways:
### Using mizu view command
In a machine that can access both the cluster and a browser, you can run `mizu view` command which creates a proxy.
Besides that, all the regular ways to expose k8s pods are valid.
```bash
$ mizu view
Establishing connection to k8s cluster...
Mizu is available at http://localhost:8899
^C
..
```
### Port Forward
```bash
$ kubectl port-forward -n mizu deployment/mizu-api-server 8899:8899
```
### NodePort
```bash
$ kubectl expose -n mizu deployment mizu-api-server --name mizu-node-port --type NodePort --port 80 --target-port 8899
```
Mizu's IP is the IP of any node (get the IP with `kubectl get nodes -o wide`) and the port is the target port of the new
service (`kubectl get services -n mizu mizu-node-port`). Note that this method will expose Mizu to public access if your
nodes are public.
### LoadBalancer
```bash
$ kubectl expose deployment -n mizu --port 80 --target-port 8899 mizu-api-server --type=LoadBalancer --name=mizu-lb
service/mizu-lb exposed
..
$ kubectl get services -n mizu
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
mizu-api-server ClusterIP 10.107.200.100 <none> 80/TCP 5m5s
mizu-lb LoadBalancer 10.107.200.101 34.77.120.116 80:30141/TCP 76s
```
Note that `LoadBalancer` services only work on supported clusters (usually cloud providers) and might incur extra costs
If you changed the `mizu-resources-namespace` value, make sure the `-n mizu` flag of the `kubectl expose` command is
changed to the value of `mizu-resources-namespace`
mizu will now be available both by running `mizu view` or by accessing the `EXTERNAL-IP` of the `mizu-lb` service
through your browser.

46
docs/ISTIO.md Normal file
View File

@@ -0,0 +1,46 @@
![Mizu: The API Traffic Viewer for Kubernetes](../assets/mizu-logo.svg)
# Istio mutual tls (mtls) with Mizu
This document describe how Mizu tapper handles workloads configured with mtls, making the internal traffic between services in a cluster to be encrypted.
Besides Istio there are other service meshes that implement mtls. However, as of now Istio is the most used one, and this is why we are focusing on it.
In order to create an Istio setup for development, follow those steps:
1. Deploy a sample application to a Kubernetes cluster, the sample application needs to make internal service to service calls
2. SSH to one of the nodes, and run `tcpdump`
3. Make sure you see the internal service to service calls in a plain text
4. Deploy Istio to the cluster - make sure it is attached to all pods of the sample application, and that it is configured with mtls (default)
5. Run `tcpdump` again, make sure you don't see the internal service to service calls in a plain text
## The connection between Istio and Envoy
In order to implement its service mesh capabilities, [Istio](https://istio.io) use an [Envoy](https://www.envoyproxy.io) sidecar in front of every pod in the cluster. The Envoy is responsible for the mtls communication, and that's why we are focusing on Envoy proxy.
In the future we might see more players in that field, then we'll have to either add support for each of them or go with a unified eBPF solution.
## Network namespaces
A [linux network namespace](https://man7.org/linux/man-pages/man7/network_namespaces.7.html) is an isolation that limit the process view of the network. In the container world it used to isolate one container from another. In the Kubernetes world it used to isolate a pod from another. That means that two containers running on the same pod share the same network namespace. A container can reach a container in the same pod by accessing `localhost`.
An Envoy proxy configured with mtls receives the inbound traffic directed to the pod, decrypts it and sends it via `localhost` to the target container.
## Tapping mtls traffic
In order for Mizu to be able to see the decrypted traffic it needs to listen on the same network namespace of the target pod. Multiple threads of the same process can have different network namespaces.
[gopacket](https://github.com/google/gopacket) uses [libpacp](https://github.com/the-tcpdump-group/libpcap) by default for capturing the traffic. Libpacap doesn't support network namespaces and we can't ask it to listen to traffic on a different namespace. However, we can change the network namespace of the calling thread and then start libpcap to see the traffic on a different namespace.
## Finding the network namespace of a running process
The network namespace of a running process can be found in `/proc/PID/ns/net` link. Once we have this link, we can ask Linux to change the network namespace of a thread to this one.
This mean that Mizu needs to have access to the `/proc` (procfs) of the running node.
## Finding the network namespace of a running pod
In order for Mizu to be able to listen to mtls traffic, it needs to get the PIDs of the the running pods, filter them according to the user filters and then start listen to their internal network namespace traffic.
There is no official way in Kubernetes to get from pod to PID. The CRI implementation purposefully doesn't force a pod to be a processes on the host. It can be a Virtual Machine as well like [Kata containers](https://katacontainers.io)
While we can provide a solution for various CRIs (like Docker, Containerd and CRI-O) it's better to have a unified solution. In order to achieve that, Mizu scans all the processes in the host, and finds the Envoy processes using their `/proc/PID/exe` link.
Once Mizu detects an Envoy process, it need to check whether this specific Envoy process is relevant according the user filters. The user filters are a list of `CLUSTER_IPS`. The tapper gets them via the `TapOpts.FilterAuthorities` list.
Istio sends an `INSTANCE_IP` environment variable to every Envoy proxy process. By examining the Envoy process's environment variables we can see whether it's relevant or not. Examining a process environment variables is done by reading the `/proc/PID/envion` file.
## Edge cases
The method we use to find Envoy processes and correlate them to the cluster IPs may be inaccurate in certain situations. If, for example, a user runs an Envoy process manually, and set its `INSTANCE_IP` environment variable to one of the `CLUSTER_IPS` the tapper gets, then Mizu will capture traffic for it.

View File

@@ -7,15 +7,15 @@ rules:
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list", "watch", "delete"]
- apiGroups: [ "" ]
- apiGroups: [ "apps" ]
resources: [ "deployments" ]
verbs: [ "create", "delete" ]
verbs: [ "get", "create", "delete" ]
- apiGroups: [""]
resources: ["services"]
verbs: ["get", "list", "watch", "create", "delete"]
- apiGroups: ["apps"]
resources: ["daemonsets"]
verbs: ["create", "patch", "delete"]
verbs: ["get", "create", "patch", "delete", "list"]
- apiGroups: [""]
resources: ["namespaces"]
verbs: ["get", "list", "watch", "create", "delete"]

View File

@@ -8,7 +8,7 @@ rules:
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list", "watch", "delete"]
- apiGroups: [ "" ]
- apiGroups: [ "apps" ]
resources: [ "deployments" ]
verbs: [ "get", "create", "delete" ]
- apiGroups: [""]
@@ -16,7 +16,7 @@ rules:
verbs: ["get", "list", "watch", "create", "delete"]
- apiGroups: ["apps"]
resources: ["daemonsets"]
verbs: ["get", "create", "patch", "delete"]
verbs: ["get", "create", "patch", "delete", "list"]
- apiGroups: [""]
resources: ["services/proxy"]
verbs: ["get"]
@@ -32,7 +32,7 @@ rules:
- apiGroups: ["rbac.authorization.k8s.io"]
resources: ["rolebindings"]
verbs: ["get", "create", "delete"]
- apiGroups: ["apps", "extensions"]
- apiGroups: ["apps", "extensions", ""]
resources: ["pods"]
verbs: ["get", "list", "watch"]
- apiGroups: ["apps", "extensions"]

View File

@@ -13,7 +13,7 @@ const (
ConfigFileName = "mizu-config.json"
GoGCEnvVar = "GOGC"
DefaultApiServerPort = 8899
DebugModeEnvVar = "MIZU_DEBUG"
LogLevelEnvVar = "LOG_LEVEL"
BasenineHost = "localhost"
BaseninePort = "9099"
)

View File

@@ -3,6 +3,7 @@ package kubernetes
import (
"context"
"fmt"
"github.com/op/go-logging"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/debounce"
"github.com/up9inc/mizu/shared/logger"
@@ -15,18 +16,20 @@ import (
const updateTappersDelay = 5 * time.Second
type TappedPodChangeEvent struct {
Added []core.Pod
Removed []core.Pod
Added []core.Pod
Removed []core.Pod
ExpectedTapperAmount int
}
// MizuTapperSyncer uses a k8s pod watch to update tapper daemonsets when targeted pods are removed or created
type MizuTapperSyncer struct {
context context.Context
CurrentlyTappedPods []core.Pod
config TapperSyncerConfig
kubernetesProvider *Provider
TapPodChangesOut chan TappedPodChangeEvent
ErrorOut chan K8sTapManagerError
context context.Context
CurrentlyTappedPods []core.Pod
config TapperSyncerConfig
kubernetesProvider *Provider
TapPodChangesOut chan TappedPodChangeEvent
ErrorOut chan K8sTapManagerError
nodeToTappedPodIPMap map[string][]string
}
type TapperSyncerConfig struct {
@@ -36,10 +39,11 @@ type TapperSyncerConfig struct {
AgentImage string
TapperResources shared.Resources
ImagePullPolicy core.PullPolicy
DumpLogs bool
LogLevel logging.Level
IgnoredUserAgents []string
MizuApiFilteringOptions api.TrafficFilteringOptions
MizuServiceAccountExists bool
Istio bool
}
func CreateAndStartMizuTapperSyncer(ctx context.Context, kubernetesProvider *Provider, config TapperSyncerConfig) (*MizuTapperSyncer, error) {
@@ -65,7 +69,8 @@ func CreateAndStartMizuTapperSyncer(ctx context.Context, kubernetesProvider *Pro
}
func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() {
added, modified, removed, errorChan := FilteredWatch(tapperSyncer.context, tapperSyncer.kubernetesProvider, tapperSyncer.config.TargetNamespaces, &tapperSyncer.config.PodFilterRegex)
podWatchHelper := NewPodWatchHelper(tapperSyncer.kubernetesProvider, &tapperSyncer.config.PodFilterRegex)
added, modified, removed, errorChan := FilteredWatch(tapperSyncer.context, podWatchHelper, tapperSyncer.config.TargetNamespaces, podWatchHelper)
restartTappers := func() {
err, changeFound := tapperSyncer.updateCurrentlyTappedPods()
@@ -91,28 +96,48 @@ func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() {
for {
select {
case pod, ok := <-added:
case wEvent, ok := <-added:
if !ok {
added = nil
continue
}
pod, err := wEvent.ToPod()
if err != nil {
tapperSyncer.handleErrorInWatchLoop(err, restartTappersDebouncer)
continue
}
logger.Log.Debugf("Added matching pod %s, ns: %s", pod.Name, pod.Namespace)
restartTappersDebouncer.SetOn()
case pod, ok := <-removed:
case wEvent, ok := <-removed:
if !ok {
removed = nil
continue
}
pod, err := wEvent.ToPod()
if err != nil {
tapperSyncer.handleErrorInWatchLoop(err, restartTappersDebouncer)
continue
}
logger.Log.Debugf("Removed matching pod %s, ns: %s", pod.Name, pod.Namespace)
restartTappersDebouncer.SetOn()
case pod, ok := <-modified:
case wEvent, ok := <-modified:
if !ok {
modified = nil
continue
}
pod, err := wEvent.ToPod()
if err != nil {
tapperSyncer.handleErrorInWatchLoop(err, restartTappersDebouncer)
continue
}
logger.Log.Debugf("Modified matching pod %s, ns: %s, phase: %s, ip: %s", pod.Name, pod.Namespace, pod.Status.Phase, pod.Status.PodIP)
// Act only if the modified pod has already obtained an IP address.
// After filtering for IPs, on a normal pod restart this includes the following events:
@@ -129,12 +154,8 @@ func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() {
continue
}
logger.Log.Debugf("Watching pods loop, got error %v, stopping `restart tappers debouncer`", err)
restartTappersDebouncer.Cancel()
tapperSyncer.ErrorOut <- K8sTapManagerError{
OriginalError: err,
TapManagerReason: TapManagerPodWatchError,
}
tapperSyncer.handleErrorInWatchLoop(err, restartTappersDebouncer)
continue
case <-tapperSyncer.context.Done():
logger.Log.Debugf("Watching pods loop, context done, stopping `restart tappers debouncer`")
@@ -145,6 +166,15 @@ func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() {
}
}
func (tapperSyncer *MizuTapperSyncer) handleErrorInWatchLoop(err error, restartTappersDebouncer *debounce.Debouncer) {
logger.Log.Debugf("Watching pods loop, got error %v, stopping `restart tappers debouncer`", err)
restartTappersDebouncer.Cancel()
tapperSyncer.ErrorOut <- K8sTapManagerError{
OriginalError: err,
TapManagerReason: TapManagerPodWatchError,
}
}
func (tapperSyncer *MizuTapperSyncer) updateCurrentlyTappedPods() (err error, changesFound bool) {
if matchingPods, err := tapperSyncer.kubernetesProvider.ListAllRunningPodsMatchingRegex(tapperSyncer.context, &tapperSyncer.config.PodFilterRegex, tapperSyncer.config.TargetNamespaces); err != nil {
return err, false
@@ -159,9 +189,11 @@ func (tapperSyncer *MizuTapperSyncer) updateCurrentlyTappedPods() (err error, ch
}
if len(addedPods) > 0 || len(removedPods) > 0 {
tapperSyncer.CurrentlyTappedPods = podsToTap
tapperSyncer.nodeToTappedPodIPMap = GetNodeHostToTappedPodIpsMap(tapperSyncer.CurrentlyTappedPods)
tapperSyncer.TapPodChangesOut <- TappedPodChangeEvent{
Added: addedPods,
Removed: removedPods,
Added: addedPods,
Removed: removedPods,
ExpectedTapperAmount: len(tapperSyncer.nodeToTappedPodIPMap),
}
return nil, true
}
@@ -170,9 +202,7 @@ func (tapperSyncer *MizuTapperSyncer) updateCurrentlyTappedPods() (err error, ch
}
func (tapperSyncer *MizuTapperSyncer) updateMizuTappers() error {
nodeToTappedPodIPMap := GetNodeHostToTappedPodIpsMap(tapperSyncer.CurrentlyTappedPods)
if len(nodeToTappedPodIPMap) > 0 {
if len(tapperSyncer.nodeToTappedPodIPMap) > 0 {
var serviceAccountName string
if tapperSyncer.config.MizuServiceAccountExists {
serviceAccountName = ServiceAccountName
@@ -187,16 +217,17 @@ func (tapperSyncer *MizuTapperSyncer) updateMizuTappers() error {
tapperSyncer.config.AgentImage,
TapperPodName,
fmt.Sprintf("%s.%s.svc.cluster.local", ApiServerPodName, tapperSyncer.config.MizuResourcesNamespace),
nodeToTappedPodIPMap,
tapperSyncer.nodeToTappedPodIPMap,
serviceAccountName,
tapperSyncer.config.TapperResources,
tapperSyncer.config.ImagePullPolicy,
tapperSyncer.config.MizuApiFilteringOptions,
tapperSyncer.config.DumpLogs,
tapperSyncer.config.LogLevel,
tapperSyncer.config.Istio,
); err != nil {
return err
}
logger.Log.Debugf("Successfully created %v tappers", len(nodeToTappedPodIPMap))
logger.Log.Debugf("Successfully created %v tappers", len(tapperSyncer.nodeToTappedPodIPMap))
} else {
if err := tapperSyncer.kubernetesProvider.RemoveDaemonSet(tapperSyncer.context, tapperSyncer.config.MizuResourcesNamespace, TapperDaemonSetName); err != nil {
return err

View File

@@ -0,0 +1,45 @@
package kubernetes
import (
"context"
"regexp"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
)
type PodWatchHelper struct {
kubernetesProvider *Provider
NameRegexFilter *regexp.Regexp
}
func NewPodWatchHelper(kubernetesProvider *Provider, NameRegexFilter *regexp.Regexp) *PodWatchHelper {
return &PodWatchHelper{
kubernetesProvider: kubernetesProvider,
NameRegexFilter: NameRegexFilter,
}
}
// Implements the EventFilterer Interface
func (pwh *PodWatchHelper) Filter(wEvent *WatchEvent) (bool, error) {
pod, err := wEvent.ToPod()
if err != nil {
return false, nil
}
if !pwh.NameRegexFilter.MatchString(pod.Name) {
return false, nil
}
return true, nil
}
// Implements the WatchCreator Interface
func (pwh *PodWatchHelper) NewWatcher(ctx context.Context, namespace string) (watch.Interface, error) {
watcher, err := pwh.kubernetesProvider.clientSet.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{Watch: true})
if err != nil {
return nil, err
}
return watcher, nil
}

View File

@@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/op/go-logging"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/shared/semver"
@@ -152,14 +153,6 @@ func (provider *Provider) WaitUtilNamespaceDeleted(ctx context.Context, name str
return err
}
func (provider *Provider) GetPodWatcher(ctx context.Context, namespace string) watch.Interface {
watcher, err := provider.clientSet.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{Watch: true})
if err != nil {
panic(err.Error())
}
return watcher
}
func (provider *Provider) CreateNamespace(ctx context.Context, name string) (*core.Namespace, error) {
namespaceSpec := &core.Namespace{
ObjectMeta: metav1.ObjectMeta{
@@ -179,7 +172,7 @@ type ApiServerOptions struct {
MaxEntriesDBSizeBytes int64
Resources shared.Resources
ImagePullPolicy core.PullPolicy
DumpLogs bool
LogLevel logging.Level
}
func (provider *Provider) GetMizuApiServerPodObject(opts *ApiServerOptions, mountVolumeClaim bool, volumeClaimName string) (*core.Pod, error) {
@@ -248,11 +241,6 @@ func (provider *Provider) GetMizuApiServerPodObject(opts *ApiServerOptions, moun
port := intstr.FromInt(shared.DefaultApiServerPort)
debugMode := ""
if opts.DumpLogs {
debugMode = "1"
}
pod := &core.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: opts.PodName,
@@ -272,8 +260,8 @@ func (provider *Provider) GetMizuApiServerPodObject(opts *ApiServerOptions, moun
Value: string(marshaledSyncEntriesConfig),
},
{
Name: shared.DebugModeEnvVar,
Value: debugMode,
Name: shared.LogLevelEnvVar,
Value: opts.LogLevel.String(),
},
},
Resources: core.ResourceRequirements{
@@ -583,6 +571,11 @@ func (provider *Provider) RemoveDaemonSet(ctx context.Context, namespace string,
return provider.handleRemovalError(err)
}
func (provider *Provider) RemovePersistentVolumeClaim(ctx context.Context, namespace string, volumeClaimName string) error {
err := provider.clientSet.CoreV1().PersistentVolumeClaims(namespace).Delete(ctx, volumeClaimName, metav1.DeleteOptions{})
return provider.handleRemovalError(err)
}
func (provider *Provider) handleRemovalError(err error) error {
// Ignore NotFound - There is nothing to delete.
// Ignore Forbidden - Assume that a user could not have created the resource in the first place.
@@ -619,7 +612,7 @@ func (provider *Provider) CreateConfigMap(ctx context.Context, namespace string,
return nil
}
func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, apiServerPodIp string, nodeToTappedPodIPMap map[string][]string, serviceAccountName string, resources shared.Resources, imagePullPolicy core.PullPolicy, mizuApiFilteringOptions api.TrafficFilteringOptions, dumpLogs bool) error {
func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, apiServerPodIp string, nodeToTappedPodIPMap map[string][]string, serviceAccountName string, resources shared.Resources, imagePullPolicy core.PullPolicy, mizuApiFilteringOptions api.TrafficFilteringOptions, logLevel logging.Level, istio bool) error {
logger.Log.Debugf("Applying %d tapper daemon sets, ns: %s, daemonSetName: %s, podImage: %s, tapperPodName: %s", len(nodeToTappedPodIPMap), namespace, daemonSetName, podImage, tapperPodName)
if len(nodeToTappedPodIPMap) == 0 {
@@ -642,12 +635,10 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
"--tap",
"--api-server-address", fmt.Sprintf("ws://%s/wsTapper", apiServerPodIp),
"--nodefrag",
"--procfs", procfsMountPath,
}
debugMode := ""
if dumpLogs {
debugMode = "1"
if istio {
mizuCmd = append(mizuCmd, "--procfs", procfsMountPath, "--istio")
}
agentContainer := applyconfcore.Container()
@@ -657,7 +648,7 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
agentContainer.WithSecurityContext(applyconfcore.SecurityContext().WithPrivileged(true))
agentContainer.WithCommand(mizuCmd...)
agentContainer.WithEnv(
applyconfcore.EnvVar().WithName(shared.DebugModeEnvVar).WithValue(debugMode),
applyconfcore.EnvVar().WithName(shared.LogLevelEnvVar).WithValue(logLevel.String()),
applyconfcore.EnvVar().WithName(shared.HostModeEnvVar).WithValue("1"),
applyconfcore.EnvVar().WithName(shared.TappedAddressesPerNodeDictEnvVar).WithValue(string(nodeToTappedPodIPMapJsonStr)),
applyconfcore.EnvVar().WithName(shared.GoGCEnvVar).WithValue("12800"),
@@ -868,10 +859,6 @@ func (provider *Provider) CreatePersistentVolumeClaim(ctx context.Context, names
return provider.clientSet.CoreV1().PersistentVolumeClaims(namespace).Create(ctx, volumeClaim, metav1.CreateOptions{})
}
func (provider *Provider) RemovePersistentVolumeClaim(ctx context.Context, namespace string, volumeClaimName string) error {
return provider.clientSet.CoreV1().PersistentVolumeClaims(namespace).Delete(ctx, volumeClaimName, metav1.DeleteOptions{})
}
func getClientSet(config *restclient.Config) (*kubernetes.Clientset, error) {
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {

View File

@@ -6,19 +6,25 @@ import (
"fmt"
"github.com/up9inc/mizu/shared/debounce"
"github.com/up9inc/mizu/shared/logger"
"regexp"
"sync"
"time"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/watch"
)
func FilteredWatch(ctx context.Context, kubernetesProvider *Provider, targetNamespaces []string, podFilter *regexp.Regexp) (chan *corev1.Pod, chan *corev1.Pod, chan *corev1.Pod, chan error) {
addedChan := make(chan *corev1.Pod)
modifiedChan := make(chan *corev1.Pod)
removedChan := make(chan *corev1.Pod)
type EventFilterer interface {
Filter(*WatchEvent) (bool, error)
}
type WatchCreator interface {
NewWatcher(ctx context.Context, namespace string) (watch.Interface, error)
}
func FilteredWatch(ctx context.Context, watcherCreator WatchCreator, targetNamespaces []string, filterer EventFilterer) (chan *WatchEvent, chan *WatchEvent, chan *WatchEvent, chan error) {
addedChan := make(chan *WatchEvent)
modifiedChan := make(chan *WatchEvent)
removedChan := make(chan *WatchEvent)
errorChan := make(chan error)
var wg sync.WaitGroup
@@ -31,8 +37,13 @@ func FilteredWatch(ctx context.Context, kubernetesProvider *Provider, targetName
watchRestartDebouncer := debounce.NewDebouncer(1 * time.Minute, func() {})
for {
watcher := kubernetesProvider.GetPodWatcher(ctx, targetNamespace)
err := startWatchLoop(ctx, watcher, podFilter, addedChan, modifiedChan, removedChan) // blocking
watcher, err := watcherCreator.NewWatcher(ctx, targetNamespace)
if err != nil {
errorChan <- fmt.Errorf("error in k8 watch: %v", err)
break
}
err = startWatchLoop(ctx, watcher, filterer, addedChan, modifiedChan, removedChan) // blocking
watcher.Stop()
select {
@@ -72,7 +83,7 @@ func FilteredWatch(ctx context.Context, kubernetesProvider *Provider, targetName
return addedChan, modifiedChan, removedChan, errorChan
}
func startWatchLoop(ctx context.Context, watcher watch.Interface, podFilter *regexp.Regexp, addedChan chan *corev1.Pod, modifiedChan chan *corev1.Pod, removedChan chan *corev1.Pod) error {
func startWatchLoop(ctx context.Context, watcher watch.Interface, filterer EventFilterer, addedChan chan *WatchEvent, modifiedChan chan *WatchEvent, removedChan chan *WatchEvent) error {
resultChan := watcher.ResultChan()
for {
select {
@@ -81,26 +92,25 @@ func startWatchLoop(ctx context.Context, watcher watch.Interface, podFilter *reg
return nil
}
if e.Type == watch.Error {
return apierrors.FromObject(e.Object)
wEvent := WatchEvent(e)
if wEvent.Type == watch.Error {
return apierrors.FromObject(wEvent.Object)
}
pod, ok := e.Object.(*corev1.Pod)
if !ok {
if pass, err := filterer.Filter(&wEvent); err != nil {
return err
} else if !pass {
continue
}
if !podFilter.MatchString(pod.Name) {
continue
}
switch e.Type {
switch wEvent.Type {
case watch.Added:
addedChan <- pod
addedChan <- &wEvent
case watch.Modified:
modifiedChan <- pod
modifiedChan <- &wEvent
case watch.Deleted:
removedChan <- pod
removedChan <- &wEvent
}
case <-ctx.Done():
return nil

View File

@@ -0,0 +1,18 @@
package kubernetes
import (
"fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/watch"
)
type WatchEvent watch.Event
func (we *WatchEvent) ToPod() (*corev1.Pod, error) {
pod, ok := we.Object.(*corev1.Pod)
if !ok {
return nil, fmt.Errorf("Invalid object type on pod event stream")
}
return pod, nil
}

View File

@@ -9,7 +9,7 @@ import (
var Log = logging.MustGetLogger("mizu")
var format = logging.MustStringFormatter(
`%{time} %{level:.5s} ▶ %{pid} %{shortfile} %{shortfunc} ▶ %{message}`,
`%{time:2006-01-02T15:04:05.999Z-07:00} %{level:-5s} ▶ %{message} ▶ %{pid} %{shortfile} %{shortfunc}`,
)
func InitLogger(logPath string) {

View File

@@ -1,9 +1,10 @@
package shared
import (
"github.com/op/go-logging"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap/api"
"io/ioutil"
"log"
"strings"
"gopkg.in/yaml.v3"
@@ -36,12 +37,13 @@ type MizuAgentConfig struct {
TargetNamespaces []string `json:"targetNamespaces"`
AgentImage string `json:"agentImage"`
PullPolicy string `json:"pullPolicy"`
DumpLogs bool `json:"dumpLogs"`
LogLevel logging.Level `json:"logLevel"`
IgnoredUserAgents []string `json:"ignoredUserAgents"`
TapperResources Resources `json:"tapperResources"`
MizuResourcesNamespace string `json:"mizuResourceNamespace"`
MizuApiFilteringOptions api.TrafficFilteringOptions `json:"mizuApiFilteringOptions"`
AgentDatabasePath string `json:"agentDatabasePath"`
Istio bool `json:"istio"`
}
type WebSocketMessageMetadata struct {
@@ -140,14 +142,12 @@ func (r *RulePolicy) validateType() bool {
permitedTypes := []string{"json", "header", "slo"}
_, found := Find(permitedTypes, r.Type)
if !found {
log.Printf("Error: %s. ", r.Name)
log.Printf("Only json, header and slo types are supported on rule definition. This rule will be ignored\n")
logger.Log.Errorf("Only json, header and slo types are supported on rule definition. This rule will be ignored. rule name: %s", r.Name)
found = false
}
if strings.ToLower(r.Type) == "slo" {
if r.ResponseTime <= 0 {
log.Printf("Error: %s. ", r.Name)
log.Printf("When type=slo, the field response-time should be specified and have a value >= 1\n\n")
logger.Log.Errorf("When rule type is slo, the field response-time should be specified and have a value >= 1. rule name: %s", r.Name)
found = false
}
}

View File

@@ -287,7 +287,7 @@ func (h HTTPPayload) MarshalJSON() ([]byte, error) {
RawResponse: &HTTPResponseWrapper{Response: h.Data.(*http.Response)},
})
default:
panic(fmt.Sprintf("HTTP payload cannot be marshaled: %s\n", h.Type))
panic(fmt.Sprintf("HTTP payload cannot be marshaled: %s", h.Type))
}
}

View File

@@ -39,7 +39,7 @@ func StartMemoryProfiler(envDumpPath string, envTimeInterval string) {
filename := fmt.Sprintf("%s/%s__mem.prof", dumpPath, t.Format("15_04_05"))
logger.Log.Infof("Writing memory profile to %s\n", filename)
logger.Log.Infof("Writing memory profile to %s", filename)
f, err := os.Create(filename)
if err != nil {

View File

@@ -37,7 +37,7 @@ func (d dissecting) Register(extension *api.Extension) {
}
func (d dissecting) Ping() {
log.Printf("pong %s\n", protocol.Name)
log.Printf("pong %s", protocol.Name)
}
const amqpRequest string = "amqp_request"
@@ -218,7 +218,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
default:
// log.Printf("unexpected frame: %+v\n", f)
// log.Printf("unexpected frame: %+v", f)
}
}
}

View File

@@ -58,7 +58,7 @@ func (d dissecting) Register(extension *api.Extension) {
}
func (d dissecting) Ping() {
log.Printf("pong %s\n", protocol.Name)
log.Printf("pong %s", protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
@@ -143,6 +143,10 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
}
}
if resDetails["bodySize"].(float64) < 0 {
resDetails["bodySize"] = 0
}
if item.Protocol.Version == "2.0" {
service = fmt.Sprintf("%s://%s", scheme, authority)
} else {
@@ -180,6 +184,9 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
}
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
if elapsedTime < 0 {
elapsedTime = 0
}
httpPair, _ := json.Marshal(item.Pair)
_protocol := protocol
_protocol.Version = item.Protocol.Version
@@ -418,7 +425,7 @@ func (d dissecting) Represent(protoIn api.Protocol, request map[string]interface
func (d dissecting) Macros() map[string]string {
return map[string]string{
`http`: fmt.Sprintf(`proto.abbr == "%s"`, protocol.Abbreviation),
`http`: fmt.Sprintf(`proto.abbr == "%s" and proto.version == "%s"`, protocol.Abbreviation, protocol.Version),
`grpc`: fmt.Sprintf(`proto.abbr == "%s" and proto.version == "%s"`, protocol.Abbreviation, http2Protocol.Version),
`http2`: fmt.Sprintf(`proto.abbr == "%s" and proto.version == "%s"`, protocol.Abbreviation, http2Protocol.Version),
}

View File

@@ -16,6 +16,7 @@ import (
)
const maskedFieldPlaceholderValue = "[REDACTED]"
const userAgent = "user-agent"
//these values MUST be all lower case and contain no `-` or `_` characters
var personallyIdentifiableDataFields = []string{"token", "authorization", "authentication", "cookie", "userid", "password",
@@ -32,7 +33,7 @@ func IsIgnoredUserAgent(item *api.OutputChannelItem, options *api.TrafficFilteri
request := item.Pair.Request.Payload.(api.HTTPPayload).Data.(*http.Request)
for headerKey, headerValues := range request.Header {
if strings.ToLower(headerKey) == "user-agent" {
if strings.ToLower(headerKey) == userAgent {
for _, userAgent := range options.IgnoredUserAgents {
for _, headerValue := range headerValues {
if strings.Contains(strings.ToLower(headerValue), strings.ToLower(userAgent)) {
@@ -89,6 +90,10 @@ func filterResponseBody(response *http.Response, options *api.TrafficFilteringOp
func filterHeaders(headers *http.Header) {
for key, _ := range *headers {
if strings.ToLower(key) == userAgent {
continue
}
if strings.ToLower(key) == "cookie" {
headers.Del(key)
} else if isFieldNameSensitive(key) {

View File

@@ -37,7 +37,7 @@ func (d dissecting) Register(extension *api.Extension) {
}
func (d dissecting) Ping() {
log.Printf("pong %s\n", _protocol.Name)
log.Printf("pong %s", _protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
@@ -149,6 +149,9 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
request["url"] = summary
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
if elapsedTime < 0 {
elapsedTime = 0
}
return &api.MizuEntry{
Protocol: _protocol,
Source: &api.TCP{

View File

@@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"log"
"time"
"github.com/up9inc/mizu/tap/api"
)
@@ -35,7 +36,7 @@ func (d dissecting) Register(extension *api.Extension) {
}
func (d dissecting) Ping() {
log.Printf("pong %s\n", protocol.Name)
log.Printf("pong %s", protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
@@ -82,6 +83,10 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
}
request["url"] = summary
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
if elapsedTime < 0 {
elapsedTime = 0
}
return &api.MizuEntry{
Protocol: protocol,
Source: &api.TCP{
@@ -104,7 +109,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
Service: service,
Timestamp: item.Timestamp,
StartTime: item.Pair.Request.CaptureTime,
ElapsedTime: 0,
ElapsedTime: elapsedTime,
Summary: summary,
ResolvedSource: resolvedSource,
ResolvedDestination: resolvedDestination,

View File

@@ -313,7 +313,7 @@ func (p *RedisProtocol) Read() (packet *RedisPacket, err error) {
packet.Value = fmt.Sprintf("%s]", packet.Value)
}
default:
msg := fmt.Sprintf("Unrecognized element in Redis array: %v\n", reflect.TypeOf(array[0]))
msg := fmt.Sprintf("Unrecognized element in Redis array: %v", reflect.TypeOf(array[0]))
err = errors.New(msg)
return
}
@@ -333,7 +333,7 @@ func (p *RedisProtocol) Read() (packet *RedisPacket, err error) {
case int64:
packet.Value = fmt.Sprintf("%d", x.(int64))
default:
msg := fmt.Sprintf("Unrecognized Redis data type: %v\n", reflect.TypeOf(x))
msg := fmt.Sprintf("Unrecognized Redis data type: %v", reflect.TypeOf(x))
err = errors.New(msg)
return
}

View File

@@ -50,14 +50,15 @@ var tstype = flag.String("timestamp_type", "", "Type of timestamps to use")
var promisc = flag.Bool("promisc", true, "Set promiscuous mode")
var staleTimeoutSeconds = flag.Int("staletimout", 120, "Max time in seconds to keep connections which don't transmit data")
var pids = flag.String("pids", "", "A comma separated list of PIDs to capture their network namespaces")
var istio = flag.Bool("istio", false, "Record decrypted traffic if the cluster configured with istio and mtls")
var memprofile = flag.String("memprofile", "", "Write memory profile")
type TapOpts struct {
HostMode bool
HostMode bool
FilterAuthorities []string
}
var hostMode bool // global
var extensions []*api.Extension // global
var filteringOptions *api.TrafficFilteringOptions // global
@@ -80,15 +81,18 @@ func inArrayString(arr []string, valueToCheck string) bool {
}
func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem, extensionsRef []*api.Extension, options *api.TrafficFilteringOptions) {
hostMode = opts.HostMode
extensions = extensionsRef
filteringOptions = options
if opts.FilterAuthorities == nil {
opts.FilterAuthorities = []string{}
}
if GetMemoryProfilingEnabled() {
diagnose.StartMemoryProfiler(os.Getenv(MemoryProfilingDumpPath), os.Getenv(MemoryProfilingTimeIntervalSeconds))
}
go startPassiveTapper(outputItems)
go startPassiveTapper(opts, outputItems)
}
func printPeriodicStats(cleaner *Cleaner) {
@@ -131,7 +135,7 @@ func printPeriodicStats(cleaner *Cleaner) {
}
}
func initializePacketSources() (*source.PacketSourceManager, error) {
func initializePacketSources(opts *TapOpts) (*source.PacketSourceManager, error) {
var bpffilter string
if len(flag.Args()) > 0 {
bpffilter = strings.Join(flag.Args(), " ")
@@ -146,17 +150,17 @@ func initializePacketSources() (*source.PacketSourceManager, error) {
BpfFilter: bpffilter,
}
return source.NewPacketSourceManager(*procfs, *pids, *fname, *iface, behaviour)
return source.NewPacketSourceManager(*procfs, *pids, *fname, *iface, *istio, opts.FilterAuthorities, behaviour)
}
func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
func startPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem) {
streamsMap := NewTcpStreamMap()
go streamsMap.closeTimedoutTcpStreamChannels()
diagnose.InitializeErrorsMap(*debug, *verbose, *quiet)
diagnose.InitializeTapperInternalStats()
sources, err := initializePacketSources()
sources, err := initializePacketSources(opts)
if err != nil {
logger.Log.Fatal(err)
@@ -169,7 +173,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
}
packets := make(chan source.TcpPacketInfo)
assembler := NewTcpAssembler(outputItems, streamsMap)
assembler := NewTcpAssembler(outputItems, streamsMap, opts)
diagnose.AppStats.SetStartTime(time.Now())
@@ -193,7 +197,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
}
if err := diagnose.DumpMemoryProfile(*memprofile); err != nil {
logger.Log.Errorf("Error dumping memory profile %v\n", err)
logger.Log.Errorf("Error dumping memory profile %v", err)
}
assembler.waitAndDump()

View File

@@ -18,24 +18,6 @@ const (
TcpStreamChannelTimeoutMsDefaultValue = 10000
)
type globalSettings struct {
filterAuthorities []string
}
var gSettings = &globalSettings{
filterAuthorities: []string{},
}
func SetFilterAuthorities(ipAddresses []string) {
gSettings.filterAuthorities = ipAddresses
}
func GetFilterIPs() []string {
addresses := make([]string, len(gSettings.filterAuthorities))
copy(addresses, gSettings.filterAuthorities)
return addresses
}
func GetMaxBufferedPagesTotal() int {
valueFromEnv, err := strconv.Atoi(os.Getenv(MaxBufferedPagesTotalEnvVarName))
if err != nil {

View File

@@ -0,0 +1,112 @@
package source
import (
"fmt"
"io/ioutil"
"os"
"regexp"
"strings"
"github.com/up9inc/mizu/shared/logger"
)
const envoyBinary = "/envoy"
var numberRegex = regexp.MustCompile("[0-9]+")
func discoverRelevantEnvoyPids(procfs string, clusterIps []string) ([]string, error) {
result := make([]string, 0)
pids, err := ioutil.ReadDir(procfs)
if err != nil {
return result, err
}
logger.Log.Infof("Starting envoy auto discoverer %v %v - scanning %v potential pids",
procfs, clusterIps, len(pids))
for _, pid := range pids {
if !pid.IsDir() {
continue
}
if !numberRegex.MatchString(pid.Name()) {
continue
}
if checkPid(procfs, pid.Name(), clusterIps) {
result = append(result, pid.Name())
}
}
logger.Log.Infof("Found %v relevant envoy processes - %v", len(result), result)
return result, nil
}
func checkPid(procfs string, pid string, clusterIps []string) bool {
execLink := fmt.Sprintf("%v/%v/exe", procfs, pid)
exec, err := os.Readlink(execLink)
if err != nil {
// Debug on purpose - it may happen due to many reasons and we only care
// for it during troubleshooting
//
logger.Log.Debugf("Unable to read link %v - %v\n", execLink, err)
return false
}
if !strings.HasSuffix(exec, envoyBinary) {
return false
}
environmentFile := fmt.Sprintf("%v/%v/environ", procfs, pid)
clusterIp, err := readEnvironmentVariable(environmentFile, "INSTANCE_IP")
if err != nil {
return false
}
if clusterIp == "" {
logger.Log.Debugf("Found an envoy process without INSTANCE_IP variable %v\n", pid)
return false
}
logger.Log.Infof("Found envoy pid %v with cluster ip %v", pid, clusterIp)
for _, value := range clusterIps {
if value == clusterIp {
return true
}
}
return false
}
func readEnvironmentVariable(file string, name string) (string, error) {
bytes, err := ioutil.ReadFile(file)
if err != nil {
logger.Log.Warningf("Error reading environment file %v - %v", file, err)
return "", err
}
envs := strings.Split(string(bytes), string([]byte{0}))
for _, env := range envs {
if !strings.Contains(env, "=") {
continue
}
parts := strings.Split(env, "=")
varName := parts[0]
value := parts[1]
if name == varName {
return value, nil
}
}
return "", nil
}

View File

@@ -15,26 +15,63 @@ type PacketSourceManager struct {
}
func NewPacketSourceManager(procfs string, pids string, filename string, interfaceName string,
behaviour TcpPacketSourceBehaviour) (*PacketSourceManager, error) {
istio bool, clusterIps []string, behaviour TcpPacketSourceBehaviour) (*PacketSourceManager, error) {
sources := make([]*tcpPacketSource, 0)
hostSource, err := newHostPacketSource(filename, interfaceName, behaviour)
sources, err := createHostSource(sources, filename, interfaceName, behaviour)
if err != nil {
return nil, err
}
sources = append(sources, hostSource)
if pids != "" {
netnsSources := newNetnsPacketSources(procfs, pids, interfaceName, behaviour)
sources = append(sources, netnsSources...)
}
sources = createSourcesFromPids(sources, procfs, pids, interfaceName, behaviour)
sources = createSourcesFromEnvoy(sources, istio, procfs, clusterIps, interfaceName, behaviour)
return &PacketSourceManager{
sources: sources,
}, nil
}
func createHostSource(sources []*tcpPacketSource, filename string, interfaceName string,
behaviour TcpPacketSourceBehaviour) ([]*tcpPacketSource, error) {
hostSource, err := newHostPacketSource(filename, interfaceName, behaviour)
if err != nil {
return sources, err
}
return append(sources, hostSource), nil
}
func createSourcesFromPids(sources []*tcpPacketSource, procfs string, pids string,
interfaceName string, behaviour TcpPacketSourceBehaviour) []*tcpPacketSource {
if pids == "" {
return sources
}
netnsSources := newNetnsPacketSources(procfs, strings.Split(pids, ","), interfaceName, behaviour)
sources = append(sources, netnsSources...)
return sources
}
func createSourcesFromEnvoy(sources []*tcpPacketSource, istio bool, procfs string, clusterIps []string,
interfaceName string, behaviour TcpPacketSourceBehaviour) []*tcpPacketSource {
if !istio {
return sources
}
envoyPids, err := discoverRelevantEnvoyPids(procfs, clusterIps)
if err != nil {
logger.Log.Warningf("Unable to discover envoy pids - %v", err)
return sources
}
netnsSources := newNetnsPacketSources(procfs, envoyPids, interfaceName, behaviour)
sources = append(sources, netnsSources...)
return sources
}
func newHostPacketSource(filename string, interfaceName string,
behaviour TcpPacketSourceBehaviour) (*tcpPacketSource, error) {
var name string
@@ -54,11 +91,11 @@ func newHostPacketSource(filename string, interfaceName string,
return source, nil
}
func newNetnsPacketSources(procfs string, pids string, interfaceName string,
func newNetnsPacketSources(procfs string, pids []string, interfaceName string,
behaviour TcpPacketSourceBehaviour) []*tcpPacketSource {
result := make([]*tcpPacketSource, 0)
for _, pidstr := range strings.Split(pids, ",") {
for _, pidstr := range pids {
pid, err := strconv.Atoi(pidstr)
if err != nil {
@@ -100,9 +137,9 @@ func newNetnsPacketSource(pid int, nsh netns.NsHandle, interfaceName string,
//
runtime.LockOSThread()
defer runtime.UnlockOSThread()
oldnetns, err := netns.Get()
if err != nil {
logger.Log.Errorf("Unable to get netns of current thread %v", err)
errors <- err

View File

@@ -33,13 +33,13 @@ func (c *context) GetCaptureInfo() gopacket.CaptureInfo {
return c.CaptureInfo
}
func NewTcpAssembler(outputItems chan *api.OutputChannelItem, streamsMap *tcpStreamMap) *tcpAssembler {
func NewTcpAssembler(outputItems chan *api.OutputChannelItem, streamsMap *tcpStreamMap, opts *TapOpts) *tcpAssembler {
var emitter api.Emitter = &api.Emitting{
AppStats: &diagnose.AppStats,
OutputChannel: outputItems,
}
streamFactory := NewTcpStreamFactory(emitter, streamsMap)
streamFactory := NewTcpStreamFactory(emitter, streamsMap, opts)
streamPool := reassembly.NewStreamPool(streamFactory)
assembler := reassembly.NewAssembler(streamPool)
@@ -78,7 +78,7 @@ func (a *tcpAssembler) processPackets(dumpPacket bool, packets <-chan source.Tcp
if *checksum {
err := tcp.SetNetworkLayerForChecksum(packet.NetworkLayer())
if err != nil {
logger.Log.Fatalf("Failed to set network layer for checksum: %s\n", err)
logger.Log.Fatalf("Failed to set network layer for checksum: %s", err)
}
}
c := context{

View File

@@ -66,7 +66,7 @@ func (h *tcpReader) Read(p []byte) (int, error) {
clientHello := tlsx.ClientHello{}
err := clientHello.Unmarshall(msg.bytes)
if err == nil {
logger.Log.Debugf("Detected TLS client hello with SNI %s\n", clientHello.SNI)
logger.Log.Debugf("Detected TLS client hello with SNI %s", clientHello.SNI)
// TODO: Throws `panic: runtime error: invalid memory address or nil pointer dereference` error.
// numericPort, _ := strconv.Atoi(h.tcpID.DstPort)
// h.outboundLinkWriter.WriteOutboundLink(h.tcpID.SrcIP, h.tcpID.DstIP, numericPort, clientHello.SNI, TLSProtocol)

View File

@@ -24,6 +24,7 @@ type tcpStreamFactory struct {
Emitter api.Emitter
streamsMap *tcpStreamMap
ownIps []string
opts *TapOpts
}
type tcpStreamWrapper struct {
@@ -31,7 +32,7 @@ type tcpStreamWrapper struct {
createdAt time.Time
}
func NewTcpStreamFactory(emitter api.Emitter, streamsMap *tcpStreamMap) *tcpStreamFactory {
func NewTcpStreamFactory(emitter api.Emitter, streamsMap *tcpStreamMap, opts *TapOpts) *tcpStreamFactory {
var ownIps []string
if localhostIPs, err := getLocalhostIPs(); err != nil {
@@ -47,6 +48,7 @@ func NewTcpStreamFactory(emitter api.Emitter, streamsMap *tcpStreamMap) *tcpStre
Emitter: emitter,
streamsMap: streamsMap,
ownIps: ownIps,
opts: opts,
}
}
@@ -139,17 +141,17 @@ func (factory *tcpStreamFactory) WaitGoRoutines() {
}
func (factory *tcpStreamFactory) getStreamProps(srcIP string, srcPort string, dstIP string, dstPort string) *streamProps {
if hostMode {
if inArrayString(gSettings.filterAuthorities, fmt.Sprintf("%s:%s", dstIP, dstPort)) {
if factory.opts.HostMode {
if inArrayString(factory.opts.FilterAuthorities, fmt.Sprintf("%s:%s", dstIP, dstPort)) {
logger.Log.Debugf("getStreamProps %s", fmt.Sprintf("+ host1 %s:%s", dstIP, dstPort))
return &streamProps{isTapTarget: true, isOutgoing: false}
} else if inArrayString(gSettings.filterAuthorities, dstIP) {
} else if inArrayString(factory.opts.FilterAuthorities, dstIP) {
logger.Log.Debugf("getStreamProps %s", fmt.Sprintf("+ host2 %s", dstIP))
return &streamProps{isTapTarget: true, isOutgoing: false}
} else if inArrayString(gSettings.filterAuthorities, fmt.Sprintf("%s:%s", srcIP, srcPort)) {
} else if inArrayString(factory.opts.FilterAuthorities, fmt.Sprintf("%s:%s", srcIP, srcPort)) {
logger.Log.Debugf("getStreamProps %s", fmt.Sprintf("+ host3 %s:%s", srcIP, srcPort))
return &streamProps{isTapTarget: true, isOutgoing: true}
} else if inArrayString(gSettings.filterAuthorities, srcIP) {
} else if inArrayString(factory.opts.FilterAuthorities, srcIP) {
logger.Log.Debugf("getStreamProps %s", fmt.Sprintf("+ host4 %s", srcIP))
return &streamProps{isTapTarget: true, isOutgoing: true}
}

View File

@@ -46,7 +46,7 @@ func (streamMap *tcpStreamMap) closeTimedoutTcpStreamChannels() {
if !stream.isClosed && time.Now().After(streamWrapper.createdAt.Add(tcpStreamChannelTimeout)) {
stream.Close()
diagnose.AppStats.IncDroppedTcpStreams()
logger.Log.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d\n",
logger.Log.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d",
diagnose.AppStats.DroppedTcpStreams, runtime.NumGoroutine(), tcpStreamChannelTimeout/1000000)
}
} else {

View File

@@ -33,7 +33,7 @@ func loadExtensions() ([]*tapApi.Extension, error) {
continue
}
logger.Log.Infof("Loading extension: %s\n", filename)
logger.Log.Infof("Loading extension: %s", filename)
extension := &tapApi.Extension{
Path: path.Join(extensionsDir, filename),
@@ -55,7 +55,7 @@ func loadExtensions() ([]*tapApi.Extension, error) {
dissector, ok := symDissector.(tapApi.Dissector)
if !ok {
return nil, errors.Errorf("Symbol Dissector type error: %v %T\n", file, symDissector)
return nil, errors.Errorf("Symbol Dissector type error: %v %T", file, symDissector)
}
dissector.Register(extension)
@@ -68,7 +68,7 @@ func loadExtensions() ([]*tapApi.Extension, error) {
})
for _, extension := range extensions {
logger.Log.Infof("Extension Properties: %+v\n", extension)
logger.Log.Infof("Extension Properties: %+v", extension)
}
return extensions, nil
@@ -92,7 +92,7 @@ func internalRun() error {
tap.StartPassiveTapper(&opts, outputItems, extenssions, &tapOpts)
logger.Log.Infof("Tapping, press enter to exit...\n")
logger.Log.Infof("Tapping, press enter to exit...")
reader := bufio.NewReader(os.Stdin)
reader.ReadLine()
return nil
@@ -104,9 +104,9 @@ func main() {
if err != nil {
switch err := err.(type) {
case *errors.Error:
logger.Log.Errorf("Error: %v\n", err.ErrorStack())
logger.Log.Errorf("Error: %v", err.ErrorStack())
default:
logger.Log.Errorf("Error: %v\n", err)
logger.Log.Errorf("Error: %v", err)
}
os.Exit(1)

View File

@@ -133,3 +133,6 @@ ignore:
SNYK-JS-WS-1296835:
- '*':
reason: Non given
SNYK-JS-JSONSCHEMA-1920922:
- '*':
reason: Non given

6
ui/package-lock.json generated
View File

@@ -13644,9 +13644,9 @@
}
},
"react-scrollable-feed-virtualized": {
"version": "1.4.3",
"resolved": "https://registry.npmjs.org/react-scrollable-feed-virtualized/-/react-scrollable-feed-virtualized-1.4.3.tgz",
"integrity": "sha512-M9WgJKr57jCyWKNCksc3oi+xhtO0YbL9d7Ll8Sdc5ZWOIstNvdNbNX0k4Nq6kXUVaHCJ9qE8omdSI/CxT3MLAQ=="
"version": "1.4.8",
"resolved": "https://registry.npmjs.org/react-scrollable-feed-virtualized/-/react-scrollable-feed-virtualized-1.4.8.tgz",
"integrity": "sha512-zsSO/9QB+4V6HEk39lxeMEUA6JFSZjfV4stw7RF17+vZdlVhyATsTBCzsj8hZywY4F29cBfH+3/GKrMhwmhAsw=="
},
"react-syntax-highlighter": {
"version": "15.4.3",

View File

@@ -23,7 +23,7 @@
"react-copy-to-clipboard": "^5.0.3",
"react-dom": "^17.0.2",
"react-scripts": "4.0.3",
"react-scrollable-feed-virtualized": "^1.4.3",
"react-scrollable-feed-virtualized": "^1.4.8",
"react-syntax-highlighter": "^15.4.3",
"react-toastify": "^8.0.3",
"typescript": "^4.2.4",

View File

@@ -1,4 +1,3 @@
import {EntryItem} from "./EntryListItem/EntryListItem";
import React, {useRef} from "react";
import styles from './style/EntriesList.module.sass';
import ScrollableFeedVirtualized from "react-scrollable-feed-virtualized";
@@ -6,40 +5,32 @@ import down from "./assets/downImg.svg";
interface EntriesListProps {
entries: any[];
setEntries: (entries: any[]) => void;
focusedEntryId: string;
setFocusedEntryId: (id: string) => void;
listEntryREF: any;
onScrollEvent: (isAtBottom:boolean) => void;
scrollableList: boolean;
ws: any
openWebSocket: any;
query: string;
updateQuery: any;
onSnapBrokenEvent: () => void;
isSnappedToBottom: boolean;
setIsSnappedToBottom: any;
queriedCurrent: number;
queriedTotal: number;
startTime: number;
}
export const EntriesList: React.FC<EntriesListProps> = ({entries, setEntries, focusedEntryId, setFocusedEntryId, listEntryREF, onScrollEvent, scrollableList, ws, openWebSocket, query, updateQuery, queriedCurrent, queriedTotal, startTime}) => {
export const EntriesList: React.FC<EntriesListProps> = ({entries, listEntryREF, onSnapBrokenEvent, isSnappedToBottom, setIsSnappedToBottom, queriedCurrent, queriedTotal, startTime}) => {
const scrollableRef = useRef(null);
return <>
<div className={styles.list}>
<div id="list" ref={listEntryREF} className={styles.list}>
<ScrollableFeedVirtualized ref={scrollableRef} itemHeight={48} marginTop={10} onScroll={(isAtBottom) => onScrollEvent(isAtBottom)}>
<ScrollableFeedVirtualized ref={scrollableRef} itemHeight={48} marginTop={10} onSnapBroken={onSnapBrokenEvent}>
{false /* TODO: why there is a need for something here (not necessarily false)? */}
{entries.map(entry => <EntryItem key={entry.id}
entry={entry}
setFocusedEntryId={setFocusedEntryId}
isSelected={focusedEntryId === entry.id.toString()}
style={{}}
updateQuery={updateQuery}/>)}
{entries}
</ScrollableFeedVirtualized>
<button type="button"
className={`${styles.btnLive} ${scrollableList ? styles.showButton : styles.hideButton}`}
onClick={(_) => scrollableRef.current.jumpToBottom()}>
className={`${styles.btnLive} ${isSnappedToBottom ? styles.hideButton : styles.showButton}`}
onClick={(_) => {
scrollableRef.current.jumpToBottom();
setIsSnappedToBottom(true);
}}>
<img alt="down" src={down} />
</button>
</div>

View File

@@ -15,7 +15,7 @@ interface EntryViewLineProps {
}
const EntryViewLine: React.FC<EntryViewLineProps> = ({label, value, updateQuery, selector, overrideQueryValue}) => {
return (label && value && <tr className={styles.dataLine}>
return (label && <tr className={styles.dataLine}>
<td
className={`queryable ${styles.dataKey}`}
onClick={() => {

View File

@@ -84,7 +84,14 @@
padding: 4px
padding-left: 12px
.port
.tcpInfo
font-size: 12px
color: $secondary-font-color
margin: 5px
margin-top: 5px
margin-bottom: 5px
.port
margin-right: 5px
.ip
margin-left: 5px

View File

@@ -1,4 +1,4 @@
import React from "react";
import React, {useState} from "react";
import styles from './EntryListItem.module.sass';
import StatusCode, {getClassification, StatusCodeClassification} from "../UI/StatusCode";
import Protocol, {ProtocolInterface} from "../UI/Protocol"
@@ -38,12 +38,14 @@ interface Rules {
interface EntryProps {
entry: Entry;
setFocusedEntryId: (id: string) => void;
isSelected?: boolean;
style: object;
updateQuery: any;
}
export const EntryItem: React.FC<EntryProps> = ({entry, setFocusedEntryId, isSelected, style, updateQuery}) => {
export const EntryItem: React.FC<EntryProps> = ({entry, setFocusedEntryId, style, updateQuery}) => {
const [isSelected, setIsSelected] = useState(false);
const classification = getClassification(entry.statusCode)
const numberOfRules = entry.rules.numberOfRules
let ingoingIcon;
@@ -119,7 +121,10 @@ export const EntryItem: React.FC<EntryProps> = ({entry, setFocusedEntryId, isSel
id={entry.id.toString()}
className={`${styles.row}
${isSelected && !rule && !contractEnabled ? styles.rowSelected : additionalRulesProperties}`}
onClick={() => setFocusedEntryId(entry.id.toString())}
onClick={() => {
setIsSelected(!isSelected);
setFocusedEntryId(entry.id.toString());
}}
style={{
border: isSelected ? `1px ${entry.protocol.backgroundColor} solid` : "1px transparent solid",
position: "absolute",
@@ -166,7 +171,17 @@ export const EntryItem: React.FC<EntryProps> = ({entry, setFocusedEntryId, isSel
}
<div className={styles.separatorRight}>
<span
className={`queryable ${styles.port}`}
className={`queryable ${styles.tcpInfo} ${styles.ip}`}
title="Source IP"
onClick={() => {
updateQuery(`src.ip == "${entry.sourceIp}"`)
}}
>
{entry.sourceIp}
</span>
<span className={`${styles.tcpInfo}`}>:</span>
<span
className={`queryable ${styles.tcpInfo} ${styles.port}`}
title="Source Port"
onClick={() => {
updateQuery(`src.port == "${entry.sourcePort}"`)
@@ -194,7 +209,17 @@ export const EntryItem: React.FC<EntryProps> = ({entry, setFocusedEntryId, isSel
/>
}
<span
className={`queryable ${styles.port}`}
className={`queryable ${styles.tcpInfo} ${styles.ip}`}
title="Destination IP"
onClick={() => {
updateQuery(`dst.ip == "${entry.destinationIp}"`)
}}
>
{entry.destinationIp}
</span>
<span className={`${styles.tcpInfo}`}>:</span>
<span
className={`queryable ${styles.tcpInfo} ${styles.port}`}
title="Destination Port"
onClick={() => {
updateQuery(`dst.port == "${entry.destinationPort}"`)

View File

@@ -1,6 +1,7 @@
import React, {useEffect, useRef, useState} from "react";
import {Filters} from "./Filters";
import {EntriesList} from "./EntriesList";
import {EntryItem} from "./EntryListItem/EntryListItem";
import {makeStyles} from "@material-ui/core";
import "./style/TrafficPage.sass";
import styles from './style/EntriesList.module.sass';
@@ -50,50 +51,59 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
const classes = useLayoutStyles();
const [entries, setEntries] = useState([] as any);
const [entriesBuffer, setEntriesBuffer] = useState([] as any);
const [focusedEntryId, setFocusedEntryId] = useState(null);
const [selectedEntryData, setSelectedEntryData] = useState(null);
const [connection, setConnection] = useState(ConnectionStatus.Closed);
const [tappingStatus, setTappingStatus] = useState(null);
const [disableScrollList, setDisableScrollList] = useState(false);
const [isSnappedToBottom, setIsSnappedToBottom] = useState(true);
const [query, setQueryDefault] = useState("");
const [query, setQuery] = useState("");
const [queryBackgroundColor, setQueryBackgroundColor] = useState("#f5f5f5");
const [addition, updateQuery] = useState("");
const [queriedCurrent, setQueriedCurrent] = useState(0);
const [queriedTotal, setQueriedTotal] = useState(0);
const [startTime, setStartTime] = useState(0);
const setQuery = async (query) => {
if (!query) {
setQueryBackgroundColor("#f5f5f5")
} else {
const data = await api.validateQuery(query);
if (data.valid) {
setQueryBackgroundColor("#d2fad2")
useEffect(() => {
(async function() {
if (!query) {
setQueryBackgroundColor("#f5f5f5")
} else {
setQueryBackgroundColor("#fad6dc")
const data = await api.validateQuery(query);
if (!data) {
return;
}
if (data.valid) {
setQueryBackgroundColor("#d2fad2");
} else {
setQueryBackgroundColor("#fad6dc");
}
}
}
setQueryDefault(query)
}
})();
}, [query]);
const updateQuery = (addition) => {
useEffect(() => {
if (query) {
setQuery(`${query} and ${addition}`)
setQuery(`${query} and ${addition}`);
} else {
setQuery(addition)
setQuery(addition);
}
}
// eslint-disable-next-line
}, [addition]);
const ws = useRef(null);
const listEntry = useRef(null);
const openWebSocket = (query) => {
setEntries([])
setFocusedEntryId(null);
setEntries([]);
setEntriesBuffer([]);
ws.current = new WebSocket(MizuWebsocketURL);
ws.current.onopen = () => {
ws.current.send(query)
@@ -108,15 +118,18 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
const message = JSON.parse(e.data);
switch (message.messageType) {
case "entry":
const entry = message.data
if (!focusedEntryId) setFocusedEntryId(entry.id.toString())
let newEntries = [...entries];
setEntries([...newEntries, entry])
if(listEntry.current) {
if(isScrollable(listEntry.current.firstChild)) {
setDisableScrollList(true)
}
}
const entry = message.data;
if (!focusedEntryId) setFocusedEntryId(entry.id.toString());
setEntriesBuffer([
...entriesBuffer,
<EntryItem
key={entry.id}
entry={entry}
setFocusedEntryId={setFocusedEntryId}
style={{}}
updateQuery={updateQuery}
/>
]);
break
case "status":
setTappingStatus(message.tappingStatus);
@@ -140,8 +153,9 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
});
break;
case "queryMetadata":
setQueriedCurrent(message.data.current)
setQueriedTotal(message.data.total)
setQueriedCurrent(message.data.current);
setQueriedTotal(message.data.total);
setEntries(entriesBuffer);
break;
case "startTime":
setStartTime(message.data);
@@ -176,6 +190,16 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
const entryData = await api.getEntry(focusedEntryId);
setSelectedEntryData(entryData);
} catch (error) {
toast[error.response.data.type](`Entry[${focusedEntryId}]: ${error.response.data.msg}`, {
position: "bottom-right",
theme: "colored",
autoClose: error.response.data.autoClose,
hideProgressBar: false,
closeOnClick: true,
pauseOnHover: true,
draggable: true,
progress: undefined,
});
console.error(error);
}
})()
@@ -209,21 +233,17 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
}
}
const onScrollEvent = (isAtBottom) => {
isAtBottom ? setDisableScrollList(false) : setDisableScrollList(true)
const onSnapBrokenEvent = () => {
setIsSnappedToBottom(false)
}
const isScrollable = (element) => {
return element.scrollHeight > element.clientHeight;
};
return (
<div className="TrafficPage">
<div className="TrafficPageHeader">
<img className="playPauseIcon" style={{visibility: connection === ConnectionStatus.Connected ? "visible" : "hidden"}} alt="pause"
src={pauseIcon} onClick={toggleConnection}/>
<img className="playPauseIcon" style={{position: "absolute", visibility: connection === ConnectionStatus.Connected ? "hidden" : "visible"}} alt="play"
src={playIcon} onClick={toggleConnection}/>
src={playIcon} onClick={toggleConnection}/>
<div className="connectionText">
{getConnectionTitle()}
<div className={"indicatorContainer " + getConnectionStatusClass(true)}>
@@ -243,16 +263,10 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
<div className={styles.container}>
<EntriesList
entries={entries}
setEntries={setEntries}
focusedEntryId={focusedEntryId}
setFocusedEntryId={setFocusedEntryId}
listEntryREF={listEntry}
onScrollEvent={onScrollEvent}
scrollableList={disableScrollList}
ws={ws.current}
openWebSocket={openWebSocket}
query={query}
updateQuery={updateQuery}
onSnapBrokenEvent={onSnapBrokenEvent}
isSnappedToBottom={isSnappedToBottom}
setIsSnappedToBottom={setIsSnappedToBottom}
queriedCurrent={queriedCurrent}
queriedTotal={queriedTotal}
startTime={startTime}

View File

@@ -17,7 +17,7 @@ interface Props {
const FancyTextDisplay: React.FC<Props> = ({text, className, isPossibleToCopy = true, applyTextEllipsis = true, flipped = false, useTooltip= false, displayIconOnMouseOver = false, buttonOnly = false}) => {
const [showCopiedNotification, setCopied] = useState(false);
const [showTooltip, setShowTooltip] = useState(false);
const displayText = text || '';
text = String(text);
const onCopy = () => {
setCopied(true)
@@ -33,12 +33,12 @@ const FancyTextDisplay: React.FC<Props> = ({text, className, isPossibleToCopy =
return () => clearTimeout(timer);
}, [showCopiedNotification]);
const textElement = <span className={'FancyTextDisplay-Text'}>{displayText}</span>;
const textElement = <span className={'FancyTextDisplay-Text'}>{text}</span>;
const copyButton = isPossibleToCopy && displayText ? <CopyToClipboard text={displayText} onCopy={onCopy}>
const copyButton = isPossibleToCopy && text ? <CopyToClipboard text={text} onCopy={onCopy}>
<span
className={`FancyTextDisplay-Icon`}
title={`Copy "${displayText}" value to clipboard`}
title={`Copy "${text}" value to clipboard`}
>
<img src={duplicateImg} alt="Duplicate full value"/>
{showCopiedNotification && <span className={'FancyTextDisplay-CopyNotifier'}>Copied</span>}
@@ -48,14 +48,14 @@ const FancyTextDisplay: React.FC<Props> = ({text, className, isPossibleToCopy =
return (
<p
className={`FancyTextDisplay-Container ${className ? className : ''} ${displayIconOnMouseOver ? 'displayIconOnMouseOver ' : ''} ${applyTextEllipsis ? ' FancyTextDisplay-ContainerEllipsis' : ''}`}
title={displayText.toString()}
title={text}
onMouseOver={ e => setShowTooltip(true)}
onMouseLeave={ e => setShowTooltip(false)}
>
{!buttonOnly && flipped && textElement}
{copyButton}
{!buttonOnly && !flipped && textElement}
{useTooltip && showTooltip && <span className={'FancyTextDisplay-CopyNotifier'}>{displayText}</span>}
{useTooltip && showTooltip && <span className={'FancyTextDisplay-CopyNotifier'}>{text}</span>}
</p>
);
};

View File

@@ -3,6 +3,8 @@ import * as axios from "axios";
// When working locally cp `cp .env.example .env`
export const MizuWebsocketURL = process.env.REACT_APP_OVERRIDE_WS_URL ? process.env.REACT_APP_OVERRIDE_WS_URL : `ws://${window.location.host}/ws`;
const CancelToken = axios.CancelToken;
export default class Api {
constructor() {
@@ -17,6 +19,8 @@ export default class Api {
Accept: "application/json",
}
});
this.source = null;
}
tapStatus = async () => {
@@ -45,9 +49,25 @@ export default class Api {
}
validateQuery = async (query) => {
if (this.source) {
this.source.cancel();
}
this.source = CancelToken.source();
const form = new FormData();
form.append('query', query)
const response = await this.client.post(`/query/validate`, form);
const response = await this.client.post(`/query/validate`, form, {
cancelToken: this.source.token
}).catch(function (thrown) {
if (!axios.isCancel(thrown)) {
console.error('Validate error', thrown.message);
}
});
if (!response) {
return null;
}
return response.data;
}
}