Compare commits

..

24 Commits

Author SHA1 Message Date
David Levanon
01d6005a7b minor logging changes (#499)
Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
2021-11-23 14:21:53 +02:00
Nimrod Gilboa Markevich
4c97316c02 Remove prevPodPhase (#497)
prevPodPhase does not take into account the fact that there may be more
than one tapper pod. Therefore it is not clear what its value
represents. It is only used in a debug print. It is not worth the effort
to fix for that one debug print.

Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
2021-11-23 10:03:36 +02:00
M. Mert Yıldıran
d66c7445e6 Remove SetHostname method in HTTP extension (#496) 2021-11-22 19:30:06 +03:00
M. Mert Yıldıran
12ca3d8779 Make the gRPC and HTTP/2 distinction (#492)
* Remove the extra negation on `nodefrag` flag's value

* Support IPv4 fragmentation and IPv6 at the same time

* Set `Method` and `StatusCode` fields correctly for `HTTP/2`

* Replace unnecessary `grpc` naming with `http2`

* Make the `gRPC` and `HTTP/2` distinction

* Fix the macros of `http` extension

* Fix the macros of other protocol extensions

* Update the method signature of `Represent`

* Fix the `HTTP/2` support

* Fix some minor issues

* Upgrade Basenine version from `0.2.10` to `0.2.11`

Sorts macros before expanding them and prioritize the long macros.

* Don't regex split the gRPC method name

* Re-enable `nodefrag` flag
2021-11-22 17:46:35 +03:00
M. Mert Yıldıran
02a125bb86 Disable IPv4 defragmentation and support IPv6 (#487)
* Remove the extra negation on `nodefrag` flag's value

* Support IPv4 fragmentation and IPv6 at the same time

* Re-enable `nodefrag` flag
2021-11-22 17:35:17 +03:00
M. Mert Yıldıran
08d7fa988e Remove tap/tester/ directory (#489)
Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
2021-11-22 17:32:38 +03:00
Nimrod Gilboa Markevich
b1ad2efb96 Warn pods not starting (#493)
Print warning event related to mizu k8s resources.
In non-daemon print to CLI. In Daemon print to API-Server logs.
2021-11-22 15:30:10 +02:00
Alon Girmonsky
ed7b754eca Some changes to the doc (#494) 2021-11-22 09:02:33 +02:00
Igor Gov
c026656b5e Improving daemon documentation (#457) 2021-11-21 19:37:02 +02:00
David Levanon
6caa94f08f Add support to auto discover envoy processes (#459)
* discover envoy pids using cluster ips

* add istio flag to cli + rename mtls flag to istio

* add istio.md to docs

* Fixing typos

* Fix minor typos and grammer in docs

Co-authored-by: Nimrod Gilboa Markevich <nimrod@up9.com>
2021-11-21 15:45:07 +02:00
RoyUP9
b77ea63f42 Add token validity check (#483) 2021-11-21 15:14:02 +02:00
gadotroee
2635964a28 Update README (#486) 2021-11-21 14:09:21 +02:00
M. Mert Yıldıran
a16faca5fb Ignore gob files (#488)
* Ignore gob files

* Remove `*.db` from `.gitignore`
2021-11-21 09:29:01 +03:00
M. Mert Yıldıran
8cf6f56a3c Remove unnecessary tcpdump dependency from Dockerfile (#491) 2021-11-21 09:12:51 +03:00
M. Mert Yıldıran
a849aae94c Upgrade Basenine version from 0.2.9 to 0.2.10 (#484)
* Upgrade Basenine version from `0.2.9` to `0.2.10`

Fixes the issues in `limit` and `rlimit` helpers that occur when they are on the left operand of a binary expression.

* Upgrade the client hash to latest
2021-11-19 18:57:19 +03:00
M. Mert Yıldıran
8118569460 Show the source and destination IP in the entry feed (#485) 2021-11-18 20:21:51 +03:00
Nimrod Gilboa Markevich
2e75834dd0 Refactor watch pods to allow reusing watch wrapper (#470)
Currently shared/kubernetes/watch.go:FilteredWatch only watches pods.
This PR makes it reusable for other types of resources.
This is done in preparation for watching k8s events.
2021-11-18 11:53:11 +02:00
M. Mert Yıldıran
dd53a36d5f Prevent the crash on client-side in case of text being undefined in FancyTextDisplay (#481)
* Prevent the crash on client-side in case of `text` being undefined in `FancyTextDisplay`

* Use `String(text)` instead
2021-11-17 18:50:09 +03:00
M. Mert Yıldıran
ad78f1dcd7 Clear focusedEntryId state in case of a filter is applied (#482) 2021-11-17 18:20:23 +03:00
M. Mert Yıldıran
a13fec3dae Sync entries in batches just as before (using uploadIntervalSec parameter) (#477)
* Sync entries in batches just as before (using `uploadIntervalSec` parameter)

* Replace `lastTimeSynced` value with `time.Time{}`

Since it will be overwritten by the very first iteration.
2021-11-17 15:16:49 +03:00
M. Mert Yıldıran
bb85312b9f Don't omit the key-value pair if the value is false in EntryTableSection (#478) 2021-11-17 15:02:23 +03:00
RamiBerm
18be46809e TRA-3903 minor daemon mode refactor (#479)
* Update common.go and tapRunner.go

* Update common.go
2021-11-17 11:18:08 +02:00
RamiBerm
b7f7daa05c TRA-3903 fix daemon mode in permission restricted configs (#473)
* Update tapRunner.go, permissions-all-namespaces-daemon.yaml, and 2 more files...

* Update tapRunner.go

* Update tapRunner.go and permissions-ns-daemon.yaml

* Update tapRunner.go

* Update tapRunner.go

* Update tapRunner.go
2021-11-17 11:14:43 +02:00
M. Mert Yıldıran
95d2a868e1 Update the UI screenshots (#476)
* Update the UI screenshots

* Update `mizu-ui.png`
2021-11-16 22:44:31 +03:00
59 changed files with 1141 additions and 502 deletions

5
.gitignore vendored
View File

@@ -15,7 +15,6 @@
# vendor/
.idea/
build
*.db
# Mac OS
.DS_Store
@@ -32,3 +31,7 @@ pprof/*
# Database Files
*.bin
*.gob
# Nohup Files - https://man7.org/linux/man-pages/man1/nohup.1p.html
nohup.*

View File

@@ -42,8 +42,8 @@ RUN go build -ldflags="-s -w \
-X 'mizuserver/pkg/version.SemVer=${SEM_VER}'" -o mizuagent .
# Download Basenine executable, verify the sha1sum and move it to a directory in $PATH
ADD https://github.com/up9inc/basenine/releases/download/v0.2.9/basenine_linux_amd64 ./basenine_linux_amd64
ADD https://github.com/up9inc/basenine/releases/download/v0.2.9/basenine_linux_amd64.sha256 ./basenine_linux_amd64.sha256
ADD https://github.com/up9inc/basenine/releases/download/v0.2.11/basenine_linux_amd64 ./basenine_linux_amd64
ADD https://github.com/up9inc/basenine/releases/download/v0.2.11/basenine_linux_amd64.sha256 ./basenine_linux_amd64.sha256
RUN shasum -a 256 -c basenine_linux_amd64.sha256
RUN chmod +x ./basenine_linux_amd64
@@ -52,7 +52,7 @@ RUN cd .. && /bin/bash build_extensions.sh
FROM alpine:3.14
RUN apk add bash libpcap-dev tcpdump
RUN apk add bash libpcap-dev
WORKDIR /app

View File

@@ -4,16 +4,21 @@
A simple-yet-powerful API traffic viewer for Kubernetes enabling you to view all API communication between microservices to help your debug and troubleshoot regressions.
Think TCPDump and Chrome Dev Tools combined.
Think TCPDump and Wireshark re-invented for Kubernetes.
![Simple UI](assets/mizu-ui.png)
## Features
- Simple and powerful CLI
- Real-time view of all HTTP requests, REST and gRPC API calls
- No installation or code instrumentation
- Works completely on premises
- Monitoring network traffic in real-time. Supported protocols:
- [HTTP/1.1](https://datatracker.ietf.org/doc/html/rfc2616) (REST, etc.)
- [HTTP/2](https://datatracker.ietf.org/doc/html/rfc7540) (gRPC)
- [AMQP](https://www.rabbitmq.com/amqp-0-9-1-reference.html) (RabbitMQ, Apache Qpid, etc.)
- [Apache Kafka](https://kafka.apache.org/protocol)
- [Redis](https://redis.io/topics/protocol)
- Works with Kubernetes APIs. No installation or code instrumentation
- Rich filtering
## Requirements
@@ -44,15 +49,6 @@ SHA256 checksums are available on the [Releases](https://github.com/up9inc/mizu/
### Development (unstable) Build
Pick one from the [Releases](https://github.com/up9inc/mizu/releases) page
## Kubeconfig & Permissions
While `mizu`most often works out of the box, you can influence its behavior:
1. [OPTIONAL] Set `KUBECONFIG` environment variable to your Kubernetes configuration. If this is not set, Mizu assumes that configuration is at `${HOME}/.kube/config`
2. `mizu` assumes user running the command has permissions to create resources (such as pods, services, namespaces) on your Kubernetes cluster (no worries - `mizu` resources are cleaned up upon termination)
For detailed list of k8s permissions see [PERMISSIONS](docs/PERMISSIONS.md) document
## How to Run
1. Find pods you'd like to tap to in your Kubernetes cluster
@@ -111,45 +107,25 @@ To tap all pods in current namespace -
Web interface is now available at http://localhost:8899
^C
```
### To run mizu mizu daemon mode (detached from cli)
```bash
$ mizu tap "^ca.*" --daemon
Mizu will store up to 200MB of traffic, old traffic will be cleared once the limit is reached.
Tapping pods in namespaces "sock-shop"
Waiting for mizu to be ready... (may take a few minutes)
+carts-66c77f5fbb-fq65r
+catalogue-5f4cb7cf5-7zrmn
..
$ mizu view
Establishing connection to k8s cluster...
Mizu is available at http://localhost:8899
^C
..
$ mizu clean # mizu will continue running in cluster until clean is executed
Removing mizu resources
```
`mizu view` provides one way to access Mizu. For other options, see [Accessing Mizu Wiki Page](https://github.com/up9inc/mizu/wiki/Accessing-Mizu).
## Configuration
Mizu can work with config file which should be stored in ${HOME}/.mizu/config.yaml (macOS: ~/.mizu/config.yaml) <br />
In case no config file found, defaults will be used <br />
Mizu can optionally work with a config file that can be provided as a CLI argument (using `--set config-path=<PATH>`) or if not provided, will be stored at ${HOME}/.mizu/config.yaml
In case of partial configuration defined, all other fields will be used with defaults <br />
You can always override the defaults or config file with CLI flags
To get the default config params run `mizu config` <br />
To generate a new config file with default values use `mizu config -r`
### Telemetry
By default, mizu reports usage telemetry. It can be disabled by adding a line of `telemetry: false` in the `${HOME}/.mizu/config.yaml` file
## Advanced Usage
### Kubeconfig
It is possible to change the kubeconfig path using `KUBECONFIG` environment variable or the command like flag
with `--set kube-config-path=<PATH>`. </br >
If both are not set - Mizu assumes that configuration is at `${HOME}/.kube/config`
### Namespace-Restricted Mode
Some users have permission to only manage resources in one particular namespace assigned to them
@@ -163,6 +139,8 @@ using the `--namespace` flag or by setting `tap.namespaces` in the config file
Setting `mizu-resources-namespace=mizu` resets Mizu to its default behavior
For detailed list of k8s permissions see [PERMISSIONS](docs/PERMISSIONS.md) document
### User agent filtering
User-agent filtering (like health checks) - can be configured using command-line options:
@@ -203,14 +181,10 @@ and when changed it will support accessing by IP
### Run in daemon mode
Mizu can be ran detached from the cli using the daemon flag: `mizu tap --daemon`. This type of mizu instance will run indefinitely in the cluster.
Please note that daemon mode requires you to have RBAC creation permissions, see the [permissions](docs/PERMISSIONS.md) doc for more details.
In order to access a daemon mizu you will have to run `mizu view` after running the `tap --daemon` command.
To stop the detached mizu instance and clean all cluster side resources, run `mizu clean`
Mizu can be run detached from the cli using the daemon flag: `mizu tap --daemon`. This type of mizu instance will run
indefinitely in the cluster.
For more information please refer to [DAEMON MODE](docs/DAEMON_MODE.md)
## How to Run local UI

View File

@@ -16,7 +16,7 @@ require (
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/up9inc/basenine/client/go v0.0.0-20211114204315-4d028da5fda5
github.com/up9inc/basenine/client/go v0.0.0-20211121072216-04366911881c
github.com/up9inc/mizu/shared v0.0.0
github.com/up9inc/mizu/tap v0.0.0
github.com/up9inc/mizu/tap/api v0.0.0

View File

@@ -450,8 +450,8 @@ github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/up9inc/basenine/client/go v0.0.0-20211114204315-4d028da5fda5 h1:JbLairDLEJpAC8bwmFuOAB+LYpY/oQbzGRSWRpkF7PQ=
github.com/up9inc/basenine/client/go v0.0.0-20211114204315-4d028da5fda5/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
github.com/up9inc/basenine/client/go v0.0.0-20211121072216-04366911881c h1:GJsCVhDKjV/k3mNG255VN7hAQ7fxyNgX5T+VJyzoOQ0=
github.com/up9inc/basenine/client/go v0.0.0-20211121072216-04366911881c/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
github.com/vektah/gqlparser v1.1.2/go.mod h1:1ycwN7Ij5njmMkPPAOaRFY4rET2Enx7IkVv3vaXspKw=
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f h1:p4VB7kIXpOQvVn1ZaTIVp+3vuYAXFe3OJEvjbUYJLaA=
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=

View File

@@ -22,6 +22,7 @@ import (
"path"
"path/filepath"
"plugin"
"regexp"
"sort"
"syscall"
"time"
@@ -94,17 +95,17 @@ func main() {
panic("API server address must be provided with --api-server-address when using --tap")
}
hostMode := os.Getenv(shared.HostModeEnvVar) == "1"
tapOpts := &tap.TapOpts{HostMode: hostMode}
tapTargets := getTapTargets()
if tapTargets != nil {
tap.SetFilterAuthorities(tapTargets)
logger.Log.Infof("Filtering for the following authorities: %v", tap.GetFilterIPs())
tapOpts.FilterAuthorities = tapTargets
logger.Log.Infof("Filtering for the following authorities: %v", tapOpts.FilterAuthorities)
}
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
filteringOptions := getTrafficFilteringOptions()
hostMode := os.Getenv(shared.HostModeEnvVar) == "1"
tapOpts := &tap.TapOpts{HostMode: hostMode}
tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, extensions, filteringOptions)
socketConnection, err := dialSocketWithRetry(*apiServerAddress, socketConnectionRetries, socketConnectionRetryDelay)
if err != nil {
@@ -264,9 +265,16 @@ func hostApi(socketHarOutputChannel chan<- *tapApi.OutputChannelItem) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if _, err := startMizuTapperSyncer(ctx); err != nil {
kubernetesProvider, err := kubernetes.NewProviderInCluster()
if err != nil {
logger.Log.Fatalf("error creating k8s provider: %+v", err)
}
if _, err := startMizuTapperSyncer(ctx, kubernetesProvider); err != nil {
logger.Log.Fatalf("error initializing tapper syncer: %+v", err)
}
go watchMizuEvents(ctx, kubernetesProvider, cancel)
}
utils.StartServer(app)
@@ -426,12 +434,7 @@ func dialSocketWithRetry(socketAddress string, retryAmount int, retryDelay time.
return nil, lastErr
}
func startMizuTapperSyncer(ctx context.Context) (*kubernetes.MizuTapperSyncer, error) {
provider, err := kubernetes.NewProviderInCluster()
if err != nil {
return nil, err
}
func startMizuTapperSyncer(ctx context.Context, provider *kubernetes.Provider) (*kubernetes.MizuTapperSyncer, error) {
tapperSyncer, err := kubernetes.CreateAndStartMizuTapperSyncer(ctx, provider, kubernetes.TapperSyncerConfig{
TargetNamespaces: config.Config.TargetNamespaces,
PodFilterRegex: config.Config.TapTargetRegex.Regexp,
@@ -443,6 +446,7 @@ func startMizuTapperSyncer(ctx context.Context) (*kubernetes.MizuTapperSyncer, e
IgnoredUserAgents: config.Config.IgnoredUserAgents,
MizuApiFilteringOptions: config.Config.MizuApiFilteringOptions,
MizuServiceAccountExists: true, //assume service account exists since daemon mode will not function without it anyway
Istio: config.Config.Istio,
})
if err != nil {
@@ -482,3 +486,86 @@ func startMizuTapperSyncer(ctx context.Context) (*kubernetes.MizuTapperSyncer, e
return tapperSyncer, nil
}
func watchMizuEvents(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
// Round down because k8s CreationTimestamp is given in 1 sec resolution.
startTime := time.Now().Truncate(time.Second)
mizuResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.MizuResourcesPrefix))
eventWatchHelper := kubernetes.NewEventWatchHelper(kubernetesProvider, mizuResourceRegex)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, eventWatchHelper, []string{config.Config.MizuResourcesNamespace}, eventWatchHelper)
for {
select {
case wEvent, ok := <-added:
if !ok {
added = nil
continue
}
event, err := wEvent.ToEvent()
if err != nil {
logger.Log.Errorf("error parsing Mizu resource added event: %+v", err)
cancel()
}
if startTime.After(event.CreationTimestamp.Time) {
continue
}
if event.Type == v1.EventTypeWarning {
logger.Log.Warningf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note)
}
case wEvent, ok := <-removed:
if !ok {
removed = nil
continue
}
event, err := wEvent.ToEvent()
if err != nil {
logger.Log.Errorf("error parsing Mizu resource removed event: %+v", err)
cancel()
}
if startTime.After(event.CreationTimestamp.Time) {
continue
}
if event.Type == v1.EventTypeWarning {
logger.Log.Warningf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note)
}
case wEvent, ok := <-modified:
if !ok {
modified = nil
continue
}
event, err := wEvent.ToEvent()
if err != nil {
logger.Log.Errorf("error parsing Mizu resource modified event: %+v", err)
cancel()
}
if startTime.After(event.CreationTimestamp.Time) {
continue
}
if event.Type == v1.EventTypeWarning {
logger.Log.Warningf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note)
}
case err, ok := <-errorChan:
if !ok {
errorChan = nil
continue
}
logger.Log.Errorf("error in watch mizu resource events loop: %+v", err)
cancel()
case <-ctx.Done():
logger.Log.Debugf("watching Mizu resource events loop, ctx done")
return
}
}
}

View File

@@ -55,7 +55,7 @@ func GetEntry(c *gin.Context) {
}
extension := extensionsMap[entry.Protocol.Name]
protocol, representation, bodySize, _ := extension.Dissector.Represent(entry.Protocol, entry.Request, entry.Response)
representation, bodySize, _ := extension.Dissector.Represent(entry.Request, entry.Response)
var rules []map[string]interface{}
var isRulesEnabled bool
@@ -68,7 +68,7 @@ func GetEntry(c *gin.Context) {
}
c.JSON(http.StatusOK, tapApi.MizuEntryWrapper{
Protocol: protocol,
Protocol: entry.Protocol,
Representation: string(representation),
BodySize: bodySize,
Data: entry,

View File

@@ -227,6 +227,10 @@ func syncEntriesImpl(token string, model string, envPrefix string, uploadInterva
connection.Close()
}()
lastTimeSynced := time.Time{}
batch := make([]har.Entry, 0)
handleDataChannel := func(wg *sync.WaitGroup, connection *basenine.Connection, data chan []byte) {
defer wg.Done()
for {
@@ -239,7 +243,6 @@ func syncEntriesImpl(token string, model string, envPrefix string, uploadInterva
var dataMap map[string]interface{}
err = json.Unmarshal(dataBytes, &dataMap)
result := make([]har.Entry, 0)
var entry tapApi.MizuEntry
if err := json.Unmarshal([]byte(dataBytes), &entry); err != nil {
continue
@@ -261,14 +264,22 @@ func syncEntriesImpl(token string, model string, envPrefix string, uploadInterva
continue
}
result = append(result, *harEntry)
batch = append(batch, *harEntry)
body, jMarshalErr := json.Marshal(result)
now := time.Now()
if lastTimeSynced.Add(time.Duration(uploadIntervalSec) * time.Second).After(now) {
continue
}
lastTimeSynced = now
body, jMarshalErr := json.Marshal(batch)
batchSize := len(batch)
if jMarshalErr != nil {
analyzeInformation.Reset()
logger.Log.Infof("Stopping sync entries")
logger.Log.Fatal(jMarshalErr)
}
batch = make([]har.Entry, 0)
var in bytes.Buffer
w := zlib.NewWriter(&in)
@@ -293,7 +304,7 @@ func syncEntriesImpl(token string, model string, envPrefix string, uploadInterva
logger.Log.Info("Stopping sync entries")
logger.Log.Fatal(postErr)
}
analyzeInformation.SentCount += 1
analyzeInformation.SentCount += batchSize
if analyzeInformation.SentCount%SentCountLogInterval == 0 {
logger.Log.Infof("Uploaded %v entries until now", analyzeInformation.SentCount)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 491 KiB

After

Width:  |  Height:  |  Size: 640 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 55 KiB

After

Width:  |  Height:  |  Size: 110 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 43 KiB

After

Width:  |  Height:  |  Size: 53 KiB

View File

@@ -4,9 +4,15 @@ import (
"context"
"errors"
"fmt"
"github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/mizu"
"github.com/up9inc/mizu/cli/mizu/fsUtils"
"github.com/up9inc/mizu/cli/telemetry"
"os"
"os/signal"
"path"
"syscall"
"time"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/config/configStructs"
@@ -64,3 +70,42 @@ func handleKubernetesProviderError(err error) {
logger.Log.Error(err)
}
}
func finishMizuExecution(kubernetesProvider *kubernetes.Provider, apiProvider *apiserver.Provider) {
telemetry.ReportAPICalls(apiProvider)
removalCtx, cancel := context.WithTimeout(context.Background(), cleanupTimeout)
defer cancel()
dumpLogsIfNeeded(removalCtx, kubernetesProvider)
cleanUpMizuResources(removalCtx, cancel, kubernetesProvider)
}
func dumpLogsIfNeeded(ctx context.Context, kubernetesProvider *kubernetes.Provider) {
if !config.Config.DumpLogs {
return
}
mizuDir := mizu.GetMizuFolderPath()
filePath := path.Join(mizuDir, fmt.Sprintf("mizu_logs_%s.zip", time.Now().Format("2006_01_02__15_04_05")))
if err := fsUtils.DumpLogs(ctx, kubernetesProvider, filePath); err != nil {
logger.Log.Errorf("Failed dump logs %v", err)
}
}
func cleanUpMizuResources(ctx context.Context, cancel context.CancelFunc, kubernetesProvider *kubernetes.Provider) {
logger.Log.Infof("\nRemoving mizu resources")
var leftoverResources []string
if config.Config.IsNsRestrictedMode() {
leftoverResources = cleanUpRestrictedMode(ctx, kubernetesProvider)
} else {
leftoverResources = cleanUpNonRestrictedMode(ctx, cancel, kubernetesProvider)
}
if len(leftoverResources) > 0 {
errMsg := fmt.Sprintf("Failed to remove the following resources, for more info check logs at %s:", fsUtils.GetLogFilePath())
for _, resource := range leftoverResources {
errMsg += "\n- " + resource
}
logger.Log.Errorf(uiUtils.Error, errMsg)
}
}

View File

@@ -3,6 +3,7 @@ package cmd
import (
"errors"
"fmt"
"github.com/up9inc/mizu/cli/up9"
"os"
"github.com/creasty/defaults"
@@ -62,6 +63,12 @@ Supported protocols are HTTP and gRPC.`,
logger.Log.Errorf("failed to log in, err: %v", err)
return nil
}
} else if isValidToken := up9.IsTokenValid(config.Config.Auth.Token, config.Config.Auth.EnvName); !isValidToken {
logger.Log.Errorf("Token is not valid, please log in again to continue")
if err := auth.Login(); err != nil {
logger.Log.Errorf("failed to log in, err: %v", err)
return nil
}
}
}
}
@@ -113,4 +120,5 @@ func init() {
tapCmd.Flags().String(configStructs.EnforcePolicyFile, defaultTapConfig.EnforcePolicyFile, "Yaml file path with policy rules")
tapCmd.Flags().String(configStructs.ContractFile, defaultTapConfig.ContractFile, "OAS/Swagger file to validate to monitor the contracts")
tapCmd.Flags().Bool(configStructs.DaemonModeTapName, defaultTapConfig.DaemonMode, "Run mizu in daemon mode, detached from the cli")
tapCmd.Flags().Bool(configStructs.IstioName, defaultTapConfig.Istio, "Record decrypted traffic if the cluster configured with istio and mtls")
}

View File

@@ -5,27 +5,24 @@ import (
"errors"
"fmt"
"io/ioutil"
"path"
"regexp"
"strings"
"time"
"github.com/up9inc/mizu/cli/cmd/goUtils"
"github.com/getkin/kin-openapi/openapi3"
"gopkg.in/yaml.v3"
core "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"github.com/getkin/kin-openapi/openapi3"
"github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/cmd/goUtils"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/errormessage"
"gopkg.in/yaml.v3"
core "k8s.io/api/core/v1"
"github.com/up9inc/mizu/cli/mizu"
"github.com/up9inc/mizu/cli/mizu/fsUtils"
"github.com/up9inc/mizu/cli/telemetry"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/kubernetes"
@@ -45,6 +42,8 @@ var state tapState
var apiProvider *apiserver.Provider
func RunMizuTap() {
startTime := time.Now()
mizuApiFilteringOptions, err := getMizuApiFilteringOptions()
apiProvider = apiserver.NewProvider(GetApiServerUrl(), apiserver.DefaultRetries, apiserver.DefaultTimeout)
if err != nil {
@@ -153,6 +152,7 @@ func RunMizuTap() {
go goUtils.HandleExcWrapper(watchApiServerPod, ctx, kubernetesProvider, cancel)
go goUtils.HandleExcWrapper(watchTapperPod, ctx, kubernetesProvider, cancel)
go goUtils.HandleExcWrapper(watchMizuEvents, ctx, kubernetesProvider, cancel, startTime)
// block until exit signal or error
waitForFinish(ctx, cancel)
@@ -185,7 +185,7 @@ func printTappedPodsPreview(ctx context.Context, kubernetesProvider *kubernetes.
if len(matchingPods) == 0 {
printNoPodsFoundSuggestion(namespaces)
}
logger.Log.Info("Pods that match regex at this instant:")
logger.Log.Info("Pods that match the provided criteria at this instant:")
for _, tappedPod := range matchingPods {
logger.Log.Infof(uiUtils.Green, fmt.Sprintf("+%s", tappedPod.Name))
}
@@ -217,6 +217,7 @@ func startTapperSyncer(ctx context.Context, cancel context.CancelFunc, provider
IgnoredUserAgents: config.Config.Tap.IgnoredUserAgents,
MizuApiFilteringOptions: mizuApiFilteringOptions,
MizuServiceAccountExists: state.mizuServiceAccountExists,
Istio: config.Config.Tap.Istio,
})
if err != nil {
@@ -378,22 +379,9 @@ func createMizuApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.
func createMizuApiServerDeployment(ctx context.Context, kubernetesProvider *kubernetes.Provider, opts *kubernetes.ApiServerOptions) error {
volumeClaimCreated := false
if !config.Config.Tap.NoPersistentVolumeClaim {
isDefaultStorageClassAvailable, err := kubernetesProvider.IsDefaultStorageProviderAvailable(ctx)
if err != nil {
return err
}
if isDefaultStorageClassAvailable {
if _, err = kubernetesProvider.CreatePersistentVolumeClaim(ctx, config.Config.MizuResourcesNamespace, kubernetes.PersistentVolumeClaimName, config.Config.Tap.MaxEntriesDBSizeBytes()+mizu.DaemonModePersistentVolumeSizeBufferBytes); err != nil {
logger.Log.Warningf(uiUtils.Yellow, "An error has occured while creating a persistent volume claim for mizu, this will mean that mizu's data will be lost on pod restart")
logger.Log.Debugf("error creating persistent volume claim: %v", err)
} else {
volumeClaimCreated = true
}
} else {
logger.Log.Warningf(uiUtils.Yellow, "Could not find default volume provider in this cluster, this will mean that mizu's data will be lost on pod restart")
}
volumeClaimCreated = TryToCreatePersistentVolumeClaim(ctx, kubernetesProvider)
}
pod, err := kubernetesProvider.GetMizuApiServerPodObject(opts, volumeClaimCreated, kubernetes.PersistentVolumeClaimName)
if err != nil {
return err
@@ -406,6 +394,26 @@ func createMizuApiServerDeployment(ctx context.Context, kubernetesProvider *kube
return nil
}
func TryToCreatePersistentVolumeClaim(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool {
isDefaultStorageClassAvailable, err := kubernetesProvider.IsDefaultStorageProviderAvailable(ctx)
if err != nil {
logger.Log.Warningf(uiUtils.Yellow, "An error occured when checking if a default storage provider exists in this cluster, this means mizu data will be lost on mizu-api-server pod restart")
logger.Log.Debugf("error checking if default storage class exists: %v", err)
return false
} else if !isDefaultStorageClassAvailable {
logger.Log.Warningf(uiUtils.Yellow, "Could not find default storage provider in this cluster, this means mizu data will be lost on mizu-api-server pod restart")
return false
}
if _, err = kubernetesProvider.CreatePersistentVolumeClaim(ctx, config.Config.MizuResourcesNamespace, kubernetes.PersistentVolumeClaimName, config.Config.Tap.MaxEntriesDBSizeBytes()+mizu.DaemonModePersistentVolumeSizeBufferBytes); err != nil {
logger.Log.Warningf(uiUtils.Yellow, "An error has occured while creating a persistent volume claim for mizu, this means mizu data will be lost on mizu-api-server pod restart")
logger.Log.Debugf("error creating persistent volume claim: %v", err)
return false
}
return true
}
func getMizuApiFilteringOptions() (*api.TrafficFilteringOptions, error) {
var compiledRegexSlice []*api.SerializableRegexp
@@ -440,45 +448,6 @@ func getSyncEntriesConfig() *shared.SyncEntriesConfig {
}
}
func finishMizuExecution(kubernetesProvider *kubernetes.Provider, apiProvider *apiserver.Provider) {
telemetry.ReportAPICalls(apiProvider)
removalCtx, cancel := context.WithTimeout(context.Background(), cleanupTimeout)
defer cancel()
dumpLogsIfNeeded(removalCtx, kubernetesProvider)
cleanUpMizuResources(removalCtx, cancel, kubernetesProvider)
}
func dumpLogsIfNeeded(ctx context.Context, kubernetesProvider *kubernetes.Provider) {
if !config.Config.DumpLogs {
return
}
mizuDir := mizu.GetMizuFolderPath()
filePath := path.Join(mizuDir, fmt.Sprintf("mizu_logs_%s.zip", time.Now().Format("2006_01_02__15_04_05")))
if err := fsUtils.DumpLogs(ctx, kubernetesProvider, filePath); err != nil {
logger.Log.Errorf("Failed dump logs %v", err)
}
}
func cleanUpMizuResources(ctx context.Context, cancel context.CancelFunc, kubernetesProvider *kubernetes.Provider) {
logger.Log.Infof("\nRemoving mizu resources")
var leftoverResources []string
if config.Config.IsNsRestrictedMode() {
leftoverResources = cleanUpRestrictedMode(ctx, kubernetesProvider)
} else {
leftoverResources = cleanUpNonRestrictedMode(ctx, cancel, kubernetesProvider)
}
if len(leftoverResources) > 0 {
errMsg := fmt.Sprintf("Failed to remove the following resources, for more info check logs at %s:", fsUtils.GetLogFilePath())
for _, resource := range leftoverResources {
errMsg += "\n- " + resource
}
logger.Log.Errorf(uiUtils.Error, errMsg)
}
}
func cleanUpRestrictedMode(ctx context.Context, kubernetesProvider *kubernetes.Provider) []string {
leftoverResources := make([]string, 0)
@@ -589,7 +558,8 @@ func waitUntilNamespaceDeleted(ctx context.Context, cancel context.CancelFunc, k
func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", kubernetes.ApiServerPodName))
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, []string{config.Config.MizuResourcesNamespace}, podExactRegex)
podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper)
isPodReady := false
timeAfter := time.After(25 * time.Second)
for {
@@ -610,12 +580,19 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
logger.Log.Infof("%s removed", kubernetes.ApiServerPodName)
cancel()
return
case modifiedPod, ok := <-modified:
case wEvent, ok := <-modified:
if !ok {
modified = nil
continue
}
modifiedPod, err := wEvent.ToPod()
if err != nil {
logger.Log.Errorf(uiUtils.Error, err)
cancel()
continue
}
logger.Log.Debugf("Watching API Server pod loop, modified: %v", modifiedPod.Status.Phase)
if modifiedPod.Status.Phase == core.PodPending {
@@ -676,42 +653,59 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.TapperDaemonSetName))
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, []string{config.Config.MizuResourcesNamespace}, podExactRegex)
var prevPodPhase core.PodPhase
podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper)
for {
select {
case addedPod, ok := <-added:
case wEvent, ok := <-added:
if !ok {
added = nil
continue
}
addedPod, err := wEvent.ToPod()
if err != nil {
logger.Log.Errorf(uiUtils.Error, err)
cancel()
continue
}
logger.Log.Debugf("Tapper is created [%s]", addedPod.Name)
case removedPod, ok := <-removed:
case wEvent, ok := <-removed:
if !ok {
removed = nil
continue
}
removedPod, err := wEvent.ToPod()
if err != nil {
logger.Log.Errorf(uiUtils.Error, err)
cancel()
continue
}
logger.Log.Debugf("Tapper is removed [%s]", removedPod.Name)
case modifiedPod, ok := <-modified:
case wEvent, ok := <-modified:
if !ok {
modified = nil
continue
}
modifiedPod, err := wEvent.ToPod()
if err != nil {
logger.Log.Errorf(uiUtils.Error, err)
cancel()
continue
}
if modifiedPod.Status.Phase == core.PodPending && modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue {
logger.Log.Infof(uiUtils.Red, fmt.Sprintf("Wasn't able to deploy the tapper %s. Reason: \"%s\"", modifiedPod.Name, modifiedPod.Status.Conditions[0].Message))
cancel()
break
continue
}
podStatus := modifiedPod.Status
if podStatus.Phase == core.PodPending && prevPodPhase == podStatus.Phase {
logger.Log.Debugf("Tapper %s is %s", modifiedPod.Name, strings.ToLower(string(podStatus.Phase)))
continue
}
prevPodPhase = podStatus.Phase
if podStatus.Phase == core.PodRunning {
state := podStatus.ContainerStatuses[0].State
@@ -730,7 +724,7 @@ func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider
continue
}
logger.Log.Errorf("[Error] Error in mizu tapper watch, err: %v", err)
logger.Log.Errorf("[Error] Error in mizu tapper pod watch, err: %v", err)
cancel()
case <-ctx.Done():
@@ -740,6 +734,89 @@ func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider
}
}
func watchMizuEvents(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, startTime time.Time) {
// Round down because k8s CreationTimestamp is given in 1 sec resolution.
startTime = startTime.Truncate(time.Second)
mizuResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.MizuResourcesPrefix))
eventWatchHelper := kubernetes.NewEventWatchHelper(kubernetesProvider, mizuResourceRegex)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, eventWatchHelper, []string{config.Config.MizuResourcesNamespace}, eventWatchHelper)
for {
select {
case wEvent, ok := <-added:
if !ok {
added = nil
continue
}
event, err := wEvent.ToEvent()
if err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("error parsing Mizu resource added event: %+v", err))
cancel()
}
if startTime.After(event.CreationTimestamp.Time) {
continue
}
if event.Type == core.EventTypeWarning {
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note))
}
case wEvent, ok := <-removed:
if !ok {
removed = nil
continue
}
event, err := wEvent.ToEvent()
if err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("error parsing Mizu resource removed event: %+v", err))
cancel()
}
if startTime.After(event.CreationTimestamp.Time) {
continue
}
if event.Type == core.EventTypeWarning {
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note))
}
case wEvent, ok := <-modified:
if !ok {
modified = nil
continue
}
event, err := wEvent.ToEvent()
if err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("error parsing Mizu resource modified event: %+v", err))
cancel()
}
if startTime.After(event.CreationTimestamp.Time) {
continue
}
if event.Type == core.EventTypeWarning {
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note))
}
case err, ok := <-errorChan:
if !ok {
errorChan = nil
continue
}
logger.Log.Errorf("error in watch mizu resource events loop: %+v", err)
cancel()
case <-ctx.Done():
logger.Log.Debugf("watching Mizu resource events loop, ctx done")
return
}
}
}
func getNamespaces(kubernetesProvider *kubernetes.Provider) []string {
if config.Config.Tap.AllNamespaces {
return []string{kubernetes.K8sAllNamespaces}

View File

@@ -402,6 +402,7 @@ func getMizuAgentConfig(targetNamespaces []string, mizuApiFilteringOptions *api.
MizuResourcesNamespace: Config.MizuResourcesNamespace,
MizuApiFilteringOptions: *mizuApiFilteringOptions,
AgentDatabasePath: shared.DataDirPath,
Istio: Config.Tap.Istio,
}
return &config, nil
}

View File

@@ -25,6 +25,7 @@ const (
EnforcePolicyFile = "traffic-validation-file"
ContractFile = "contract"
DaemonModeTapName = "daemon"
IstioName = "istio"
)
type TapConfig struct {
@@ -48,6 +49,7 @@ type TapConfig struct {
TapperResources shared.Resources `yaml:"tapper-resources"`
DaemonMode bool `yaml:"daemon" default:"false"`
NoPersistentVolumeClaim bool `yaml:"no-persistent-volume-claim" default:"false"`
Istio bool `yaml:"istio" default:"false"`
}
func (config *TapConfig) PodRegex() *regexp.Regexp {

31
cli/up9/provider.go Normal file
View File

@@ -0,0 +1,31 @@
package up9
import (
"fmt"
"net/http"
"net/url"
)
func IsTokenValid(tokenString string, envName string) bool {
whoAmIUrl, _ := url.Parse(fmt.Sprintf("https://trcc.%s/admin/whoami", envName))
req := &http.Request{
Method: http.MethodGet,
URL: whoAmIUrl,
Header: map[string][]string{
"Authorization": {fmt.Sprintf("bearer %s", tokenString)},
},
}
response, err := http.DefaultClient.Do(req)
if err != nil {
return false
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
return false
}
return true
}

View File

@@ -37,8 +37,8 @@ COPY agent .
RUN go build -gcflags="all=-N -l" -o mizuagent .
# Download Basenine executable, verify the sha1sum and move it to a directory in $PATH
ADD https://github.com/up9inc/basenine/releases/download/v0.2.9/basenine_linux_amd64 ./basenine_linux_amd64
ADD https://github.com/up9inc/basenine/releases/download/v0.2.9/basenine_linux_amd64.sha256 ./basenine_linux_amd64.sha256
ADD https://github.com/up9inc/basenine/releases/download/v0.2.11/basenine_linux_amd64 ./basenine_linux_amd64
ADD https://github.com/up9inc/basenine/releases/download/v0.2.11/basenine_linux_amd64.sha256 ./basenine_linux_amd64.sha256
RUN shasum -a 256 -c basenine_linux_amd64.sha256
RUN chmod +x ./basenine_linux_amd64
@@ -48,7 +48,7 @@ RUN cd .. && /bin/bash build_extensions_debug.sh
FROM golang:1.16-alpine
RUN apk add bash libpcap-dev tcpdump
RUN apk add bash libpcap-dev
WORKDIR /app

80
docs/DAEMON_MODE.md Normal file
View File

@@ -0,0 +1,80 @@
# Mizu daemon mode
Mizu can be run detached from the cli using the daemon flag: `mizu tap --daemon`. This type of mizu instance will run
indefinitely in the cluster.
Please note that daemon mode requires you to have RBAC creation permissions, see the [permissions](PERMISSIONS.md)
doc for more details.
```bash
$ mizu tap "^ca.*" --daemon
Mizu will store up to 200MB of traffic, old traffic will be cleared once the limit is reached.
Tapping pods in namespaces "sock-shop"
Waiting for mizu to be ready... (may take a few minutes)
+carts-66c77f5fbb-fq65r
+catalogue-5f4cb7cf5-7zrmn
..
```
## Stop mizu daemon
To stop the detached mizu instance and clean all cluster side resources, run `mizu clean`
```bash
$ mizu clean # mizu will continue running in cluster until clean is executed
Removing mizu resources
```
## Expose mizu web app
Mizu could be exposed at a later stage in any of the following ways:
### Using mizu view command
In a machine that can access both the cluster and a browser, you can run `mizu view` command which creates a proxy.
Besides that, all the regular ways to expose k8s pods are valid.
```bash
$ mizu view
Establishing connection to k8s cluster...
Mizu is available at http://localhost:8899
^C
..
```
### Port Forward
```bash
$ kubectl port-forward -n mizu deployment/mizu-api-server 8899:8899
```
### NodePort
```bash
$ kubectl expose -n mizu deployment mizu-api-server --name mizu-node-port --type NodePort --port 80 --target-port 8899
```
Mizu's IP is the IP of any node (get the IP with `kubectl get nodes -o wide`) and the port is the target port of the new
service (`kubectl get services -n mizu mizu-node-port`). Note that this method will expose Mizu to public access if your
nodes are public.
### LoadBalancer
```bash
$ kubectl expose deployment -n mizu --port 80 --target-port 8899 mizu-api-server --type=LoadBalancer --name=mizu-lb
service/mizu-lb exposed
..
$ kubectl get services -n mizu
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
mizu-api-server ClusterIP 10.107.200.100 <none> 80/TCP 5m5s
mizu-lb LoadBalancer 10.107.200.101 34.77.120.116 80:30141/TCP 76s
```
Note that `LoadBalancer` services only work on supported clusters (usually cloud providers) and might incur extra costs
If you changed the `mizu-resources-namespace` value, make sure the `-n mizu` flag of the `kubectl expose` command is
changed to the value of `mizu-resources-namespace`
mizu will now be available both by running `mizu view` or by accessing the `EXTERNAL-IP` of the `mizu-lb` service
through your browser.

46
docs/ISTIO.md Normal file
View File

@@ -0,0 +1,46 @@
![Mizu: The API Traffic Viewer for Kubernetes](../assets/mizu-logo.svg)
# Istio mutual tls (mtls) with Mizu
This document describe how Mizu tapper handles workloads configured with mtls, making the internal traffic between services in a cluster to be encrypted.
Besides Istio there are other service meshes that implement mtls. However, as of now Istio is the most used one, and this is why we are focusing on it.
In order to create an Istio setup for development, follow those steps:
1. Deploy a sample application to a Kubernetes cluster, the sample application needs to make internal service to service calls
2. SSH to one of the nodes, and run `tcpdump`
3. Make sure you see the internal service to service calls in a plain text
4. Deploy Istio to the cluster - make sure it is attached to all pods of the sample application, and that it is configured with mtls (default)
5. Run `tcpdump` again, make sure you don't see the internal service to service calls in a plain text
## The connection between Istio and Envoy
In order to implement its service mesh capabilities, [Istio](https://istio.io) use an [Envoy](https://www.envoyproxy.io) sidecar in front of every pod in the cluster. The Envoy is responsible for the mtls communication, and that's why we are focusing on Envoy proxy.
In the future we might see more players in that field, then we'll have to either add support for each of them or go with a unified eBPF solution.
## Network namespaces
A [linux network namespace](https://man7.org/linux/man-pages/man7/network_namespaces.7.html) is an isolation that limit the process view of the network. In the container world it used to isolate one container from another. In the Kubernetes world it used to isolate a pod from another. That means that two containers running on the same pod share the same network namespace. A container can reach a container in the same pod by accessing `localhost`.
An Envoy proxy configured with mtls receives the inbound traffic directed to the pod, decrypts it and sends it via `localhost` to the target container.
## Tapping mtls traffic
In order for Mizu to be able to see the decrypted traffic it needs to listen on the same network namespace of the target pod. Multiple threads of the same process can have different network namespaces.
[gopacket](https://github.com/google/gopacket) uses [libpacp](https://github.com/the-tcpdump-group/libpcap) by default for capturing the traffic. Libpacap doesn't support network namespaces and we can't ask it to listen to traffic on a different namespace. However, we can change the network namespace of the calling thread and then start libpcap to see the traffic on a different namespace.
## Finding the network namespace of a running process
The network namespace of a running process can be found in `/proc/PID/ns/net` link. Once we have this link, we can ask Linux to change the network namespace of a thread to this one.
This mean that Mizu needs to have access to the `/proc` (procfs) of the running node.
## Finding the network namespace of a running pod
In order for Mizu to be able to listen to mtls traffic, it needs to get the PIDs of the the running pods, filter them according to the user filters and then start listen to their internal network namespace traffic.
There is no official way in Kubernetes to get from pod to PID. The CRI implementation purposefully doesn't force a pod to be a processes on the host. It can be a Virtual Machine as well like [Kata containers](https://katacontainers.io)
While we can provide a solution for various CRIs (like Docker, Containerd and CRI-O) it's better to have a unified solution. In order to achieve that, Mizu scans all the processes in the host, and finds the Envoy processes using their `/proc/PID/exe` link.
Once Mizu detects an Envoy process, it need to check whether this specific Envoy process is relevant according the user filters. The user filters are a list of `CLUSTER_IPS`. The tapper gets them via the `TapOpts.FilterAuthorities` list.
Istio sends an `INSTANCE_IP` environment variable to every Envoy proxy process. By examining the Envoy process's environment variables we can see whether it's relevant or not. Examining a process environment variables is done by reading the `/proc/PID/envion` file.
## Edge cases
The method we use to find Envoy processes and correlate them to the cluster IPs may be inaccurate in certain situations. If, for example, a user runs an Envoy process manually, and set its `INSTANCE_IP` environment variable to one of the `CLUSTER_IPS` the tapper gets, then Mizu will capture traffic for it.

View File

@@ -7,15 +7,15 @@ rules:
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list", "watch", "delete"]
- apiGroups: [ "" ]
- apiGroups: [ "apps" ]
resources: [ "deployments" ]
verbs: [ "create", "delete" ]
verbs: [ "get", "create", "delete" ]
- apiGroups: [""]
resources: ["services"]
verbs: ["get", "list", "watch", "create", "delete"]
- apiGroups: ["apps"]
resources: ["daemonsets"]
verbs: ["create", "patch", "delete"]
verbs: ["get", "create", "patch", "delete", "list"]
- apiGroups: [""]
resources: ["namespaces"]
verbs: ["get", "list", "watch", "create", "delete"]
@@ -49,6 +49,9 @@ rules:
- apiGroups: ["", "apps", "extensions"]
resources: ["endpoints"]
verbs: ["get", "list", "watch"]
- apiGroups: ["events.k8s.io"]
resources: ["events"]
verbs: ["list", "watch"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1

View File

@@ -23,6 +23,9 @@ rules:
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["get", "create", "delete"]
- apiGroups: ["events.k8s.io"]
resources: ["events"]
verbs: ["list", "watch"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1

View File

@@ -46,6 +46,9 @@ rules:
- apiGroups: ["", "apps", "extensions"]
resources: ["endpoints"]
verbs: ["get", "list", "watch"]
- apiGroups: ["events.k8s.io"]
resources: ["events"]
verbs: ["list", "watch"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1

View File

@@ -8,7 +8,7 @@ rules:
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list", "watch", "delete"]
- apiGroups: [ "" ]
- apiGroups: [ "apps" ]
resources: [ "deployments" ]
verbs: [ "get", "create", "delete" ]
- apiGroups: [""]
@@ -16,7 +16,7 @@ rules:
verbs: ["get", "list", "watch", "create", "delete"]
- apiGroups: ["apps"]
resources: ["daemonsets"]
verbs: ["get", "create", "patch", "delete"]
verbs: ["get", "create", "patch", "delete", "list"]
- apiGroups: [""]
resources: ["services/proxy"]
verbs: ["get"]
@@ -32,7 +32,7 @@ rules:
- apiGroups: ["rbac.authorization.k8s.io"]
resources: ["rolebindings"]
verbs: ["get", "create", "delete"]
- apiGroups: ["apps", "extensions"]
- apiGroups: ["apps", "extensions", ""]
resources: ["pods"]
verbs: ["get", "list", "watch"]
- apiGroups: ["apps", "extensions"]
@@ -41,6 +41,9 @@ rules:
- apiGroups: ["", "apps", "extensions"]
resources: ["endpoints"]
verbs: ["get", "list", "watch"]
- apiGroups: ["events.k8s.io"]
resources: ["events"]
verbs: ["list", "watch"]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1

View File

@@ -38,6 +38,9 @@ rules:
- apiGroups: ["", "apps", "extensions"]
resources: ["endpoints"]
verbs: ["get", "list", "watch"]
- apiGroups: ["events.k8s.io"]
resources: ["events"]
verbs: ["list", "watch"]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1

View File

@@ -20,6 +20,9 @@ rules:
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["get", "create", "delete"]
- apiGroups: ["events.k8s.io"]
resources: ["events"]
verbs: ["list", "watch"]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1

View File

@@ -38,6 +38,9 @@ rules:
- apiGroups: ["", "apps", "extensions"]
resources: ["endpoints"]
verbs: ["get", "list", "watch"]
- apiGroups: ["events.k8s.io"]
resources: ["events"]
verbs: ["list", "watch"]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1

View File

@@ -0,0 +1,45 @@
package kubernetes
import (
"context"
"regexp"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
)
type EventWatchHelper struct {
kubernetesProvider *Provider
NameRegexFilter *regexp.Regexp
}
func NewEventWatchHelper(kubernetesProvider *Provider, NameRegexFilter *regexp.Regexp) *EventWatchHelper {
return &EventWatchHelper{
kubernetesProvider: kubernetesProvider,
NameRegexFilter: NameRegexFilter,
}
}
// Implements the EventFilterer Interface
func (wh *EventWatchHelper) Filter(wEvent *WatchEvent) (bool, error) {
event, err := wEvent.ToEvent()
if err != nil {
return false, nil
}
if !wh.NameRegexFilter.MatchString(event.Name) {
return false, nil
}
return true, nil
}
// Implements the WatchCreator Interface
func (wh *EventWatchHelper) NewWatcher(ctx context.Context, namespace string) (watch.Interface, error) {
watcher, err := wh.kubernetesProvider.clientSet.EventsV1().Events(namespace).Watch(ctx, metav1.ListOptions{Watch: true})
if err != nil {
return nil, err
}
return watcher, nil
}

View File

@@ -43,6 +43,7 @@ type TapperSyncerConfig struct {
IgnoredUserAgents []string
MizuApiFilteringOptions api.TrafficFilteringOptions
MizuServiceAccountExists bool
Istio bool
}
func CreateAndStartMizuTapperSyncer(ctx context.Context, kubernetesProvider *Provider, config TapperSyncerConfig) (*MizuTapperSyncer, error) {
@@ -68,7 +69,8 @@ func CreateAndStartMizuTapperSyncer(ctx context.Context, kubernetesProvider *Pro
}
func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() {
added, modified, removed, errorChan := FilteredWatch(tapperSyncer.context, tapperSyncer.kubernetesProvider, tapperSyncer.config.TargetNamespaces, &tapperSyncer.config.PodFilterRegex)
podWatchHelper := NewPodWatchHelper(tapperSyncer.kubernetesProvider, &tapperSyncer.config.PodFilterRegex)
added, modified, removed, errorChan := FilteredWatch(tapperSyncer.context, podWatchHelper, tapperSyncer.config.TargetNamespaces, podWatchHelper)
restartTappers := func() {
err, changeFound := tapperSyncer.updateCurrentlyTappedPods()
@@ -94,28 +96,48 @@ func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() {
for {
select {
case pod, ok := <-added:
case wEvent, ok := <-added:
if !ok {
added = nil
continue
}
pod, err := wEvent.ToPod()
if err != nil {
tapperSyncer.handleErrorInWatchLoop(err, restartTappersDebouncer)
continue
}
logger.Log.Debugf("Added matching pod %s, ns: %s", pod.Name, pod.Namespace)
restartTappersDebouncer.SetOn()
case pod, ok := <-removed:
case wEvent, ok := <-removed:
if !ok {
removed = nil
continue
}
pod, err := wEvent.ToPod()
if err != nil {
tapperSyncer.handleErrorInWatchLoop(err, restartTappersDebouncer)
continue
}
logger.Log.Debugf("Removed matching pod %s, ns: %s", pod.Name, pod.Namespace)
restartTappersDebouncer.SetOn()
case pod, ok := <-modified:
case wEvent, ok := <-modified:
if !ok {
modified = nil
continue
}
pod, err := wEvent.ToPod()
if err != nil {
tapperSyncer.handleErrorInWatchLoop(err, restartTappersDebouncer)
continue
}
logger.Log.Debugf("Modified matching pod %s, ns: %s, phase: %s, ip: %s", pod.Name, pod.Namespace, pod.Status.Phase, pod.Status.PodIP)
// Act only if the modified pod has already obtained an IP address.
// After filtering for IPs, on a normal pod restart this includes the following events:
@@ -132,12 +154,8 @@ func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() {
continue
}
logger.Log.Debugf("Watching pods loop, got error %v, stopping `restart tappers debouncer`", err)
restartTappersDebouncer.Cancel()
tapperSyncer.ErrorOut <- K8sTapManagerError{
OriginalError: err,
TapManagerReason: TapManagerPodWatchError,
}
tapperSyncer.handleErrorInWatchLoop(err, restartTappersDebouncer)
continue
case <-tapperSyncer.context.Done():
logger.Log.Debugf("Watching pods loop, context done, stopping `restart tappers debouncer`")
@@ -148,6 +166,15 @@ func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() {
}
}
func (tapperSyncer *MizuTapperSyncer) handleErrorInWatchLoop(err error, restartTappersDebouncer *debounce.Debouncer) {
logger.Log.Debugf("Watching pods loop, got error %v, stopping `restart tappers debouncer`", err)
restartTappersDebouncer.Cancel()
tapperSyncer.ErrorOut <- K8sTapManagerError{
OriginalError: err,
TapManagerReason: TapManagerPodWatchError,
}
}
func (tapperSyncer *MizuTapperSyncer) updateCurrentlyTappedPods() (err error, changesFound bool) {
if matchingPods, err := tapperSyncer.kubernetesProvider.ListAllRunningPodsMatchingRegex(tapperSyncer.context, &tapperSyncer.config.PodFilterRegex, tapperSyncer.config.TargetNamespaces); err != nil {
return err, false
@@ -196,6 +223,7 @@ func (tapperSyncer *MizuTapperSyncer) updateMizuTappers() error {
tapperSyncer.config.ImagePullPolicy,
tapperSyncer.config.MizuApiFilteringOptions,
tapperSyncer.config.LogLevel,
tapperSyncer.config.Istio,
); err != nil {
return err
}

View File

@@ -0,0 +1,45 @@
package kubernetes
import (
"context"
"regexp"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
)
type PodWatchHelper struct {
kubernetesProvider *Provider
NameRegexFilter *regexp.Regexp
}
func NewPodWatchHelper(kubernetesProvider *Provider, NameRegexFilter *regexp.Regexp) *PodWatchHelper {
return &PodWatchHelper{
kubernetesProvider: kubernetesProvider,
NameRegexFilter: NameRegexFilter,
}
}
// Implements the EventFilterer Interface
func (wh *PodWatchHelper) Filter(wEvent *WatchEvent) (bool, error) {
pod, err := wEvent.ToPod()
if err != nil {
return false, nil
}
if !wh.NameRegexFilter.MatchString(pod.Name) {
return false, nil
}
return true, nil
}
// Implements the WatchCreator Interface
func (wh *PodWatchHelper) NewWatcher(ctx context.Context, namespace string) (watch.Interface, error) {
watcher, err := wh.kubernetesProvider.clientSet.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{Watch: true})
if err != nil {
return nil, err
}
return watcher, nil
}

View File

@@ -6,12 +6,16 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"net/url"
"path/filepath"
"regexp"
"github.com/op/go-logging"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/shared/semver"
"github.com/up9inc/mizu/tap/api"
"io"
v1 "k8s.io/api/apps/v1"
core "k8s.io/api/core/v1"
rbac "k8s.io/api/rbac/v1"
@@ -32,9 +36,6 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
watchtools "k8s.io/client-go/tools/watch"
"net/url"
"path/filepath"
"regexp"
)
type Provider struct {
@@ -153,14 +154,6 @@ func (provider *Provider) WaitUtilNamespaceDeleted(ctx context.Context, name str
return err
}
func (provider *Provider) GetPodWatcher(ctx context.Context, namespace string) watch.Interface {
watcher, err := provider.clientSet.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{Watch: true})
if err != nil {
panic(err.Error())
}
return watcher
}
func (provider *Provider) CreateNamespace(ctx context.Context, name string) (*core.Namespace, error) {
namespaceSpec := &core.Namespace{
ObjectMeta: metav1.ObjectMeta{
@@ -493,6 +486,11 @@ func (provider *Provider) CreateDaemonsetRBAC(ctx context.Context, namespace str
Resources: []string{"daemonsets"},
Verbs: []string{"patch", "get", "list", "create", "delete"},
},
{
APIGroups: []string{"events.k8s.io"},
Resources: []string{"events"},
Verbs: []string{"list", "watch"},
},
},
}
roleBinding := &rbac.RoleBinding{
@@ -579,6 +577,11 @@ func (provider *Provider) RemoveDaemonSet(ctx context.Context, namespace string,
return provider.handleRemovalError(err)
}
func (provider *Provider) RemovePersistentVolumeClaim(ctx context.Context, namespace string, volumeClaimName string) error {
err := provider.clientSet.CoreV1().PersistentVolumeClaims(namespace).Delete(ctx, volumeClaimName, metav1.DeleteOptions{})
return provider.handleRemovalError(err)
}
func (provider *Provider) handleRemovalError(err error) error {
// Ignore NotFound - There is nothing to delete.
// Ignore Forbidden - Assume that a user could not have created the resource in the first place.
@@ -615,7 +618,7 @@ func (provider *Provider) CreateConfigMap(ctx context.Context, namespace string,
return nil
}
func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, apiServerPodIp string, nodeToTappedPodIPMap map[string][]string, serviceAccountName string, resources shared.Resources, imagePullPolicy core.PullPolicy, mizuApiFilteringOptions api.TrafficFilteringOptions, logLevel logging.Level) error {
func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, apiServerPodIp string, nodeToTappedPodIPMap map[string][]string, serviceAccountName string, resources shared.Resources, imagePullPolicy core.PullPolicy, mizuApiFilteringOptions api.TrafficFilteringOptions, logLevel logging.Level, istio bool) error {
logger.Log.Debugf("Applying %d tapper daemon sets, ns: %s, daemonSetName: %s, podImage: %s, tapperPodName: %s", len(nodeToTappedPodIPMap), namespace, daemonSetName, podImage, tapperPodName)
if len(nodeToTappedPodIPMap) == 0 {
@@ -638,7 +641,10 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
"--tap",
"--api-server-address", fmt.Sprintf("ws://%s/wsTapper", apiServerPodIp),
"--nodefrag",
"--procfs", procfsMountPath,
}
if istio {
mizuCmd = append(mizuCmd, "--procfs", procfsMountPath, "--istio")
}
agentContainer := applyconfcore.Container()
@@ -859,10 +865,6 @@ func (provider *Provider) CreatePersistentVolumeClaim(ctx context.Context, names
return provider.clientSet.CoreV1().PersistentVolumeClaims(namespace).Create(ctx, volumeClaim, metav1.CreateOptions{})
}
func (provider *Provider) RemovePersistentVolumeClaim(ctx context.Context, namespace string, volumeClaimName string) error {
return provider.clientSet.CoreV1().PersistentVolumeClaims(namespace).Delete(ctx, volumeClaimName, metav1.DeleteOptions{})
}
func getClientSet(config *restclient.Config) (*kubernetes.Clientset, error) {
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {

View File

@@ -6,19 +6,24 @@ import (
"fmt"
"github.com/up9inc/mizu/shared/debounce"
"github.com/up9inc/mizu/shared/logger"
"regexp"
"sync"
"time"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/watch"
)
func FilteredWatch(ctx context.Context, kubernetesProvider *Provider, targetNamespaces []string, podFilter *regexp.Regexp) (chan *corev1.Pod, chan *corev1.Pod, chan *corev1.Pod, chan error) {
addedChan := make(chan *corev1.Pod)
modifiedChan := make(chan *corev1.Pod)
removedChan := make(chan *corev1.Pod)
type EventFilterer interface {
Filter(*WatchEvent) (bool, error)
}
type WatchCreator interface {
NewWatcher(ctx context.Context, namespace string) (watch.Interface, error)
}
func FilteredWatch(ctx context.Context, watcherCreator WatchCreator, targetNamespaces []string, filterer EventFilterer) (chan *WatchEvent, chan *WatchEvent, chan *WatchEvent, chan error) {
addedChan := make(chan *WatchEvent)
modifiedChan := make(chan *WatchEvent)
removedChan := make(chan *WatchEvent)
errorChan := make(chan error)
var wg sync.WaitGroup
@@ -31,8 +36,13 @@ func FilteredWatch(ctx context.Context, kubernetesProvider *Provider, targetName
watchRestartDebouncer := debounce.NewDebouncer(1 * time.Minute, func() {})
for {
watcher := kubernetesProvider.GetPodWatcher(ctx, targetNamespace)
err := startWatchLoop(ctx, watcher, podFilter, addedChan, modifiedChan, removedChan) // blocking
watcher, err := watcherCreator.NewWatcher(ctx, targetNamespace)
if err != nil {
errorChan <- fmt.Errorf("error in k8s watch: %v", err)
break
}
err = startWatchLoop(ctx, watcher, filterer, addedChan, modifiedChan, removedChan) // blocking
watcher.Stop()
select {
@@ -43,7 +53,7 @@ func FilteredWatch(ctx context.Context, kubernetesProvider *Provider, targetName
}
if err != nil {
errorChan <- fmt.Errorf("error in k8 watch: %v", err)
errorChan <- fmt.Errorf("error in k8s watch: %v", err)
break
} else {
if !watchRestartDebouncer.IsOn() {
@@ -72,7 +82,7 @@ func FilteredWatch(ctx context.Context, kubernetesProvider *Provider, targetName
return addedChan, modifiedChan, removedChan, errorChan
}
func startWatchLoop(ctx context.Context, watcher watch.Interface, podFilter *regexp.Regexp, addedChan chan *corev1.Pod, modifiedChan chan *corev1.Pod, removedChan chan *corev1.Pod) error {
func startWatchLoop(ctx context.Context, watcher watch.Interface, filterer EventFilterer, addedChan chan *WatchEvent, modifiedChan chan *WatchEvent, removedChan chan *WatchEvent) error {
resultChan := watcher.ResultChan()
for {
select {
@@ -81,26 +91,25 @@ func startWatchLoop(ctx context.Context, watcher watch.Interface, podFilter *reg
return nil
}
if e.Type == watch.Error {
return apierrors.FromObject(e.Object)
wEvent := WatchEvent(e)
if wEvent.Type == watch.Error {
return wEvent.ToError()
}
pod, ok := e.Object.(*corev1.Pod)
if !ok {
if pass, err := filterer.Filter(&wEvent); err != nil {
return err
} else if !pass {
continue
}
if !podFilter.MatchString(pod.Name) {
continue
}
switch e.Type {
switch wEvent.Type {
case watch.Added:
addedChan <- pod
addedChan <- &wEvent
case watch.Modified:
modifiedChan <- pod
modifiedChan <- &wEvent
case watch.Deleted:
removedChan <- pod
removedChan <- &wEvent
}
case <-ctx.Done():
return nil

View File

@@ -0,0 +1,44 @@
package kubernetes
import (
"fmt"
"reflect"
corev1 "k8s.io/api/core/v1"
eventsv1 "k8s.io/api/events/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/watch"
)
type InvalidObjectType struct {
RequestedType reflect.Type
}
// Implements the error interface
func (iot *InvalidObjectType) Error() string {
return fmt.Sprintf("Cannot convert event to type %s", iot.RequestedType)
}
type WatchEvent watch.Event
func (we *WatchEvent) ToPod() (*corev1.Pod, error) {
pod, ok := we.Object.(*corev1.Pod)
if !ok {
return nil, &InvalidObjectType{RequestedType: reflect.TypeOf(pod)}
}
return pod, nil
}
func (we *WatchEvent) ToEvent() (*eventsv1.Event, error) {
event, ok := we.Object.(*eventsv1.Event)
if !ok {
return nil, &InvalidObjectType{RequestedType: reflect.TypeOf(event)}
}
return event, nil
}
func (we *WatchEvent) ToError() error {
return apierrors.FromObject(we.Object)
}

View File

@@ -9,7 +9,7 @@ import (
var Log = logging.MustGetLogger("mizu")
var format = logging.MustStringFormatter(
`%{time:2006-01-02T15:04:05.999Z-07:00} %{level:-5s} ▶ %{message} ▶ %{pid} %{shortfile} %{shortfunc}`,
`[%{time:2006-01-02T15:04:05.000-0700}] %{level:-5s} ▶ %{message} ▶ [%{pid} %{shortfile} %{shortfunc}]`,
)
func InitLogger(logPath string) {

View File

@@ -43,6 +43,7 @@ type MizuAgentConfig struct {
MizuResourcesNamespace string `json:"mizuResourceNamespace"`
MizuApiFilteringOptions api.TrafficFilteringOptions `json:"mizuApiFilteringOptions"`
AgentDatabasePath string `json:"agentDatabasePath"`
Istio bool `json:"istio"`
}
type WebSocketMessageMetadata struct {

View File

@@ -99,7 +99,7 @@ type Dissector interface {
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions) error
Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string) *MizuEntry
Summarize(entry *MizuEntry) *BaseEntryDetails
Represent(pIn Protocol, request map[string]interface{}, response map[string]interface{}) (pOut Protocol, object []byte, bodySize int64, err error)
Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error)
Macros() map[string]string
}

View File

@@ -325,8 +325,7 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
}
}
func (d dissecting) Represent(protoIn api.Protocol, request map[string]interface{}, response map[string]interface{}) (protoOut api.Protocol, object []byte, bodySize int64, err error) {
protoOut = protocol
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error) {
bodySize = 0
representation := make(map[string]interface{}, 0)
var repRequest []interface{}
@@ -363,7 +362,7 @@ func (d dissecting) Represent(protoIn api.Protocol, request map[string]interface
func (d dissecting) Macros() map[string]string {
return map[string]string{
`amqp`: fmt.Sprintf(`proto.abbr == "%s"`, protocol.Abbreviation),
`amqp`: fmt.Sprintf(`proto.name == "%s"`, protocol.Name),
}
}

View File

@@ -23,8 +23,8 @@ func filterAndEmit(item *api.OutputChannelItem, emitter api.Emitter, options *ap
emitter.Emit(item)
}
func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
streamID, messageHTTP1, err := grpcAssembler.readMessage()
func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
streamID, messageHTTP1, isGrpc, err := http2Assembler.readMessage()
if err != nil {
return err
}
@@ -73,7 +73,11 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, superTime
}
if item != nil {
item.Protocol = http2Protocol
if isGrpc {
item.Protocol = grpcProtocol
} else {
item.Protocol = http2Protocol
}
filterAndEmit(item, emitter, options)
}

View File

@@ -10,6 +10,7 @@ import (
"math"
"net/http"
"net/url"
"strconv"
"strings"
"golang.org/x/net/http2"
@@ -27,6 +28,26 @@ const protoMinorHTTP2 = 0
var maxHTTP2DataLen = 1 * 1024 * 1024 // 1MB
var grpcStatusCodes = []string{
"OK",
"CANCELLED",
"UNKNOWN",
"INVALID_ARGUMENT",
"DEADLINE_EXCEEDED",
"NOT_FOUND",
"ALREADY_EXISTS",
"PERMISSION_DENIED",
"RESOURCE_EXHAUSTED",
"FAILED_PRECONDITION",
"ABORTED",
"OUT_OF_RANGE",
"UNIMPLEMENTED",
"INTERNAL",
"UNAVAILABLE",
"DATA_LOSS",
"UNAUTHENTICATED",
}
type messageFragment struct {
headers []hpack.HeaderField
data []byte
@@ -71,37 +92,38 @@ func (fbs *fragmentsByStream) pop(streamID uint32) ([]hpack.HeaderField, []byte)
return headers, data
}
func createGrpcAssembler(b *bufio.Reader) *GrpcAssembler {
func createHTTP2Assembler(b *bufio.Reader) *Http2Assembler {
var framerOutput bytes.Buffer
framer := http2.NewFramer(&framerOutput, b)
framer.ReadMetaHeaders = hpack.NewDecoder(initialHeaderTableSize, nil)
return &GrpcAssembler{
return &Http2Assembler{
fragmentsByStream: make(fragmentsByStream),
framer: framer,
}
}
type GrpcAssembler struct {
type Http2Assembler struct {
fragmentsByStream fragmentsByStream
framer *http2.Framer
}
func (ga *GrpcAssembler) readMessage() (uint32, interface{}, error) {
func (ga *Http2Assembler) readMessage() (streamID uint32, messageHTTP1 interface{}, isGrpc bool, err error) {
// Exactly one Framer is used for each half connection.
// (Instead of creating a new Framer for each ReadFrame operation)
// This is needed in order to decompress the headers,
// because the compression context is updated with each requests/response.
frame, err := ga.framer.ReadFrame()
if err != nil {
return 0, nil, err
return
}
streamID := frame.Header().StreamID
streamID = frame.Header().StreamID
ga.fragmentsByStream.appendFrame(streamID, frame)
if !(ga.isStreamEnd(frame)) {
return 0, nil, nil
streamID = 0
return
}
headers, data := ga.fragmentsByStream.pop(streamID)
@@ -115,13 +137,29 @@ func (ga *GrpcAssembler) readMessage() (uint32, interface{}, error) {
dataString := base64.StdEncoding.EncodeToString(data)
// Use http1 types only because they are expected in http_matcher.
// TODO: Create an interface that will be used by http_matcher:registerRequest and http_matcher:registerRequest
// to accept both HTTP/1.x and HTTP/2 requests and responses
var messageHTTP1 interface{}
if _, ok := headersHTTP1[":method"]; ok {
method := headersHTTP1.Get(":method")
status := headersHTTP1.Get(":status")
// gRPC detection
grpcStatus := headersHTTP1.Get("Grpc-Status")
if grpcStatus != "" {
isGrpc = true
status = grpcStatus
}
if strings.Contains(headersHTTP1.Get("Content-Type"), "application/grpc") {
isGrpc = true
grpcPath := headersHTTP1.Get(":path")
pathSegments := strings.Split(grpcPath, "/")
if len(pathSegments) > 0 {
method = pathSegments[len(pathSegments)-1]
}
}
if method != "" {
messageHTTP1 = http.Request{
URL: &url.URL{},
Method: "POST",
Method: method,
Header: headersHTTP1,
Proto: protoHTTP2,
ProtoMajor: protoMajorHTTP2,
@@ -129,8 +167,16 @@ func (ga *GrpcAssembler) readMessage() (uint32, interface{}, error) {
Body: io.NopCloser(strings.NewReader(dataString)),
ContentLength: int64(len(dataString)),
}
} else if _, ok := headersHTTP1[":status"]; ok {
} else if status != "" {
var statusCode int
statusCode, err = strconv.Atoi(status)
if err != nil {
return
}
messageHTTP1 = http.Response{
StatusCode: statusCode,
Header: headersHTTP1,
Proto: protoHTTP2,
ProtoMajor: protoMajorHTTP2,
@@ -139,13 +185,14 @@ func (ga *GrpcAssembler) readMessage() (uint32, interface{}, error) {
ContentLength: int64(len(dataString)),
}
} else {
return 0, nil, errors.New("failed to assemble stream: neither a request nor a message")
err = errors.New("failed to assemble stream: neither a request nor a message")
return
}
return streamID, messageHTTP1, nil
return
}
func (ga *GrpcAssembler) isStreamEnd(frame http2.Frame) bool {
func (ga *Http2Assembler) isStreamEnd(frame http2.Frame) bool {
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
if frame.StreamEnded() {

View File

@@ -23,21 +23,35 @@ var protocol api.Protocol = api.Protocol{
ForegroundColor: "#ffffff",
FontSize: 12,
ReferenceLink: "https://datatracker.ietf.org/doc/html/rfc2616",
Ports: []string{"80", "8080", "50051"},
Ports: []string{"80", "443", "8080"},
Priority: 0,
}
var http2Protocol api.Protocol = api.Protocol{
Name: "http",
LongName: "Hypertext Transfer Protocol Version 2 (HTTP/2) (gRPC)",
LongName: "Hypertext Transfer Protocol Version 2 (HTTP/2)",
Abbreviation: "HTTP/2",
Macro: "grpc",
Macro: "http2",
Version: "2.0",
BackgroundColor: "#244c5a",
ForegroundColor: "#ffffff",
FontSize: 11,
ReferenceLink: "https://datatracker.ietf.org/doc/html/rfc7540",
Ports: []string{"80", "8080"},
Ports: []string{"80", "443", "8080"},
Priority: 0,
}
var grpcProtocol api.Protocol = api.Protocol{
Name: "http",
LongName: "Hypertext Transfer Protocol Version 2 (HTTP/2) [ gRPC over HTTP/2 ]",
Abbreviation: "gRPC",
Macro: "grpc",
Version: "2.0",
BackgroundColor: "#244c5a",
ForegroundColor: "#ffffff",
FontSize: 11,
ReferenceLink: "https://grpc.github.io/grpc/core/md_doc_statuscodes.html",
Ports: []string{"80", "443", "8080", "50051"},
Priority: 0,
}
@@ -64,10 +78,10 @@ func (d dissecting) Ping() {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
isHTTP2, err := checkIsHTTP2Connection(b, isClient)
var grpcAssembler *GrpcAssembler
var http2Assembler *Http2Assembler
if isHTTP2 {
prepareHTTP2Connection(b, isClient)
grpcAssembler = createGrpcAssembler(b)
http2Assembler = createHTTP2Assembler(b)
}
dissected := false
@@ -77,7 +91,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
if isHTTP2 {
err = handleHTTP2Stream(grpcAssembler, tcpID, superTimer, emitter, options)
err = handleHTTP2Stream(http2Assembler, tcpID, superTimer, emitter, options)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@@ -110,17 +124,8 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
return nil
}
func SetHostname(address, newHostname string) string {
replacedUrl, err := url.Parse(address)
if err != nil {
return address
}
replacedUrl.Host = newHostname
return replacedUrl.String()
}
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.MizuEntry {
var host, scheme, authority, path, service string
var host, authority, path, service string
request := item.Pair.Request.Payload.(map[string]interface{})
response := item.Pair.Response.Payload.(map[string]interface{})
@@ -135,9 +140,6 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
if h["name"] == ":authority" {
authority = h["value"].(string)
}
if h["name"] == ":scheme" {
scheme = h["value"].(string)
}
if h["name"] == ":path" {
path = h["value"].(string)
}
@@ -148,9 +150,9 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
}
if item.Protocol.Version == "2.0" {
service = fmt.Sprintf("%s://%s", scheme, authority)
service = authority
} else {
service = fmt.Sprintf("http://%s", host)
service = host
u, err := url.Parse(reqDetails["url"].(string))
if err != nil {
path = reqDetails["url"].(string)
@@ -178,9 +180,20 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
reqDetails["queryString"] = mapSliceRebuildAsMap(reqDetails["_queryString"].([]interface{}))
if resolvedDestination != "" {
service = SetHostname(service, resolvedDestination)
service = resolvedDestination
} else if resolvedSource != "" {
service = SetHostname(service, resolvedSource)
service = resolvedSource
}
method := reqDetails["method"].(string)
statusCode := int(resDetails["status"].(float64))
if item.Protocol.Abbreviation == "gRPC" {
resDetails["statusText"] = grpcStatusCodes[statusCode]
}
if item.Protocol.Version == "2.0" {
reqDetails["url"] = path
request["url"] = path
}
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
@@ -188,10 +201,8 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
elapsedTime = 0
}
httpPair, _ := json.Marshal(item.Pair)
_protocol := protocol
_protocol.Version = item.Protocol.Version
return &api.MizuEntry{
Protocol: _protocol,
Protocol: item.Protocol,
Source: &api.TCP{
Name: resolvedSource,
IP: item.ConnectionInfo.ClientIP,
@@ -206,8 +217,8 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
Request: reqDetails,
Response: resDetails,
Url: fmt.Sprintf("%s%s", service, path),
Method: reqDetails["method"].(string),
Status: int(resDetails["status"].(float64)),
Method: method,
Status: statusCode,
RequestSenderIp: item.ConnectionInfo.ClientIP,
Service: service,
Timestamp: item.Timestamp,
@@ -226,15 +237,9 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
}
func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
var p api.Protocol
if entry.Protocol.Version == "2.0" {
p = http2Protocol
} else {
p = protocol
}
return &api.BaseEntryDetails{
Id: entry.Id,
Protocol: p,
Protocol: entry.Protocol,
Url: entry.Url,
RequestSenderIp: entry.RequestSenderIp,
Service: entry.Service,
@@ -408,12 +413,7 @@ func representResponse(response map[string]interface{}) (repResponse []interface
return
}
func (d dissecting) Represent(protoIn api.Protocol, request map[string]interface{}, response map[string]interface{}) (protoOut api.Protocol, object []byte, bodySize int64, err error) {
if protoIn.Version == "2.0" {
protoOut = http2Protocol
} else {
protoOut = protocol
}
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error) {
representation := make(map[string]interface{}, 0)
repRequest := representRequest(request)
repResponse, bodySize := representResponse(response)
@@ -425,9 +425,9 @@ func (d dissecting) Represent(protoIn api.Protocol, request map[string]interface
func (d dissecting) Macros() map[string]string {
return map[string]string{
`http`: fmt.Sprintf(`proto.abbr == "%s" and proto.version == "%s"`, protocol.Abbreviation, protocol.Version),
`grpc`: fmt.Sprintf(`proto.abbr == "%s" and proto.version == "%s"`, protocol.Abbreviation, http2Protocol.Version),
`http2`: fmt.Sprintf(`proto.abbr == "%s" and proto.version == "%s"`, protocol.Abbreviation, http2Protocol.Version),
`http`: fmt.Sprintf(`proto.name == "%s" and proto.version == "%s"`, protocol.Name, protocol.Version),
`http2`: fmt.Sprintf(`proto.name == "%s" and proto.version == "%s"`, protocol.Name, http2Protocol.Version),
`grpc`: fmt.Sprintf(`proto.name == "%s" and proto.version == "%s" and proto.macro == "%s"`, protocol.Name, grpcProtocol.Version, grpcProtocol.Macro),
}
}

View File

@@ -210,8 +210,7 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
}
}
func (d dissecting) Represent(protoIn api.Protocol, request map[string]interface{}, response map[string]interface{}) (protoOut api.Protocol, object []byte, bodySize int64, err error) {
protoOut = _protocol
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error) {
bodySize = 0
representation := make(map[string]interface{}, 0)
@@ -258,7 +257,7 @@ func (d dissecting) Represent(protoIn api.Protocol, request map[string]interface
func (d dissecting) Macros() map[string]string {
return map[string]string{
`kafka`: fmt.Sprintf(`proto.abbr == "%s"`, _protocol.Abbreviation),
`kafka`: fmt.Sprintf(`proto.name == "%s"`, _protocol.Name),
}
}

View File

@@ -146,8 +146,7 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
}
}
func (d dissecting) Represent(protoIn api.Protocol, request map[string]interface{}, response map[string]interface{}) (protoOut api.Protocol, object []byte, bodySize int64, err error) {
protoOut = protocol
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error) {
bodySize = 0
representation := make(map[string]interface{}, 0)
repRequest := representGeneric(request, `request.`)
@@ -160,7 +159,7 @@ func (d dissecting) Represent(protoIn api.Protocol, request map[string]interface
func (d dissecting) Macros() map[string]string {
return map[string]string{
`redis`: fmt.Sprintf(`proto.abbr == "%s"`, protocol.Abbreviation),
`redis`: fmt.Sprintf(`proto.name == "%s"`, protocol.Name),
}
}

View File

@@ -50,14 +50,15 @@ var tstype = flag.String("timestamp_type", "", "Type of timestamps to use")
var promisc = flag.Bool("promisc", true, "Set promiscuous mode")
var staleTimeoutSeconds = flag.Int("staletimout", 120, "Max time in seconds to keep connections which don't transmit data")
var pids = flag.String("pids", "", "A comma separated list of PIDs to capture their network namespaces")
var istio = flag.Bool("istio", false, "Record decrypted traffic if the cluster configured with istio and mtls")
var memprofile = flag.String("memprofile", "", "Write memory profile")
type TapOpts struct {
HostMode bool
HostMode bool
FilterAuthorities []string
}
var hostMode bool // global
var extensions []*api.Extension // global
var filteringOptions *api.TrafficFilteringOptions // global
@@ -80,15 +81,18 @@ func inArrayString(arr []string, valueToCheck string) bool {
}
func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem, extensionsRef []*api.Extension, options *api.TrafficFilteringOptions) {
hostMode = opts.HostMode
extensions = extensionsRef
filteringOptions = options
if opts.FilterAuthorities == nil {
opts.FilterAuthorities = []string{}
}
if GetMemoryProfilingEnabled() {
diagnose.StartMemoryProfiler(os.Getenv(MemoryProfilingDumpPath), os.Getenv(MemoryProfilingTimeIntervalSeconds))
}
go startPassiveTapper(outputItems)
go startPassiveTapper(opts, outputItems)
}
func printPeriodicStats(cleaner *Cleaner) {
@@ -131,7 +135,7 @@ func printPeriodicStats(cleaner *Cleaner) {
}
}
func initializePacketSources() (*source.PacketSourceManager, error) {
func initializePacketSources(opts *TapOpts) (*source.PacketSourceManager, error) {
var bpffilter string
if len(flag.Args()) > 0 {
bpffilter = strings.Join(flag.Args(), " ")
@@ -146,17 +150,17 @@ func initializePacketSources() (*source.PacketSourceManager, error) {
BpfFilter: bpffilter,
}
return source.NewPacketSourceManager(*procfs, *pids, *fname, *iface, behaviour)
return source.NewPacketSourceManager(*procfs, *pids, *fname, *iface, *istio, opts.FilterAuthorities, behaviour)
}
func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
func startPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem) {
streamsMap := NewTcpStreamMap()
go streamsMap.closeTimedoutTcpStreamChannels()
diagnose.InitializeErrorsMap(*debug, *verbose, *quiet)
diagnose.InitializeTapperInternalStats()
sources, err := initializePacketSources()
sources, err := initializePacketSources(opts)
if err != nil {
logger.Log.Fatal(err)
@@ -169,7 +173,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
}
packets := make(chan source.TcpPacketInfo)
assembler := NewTcpAssembler(outputItems, streamsMap)
assembler := NewTcpAssembler(outputItems, streamsMap, opts)
diagnose.AppStats.SetStartTime(time.Now())

View File

@@ -18,24 +18,6 @@ const (
TcpStreamChannelTimeoutMsDefaultValue = 10000
)
type globalSettings struct {
filterAuthorities []string
}
var gSettings = &globalSettings{
filterAuthorities: []string{},
}
func SetFilterAuthorities(ipAddresses []string) {
gSettings.filterAuthorities = ipAddresses
}
func GetFilterIPs() []string {
addresses := make([]string, len(gSettings.filterAuthorities))
copy(addresses, gSettings.filterAuthorities)
return addresses
}
func GetMaxBufferedPagesTotal() int {
valueFromEnv, err := strconv.Atoi(os.Getenv(MaxBufferedPagesTotalEnvVarName))
if err != nil {

View File

@@ -0,0 +1,112 @@
package source
import (
"fmt"
"io/ioutil"
"os"
"regexp"
"strings"
"github.com/up9inc/mizu/shared/logger"
)
const envoyBinary = "/envoy"
var numberRegex = regexp.MustCompile("[0-9]+")
func discoverRelevantEnvoyPids(procfs string, clusterIps []string) ([]string, error) {
result := make([]string, 0)
pids, err := ioutil.ReadDir(procfs)
if err != nil {
return result, err
}
logger.Log.Infof("Starting envoy auto discoverer %v %v - scanning %v potential pids",
procfs, clusterIps, len(pids))
for _, pid := range pids {
if !pid.IsDir() {
continue
}
if !numberRegex.MatchString(pid.Name()) {
continue
}
if checkPid(procfs, pid.Name(), clusterIps) {
result = append(result, pid.Name())
}
}
logger.Log.Infof("Found %v relevant envoy processes - %v", len(result), result)
return result, nil
}
func checkPid(procfs string, pid string, clusterIps []string) bool {
execLink := fmt.Sprintf("%v/%v/exe", procfs, pid)
exec, err := os.Readlink(execLink)
if err != nil {
// Debug on purpose - it may happen due to many reasons and we only care
// for it during troubleshooting
//
logger.Log.Debugf("Unable to read link %v - %v\n", execLink, err)
return false
}
if !strings.HasSuffix(exec, envoyBinary) {
return false
}
environmentFile := fmt.Sprintf("%v/%v/environ", procfs, pid)
clusterIp, err := readEnvironmentVariable(environmentFile, "INSTANCE_IP")
if err != nil {
return false
}
if clusterIp == "" {
logger.Log.Debugf("Found an envoy process without INSTANCE_IP variable %v\n", pid)
return false
}
logger.Log.Infof("Found envoy pid %v with cluster ip %v", pid, clusterIp)
for _, value := range clusterIps {
if value == clusterIp {
return true
}
}
return false
}
func readEnvironmentVariable(file string, name string) (string, error) {
bytes, err := ioutil.ReadFile(file)
if err != nil {
logger.Log.Warningf("Error reading environment file %v - %v", file, err)
return "", err
}
envs := strings.Split(string(bytes), string([]byte{0}))
for _, env := range envs {
if !strings.Contains(env, "=") {
continue
}
parts := strings.Split(env, "=")
varName := parts[0]
value := parts[1]
if name == varName {
return value, nil
}
}
return "", nil
}

View File

@@ -15,26 +15,63 @@ type PacketSourceManager struct {
}
func NewPacketSourceManager(procfs string, pids string, filename string, interfaceName string,
behaviour TcpPacketSourceBehaviour) (*PacketSourceManager, error) {
istio bool, clusterIps []string, behaviour TcpPacketSourceBehaviour) (*PacketSourceManager, error) {
sources := make([]*tcpPacketSource, 0)
hostSource, err := newHostPacketSource(filename, interfaceName, behaviour)
sources, err := createHostSource(sources, filename, interfaceName, behaviour)
if err != nil {
return nil, err
}
sources = append(sources, hostSource)
if pids != "" {
netnsSources := newNetnsPacketSources(procfs, pids, interfaceName, behaviour)
sources = append(sources, netnsSources...)
}
sources = createSourcesFromPids(sources, procfs, pids, interfaceName, behaviour)
sources = createSourcesFromEnvoy(sources, istio, procfs, clusterIps, interfaceName, behaviour)
return &PacketSourceManager{
sources: sources,
}, nil
}
func createHostSource(sources []*tcpPacketSource, filename string, interfaceName string,
behaviour TcpPacketSourceBehaviour) ([]*tcpPacketSource, error) {
hostSource, err := newHostPacketSource(filename, interfaceName, behaviour)
if err != nil {
return sources, err
}
return append(sources, hostSource), nil
}
func createSourcesFromPids(sources []*tcpPacketSource, procfs string, pids string,
interfaceName string, behaviour TcpPacketSourceBehaviour) []*tcpPacketSource {
if pids == "" {
return sources
}
netnsSources := newNetnsPacketSources(procfs, strings.Split(pids, ","), interfaceName, behaviour)
sources = append(sources, netnsSources...)
return sources
}
func createSourcesFromEnvoy(sources []*tcpPacketSource, istio bool, procfs string, clusterIps []string,
interfaceName string, behaviour TcpPacketSourceBehaviour) []*tcpPacketSource {
if !istio {
return sources
}
envoyPids, err := discoverRelevantEnvoyPids(procfs, clusterIps)
if err != nil {
logger.Log.Warningf("Unable to discover envoy pids - %v", err)
return sources
}
netnsSources := newNetnsPacketSources(procfs, envoyPids, interfaceName, behaviour)
sources = append(sources, netnsSources...)
return sources
}
func newHostPacketSource(filename string, interfaceName string,
behaviour TcpPacketSourceBehaviour) (*tcpPacketSource, error) {
var name string
@@ -54,11 +91,11 @@ func newHostPacketSource(filename string, interfaceName string,
return source, nil
}
func newNetnsPacketSources(procfs string, pids string, interfaceName string,
func newNetnsPacketSources(procfs string, pids []string, interfaceName string,
behaviour TcpPacketSourceBehaviour) []*tcpPacketSource {
result := make([]*tcpPacketSource, 0)
for _, pidstr := range strings.Split(pids, ",") {
for _, pidstr := range pids {
pid, err := strconv.Atoi(pidstr)
if err != nil {
@@ -100,9 +137,9 @@ func newNetnsPacketSource(pid int, nsh netns.NsHandle, interfaceName string,
//
runtime.LockOSThread()
defer runtime.UnlockOSThread()
oldnetns, err := netns.Get()
if err != nil {
logger.Log.Errorf("Unable to get netns of current thread %v", err)
errors <- err

View File

@@ -121,29 +121,27 @@ func (source *tcpPacketSource) readPackets(ipdefrag bool, packets chan<- TcpPack
}
// defrag the IPv4 packet if required
if !ipdefrag {
ip4Layer := packet.Layer(layers.LayerTypeIPv4)
if ip4Layer == nil {
continue
}
ip4 := ip4Layer.(*layers.IPv4)
l := ip4.Length
newip4, err := source.defragger.DefragIPv4(ip4)
if err != nil {
logger.Log.Fatal("Error while de-fragmenting", err)
} else if newip4 == nil {
logger.Log.Debugf("Fragment...")
continue // packet fragment, we don't have whole packet yet.
}
if newip4.Length != l {
diagnose.InternalStats.Ipdefrag++
logger.Log.Debugf("Decoding re-assembled packet: %s", newip4.NextLayerType())
pb, ok := packet.(gopacket.PacketBuilder)
if !ok {
logger.Log.Panic("Not a PacketBuilder")
if ipdefrag {
if ip4Layer := packet.Layer(layers.LayerTypeIPv4); ip4Layer != nil {
ip4 := ip4Layer.(*layers.IPv4)
l := ip4.Length
newip4, err := source.defragger.DefragIPv4(ip4)
if err != nil {
logger.Log.Fatal("Error while de-fragmenting", err)
} else if newip4 == nil {
logger.Log.Debugf("Fragment...")
continue // packet fragment, we don't have whole packet yet.
}
if newip4.Length != l {
diagnose.InternalStats.Ipdefrag++
logger.Log.Debugf("Decoding re-assembled packet: %s", newip4.NextLayerType())
pb, ok := packet.(gopacket.PacketBuilder)
if !ok {
logger.Log.Panic("Not a PacketBuilder")
}
nextDecoder := newip4.NextLayerType()
_ = nextDecoder.Decode(newip4.Payload, pb)
}
nextDecoder := newip4.NextLayerType()
_ = nextDecoder.Decode(newip4.Payload, pb)
}
}

View File

@@ -16,6 +16,8 @@ import (
"github.com/up9inc/mizu/tap/source"
)
const PACKETS_SEEN_LOG_THRESHOLD = 1000
type tcpAssembler struct {
*reassembly.Assembler
streamPool *reassembly.StreamPool
@@ -33,13 +35,13 @@ func (c *context) GetCaptureInfo() gopacket.CaptureInfo {
return c.CaptureInfo
}
func NewTcpAssembler(outputItems chan *api.OutputChannelItem, streamsMap *tcpStreamMap) *tcpAssembler {
func NewTcpAssembler(outputItems chan *api.OutputChannelItem, streamsMap *tcpStreamMap, opts *TapOpts) *tcpAssembler {
var emitter api.Emitter = &api.Emitting{
AppStats: &diagnose.AppStats,
OutputChannel: outputItems,
}
streamFactory := NewTcpStreamFactory(emitter, streamsMap)
streamFactory := NewTcpStreamFactory(emitter, streamsMap, opts)
streamPool := reassembly.NewStreamPool(streamFactory)
assembler := reassembly.NewAssembler(streamPool)
@@ -63,7 +65,11 @@ func (a *tcpAssembler) processPackets(dumpPacket bool, packets <-chan source.Tcp
for packetInfo := range packets {
packetsCount := diagnose.AppStats.IncPacketsCount()
logger.Log.Debugf("PACKET #%d", packetsCount)
if packetsCount % PACKETS_SEEN_LOG_THRESHOLD == 0 {
logger.Log.Debugf("Packets seen: #%d", packetsCount)
}
packet := packetInfo.Packet
data := packet.Data()
diagnose.AppStats.UpdateProcessedBytes(uint64(len(data)))
@@ -85,7 +91,7 @@ func (a *tcpAssembler) processPackets(dumpPacket bool, packets <-chan source.Tcp
CaptureInfo: packet.Metadata().CaptureInfo,
}
diagnose.InternalStats.Totalsz += len(tcp.Payload)
logger.Log.Debugf("%s : %v -> %s : %v", packet.NetworkLayer().NetworkFlow().Src(), tcp.SrcPort, packet.NetworkLayer().NetworkFlow().Dst(), tcp.DstPort)
logger.Log.Debugf("%s:%v -> %s:%v", packet.NetworkLayer().NetworkFlow().Src(), tcp.SrcPort, packet.NetworkLayer().NetworkFlow().Dst(), tcp.DstPort)
a.assemblerMutex.Lock()
a.AssembleWithContext(packet.NetworkLayer().NetworkFlow(), tcp, &c)
a.assemblerMutex.Unlock()

View File

@@ -24,6 +24,7 @@ type tcpStreamFactory struct {
Emitter api.Emitter
streamsMap *tcpStreamMap
ownIps []string
opts *TapOpts
}
type tcpStreamWrapper struct {
@@ -31,7 +32,7 @@ type tcpStreamWrapper struct {
createdAt time.Time
}
func NewTcpStreamFactory(emitter api.Emitter, streamsMap *tcpStreamMap) *tcpStreamFactory {
func NewTcpStreamFactory(emitter api.Emitter, streamsMap *tcpStreamMap, opts *TapOpts) *tcpStreamFactory {
var ownIps []string
if localhostIPs, err := getLocalhostIPs(); err != nil {
@@ -47,6 +48,7 @@ func NewTcpStreamFactory(emitter api.Emitter, streamsMap *tcpStreamMap) *tcpStre
Emitter: emitter,
streamsMap: streamsMap,
ownIps: ownIps,
opts: opts,
}
}
@@ -139,17 +141,17 @@ func (factory *tcpStreamFactory) WaitGoRoutines() {
}
func (factory *tcpStreamFactory) getStreamProps(srcIP string, srcPort string, dstIP string, dstPort string) *streamProps {
if hostMode {
if inArrayString(gSettings.filterAuthorities, fmt.Sprintf("%s:%s", dstIP, dstPort)) {
if factory.opts.HostMode {
if inArrayString(factory.opts.FilterAuthorities, fmt.Sprintf("%s:%s", dstIP, dstPort)) {
logger.Log.Debugf("getStreamProps %s", fmt.Sprintf("+ host1 %s:%s", dstIP, dstPort))
return &streamProps{isTapTarget: true, isOutgoing: false}
} else if inArrayString(gSettings.filterAuthorities, dstIP) {
} else if inArrayString(factory.opts.FilterAuthorities, dstIP) {
logger.Log.Debugf("getStreamProps %s", fmt.Sprintf("+ host2 %s", dstIP))
return &streamProps{isTapTarget: true, isOutgoing: false}
} else if inArrayString(gSettings.filterAuthorities, fmt.Sprintf("%s:%s", srcIP, srcPort)) {
} else if inArrayString(factory.opts.FilterAuthorities, fmt.Sprintf("%s:%s", srcIP, srcPort)) {
logger.Log.Debugf("getStreamProps %s", fmt.Sprintf("+ host3 %s:%s", srcIP, srcPort))
return &streamProps{isTapTarget: true, isOutgoing: true}
} else if inArrayString(gSettings.filterAuthorities, srcIP) {
} else if inArrayString(factory.opts.FilterAuthorities, srcIP) {
logger.Log.Debugf("getStreamProps %s", fmt.Sprintf("+ host4 %s", srcIP))
return &streamProps{isTapTarget: true, isOutgoing: true}
}

View File

@@ -1 +0,0 @@
tester

View File

@@ -1,12 +0,0 @@
This tester used to launch passive-tapper locally without Docker or Kuberenetes environment.
Its good for testing purposes.
# How to run
From the `tap` folder run:
`./tester/launch.sh`
The tester gets the same arguments the passive_tapper gets, run with `--help` to get a complete list of options.
`./tester/launch.sh --help`

View File

@@ -1,10 +0,0 @@
#!/bin/bash
set -e
echo "Building extensions..."
pushd .. && ./devops/build_extensions.sh && popd
go build -o tester tester/tester.go
sudo ./tester/tester "$@"

View File

@@ -1,114 +0,0 @@
package main
import (
"bufio"
"io/ioutil"
"os"
"path"
"plugin"
"sort"
"strings"
"github.com/op/go-logging"
"github.com/go-errors/errors"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap"
tapApi "github.com/up9inc/mizu/tap/api"
)
func loadExtensions() ([]*tapApi.Extension, error) {
extensionsDir := "./extensions"
files, err := ioutil.ReadDir(extensionsDir)
if err != nil {
return nil, errors.Wrap(err, 0)
}
extensions := make([]*tapApi.Extension, 0)
for _, file := range files {
filename := file.Name()
if !strings.HasSuffix(filename, ".so") {
continue
}
logger.Log.Infof("Loading extension: %s", filename)
extension := &tapApi.Extension{
Path: path.Join(extensionsDir, filename),
}
plug, err := plugin.Open(extension.Path)
if err != nil {
return nil, errors.Wrap(err, 0)
}
extension.Plug = plug
symDissector, err := plug.Lookup("Dissector")
if err != nil {
return nil, errors.Wrap(err, 0)
}
dissector, ok := symDissector.(tapApi.Dissector)
if !ok {
return nil, errors.Errorf("Symbol Dissector type error: %v %T", file, symDissector)
}
dissector.Register(extension)
extension.Dissector = dissector
extensions = append(extensions, extension)
}
sort.Slice(extensions, func(i, j int) bool {
return extensions[i].Protocol.Priority < extensions[j].Protocol.Priority
})
for _, extension := range extensions {
logger.Log.Infof("Extension Properties: %+v", extension)
}
return extensions, nil
}
func internalRun() error {
logger.InitLoggerStderrOnly(logging.DEBUG)
opts := tap.TapOpts{
HostMode: false,
}
outputItems := make(chan *tapApi.OutputChannelItem, 1000)
extenssions, err := loadExtensions()
if err != nil {
return err
}
tapOpts := tapApi.TrafficFilteringOptions{}
tap.StartPassiveTapper(&opts, outputItems, extenssions, &tapOpts)
logger.Log.Infof("Tapping, press enter to exit...")
reader := bufio.NewReader(os.Stdin)
reader.ReadLine()
return nil
}
func main() {
err := internalRun()
if err != nil {
switch err := err.(type) {
case *errors.Error:
logger.Log.Errorf("Error: %v", err.ErrorStack())
default:
logger.Log.Errorf("Error: %v", err)
}
os.Exit(1)
}
}

View File

@@ -15,7 +15,7 @@ interface EntryViewLineProps {
}
const EntryViewLine: React.FC<EntryViewLineProps> = ({label, value, updateQuery, selector, overrideQueryValue}) => {
return (label && value && <tr className={styles.dataLine}>
return (label && <tr className={styles.dataLine}>
<td
className={`queryable ${styles.dataKey}`}
onClick={() => {

View File

@@ -84,7 +84,14 @@
padding: 4px
padding-left: 12px
.port
.tcpInfo
font-size: 12px
color: $secondary-font-color
margin: 5px
margin-top: 5px
margin-bottom: 5px
.port
margin-right: 5px
.ip
margin-left: 5px

View File

@@ -171,7 +171,17 @@ export const EntryItem: React.FC<EntryProps> = ({entry, setFocusedEntryId, style
}
<div className={styles.separatorRight}>
<span
className={`queryable ${styles.port}`}
className={`queryable ${styles.tcpInfo} ${styles.ip}`}
title="Source IP"
onClick={() => {
updateQuery(`src.ip == "${entry.sourceIp}"`)
}}
>
{entry.sourceIp}
</span>
<span className={`${styles.tcpInfo}`}>:</span>
<span
className={`queryable ${styles.tcpInfo} ${styles.port}`}
title="Source Port"
onClick={() => {
updateQuery(`src.port == "${entry.sourcePort}"`)
@@ -199,7 +209,17 @@ export const EntryItem: React.FC<EntryProps> = ({entry, setFocusedEntryId, style
/>
}
<span
className={`queryable ${styles.port}`}
className={`queryable ${styles.tcpInfo} ${styles.ip}`}
title="Destination IP"
onClick={() => {
updateQuery(`dst.ip == "${entry.destinationIp}"`)
}}
>
{entry.destinationIp}
</span>
<span className={`${styles.tcpInfo}`}>:</span>
<span
className={`queryable ${styles.tcpInfo} ${styles.port}`}
title="Destination Port"
onClick={() => {
updateQuery(`dst.port == "${entry.destinationPort}"`)

View File

@@ -101,6 +101,7 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
const listEntry = useRef(null);
const openWebSocket = (query) => {
setFocusedEntryId(null);
setEntries([]);
setEntriesBuffer([]);
ws.current = new WebSocket(MizuWebsocketURL);

View File

@@ -17,7 +17,7 @@ interface Props {
const FancyTextDisplay: React.FC<Props> = ({text, className, isPossibleToCopy = true, applyTextEllipsis = true, flipped = false, useTooltip= false, displayIconOnMouseOver = false, buttonOnly = false}) => {
const [showCopiedNotification, setCopied] = useState(false);
const [showTooltip, setShowTooltip] = useState(false);
const displayText = text || '';
text = String(text);
const onCopy = () => {
setCopied(true)
@@ -33,12 +33,12 @@ const FancyTextDisplay: React.FC<Props> = ({text, className, isPossibleToCopy =
return () => clearTimeout(timer);
}, [showCopiedNotification]);
const textElement = <span className={'FancyTextDisplay-Text'}>{displayText}</span>;
const textElement = <span className={'FancyTextDisplay-Text'}>{text}</span>;
const copyButton = isPossibleToCopy && displayText ? <CopyToClipboard text={displayText} onCopy={onCopy}>
const copyButton = isPossibleToCopy && text ? <CopyToClipboard text={text} onCopy={onCopy}>
<span
className={`FancyTextDisplay-Icon`}
title={`Copy "${displayText}" value to clipboard`}
title={`Copy "${text}" value to clipboard`}
>
<img src={duplicateImg} alt="Duplicate full value"/>
{showCopiedNotification && <span className={'FancyTextDisplay-CopyNotifier'}>Copied</span>}
@@ -48,14 +48,14 @@ const FancyTextDisplay: React.FC<Props> = ({text, className, isPossibleToCopy =
return (
<p
className={`FancyTextDisplay-Container ${className ? className : ''} ${displayIconOnMouseOver ? 'displayIconOnMouseOver ' : ''} ${applyTextEllipsis ? ' FancyTextDisplay-ContainerEllipsis' : ''}`}
title={displayText.toString()}
title={text}
onMouseOver={ e => setShowTooltip(true)}
onMouseLeave={ e => setShowTooltip(false)}
>
{!buttonOnly && flipped && textElement}
{copyButton}
{!buttonOnly && !flipped && textElement}
{useTooltip && showTooltip && <span className={'FancyTextDisplay-CopyNotifier'}>{displayText}</span>}
{useTooltip && showTooltip && <span className={'FancyTextDisplay-CopyNotifier'}>{text}</span>}
</p>
);
};

View File

@@ -5,7 +5,6 @@
border: solid 1px $secondary-font-color
margin-left: 4px
padding: 2px 5px
text-transform: uppercase
font-family: "Source Sans Pro", sans-serif
font-size: 11px
font-weight: bold