Compare commits

..

11 Commits

Author SHA1 Message Date
RoyUP9
842d95c836 fixed redact and regex masking tests (#284) 2021-09-19 11:52:11 +03:00
M. Mert Yıldıran
9fa9b67328 Bring back the sensitive data filtering feature (#280)
* Bring back the sensitive data filtering feature

* Add `// global` comment
2021-09-18 20:13:59 +03:00
Selton Fiuza
6337b75f0e [TRA-3659] Fix rules (#271)
* Fix rules

* Not reay, error on running

* Empty dissector Rules()

* almost working

* Finally, fixed

* undo changes on agent/pkg/utils/har.go

* fix not showing service on rules detail

* Update tap/api/api.go

Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>

* Update agent/pkg/controllers/entries_controller.go

Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>

* Update agent/pkg/controllers/entries_controller.go

Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>

* unwrap Data

* Fix bug off using more than one latency rule that always get the first.

* fix json type, decoding base64 before unmarshal

* Run `go mod tidy` on `cli/go.sum`

* Fix the linting issues

* Remove a `FIXME` comment

* Remove a CSS rule

* Adapt `ruleNumberText` CSS class to the design language of the UI

* Fix an issue in the UI related to `rule.Latency` slipping out

* Removed unecessary codes.

Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
Co-authored-by: M. Mert Yildiran <mehmet@up9.com>
2021-09-18 14:02:18 -03:00
Igor Gov
b9d2e671c7 Move all docs to docs folder and clean project root (#278) 2021-09-15 11:53:23 +03:00
RoyUP9
0840642c98 testing guidelines (#276) 2021-09-14 16:57:39 +03:00
RoyUP9
d5b01347df fixed crash when channel is closed (#273) 2021-09-14 11:53:47 +03:00
M. Mert Yıldıran
7dca1ad889 Move stats_tracker.go into the extension API and increment MatchedPairs from inside the Emit method (#272)
* Move `stats_tracker.go` into the extension API and increment `MatchedPairs` from inside the `Emit` method

* Replace multiple `sync.Mutex`(es) with low-level atomic memory primitives
2021-09-13 16:22:16 +03:00
M. Mert Yıldıran
616eccb2cf Make ScrollableFeed virtualized by replacing react-scrollable-feed with react-scrollable-feed-virtualized (#268)
* Make `ScrollableFeed` virtualized by replacing `react-scrollable-feed` with `react-scrollable-feed-virtualized`

* fix get new entries button

* Fix the not populated `Protocol` struct in case of `GetEntries` endpoint is called

Co-authored-by: Liraz Yehezkel <lirazy@up9.com>
2021-09-13 16:18:43 +03:00
M. Mert Yıldıran
30f07479cb Fix the JSON style to camelCase and rename CONTRIBUTE.md to CONTRIBUTING.md (#274)
* Fix the JSON style to camelCase and rename `CONTRIBUTE.md` to `CONTRIBUTING.md`

* Move `CONTRIBUTING.md` to `.github/` directory
2021-09-13 10:55:46 +03:00
M. Mert Yıldıran
7f880417e9 Add CODE_OF_CONDUCT.md (#275) 2021-09-13 10:28:28 +03:00
RoyUP9
6b52458642 removed exit if browser not supported (#270) 2021-09-12 16:09:04 +03:00
50 changed files with 907 additions and 23249 deletions

View File

@@ -39,7 +39,7 @@ RUN go build -ldflags="-s -w \
-X 'mizuserver/pkg/version.BuildTimestamp=${BUILD_TIMESTAMP}' \
-X 'mizuserver/pkg/version.SemVer=${SEM_VER}'" -o mizuagent .
COPY build_extensions.sh ..
COPY devops/build_extensions.sh ..
RUN cd .. && /bin/bash build_extensions.sh
FROM alpine:3.13.5

View File

@@ -44,11 +44,11 @@ push: push-docker push-cli ## Build and publish agent docker image & CLI.
push-docker: ## Build and publish agent docker image.
@echo "publishing Docker image .. "
./build-push-featurebranch.sh
devops/build-push-featurebranch.sh
build-docker-ci: ## Build agent docker image for CI.
@echo "building docker image for ci"
./build-agent-ci.sh
devops/build-agent-ci.sh
push-cli: ## Build and publish CLI.
@echo "publishing CLI .. "
@@ -73,7 +73,7 @@ clean-docker:
@(echo "DOCKER cleanup - NOT IMPLEMENTED YET " )
extensions:
./build_extensions.sh
devops/build_extensions.sh
test-cli:
@echo "running cli tests"; cd cli && $(MAKE) test

View File

@@ -46,7 +46,7 @@ While `mizu`most often works out of the box, you can influence its behavior:
1. [OPTIONAL] Set `KUBECONFIG` environment variable to your Kubernetes configuration. If this is not set, Mizu assumes that configuration is at `${HOME}/.kube/config`
2. `mizu` assumes user running the command has permissions to create resources (such as pods, services, namespaces) on your Kubernetes cluster (no worries - `mizu` resources are cleaned up upon termination)
For detailed list of k8s permissions see [PERMISSIONS](PERMISSIONS.md) document
For detailed list of k8s permissions see [PERMISSIONS](docs/PERMISSIONS.md) document
## How to Run

View File

@@ -1,15 +0,0 @@
![Mizu: The API Traffic Viewer for Kubernetes](assets/mizu-logo.svg)
# TESTING
Testing guidelines for Mizu project
## Unit-tests
* TBD
* TBD
* TBD
## System tests
* TBD
* TBD
* TBD

View File

@@ -502,10 +502,19 @@ func TestTapRedact(t *testing.T) {
return fmt.Errorf("failed to get entry, err: %v", requestErr)
}
entry := requestResult.(map[string]interface{})["entry"].(map[string]interface{})
entryRequest := entry["request"].(map[string]interface{})
data := requestResult.(map[string]interface{})["data"].(map[string]interface{})
entryJson := data["entry"].(string)
headers := entryRequest["headers"].([]interface{})
var entry map[string]interface{}
if parseErr := json.Unmarshal([]byte(entryJson), &entry); parseErr != nil {
return fmt.Errorf("failed to parse entry, err: %v", parseErr)
}
entryRequest := entry["request"].(map[string]interface{})
entryPayload := entryRequest["payload"].(map[string]interface{})
entryDetails := entryPayload["details"].(map[string]interface{})
headers := entryDetails["headers"].([]interface{})
for _, headerInterface := range headers {
header := headerInterface.(map[string]interface{})
if header["name"].(string) != "User-Agent" {
@@ -518,8 +527,8 @@ func TestTapRedact(t *testing.T) {
}
}
data := entryRequest["postData"].(map[string]interface{})
textDataStr := data["text"].(string)
postData := entryDetails["postData"].(map[string]interface{})
textDataStr := postData["text"].(string)
var textData map[string]string
if parseErr := json.Unmarshal([]byte(textDataStr), &textData); parseErr != nil {
@@ -608,10 +617,19 @@ func TestTapNoRedact(t *testing.T) {
return fmt.Errorf("failed to get entry, err: %v", requestErr)
}
entry := requestResult.(map[string]interface{})["entry"].(map[string]interface{})
entryRequest := entry["request"].(map[string]interface{})
data := requestResult.(map[string]interface{})["data"].(map[string]interface{})
entryJson := data["entry"].(string)
headers := entryRequest["headers"].([]interface{})
var entry map[string]interface{}
if parseErr := json.Unmarshal([]byte(entryJson), &entry); parseErr != nil {
return fmt.Errorf("failed to parse entry, err: %v", parseErr)
}
entryRequest := entry["request"].(map[string]interface{})
entryPayload := entryRequest["payload"].(map[string]interface{})
entryDetails := entryPayload["details"].(map[string]interface{})
headers := entryDetails["headers"].([]interface{})
for _, headerInterface := range headers {
header := headerInterface.(map[string]interface{})
if header["name"].(string) != "User-Agent" {
@@ -624,8 +642,8 @@ func TestTapNoRedact(t *testing.T) {
}
}
data := entryRequest["postData"].(map[string]interface{})
textDataStr := data["text"].(string)
postData := entryDetails["postData"].(map[string]interface{})
textDataStr := postData["text"].(string)
var textData map[string]string
if parseErr := json.Unmarshal([]byte(textDataStr), &textData); parseErr != nil {
@@ -714,11 +732,20 @@ func TestTapRegexMasking(t *testing.T) {
return fmt.Errorf("failed to get entry, err: %v", requestErr)
}
entry := requestResult.(map[string]interface{})["entry"].(map[string]interface{})
entryRequest := entry["request"].(map[string]interface{})
data := requestResult.(map[string]interface{})["data"].(map[string]interface{})
entryJson := data["entry"].(string)
data := entryRequest["postData"].(map[string]interface{})
textData := data["text"].(string)
var entry map[string]interface{}
if parseErr := json.Unmarshal([]byte(entryJson), &entry); parseErr != nil {
return fmt.Errorf("failed to parse entry, err: %v", parseErr)
}
entryRequest := entry["request"].(map[string]interface{})
entryPayload := entryRequest["payload"].(map[string]interface{})
entryDetails := entryPayload["details"].(map[string]interface{})
postData := entryDetails["postData"].(map[string]interface{})
textData := postData["text"].(string)
if textData != "[REDACTED]" {
return fmt.Errorf("unexpected result - body is not redacted")

View File

@@ -50,14 +50,16 @@ func main() {
panic("One of the flags --tap, --api or --standalone or --hars-read must be provided")
}
filteringOptions := getTrafficFilteringOptions()
if *standaloneMode {
api.StartResolving(*namespace)
outputItemsChannel := make(chan *tapApi.OutputChannelItem)
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
tap.StartPassiveTapper(tapOpts, outputItemsChannel, extensions)
tap.StartPassiveTapper(tapOpts, outputItemsChannel, extensions, filteringOptions)
go filterItems(outputItemsChannel, filteredOutputItemsChannel, getTrafficFilteringOptions())
go filterItems(outputItemsChannel, filteredOutputItemsChannel, filteringOptions)
go api.StartReadingEntries(filteredOutputItemsChannel, nil, extensionsMap)
// go api.StartReadingOutbound(outboundLinkOutputChannel)
@@ -73,9 +75,8 @@ func main() {
rlog.Infof("Filtering for the following authorities: %v", tap.GetFilterIPs())
}
// harOutputChannel, outboundLinkOutputChannel := tap.StartPassiveTapper(tapOpts)
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, extensions)
tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, extensions, filteringOptions)
socketConnection, err := shared.ConnectToSocketServer(*apiServerAddress, shared.DEFAULT_SOCKET_RETRIES, shared.DEFAULT_SOCKET_RETRY_SLEEP_TIME, false)
if err != nil {
panic(fmt.Sprintf("Error connecting to socket server at %s %v", *apiServerAddress, err))
@@ -89,7 +90,7 @@ func main() {
outputItemsChannel := make(chan *tapApi.OutputChannelItem)
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
go filterItems(outputItemsChannel, filteredOutputItemsChannel, getTrafficFilteringOptions())
go filterItems(outputItemsChannel, filteredOutputItemsChannel, filteringOptions)
go api.StartReadingEntries(filteredOutputItemsChannel, nil, extensionsMap)
hostApi(outputItemsChannel)
@@ -97,7 +98,7 @@ func main() {
outputItemsChannel := make(chan *tapApi.OutputChannelItem, 1000)
filteredHarChannel := make(chan *tapApi.OutputChannelItem)
go filterItems(outputItemsChannel, filteredHarChannel, getTrafficFilteringOptions())
go filterItems(outputItemsChannel, filteredHarChannel, filteringOptions)
go api.StartReadingEntries(filteredHarChannel, harsDir, extensionsMap)
hostApi(nil)
}
@@ -225,23 +226,23 @@ func getTapTargets() []string {
return tappedAddressesPerNodeDict[nodeName]
}
func getTrafficFilteringOptions() *shared.TrafficFilteringOptions {
func getTrafficFilteringOptions() *tapApi.TrafficFilteringOptions {
filteringOptionsJson := os.Getenv(shared.MizuFilteringOptionsEnvVar)
if filteringOptionsJson == "" {
return &shared.TrafficFilteringOptions{
return &tapApi.TrafficFilteringOptions{
HealthChecksUserAgentHeaders: []string{},
}
}
var filteringOptions shared.TrafficFilteringOptions
var filteringOptions tapApi.TrafficFilteringOptions
err := json.Unmarshal([]byte(filteringOptionsJson), &filteringOptions)
if err != nil {
panic(fmt.Sprintf("env var %s's value of %s is invalid! json must match the shared.TrafficFilteringOptions struct %v", shared.MizuFilteringOptionsEnvVar, filteringOptionsJson, err))
panic(fmt.Sprintf("env var %s's value of %s is invalid! json must match the api.TrafficFilteringOptions struct %v", shared.MizuFilteringOptionsEnvVar, filteringOptionsJson, err))
}
return &filteringOptions
}
func filterItems(inChannel <-chan *tapApi.OutputChannelItem, outChannel chan *tapApi.OutputChannelItem, filterOptions *shared.TrafficFilteringOptions) {
func filterItems(inChannel <-chan *tapApi.OutputChannelItem, outChannel chan *tapApi.OutputChannelItem, filterOptions *tapApi.TrafficFilteringOptions) {
for message := range inChannel {
if message.ConnectionInfo.IsOutgoing && api.CheckIsServiceIP(message.ConnectionInfo.ServerIP) {
continue
@@ -251,10 +252,6 @@ func filterItems(inChannel <-chan *tapApi.OutputChannelItem, outChannel chan *ta
continue
}
// if !filterOptions.DisableRedaction {
// sensitiveDataFiltering.FilterSensitiveInfoFromHarRequest(message, filterOptions)
// }
outChannel <- message
}
}

View File

@@ -116,6 +116,15 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
baseEntry := extension.Dissector.Summarize(mizuEntry)
mizuEntry.EstimatedSizeBytes = getEstimatedEntrySizeBytes(mizuEntry)
database.CreateEntry(mizuEntry)
if extension.Protocol.Name == "http" {
var pair tapApi.RequestResponsePair
json.Unmarshal([]byte(mizuEntry.Entry), &pair)
harEntry, _ := utils.NewEntry(&pair)
rules, _ := models.RunValidationRulesState(*harEntry, mizuEntry.Service)
baseEntry.Rules = rules
baseEntry.Latency = mizuEntry.ElapsedTime
}
baseEntryBytes, _ := models.CreateBaseEntryWebSocketMessage(baseEntry)
BroadcastToBrowserClients(baseEntryBytes)
}

View File

@@ -3,7 +3,6 @@ package controllers
import (
"encoding/json"
"fmt"
"github.com/google/martian/har"
"mizuserver/pkg/database"
"mizuserver/pkg/models"
"mizuserver/pkg/providers"
@@ -13,6 +12,8 @@ import (
"net/http"
"time"
"github.com/google/martian/har"
"github.com/gin-gonic/gin"
"github.com/romana/rlog"
@@ -140,11 +141,23 @@ func GetEntry(c *gin.Context) {
extension := extensionsMap[entryData.ProtocolName]
protocol, representation, bodySize, _ := extension.Dissector.Represent(&entryData)
var rules []map[string]interface{}
if entryData.ProtocolName == "http" {
var pair tapApi.RequestResponsePair
json.Unmarshal([]byte(entryData.Entry), &pair)
harEntry, _ := utils.NewEntry(&pair)
_, rulesMatched := models.RunValidationRulesState(*harEntry, entryData.Service)
inrec, _ := json.Marshal(rulesMatched)
json.Unmarshal(inrec, &rules)
}
c.JSON(http.StatusOK, tapApi.MizuEntryWrapper{
Protocol: protocol,
Representation: string(representation),
BodySize: bodySize,
Data: entryData,
Rules: rules,
})
}

View File

@@ -2,9 +2,10 @@ package models
import (
"encoding/json"
tapApi "github.com/up9inc/mizu/tap/api"
"mizuserver/pkg/rules"
"mizuserver/pkg/utils"
"github.com/google/martian/har"
"github.com/up9inc/mizu/shared"
@@ -15,15 +16,6 @@ func GetEntry(r *tapApi.MizuEntry, v tapApi.DataUnmarshaler) error {
return v.UnmarshalData(r)
}
// TODO: until we fixed the Rules feature
//func NewApplicableRules(status bool, latency int64, number int) tapApi.ApplicableRules {
// ar := tapApi.ApplicableRules{}
// ar.Status = status
// ar.Latency = latency
// ar.NumberOfRules = number
// return ar
//}
type EntriesFilter struct {
Limit int `form:"limit" validate:"required,min=1,max=200"`
Operator string `form:"operator" validate:"required,oneof='lt' 'gt'"`
@@ -105,33 +97,8 @@ type ExtendedCreator struct {
Source *string `json:"_source"`
}
type FullEntryWithPolicy struct {
RulesMatched []rules.RulesMatched `json:"rulesMatched,omitempty"`
Entry har.Entry `json:"entry"`
Service string `json:"service"`
func RunValidationRulesState(harEntry har.Entry, service string) (tapApi.ApplicableRules, []rules.RulesMatched) {
resultPolicyToSend := rules.MatchRequestPolicy(harEntry, service)
statusPolicyToSend, latency, numberOfRules := rules.PassedValidationRules(resultPolicyToSend)
return tapApi.ApplicableRules{Status: statusPolicyToSend, Latency: latency, NumberOfRules: numberOfRules}, resultPolicyToSend
}
func (fewp *FullEntryWithPolicy) UnmarshalData(entry *tapApi.MizuEntry) error {
var pair tapApi.RequestResponsePair
if err := json.Unmarshal([]byte(entry.Entry), &pair); err != nil {
return err
}
harEntry, err := utils.NewEntry(&pair)
if err != nil {
return err
}
fewp.Entry = *harEntry
_, resultPolicyToSend := rules.MatchRequestPolicy(fewp.Entry, entry.Service)
fewp.RulesMatched = resultPolicyToSend
fewp.Service = entry.Service
return nil
}
// TODO: until we fixed the Rules feature
//func RunValidationRulesState(harEntry har.Entry, service string) tapApi.ApplicableRules {
// numberOfRules, resultPolicyToSend := rules.MatchRequestPolicy(harEntry, service)
// statusPolicyToSend, latency, numberOfRules := rules.PassedValidationRules(resultPolicyToSend, numberOfRules)
// ar := NewApplicableRules(statusPolicyToSend, latency, numberOfRules)
// return ar
//}

View File

@@ -1,6 +1,7 @@
package rules
import (
"encoding/base64"
"encoding/json"
"fmt"
"reflect"
@@ -41,7 +42,7 @@ func ValidateService(serviceFromRule string, service string) bool {
return true
}
func MatchRequestPolicy(harEntry har.Entry, service string) (int, []RulesMatched) {
func MatchRequestPolicy(harEntry har.Entry, service string) []RulesMatched {
enforcePolicy, _ := shared.DecodeEnforcePolicy(fmt.Sprintf("%s/%s", shared.RulePolicyPath, shared.RulePolicyFileName))
var resultPolicyToSend []RulesMatched
for _, rule := range enforcePolicy.Rules {
@@ -50,7 +51,8 @@ func MatchRequestPolicy(harEntry har.Entry, service string) (int, []RulesMatched
}
if rule.Type == "json" {
var bodyJsonMap interface{}
if err := json.Unmarshal(harEntry.Response.Content.Text, &bodyJsonMap); err != nil {
contentTextDecoded, _ := base64.StdEncoding.DecodeString(string(harEntry.Response.Content.Text))
if err := json.Unmarshal(contentTextDecoded, &bodyJsonMap); err != nil {
continue
}
out, err := jsonpath.Read(bodyJsonMap, rule.Key)
@@ -63,6 +65,7 @@ func MatchRequestPolicy(harEntry har.Entry, service string) (int, []RulesMatched
if err != nil {
continue
}
fmt.Println(matchValue, rule.Value)
} else {
val := fmt.Sprint(out)
matchValue, err = regexp.MatchString(rule.Value, val)
@@ -89,22 +92,28 @@ func MatchRequestPolicy(harEntry har.Entry, service string) (int, []RulesMatched
resultPolicyToSend = appendRulesMatched(resultPolicyToSend, true, rule)
}
}
return len(enforcePolicy.Rules), resultPolicyToSend
return resultPolicyToSend
}
func PassedValidationRules(rulesMatched []RulesMatched, numberOfRules int) (bool, int64, int) {
if len(rulesMatched) == 0 {
return false, 0, 0
func PassedValidationRules(rulesMatched []RulesMatched) (bool, int64, int) {
var numberOfRulesMatched = len(rulesMatched)
var latency int64 = -1
if numberOfRulesMatched == 0 {
return false, 0, numberOfRulesMatched
}
for _, rule := range rulesMatched {
if rule.Matched == false {
return false, -1, len(rulesMatched)
return false, latency, numberOfRulesMatched
} else {
if strings.ToLower(rule.Rule.Type) == "latency" {
if rule.Rule.Latency < latency || latency == -1 {
latency = rule.Rule.Latency
}
}
}
}
for _, rule := range rulesMatched {
if strings.ToLower(rule.Rule.Type) == "latency" {
return true, rule.Rule.Latency, len(rulesMatched)
}
}
return true, -1, len(rulesMatched)
return true, latency, numberOfRulesMatched
}

View File

@@ -3,7 +3,6 @@ package cmd
import (
"context"
"fmt"
"log"
"os"
"os/exec"
"os/signal"
@@ -63,8 +62,8 @@ func openBrowser(url string) {
default:
err = fmt.Errorf("unsupported platform")
}
if err != nil {
log.Fatal(err)
}
if err != nil {
logger.Log.Errorf("error while opening browser, %v", err)
}
}

View File

@@ -21,6 +21,7 @@ import (
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/debounce"
"github.com/up9inc/mizu/tap/api"
yaml "gopkg.in/yaml.v3"
core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
@@ -123,7 +124,7 @@ func readValidationRules(file string) (string, error) {
return string(newContent), nil
}
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, mizuApiFilteringOptions *shared.TrafficFilteringOptions, mizuValidationRules string) error {
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, mizuApiFilteringOptions *api.TrafficFilteringOptions, mizuValidationRules string) error {
if !config.Config.IsNsRestrictedMode() {
if err := createMizuNamespace(ctx, kubernetesProvider); err != nil {
return err
@@ -158,7 +159,7 @@ func createMizuNamespace(ctx context.Context, kubernetesProvider *kubernetes.Pro
return err
}
func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Provider, mizuApiFilteringOptions *shared.TrafficFilteringOptions) error {
func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Provider, mizuApiFilteringOptions *api.TrafficFilteringOptions) error {
var err error
state.mizuServiceAccountExists, err = createRBACIfNecessary(ctx, kubernetesProvider)
@@ -199,13 +200,13 @@ func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Pro
return nil
}
func getMizuApiFilteringOptions() (*shared.TrafficFilteringOptions, error) {
var compiledRegexSlice []*shared.SerializableRegexp
func getMizuApiFilteringOptions() (*api.TrafficFilteringOptions, error) {
var compiledRegexSlice []*api.SerializableRegexp
if config.Config.Tap.PlainTextFilterRegexes != nil && len(config.Config.Tap.PlainTextFilterRegexes) > 0 {
compiledRegexSlice = make([]*shared.SerializableRegexp, 0)
compiledRegexSlice = make([]*api.SerializableRegexp, 0)
for _, regexStr := range config.Config.Tap.PlainTextFilterRegexes {
compiledRegex, err := shared.CompileRegexToSerializableRegexp(regexStr)
compiledRegex, err := api.CompileRegexToSerializableRegexp(regexStr)
if err != nil {
return nil, err
}
@@ -213,7 +214,7 @@ func getMizuApiFilteringOptions() (*shared.TrafficFilteringOptions, error) {
}
}
return &shared.TrafficFilteringOptions{
return &api.TrafficFilteringOptions{
PlainTextMaskingRegexes: compiledRegexSlice,
HealthChecksUserAgentHeaders: config.Config.Tap.HealthChecksUserAgentHeaders,
DisableRedaction: config.Config.Tap.DisableRedaction,
@@ -379,13 +380,28 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
for {
select {
case pod := <-added:
case pod, ok := <-added:
if !ok {
added = nil
continue
}
logger.Log.Debugf("Added matching pod %s, ns: %s", pod.Name, pod.Namespace)
restartTappersDebouncer.SetOn()
case pod := <-removed:
case pod, ok := <-removed:
if !ok {
removed = nil
continue
}
logger.Log.Debugf("Removed matching pod %s, ns: %s", pod.Name, pod.Namespace)
restartTappersDebouncer.SetOn()
case pod := <-modified:
case pod, ok := <-modified:
if !ok {
modified = nil
continue
}
logger.Log.Debugf("Modified matching pod %s, ns: %s, phase: %s, ip: %s", pod.Name, pod.Namespace, pod.Status.Phase, pod.Status.PodIP)
// Act only if the modified pod has already obtained an IP address.
// After filtering for IPs, on a normal pod restart this includes the following events:
@@ -396,8 +412,12 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
if pod.Status.PodIP != "" {
restartTappersDebouncer.SetOn()
}
case err, ok := <-errorChan:
if !ok {
errorChan = nil
continue
}
case err := <-errorChan:
logger.Log.Debugf("Watching pods loop, got error %v, stopping `restart tappers debouncer`", err)
restartTappersDebouncer.Cancel()
// TODO: Does this also perform cleanup?
@@ -477,21 +497,28 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
timeAfter := time.After(25 * time.Second)
for {
select {
case <-ctx.Done():
logger.Log.Debugf("Watching API Server pod loop, ctx done")
return
case <-added:
case _, ok := <-added:
if !ok {
added = nil
continue
}
logger.Log.Debugf("Watching API Server pod loop, added")
continue
case <-removed:
case _, ok := <-removed:
if !ok {
removed = nil
continue
}
logger.Log.Infof("%s removed", mizu.ApiServerPodName)
cancel()
return
case modifiedPod := <-modified:
if modifiedPod == nil {
logger.Log.Debugf("Watching API Server pod loop, modifiedPod with nil")
case modifiedPod, ok := <-modified:
if !ok {
modified = nil
continue
}
logger.Log.Debugf("Watching API Server pod loop, modified: %v", modifiedPod.Status.Phase)
if modifiedPod.Status.Phase == core.PodRunning && !isPodReady {
isPodReady = true
@@ -510,14 +537,23 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
logger.Log.Debugf("[Error] failed update tapped pods %v", err)
}
}
case _, ok := <-errorChan:
if !ok {
errorChan = nil
continue
}
logger.Log.Debugf("[ERROR] Agent creation, watching %v namespace", config.Config.MizuResourcesNamespace)
cancel()
case <-timeAfter:
if !isPodReady {
logger.Log.Errorf(uiUtils.Error, "Mizu API server was not ready in time")
cancel()
}
case <-errorChan:
logger.Log.Debugf("[ERROR] Agent creation, watching %v namespace", config.Config.MizuResourcesNamespace)
cancel()
case <-ctx.Done():
logger.Log.Debugf("Watching API Server pod loop, ctx done")
return
}
}
}

View File

@@ -11,6 +11,7 @@ require (
github.com/spf13/cobra v1.1.3
github.com/spf13/pflag v1.0.5
github.com/up9inc/mizu/shared v0.0.0
github.com/up9inc/mizu/tap/api v0.0.0
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
k8s.io/api v0.21.2
k8s.io/apimachinery v0.21.2
@@ -19,3 +20,5 @@ require (
)
replace github.com/up9inc/mizu/shared v0.0.0 => ../shared
replace github.com/up9inc/mizu/tap/api v0.0.0 => ../tap/api

View File

@@ -18,6 +18,7 @@ import (
"github.com/up9inc/mizu/cli/mizu"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/tap/api"
core "k8s.io/api/core/v1"
rbac "k8s.io/api/rbac/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
@@ -150,7 +151,7 @@ type ApiServerOptions struct {
PodImage string
ServiceAccountName string
IsNamespaceRestricted bool
MizuApiFilteringOptions *shared.TrafficFilteringOptions
MizuApiFilteringOptions *api.TrafficFilteringOptions
MaxEntriesDBSizeBytes int64
Resources configStructs.Resources
ImagePullPolicy core.PullPolicy

76
docs/CODE_OF_CONDUCT.md Normal file
View File

@@ -0,0 +1,76 @@
# Contributor Covenant Code of Conduct
## Our Pledge
In the interest of fostering an open and welcoming environment, we as
contributors and maintainers pledge to making participation in our project and
our community a harassment-free experience for everyone, regardless of age, body
size, disability, ethnicity, sex characteristics, gender identity and expression,
level of experience, education, socio-economic status, nationality, personal
appearance, race, religion, or sexual identity and orientation.
## Our Standards
Examples of behavior that contributes to creating a positive environment
include:
* Using welcoming and inclusive language
* Being respectful of differing viewpoints and experiences
* Gracefully accepting constructive criticism
* Focusing on what is best for the community
* Showing empathy towards other community members
Examples of unacceptable behavior by participants include:
* The use of sexualized language or imagery and unwelcome sexual attention or
advances
* Trolling, insulting/derogatory comments, and personal or political attacks
* Public or private harassment
* Publishing others' private information, such as a physical or electronic
address, without explicit permission
* Other conduct which could reasonably be considered inappropriate in a
professional setting
## Our Responsibilities
Project maintainers are responsible for clarifying the standards of acceptable
behavior and are expected to take appropriate and fair corrective action in
response to any instances of unacceptable behavior.
Project maintainers have the right and responsibility to remove, edit, or
reject comments, commits, code, wiki edits, issues, and other contributions
that are not aligned to this Code of Conduct, or to ban temporarily or
permanently any contributor for other behaviors that they deem inappropriate,
threatening, offensive, or harmful.
## Scope
This Code of Conduct applies both within project spaces and in public spaces
when an individual is representing the project or its community. Examples of
representing a project or community include using an official project e-mail
address, posting via an official social media account, or acting as an appointed
representative at an online or offline event. Representation of a project may be
further defined and clarified by project maintainers.
## Enforcement
Instances of abusive, harassing, or otherwise unacceptable behavior may be
reported by contacting the project team at mizu@up9.com. All
complaints will be reviewed and investigated and will result in a response that
is deemed necessary and appropriate to the circumstances. The project team is
obligated to maintain confidentiality with regard to the reporter of an incident.
Further details of specific enforcement policies may be posted separately.
Project maintainers who do not follow or enforce the Code of Conduct in good
faith may face temporary or permanent repercussions as determined by other
members of the project's leadership.
## Attribution
This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4,
available at https://www.contributor-covenant.org/version/1/4/code-of-conduct.html
[homepage]: https://www.contributor-covenant.org
For answers to common questions about this code of conduct, see
https://www.contributor-covenant.org/faq

View File

@@ -1,18 +1,20 @@
![Mizu: The API Traffic Viewer for Kubernetes](assets/mizu-logo.svg)
# CONTRIBUTE
![Mizu: The API Traffic Viewer for Kubernetes](../assets/mizu-logo.svg)
# Contributing to Mizu
We welcome code contributions from the community.
Please read and follow the guidelines below.
## Communication
* Before starting work on a major feature, please reach out to us via [GitHub](https://github.com/up9inc/mizu), [Slack](https://join.slack.com/share/zt-u6bbs3pg-X1zhQOXOH0yEoqILgH~csw), [email](mailto:mizu@up9.com), etc. We will make sure no one else is already working on it. A _major feature_ is defined as any change that is > 100 LOC altered (not including tests), or changes any user-facing behavior
* Small patches and bug fixes don't need prior communication.
## Contribution requirements
## Contribution Requirements
* Code style - most of the code is written in Go, please follow [these guidelines](https://golang.org/doc/effective_go)
* Go-tools compatible (`go get`, `go test`, etc)
* Unit-test coverage cant go down ..
* Go-tools compatible (`go get`, `go test`, etc.)
* Code coverage for unit tests must not decrease.
* Code must be usefully commented. Not only for developers on the project, but also for external users of these packages
* When reviewing PRs, you are encouraged to use Golang's [code review comments page](https://github.com/golang/go/wiki/CodeReviewComments)
* Project follows [Google JSON Style Guide](https://google.github.io/styleguide/jsoncstyleguide.xml) for the REST APIs that are provided.

View File

@@ -1,4 +1,4 @@
![Mizu: The API Traffic Viewer for Kubernetes](assets/mizu-logo.svg)
![Mizu: The API Traffic Viewer for Kubernetes](../assets/mizu-logo.svg)
# Kubernetes permissions for MIZU
This document describes in details all permissions required for full and correct operation of Mizu

33
docs/TESTING.md Normal file
View File

@@ -0,0 +1,33 @@
![Mizu: The API Traffic Viewer for Kubernetes](../assets/mizu-logo.svg)
# Testing guidelines
## Generic guidelines
* Use "[testing](https://pkg.go.dev/testing)" package
* Write [Table-driven tests using subtests](https://go.dev/blog/subtests)
* Use cleanup in test/subtest in order to clean up resources
* Name the test func "Test<tested_func_name><tested_case>"
## Unit tests
* Position the test file inside the folder of the tested package
* In case of internal func testing
* Name the test file "<tested_file_name>_internal_test.go"
* Name the test package same as the package being tested
* Example - [Config](../cli/config/config_internal_test.go)
* In case of exported func testing
* Name the test file "<tested_file_name>_test.go"
* Name the test package "<tested_package>_test"
* Example - [Slice Utils](../cli/mizu/sliceUtils_test.go)
* Make sure to run test coverage to make sure you covered all the cases and lines in the func
## Acceptance tests
* Position the test file inside the [acceptance tests folder](../acceptanceTests)
* Name the file "<tested_command>_test.go"
* Name the package "acceptanceTests"
* Do not run as part of the short tests
* Use/Create generic test utils func in acceptanceTests/testsUtils
* Don't use sleep inside the tests - active check
* Running acceptance tests locally
* Switch to the branch that is being tested
* Run acceptanceTests/setup.sh
* Run tests (make acceptance-test)
* Example - [Tap](../acceptanceTests/tap_test.go)

View File

@@ -74,12 +74,6 @@ func CreateWebSocketMessageTypeAnalyzeStatus(analyzeStatus AnalyzeStatus) WebSoc
}
}
type TrafficFilteringOptions struct {
HealthChecksUserAgentHeaders []string
PlainTextMaskingRegexes []*SerializableRegexp
DisableRedaction bool
}
type VersionResponse struct {
SemVer string `json:"semver"`
}
@@ -99,6 +93,11 @@ type RulePolicy struct {
Name string `yaml:"name"`
}
type RulesMatched struct {
Matched bool `json:"matched"`
Rule RulePolicy `json:"rule"`
}
func (r *RulePolicy) validateType() bool {
permitedTypes := []string{"json", "header", "latency"}
_, found := Find(permitedTypes, r.Type)

View File

@@ -9,13 +9,13 @@ import (
type Protocol struct {
Name string `json:"name"`
LongName string `json:"long_name"`
LongName string `json:"longName"`
Abbreviation string `json:"abbreviation"`
Version string `json:"version"`
BackgroundColor string `json:"background_color"`
ForegroundColor string `json:"foreground_color"`
FontSize int8 `json:"font_size"`
ReferenceLink string `json:"reference_link"`
BackgroundColor string `json:"backgroundColor"`
ForegroundColor string `json:"foregroundColor"`
FontSize int8 `json:"fontSize"`
ReferenceLink string `json:"referenceLink"`
Ports []string `json:"ports"`
Priority uint8 `json:"priority"`
}
@@ -50,8 +50,8 @@ type CounterPair struct {
}
type GenericMessage struct {
IsRequest bool `json:"is_request"`
CaptureTime time.Time `json:"capture_time"`
IsRequest bool `json:"isRequest"`
CaptureTime time.Time `json:"captureTime"`
Payload interface{} `json:"payload"`
}
@@ -80,13 +80,14 @@ type SuperIdentifier struct {
type Dissector interface {
Register(*Extension)
Ping()
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter) error
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions) error
Analyze(item *OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *MizuEntry
Summarize(entry *MizuEntry) *BaseEntryDetails
Represent(entry *MizuEntry) (protocol Protocol, object []byte, bodySize int64, err error)
}
type Emitting struct {
AppStats *AppStats
OutputChannel chan *OutputChannelItem
}
@@ -96,56 +97,64 @@ type Emitter interface {
func (e *Emitting) Emit(item *OutputChannelItem) {
e.OutputChannel <- item
e.AppStats.IncMatchedPairs()
}
type MizuEntry struct {
ID uint `gorm:"primarykey"`
CreatedAt time.Time
UpdatedAt time.Time
ProtocolName string `json:"protocol_key" gorm:"column:protocolKey"`
ProtocolVersion string `json:"protocol_version" gorm:"column:protocolVersion"`
Entry string `json:"entry,omitempty" gorm:"column:entry"`
EntryId string `json:"entryId" gorm:"column:entryId"`
Url string `json:"url" gorm:"column:url"`
Method string `json:"method" gorm:"column:method"`
Status int `json:"status" gorm:"column:status"`
RequestSenderIp string `json:"requestSenderIp" gorm:"column:requestSenderIp"`
Service string `json:"service" gorm:"column:service"`
Timestamp int64 `json:"timestamp" gorm:"column:timestamp"`
ElapsedTime int64 `json:"elapsedTime" gorm:"column:elapsedTime"`
Path string `json:"path" gorm:"column:path"`
ResolvedSource string `json:"resolvedSource,omitempty" gorm:"column:resolvedSource"`
ResolvedDestination string `json:"resolvedDestination,omitempty" gorm:"column:resolvedDestination"`
SourceIp string `json:"sourceIp,omitempty" gorm:"column:sourceIp"`
DestinationIp string `json:"destinationIp,omitempty" gorm:"column:destinationIp"`
SourcePort string `json:"sourcePort,omitempty" gorm:"column:sourcePort"`
DestinationPort string `json:"destinationPort,omitempty" gorm:"column:destinationPort"`
IsOutgoing bool `json:"isOutgoing,omitempty" gorm:"column:isOutgoing"`
EstimatedSizeBytes int `json:"-" gorm:"column:estimatedSizeBytes"`
ID uint `gorm:"primarykey"`
CreatedAt time.Time
UpdatedAt time.Time
ProtocolName string `json:"protocolName" gorm:"column:protocolName"`
ProtocolLongName string `json:"protocolLongName" gorm:"column:protocolLongName"`
ProtocolAbbreviation string `json:"protocolAbbreviation" gorm:"column:protocolVersion"`
ProtocolVersion string `json:"protocolVersion" gorm:"column:protocolVersion"`
ProtocolBackgroundColor string `json:"protocolBackgroundColor" gorm:"column:protocolBackgroundColor"`
ProtocolForegroundColor string `json:"protocolForegroundColor" gorm:"column:protocolForegroundColor"`
ProtocolFontSize int8 `json:"protocolFontSize" gorm:"column:protocolFontSize"`
ProtocolReferenceLink string `json:"protocolReferenceLink" gorm:"column:protocolReferenceLink"`
Entry string `json:"entry,omitempty" gorm:"column:entry"`
EntryId string `json:"entryId" gorm:"column:entryId"`
Url string `json:"url" gorm:"column:url"`
Method string `json:"method" gorm:"column:method"`
Status int `json:"status" gorm:"column:status"`
RequestSenderIp string `json:"requestSenderIp" gorm:"column:requestSenderIp"`
Service string `json:"service" gorm:"column:service"`
Timestamp int64 `json:"timestamp" gorm:"column:timestamp"`
ElapsedTime int64 `json:"elapsedTime" gorm:"column:elapsedTime"`
Path string `json:"path" gorm:"column:path"`
ResolvedSource string `json:"resolvedSource,omitempty" gorm:"column:resolvedSource"`
ResolvedDestination string `json:"resolvedDestination,omitempty" gorm:"column:resolvedDestination"`
SourceIp string `json:"sourceIp,omitempty" gorm:"column:sourceIp"`
DestinationIp string `json:"destinationIp,omitempty" gorm:"column:destinationIp"`
SourcePort string `json:"sourcePort,omitempty" gorm:"column:sourcePort"`
DestinationPort string `json:"destinationPort,omitempty" gorm:"column:destinationPort"`
IsOutgoing bool `json:"isOutgoing,omitempty" gorm:"column:isOutgoing"`
EstimatedSizeBytes int `json:"-" gorm:"column:estimatedSizeBytes"`
}
type MizuEntryWrapper struct {
Protocol Protocol `json:"protocol"`
Representation string `json:"representation"`
BodySize int64 `json:"bodySize"`
Data MizuEntry `json:"data"`
Protocol Protocol `json:"protocol"`
Representation string `json:"representation"`
BodySize int64 `json:"bodySize"`
Data MizuEntry `json:"data"`
Rules []map[string]interface{} `json:"rulesMatched,omitempty"`
}
type BaseEntryDetails struct {
Id string `json:"id,omitempty"`
Protocol Protocol `json:"protocol,omitempty"`
Url string `json:"url,omitempty"`
RequestSenderIp string `json:"request_sender_ip,omitempty"`
RequestSenderIp string `json:"requestSenderIp,omitempty"`
Service string `json:"service,omitempty"`
Path string `json:"path,omitempty"`
Summary string `json:"summary,omitempty"`
StatusCode int `json:"status_code"`
StatusCode int `json:"statusCode"`
Method string `json:"method,omitempty"`
Timestamp int64 `json:"timestamp,omitempty"`
SourceIp string `json:"source_ip,omitempty"`
DestinationIp string `json:"destination_ip,omitempty"`
SourcePort string `json:"source_port,omitempty"`
DestinationPort string `json:"destination_port,omitempty"`
SourceIp string `json:"sourceIp,omitempty"`
DestinationIp string `json:"destinationIp,omitempty"`
SourcePort string `json:"sourcePort,omitempty"`
DestinationPort string `json:"destinationPort,omitempty"`
IsOutgoing bool `json:"isOutgoing,omitempty"`
Latency int64 `json:"latency,omitempty"`
Rules ApplicableRules `json:"rules,omitempty"`
@@ -162,11 +171,19 @@ type DataUnmarshaler interface {
}
func (bed *BaseEntryDetails) UnmarshalData(entry *MizuEntry) error {
entryUrl := entry.Url
service := entry.Service
bed.Protocol = Protocol{
Name: entry.ProtocolName,
LongName: entry.ProtocolLongName,
Abbreviation: entry.ProtocolAbbreviation,
Version: entry.ProtocolVersion,
BackgroundColor: entry.ProtocolBackgroundColor,
ForegroundColor: entry.ProtocolForegroundColor,
FontSize: entry.ProtocolFontSize,
ReferenceLink: entry.ProtocolReferenceLink,
}
bed.Id = entry.EntryId
bed.Url = entryUrl
bed.Service = service
bed.Url = entry.Url
bed.Service = entry.Service
bed.Summary = entry.Path
bed.StatusCode = entry.Status
bed.Method = entry.Method

7
tap/api/options.go Normal file
View File

@@ -0,0 +1,7 @@
package api
type TrafficFilteringOptions struct {
HealthChecksUserAgentHeaders []string
PlainTextMaskingRegexes []*SerializableRegexp
DisableRedaction bool
}

View File

@@ -1,4 +1,4 @@
package shared
package api
import "regexp"

70
tap/api/stats_tracker.go Normal file
View File

@@ -0,0 +1,70 @@
package api
import (
"sync/atomic"
"time"
)
type AppStats struct {
StartTime time.Time `json:"-"`
ProcessedBytes uint64 `json:"processedBytes"`
PacketsCount uint64 `json:"packetsCount"`
TcpPacketsCount uint64 `json:"tcpPacketsCount"`
ReassembledTcpPayloadsCount uint64 `json:"reassembledTcpPayloadsCount"`
TlsConnectionsCount uint64 `json:"tlsConnectionsCount"`
MatchedPairs uint64 `json:"matchedPairs"`
DroppedTcpStreams uint64 `json:"droppedTcpStreams"`
}
func (as *AppStats) IncMatchedPairs() {
atomic.AddUint64(&as.MatchedPairs, 1)
}
func (as *AppStats) IncDroppedTcpStreams() {
atomic.AddUint64(&as.DroppedTcpStreams, 1)
}
func (as *AppStats) IncPacketsCount() uint64 {
atomic.AddUint64(&as.PacketsCount, 1)
return as.PacketsCount
}
func (as *AppStats) IncTcpPacketsCount() {
atomic.AddUint64(&as.TcpPacketsCount, 1)
}
func (as *AppStats) IncReassembledTcpPayloadsCount() {
atomic.AddUint64(&as.ReassembledTcpPayloadsCount, 1)
}
func (as *AppStats) IncTlsConnectionsCount() {
atomic.AddUint64(&as.TlsConnectionsCount, 1)
}
func (as *AppStats) UpdateProcessedBytes(size uint64) {
atomic.AddUint64(&as.ProcessedBytes, size)
}
func (as *AppStats) SetStartTime(startTime time.Time) {
as.StartTime = startTime
}
func (as *AppStats) DumpStats() *AppStats {
currentAppStats := &AppStats{StartTime: as.StartTime}
currentAppStats.ProcessedBytes = resetUint64(&as.ProcessedBytes)
currentAppStats.PacketsCount = resetUint64(&as.PacketsCount)
currentAppStats.TcpPacketsCount = resetUint64(&as.TcpPacketsCount)
currentAppStats.ReassembledTcpPayloadsCount = resetUint64(&as.ReassembledTcpPayloadsCount)
currentAppStats.TlsConnectionsCount = resetUint64(&as.TlsConnectionsCount)
currentAppStats.MatchedPairs = resetUint64(&as.MatchedPairs)
currentAppStats.DroppedTcpStreams = resetUint64(&as.DroppedTcpStreams)
return currentAppStats
}
func resetUint64(ref *uint64) (val uint64) {
val = atomic.LoadUint64(ref)
atomic.StoreUint64(ref, 0)
return
}

View File

@@ -41,7 +41,7 @@ func (d dissecting) Ping() {
const amqpRequest string = "amqp_request"
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter) error {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
r := AmqpReader{b}
var remaining int
@@ -267,25 +267,31 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
request["url"] = summary
entryBytes, _ := json.Marshal(item.Pair)
return &api.MizuEntry{
ProtocolName: protocol.Name,
ProtocolVersion: protocol.Version,
EntryId: entryId,
Entry: string(entryBytes),
Url: fmt.Sprintf("%s%s", service, summary),
Method: request["method"].(string),
Status: 0,
RequestSenderIp: item.ConnectionInfo.ClientIP,
Service: service,
Timestamp: item.Timestamp,
ElapsedTime: 0,
Path: summary,
ResolvedSource: resolvedSource,
ResolvedDestination: resolvedDestination,
SourceIp: item.ConnectionInfo.ClientIP,
DestinationIp: item.ConnectionInfo.ServerIP,
SourcePort: item.ConnectionInfo.ClientPort,
DestinationPort: item.ConnectionInfo.ServerPort,
IsOutgoing: item.ConnectionInfo.IsOutgoing,
ProtocolName: protocol.Name,
ProtocolLongName: protocol.LongName,
ProtocolAbbreviation: protocol.Abbreviation,
ProtocolVersion: protocol.Version,
ProtocolBackgroundColor: protocol.BackgroundColor,
ProtocolForegroundColor: protocol.ForegroundColor,
ProtocolFontSize: protocol.FontSize,
ProtocolReferenceLink: protocol.ReferenceLink,
EntryId: entryId,
Entry: string(entryBytes),
Url: fmt.Sprintf("%s%s", service, summary),
Method: request["method"].(string),
Status: 0,
RequestSenderIp: item.ConnectionInfo.ClientIP,
Service: service,
Timestamp: item.Timestamp,
ElapsedTime: 0,
Path: summary,
ResolvedSource: resolvedSource,
ResolvedDestination: resolvedDestination,
SourceIp: item.ConnectionInfo.ClientIP,
DestinationIp: item.ConnectionInfo.ServerIP,
SourcePort: item.ConnectionInfo.ClientPort,
DestinationPort: item.ConnectionInfo.ServerPort,
IsOutgoing: item.ConnectionInfo.IsOutgoing,
}
}

View File

@@ -3,6 +3,7 @@ module github.com/up9inc/mizu/tap/extensions/http
go 1.16
require (
github.com/beevik/etree v1.1.0 // indirect
github.com/google/martian v2.1.0+incompatible
github.com/romana/rlog v0.0.0-20171115192701-f018bc92e7d7
github.com/up9inc/mizu/tap/api v0.0.0

View File

@@ -1,3 +1,5 @@
github.com/beevik/etree v1.1.0 h1:T0xke/WvNtMoCqgzPhkX2r4rjY3GDZFi+FjpRZY2Jbs=
github.com/beevik/etree v1.1.0/go.mod h1:r8Aw8JqVegEf0w2fDnATrX9VpkMcyFeM0FhwO62wh+A=
github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/romana/rlog v0.0.0-20171115192701-f018bc92e7d7 h1:jkvpcEatpwuMF5O5LVxTnehj6YZ/aEZN4NWD/Xml4pI=

View File

@@ -13,7 +13,12 @@ import (
"github.com/up9inc/mizu/tap/api"
)
func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter) error {
func filterAndEmit(item *api.OutputChannelItem, emitter api.Emitter, options *api.TrafficFilteringOptions) {
FilterSensitiveData(item, options)
emitter.Emit(item)
}
func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
streamID, messageHTTP1, err := grpcAssembler.readMessage()
if err != nil {
return err
@@ -64,13 +69,13 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, superTime
if item != nil {
item.Protocol = http2Protocol
emitter.Emit(item)
filterAndEmit(item, emitter, options)
}
return nil
}
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) error {
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
req, err := http.ReadRequest(b)
if err != nil {
// log.Println("Error reading stream:", err)
@@ -107,12 +112,12 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
ServerPort: tcpID.DstPort,
IsOutgoing: true,
}
emitter.Emit(item)
filterAndEmit(item, emitter, options)
}
return nil
}
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) error {
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
res, err := http.ReadResponse(b, nil)
if err != nil {
// log.Println("Error reading stream:", err)
@@ -157,7 +162,7 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
ServerPort: tcpID.SrcPort,
IsOutgoing: false,
}
emitter.Emit(item)
filterAndEmit(item, emitter, options)
}
return nil
}

View File

@@ -61,7 +61,7 @@ func (d dissecting) Ping() {
log.Printf("pong %s\n", protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter) error {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
ident := fmt.Sprintf("%s->%s:%s->%s", tcpID.SrcIP, tcpID.DstIP, tcpID.SrcPort, tcpID.DstPort)
isHTTP2, err := checkIsHTTP2Connection(b, isClient)
if err != nil {
@@ -85,7 +85,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
if isHTTP2 {
err = handleHTTP2Stream(grpcAssembler, tcpID, superTimer, emitter)
err = handleHTTP2Stream(grpcAssembler, tcpID, superTimer, emitter, options)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@@ -94,7 +94,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
dissected = true
} else if isClient {
err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter)
err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter, options)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@@ -103,7 +103,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
dissected = true
} else {
err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter)
err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter, options)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@@ -172,25 +172,31 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
entryBytes, _ := json.Marshal(item.Pair)
return &api.MizuEntry{
ProtocolName: protocol.Name,
ProtocolVersion: item.Protocol.Version,
EntryId: entryId,
Entry: string(entryBytes),
Url: fmt.Sprintf("%s%s", service, path),
Method: reqDetails["method"].(string),
Status: int(resDetails["status"].(float64)),
RequestSenderIp: item.ConnectionInfo.ClientIP,
Service: service,
Timestamp: item.Timestamp,
ElapsedTime: elapsedTime,
Path: path,
ResolvedSource: resolvedSource,
ResolvedDestination: resolvedDestination,
SourceIp: item.ConnectionInfo.ClientIP,
DestinationIp: item.ConnectionInfo.ServerIP,
SourcePort: item.ConnectionInfo.ClientPort,
DestinationPort: item.ConnectionInfo.ServerPort,
IsOutgoing: item.ConnectionInfo.IsOutgoing,
ProtocolName: protocol.Name,
ProtocolLongName: protocol.LongName,
ProtocolAbbreviation: protocol.Abbreviation,
ProtocolVersion: item.Protocol.Version,
ProtocolBackgroundColor: protocol.BackgroundColor,
ProtocolForegroundColor: protocol.ForegroundColor,
ProtocolFontSize: protocol.FontSize,
ProtocolReferenceLink: protocol.ReferenceLink,
EntryId: entryId,
Entry: string(entryBytes),
Url: fmt.Sprintf("%s%s", service, path),
Method: reqDetails["method"].(string),
Status: int(resDetails["status"].(float64)),
RequestSenderIp: item.ConnectionInfo.ClientIP,
Service: service,
Timestamp: item.Timestamp,
ElapsedTime: elapsedTime,
Path: path,
ResolvedSource: resolvedSource,
ResolvedDestination: resolvedDestination,
SourceIp: item.ConnectionInfo.ClientIP,
DestinationIp: item.ConnectionInfo.ServerIP,
SourcePort: item.ConnectionInfo.ClientPort,
DestinationPort: item.ConnectionInfo.ServerPort,
IsOutgoing: item.ConnectionInfo.IsOutgoing,
}
}

View File

@@ -0,0 +1,209 @@
package main
import (
"bytes"
"encoding/json"
"encoding/xml"
"errors"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strings"
"github.com/beevik/etree"
"github.com/romana/rlog"
"github.com/up9inc/mizu/tap/api"
)
const maskedFieldPlaceholderValue = "[REDACTED]"
//these values MUST be all lower case and contain no `-` or `_` characters
var personallyIdentifiableDataFields = []string{"token", "authorization", "authentication", "cookie", "userid", "password",
"username", "user", "key", "passcode", "pass", "auth", "authtoken", "jwt",
"bearer", "clientid", "clientsecret", "redirecturi", "phonenumber",
"zip", "zipcode", "address", "country", "firstname", "lastname",
"middlename", "fname", "lname", "birthdate"}
func FilterSensitiveData(item *api.OutputChannelItem, options *api.TrafficFilteringOptions) {
request := item.Pair.Request.Payload.(HTTPPayload).Data.(*http.Request)
response := item.Pair.Response.Payload.(HTTPPayload).Data.(*http.Response)
filterHeaders(&request.Header)
filterHeaders(&response.Header)
filterUrl(request.URL)
filterRequestBody(request, options)
filterResponseBody(response, options)
}
func filterRequestBody(request *http.Request, options *api.TrafficFilteringOptions) {
contenType := getContentTypeHeaderValue(request.Header)
body, err := ioutil.ReadAll(request.Body)
if err != nil {
rlog.Debugf("Filtering error reading body: %v", err)
return
}
filteredBody, err := filterHttpBody([]byte(body), contenType, options)
if err == nil {
request.Body = ioutil.NopCloser(bytes.NewBuffer(filteredBody))
} else {
request.Body = ioutil.NopCloser(bytes.NewBuffer(body))
}
}
func filterResponseBody(response *http.Response, options *api.TrafficFilteringOptions) {
contentType := getContentTypeHeaderValue(response.Header)
body, err := ioutil.ReadAll(response.Body)
if err != nil {
rlog.Debugf("Filtering error reading body: %v", err)
return
}
filteredBody, err := filterHttpBody([]byte(body), contentType, options)
if err == nil {
response.Body = ioutil.NopCloser(bytes.NewBuffer(filteredBody))
} else {
response.Body = ioutil.NopCloser(bytes.NewBuffer(body))
}
}
func filterHeaders(headers *http.Header) {
for key, _ := range *headers {
if strings.ToLower(key) == "cookie" {
headers.Del(key)
} else if isFieldNameSensitive(key) {
headers.Set(key, maskedFieldPlaceholderValue)
}
}
}
func getContentTypeHeaderValue(headers http.Header) string {
for key, _ := range headers {
if strings.ToLower(key) == "content-type" {
return headers.Get(key)
}
}
return ""
}
func isFieldNameSensitive(fieldName string) bool {
name := strings.ToLower(fieldName)
name = strings.ReplaceAll(name, "_", "")
name = strings.ReplaceAll(name, "-", "")
name = strings.ReplaceAll(name, " ", "")
for _, sensitiveField := range personallyIdentifiableDataFields {
if strings.Contains(name, sensitiveField) {
return true
}
}
return false
}
func filterHttpBody(bytes []byte, contentType string, options *api.TrafficFilteringOptions) ([]byte, error) {
mimeType := strings.Split(contentType, ";")[0]
switch strings.ToLower(mimeType) {
case "application/json":
return filterJsonBody(bytes)
case "text/html":
fallthrough
case "application/xhtml+xml":
fallthrough
case "text/xml":
fallthrough
case "application/xml":
return filterXmlEtree(bytes)
case "text/plain":
if options != nil && options.PlainTextMaskingRegexes != nil {
return filterPlainText(bytes, options), nil
}
}
return bytes, nil
}
func filterPlainText(bytes []byte, options *api.TrafficFilteringOptions) []byte {
for _, regex := range options.PlainTextMaskingRegexes {
bytes = regex.ReplaceAll(bytes, []byte(maskedFieldPlaceholderValue))
}
return bytes
}
func filterXmlEtree(bytes []byte) ([]byte, error) {
if !IsValidXML(bytes) {
return nil, errors.New("Invalid XML")
}
xmlDoc := etree.NewDocument()
err := xmlDoc.ReadFromBytes(bytes)
if err != nil {
return nil, err
} else {
filterXmlElement(xmlDoc.Root())
}
return xmlDoc.WriteToBytes()
}
func IsValidXML(data []byte) bool {
return xml.Unmarshal(data, new(interface{})) == nil
}
func filterXmlElement(element *etree.Element) {
for i, attribute := range element.Attr {
if isFieldNameSensitive(attribute.Key) {
element.Attr[i].Value = maskedFieldPlaceholderValue
}
}
if element.ChildElements() == nil || len(element.ChildElements()) == 0 {
if isFieldNameSensitive(element.Tag) {
element.SetText(maskedFieldPlaceholderValue)
}
} else {
for _, element := range element.ChildElements() {
filterXmlElement(element)
}
}
}
func filterJsonBody(bytes []byte) ([]byte, error) {
var bodyJsonMap map[string]interface{}
err := json.Unmarshal(bytes, &bodyJsonMap)
if err != nil {
return nil, err
}
filterJsonMap(bodyJsonMap)
return json.Marshal(bodyJsonMap)
}
func filterJsonMap(jsonMap map[string]interface{}) {
for key, value := range jsonMap {
// Do not replace nil values with maskedFieldPlaceholderValue
if value == nil {
continue
}
nestedMap, isNested := value.(map[string]interface{})
if isNested {
filterJsonMap(nestedMap)
} else {
if isFieldNameSensitive(key) {
jsonMap[key] = maskedFieldPlaceholderValue
}
}
}
}
func filterUrl(url *url.URL) {
if len(url.RawQuery) > 0 {
newQueryArgs := make([]string, 0)
for urlQueryParamName, urlQueryParamValues := range url.Query() {
newValues := urlQueryParamValues
if isFieldNameSensitive(urlQueryParamName) {
newValues = []string{maskedFieldPlaceholderValue}
}
for _, paramValue := range newValues {
newQueryArgs = append(newQueryArgs, fmt.Sprintf("%s=%s", urlQueryParamName, paramValue))
}
}
url.RawQuery = strings.Join(newQueryArgs, "&")
}
}

View File

@@ -39,7 +39,7 @@ func (d dissecting) Ping() {
log.Printf("pong %s\n", _protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter) error {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
for {
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &_protocol {
return errors.New("Identified by another protocol")
@@ -142,25 +142,31 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
entryBytes, _ := json.Marshal(item.Pair)
return &api.MizuEntry{
ProtocolName: _protocol.Name,
ProtocolVersion: _protocol.Version,
EntryId: entryId,
Entry: string(entryBytes),
Url: fmt.Sprintf("%s%s", service, summary),
Method: apiNames[apiKey],
Status: 0,
RequestSenderIp: item.ConnectionInfo.ClientIP,
Service: service,
Timestamp: item.Timestamp,
ElapsedTime: elapsedTime,
Path: summary,
ResolvedSource: resolvedSource,
ResolvedDestination: resolvedDestination,
SourceIp: item.ConnectionInfo.ClientIP,
DestinationIp: item.ConnectionInfo.ServerIP,
SourcePort: item.ConnectionInfo.ClientPort,
DestinationPort: item.ConnectionInfo.ServerPort,
IsOutgoing: item.ConnectionInfo.IsOutgoing,
ProtocolName: _protocol.Name,
ProtocolLongName: _protocol.LongName,
ProtocolAbbreviation: _protocol.Abbreviation,
ProtocolVersion: _protocol.Version,
ProtocolBackgroundColor: _protocol.BackgroundColor,
ProtocolForegroundColor: _protocol.ForegroundColor,
ProtocolFontSize: _protocol.FontSize,
ProtocolReferenceLink: _protocol.ReferenceLink,
EntryId: entryId,
Entry: string(entryBytes),
Url: fmt.Sprintf("%s%s", service, summary),
Method: apiNames[apiKey],
Status: 0,
RequestSenderIp: item.ConnectionInfo.ClientIP,
Service: service,
Timestamp: item.Timestamp,
ElapsedTime: elapsedTime,
Path: summary,
ResolvedSource: resolvedSource,
ResolvedDestination: resolvedDestination,
SourceIp: item.ConnectionInfo.ClientIP,
DestinationIp: item.ConnectionInfo.ServerIP,
SourcePort: item.ConnectionInfo.ClientPort,
DestinationPort: item.ConnectionInfo.ServerPort,
IsOutgoing: item.ConnectionInfo.IsOutgoing,
}
}

View File

@@ -63,7 +63,7 @@ var staleTimeoutSeconds = flag.Int("staletimout", 120, "Max time in seconds to k
var memprofile = flag.String("memprofile", "", "Write memory profile")
var statsTracker = StatsTracker{}
var appStats = api.AppStats{}
// global
var stats struct {
@@ -92,9 +92,10 @@ var outputLevel int
var errorsMap map[string]uint
var errorsMapMutex sync.Mutex
var nErrors uint
var ownIps []string // global
var hostMode bool // global
var extensions []*api.Extension // global
var ownIps []string // global
var hostMode bool // global
var extensions []*api.Extension // global
var filteringOptions *api.TrafficFilteringOptions // global
const baseStreamChannelTimeoutMs int = 5000 * 100
@@ -152,17 +153,18 @@ type Context struct {
CaptureInfo gopacket.CaptureInfo
}
func GetStats() AppStats {
return statsTracker.appStats
func GetStats() api.AppStats {
return appStats
}
func (c *Context) GetCaptureInfo() gopacket.CaptureInfo {
return c.CaptureInfo
}
func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem, extensionsRef []*api.Extension) {
func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem, extensionsRef []*api.Extension, options *api.TrafficFilteringOptions) {
hostMode = opts.HostMode
extensions = extensionsRef
filteringOptions = options
if GetMemoryProfilingEnabled() {
startMemoryProfiler()
@@ -225,8 +227,8 @@ func closeTimedoutTcpStreamChannels() {
if stream.superIdentifier.Protocol == nil {
if !stream.isClosed && time.Now().After(streamWrapper.createdAt.Add(TcpStreamChannelTimeoutMs)) {
stream.Close()
statsTracker.incDroppedTcpStreams()
rlog.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d\n", statsTracker.appStats.DroppedTcpStreams, runtime.NumGoroutine(), TcpStreamChannelTimeoutMs/1000000)
appStats.IncDroppedTcpStreams()
rlog.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d\n", appStats.DroppedTcpStreams, runtime.NumGoroutine(), TcpStreamChannelTimeoutMs/1000000)
}
} else {
if !stream.superIdentifier.IsClosedOthers {
@@ -328,10 +330,11 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
source.Lazy = *lazy
source.NoCopy = true
rlog.Info("Starting to read packets")
statsTracker.setStartTime(time.Now())
appStats.SetStartTime(time.Now())
defragger := ip4defrag.NewIPv4Defragmenter()
var emitter api.Emitter = &api.Emitting{
AppStats: &appStats,
OutputChannel: outputItems,
}
@@ -374,7 +377,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
errorsSummery := fmt.Sprintf("%v", errorsMap)
errorsMapMutex.Unlock()
log.Printf("%v (errors: %v, errTypes:%v) - Errors Summary: %s",
time.Since(statsTracker.appStats.StartTime),
time.Since(appStats.StartTime),
nErrors,
errorMapLen,
errorsSummery,
@@ -397,7 +400,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
cleanStats.closed,
cleanStats.deleted,
)
currentAppStats := statsTracker.dumpStats()
currentAppStats := appStats.DumpStats()
appStatsJSON, _ := json.Marshal(currentAppStats)
log.Printf("app stats - %v", string(appStatsJSON))
}
@@ -415,10 +418,10 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
rlog.Debugf("Error:", err)
continue
}
packetsCount := statsTracker.incPacketsCount()
packetsCount := appStats.IncPacketsCount()
rlog.Debugf("PACKET #%d", packetsCount)
data := packet.Data()
statsTracker.updateProcessedBytes(int64(len(data)))
appStats.UpdateProcessedBytes(uint64(len(data)))
if *hexdumppkt {
rlog.Debugf("Packet content (%d/0x%x) - %s", len(data), len(data), hex.Dump(data))
}
@@ -452,7 +455,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
tcp := packet.Layer(layers.LayerTypeTCP)
if tcp != nil {
statsTracker.incTcpPacketsCount()
appStats.IncTcpPacketsCount()
tcp := tcp.(*layers.TCP)
if *checksum {
err := tcp.SetNetworkLayerForChecksum(packet.NetworkLayer())
@@ -470,15 +473,15 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
assemblerMutex.Unlock()
}
done := *maxcount > 0 && statsTracker.appStats.PacketsCount >= *maxcount
done := *maxcount > 0 && int64(appStats.PacketsCount) >= *maxcount
if done {
errorsMapMutex.Lock()
errorMapLen := len(errorsMap)
errorsMapMutex.Unlock()
log.Printf("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)",
statsTracker.appStats.PacketsCount,
statsTracker.appStats.ProcessedBytes,
time.Since(statsTracker.appStats.StartTime),
appStats.PacketsCount,
appStats.ProcessedBytes,
time.Since(appStats.StartTime),
nErrors,
errorMapLen)
}

View File

@@ -1,117 +0,0 @@
package tap
import (
"sync"
"time"
)
type AppStats struct {
StartTime time.Time `json:"-"`
ProcessedBytes int64 `json:"processedBytes"`
PacketsCount int64 `json:"packetsCount"`
TcpPacketsCount int64 `json:"tcpPacketsCount"`
ReassembledTcpPayloadsCount int64 `json:"reassembledTcpPayloadsCount"`
TlsConnectionsCount int64 `json:"tlsConnectionsCount"`
MatchedPairs int64 `json:"matchedPairs"`
DroppedTcpStreams int64 `json:"droppedTcpStreams"`
}
type StatsTracker struct {
appStats AppStats
processedBytesMutex sync.Mutex
packetsCountMutex sync.Mutex
tcpPacketsCountMutex sync.Mutex
reassembledTcpPayloadsCountMutex sync.Mutex
tlsConnectionsCountMutex sync.Mutex
matchedPairsMutex sync.Mutex
droppedTcpStreamsMutex sync.Mutex
}
func (st *StatsTracker) incMatchedPairs() {
st.matchedPairsMutex.Lock()
st.appStats.MatchedPairs++
st.matchedPairsMutex.Unlock()
}
func (st *StatsTracker) incDroppedTcpStreams() {
st.droppedTcpStreamsMutex.Lock()
st.appStats.DroppedTcpStreams++
st.droppedTcpStreamsMutex.Unlock()
}
func (st *StatsTracker) incPacketsCount() int64 {
st.packetsCountMutex.Lock()
st.appStats.PacketsCount++
currentPacketsCount := st.appStats.PacketsCount
st.packetsCountMutex.Unlock()
return currentPacketsCount
}
func (st *StatsTracker) incTcpPacketsCount() {
st.tcpPacketsCountMutex.Lock()
st.appStats.TcpPacketsCount++
st.tcpPacketsCountMutex.Unlock()
}
func (st *StatsTracker) incReassembledTcpPayloadsCount() {
st.reassembledTcpPayloadsCountMutex.Lock()
st.appStats.ReassembledTcpPayloadsCount++
st.reassembledTcpPayloadsCountMutex.Unlock()
}
func (st *StatsTracker) incTlsConnectionsCount() {
st.tlsConnectionsCountMutex.Lock()
st.appStats.TlsConnectionsCount++
st.tlsConnectionsCountMutex.Unlock()
}
func (st *StatsTracker) updateProcessedBytes(size int64) {
st.processedBytesMutex.Lock()
st.appStats.ProcessedBytes += size
st.processedBytesMutex.Unlock()
}
func (st *StatsTracker) setStartTime(startTime time.Time) {
st.appStats.StartTime = startTime
}
func (st *StatsTracker) dumpStats() *AppStats {
currentAppStats := &AppStats{StartTime: st.appStats.StartTime}
st.processedBytesMutex.Lock()
currentAppStats.ProcessedBytes = st.appStats.ProcessedBytes
st.appStats.ProcessedBytes = 0
st.processedBytesMutex.Unlock()
st.packetsCountMutex.Lock()
currentAppStats.PacketsCount = st.appStats.PacketsCount
st.appStats.PacketsCount = 0
st.packetsCountMutex.Unlock()
st.tcpPacketsCountMutex.Lock()
currentAppStats.TcpPacketsCount = st.appStats.TcpPacketsCount
st.appStats.TcpPacketsCount = 0
st.tcpPacketsCountMutex.Unlock()
st.reassembledTcpPayloadsCountMutex.Lock()
currentAppStats.ReassembledTcpPayloadsCount = st.appStats.ReassembledTcpPayloadsCount
st.appStats.ReassembledTcpPayloadsCount = 0
st.reassembledTcpPayloadsCountMutex.Unlock()
st.tlsConnectionsCountMutex.Lock()
currentAppStats.TlsConnectionsCount = st.appStats.TlsConnectionsCount
st.appStats.TlsConnectionsCount = 0
st.tlsConnectionsCountMutex.Unlock()
st.matchedPairsMutex.Lock()
currentAppStats.MatchedPairs = st.appStats.MatchedPairs
st.appStats.MatchedPairs = 0
st.matchedPairsMutex.Unlock()
st.droppedTcpStreamsMutex.Lock()
currentAppStats.DroppedTcpStreams = st.appStats.DroppedTcpStreams
st.appStats.DroppedTcpStreams = 0
st.droppedTcpStreamsMutex.Unlock()
return currentAppStats
}

View File

@@ -107,7 +107,7 @@ func (h *tcpReader) Close() {
func (h *tcpReader) run(wg *sync.WaitGroup) {
defer wg.Done()
b := bufio.NewReader(h)
err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter)
err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter, filteringOptions)
if err != nil {
io.Copy(ioutil.Discard, b)
}

View File

@@ -147,7 +147,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
if length > 0 {
// This is where we pass the reassembled information onwards
// This channel is read by an tcpReader object
statsTracker.incReassembledTcpPayloadsCount()
appStats.IncReassembledTcpPayloadsCount()
timestamp := ac.GetCaptureInfo().Timestamp
if dir == reassembly.TCPDirClientToServer {
for i := range t.clients {

View File

@@ -62,8 +62,8 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
}
if stream.isTapTarget {
if runtime.NumGoroutine() > maxNumberOfGoroutines {
statsTracker.incDroppedTcpStreams()
rlog.Debugf("Dropped a TCP stream because of load. Total dropped: %d Total Goroutines: %d\n", statsTracker.appStats.DroppedTcpStreams, runtime.NumGoroutine())
appStats.IncDroppedTcpStreams()
rlog.Debugf("Dropped a TCP stream because of load. Total dropped: %d Total Goroutines: %d\n", appStats.DroppedTcpStreams, runtime.NumGoroutine())
return stream
}
streamId++

22792
ui/package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -12,8 +12,8 @@
"@types/node": "^12.20.10",
"@types/react": "^17.0.3",
"@types/react-dom": "^17.0.3",
"jsonpath": "^1.1.1",
"axios": "^0.21.1",
"jsonpath": "^1.1.1",
"node-sass": "^5.0.0",
"numeral": "^2.0.6",
"protobuf-decoder": "^0.1.0",
@@ -21,7 +21,7 @@
"react-copy-to-clipboard": "^5.0.3",
"react-dom": "^17.0.2",
"react-scripts": "4.0.3",
"react-scrollable-feed": "^1.3.0",
"react-scrollable-feed-virtualized": "^1.4.2",
"react-syntax-highlighter": "^15.4.3",
"typescript": "^4.2.4",
"web-vitals": "^1.1.1"

View File

@@ -2,7 +2,7 @@ import {EntryItem} from "./EntryListItem/EntryListItem";
import React, {useCallback, useEffect, useMemo, useRef, useState} from "react";
import styles from './style/EntriesList.module.sass';
import spinner from './assets/spinner.svg';
import ScrollableFeed from "react-scrollable-feed";
import ScrollableFeedVirtualized from "react-scrollable-feed-virtualized";
import {StatusType} from "./Filters";
import Api from "../helpers/api";
import down from "./assets/downImg.svg";
@@ -54,8 +54,8 @@ export const EntriesList: React.FC<EntriesListProps> = ({entries, setEntries, fo
const filterEntries = useCallback((entry) => {
if(methodsFilter.length > 0 && !methodsFilter.includes(entry.method.toLowerCase())) return;
if(pathFilter && entry.path?.toLowerCase()?.indexOf(pathFilter) === -1) return;
if(statusFilter.includes(StatusType.SUCCESS) && entry.status_code >= 400) return;
if(statusFilter.includes(StatusType.ERROR) && entry.status_code < 400) return;
if(statusFilter.includes(StatusType.SUCCESS) && entry.statusCode >= 400) return;
if(statusFilter.includes(StatusType.ERROR) && entry.statusCode < 400) return;
return entry;
},[methodsFilter, pathFilter, statusFilter])
@@ -77,9 +77,6 @@ export const EntriesList: React.FC<EntriesListProps> = ({entries, setEntries, fo
}
setIsLoadingTop(false);
const newEntries = [...data, ...entries];
if(newEntries.length >= 1000) {
newEntries.splice(1000);
}
setEntries(newEntries);
if(scrollTo) {
@@ -100,10 +97,6 @@ export const EntriesList: React.FC<EntriesListProps> = ({entries, setEntries, fo
}
scrollTo = document.getElementById(filteredEntries?.[filteredEntries.length -1]?.id);
let newEntries = [...entries, ...data];
if(newEntries.length >= 1000) {
setNoMoreDataTop(false);
newEntries = newEntries.slice(-1000);
}
setEntries(newEntries);
if(scrollTo) {
scrollTo.scrollIntoView({behavior: "smooth"});
@@ -116,19 +109,20 @@ export const EntriesList: React.FC<EntriesListProps> = ({entries, setEntries, fo
{isLoadingTop && <div className={styles.spinnerContainer}>
<img alt="spinner" src={spinner} style={{height: 25}}/>
</div>}
<ScrollableFeed ref={scrollableRef} onScroll={(isAtBottom) => onScrollEvent(isAtBottom)}>
<ScrollableFeedVirtualized ref={scrollableRef} itemHeight={48} marginTop={10} onScroll={(isAtBottom) => onScrollEvent(isAtBottom)}>
{noMoreDataTop && !connectionOpen && <div id="noMoreDataTop" className={styles.noMoreDataAvailable}>No more data available</div>}
{filteredEntries.map(entry => <EntryItem key={entry.id}
entry={entry}
setFocusedEntryId={setFocusedEntryId}
isSelected={focusedEntryId === entry.id}/>)}
{!connectionOpen && !noMoreDataBottom && <div className={styles.fetchButtonContainer}>
<div className={styles.styledButton} onClick={() => getNewEntries()}>Fetch more entries</div>
</div>}
</ScrollableFeed>
<button type="button"
className={`${styles.btnLive} ${scrollableList ? styles.showButton : styles.hideButton}`}
onClick={(_) => scrollableRef.current.scrollToBottom()}>
isSelected={focusedEntryId === entry.id}
style={{}}/>)}
</ScrollableFeedVirtualized>
{!connectionOpen && !noMoreDataBottom && <div className={styles.fetchButtonContainer}>
<div className={styles.styledButton} onClick={() => getNewEntries()}>Fetch more entries</div>
</div>}
<button type="button"
className={`${styles.btnLive} ${scrollableList ? styles.showButton : styles.hideButton}`}
onClick={(_) => scrollableRef.current.jumpToBottom()}>
<img alt="down" src={down} />
</button>
</div>

View File

@@ -42,7 +42,6 @@ const EntryTitle: React.FC<any> = ({protocol, data, bodySize, elapsedTime}) => {
<div style={{right: "30px", position: "absolute", display: "flex"}}>
{response.payload && <div style={{margin: "0 18px", opacity: 0.5}}>{formatSize(bodySize)}</div>}
<div style={{marginRight: 18, opacity: 0.5}}>{Math.round(elapsedTime)}ms</div>
<div style={{opacity: 0.5}}>{'rulesMatched' in data ? data.rulesMatched?.length : '0'} Rules Applied</div>
</div>
</div>;
};
@@ -72,7 +71,7 @@ export const EntryDetailed: React.FC<EntryDetailedProps> = ({entryData}) => {
/>
{entryData.data && <EntrySummary data={entryData.data}/>}
<>
{entryData.data && <EntryViewer representation={entryData.representation} color={entryData.protocol.background_color}/>}
{entryData.data && <EntryViewer representation={entryData.representation} rulesMatched={entryData.rulesMatched} elapsedTime={entryData.data.elapsedTime} color={entryData.protocol.backgroundColor}/>}
</>
</>
};

View File

@@ -215,10 +215,10 @@ export const EntryTablePolicySection: React.FC<EntryPolicySectionProps> = ({serv
<>
{
rule.Key &&
<tr className={styles.dataValue}><td><b>Key</b>:</td><td>{rule.Key}</td></tr>
<tr className={styles.dataValue}><td><b>Key:</b></td> <td>{rule.Key}</td></tr>
}
{
rule.Latency &&
rule.Latency !== 0 &&
<tr className={styles.dataValue}><td><b>Latency:</b></td> <td>{rule.Latency}</td></tr>
}
{
@@ -231,7 +231,7 @@ export const EntryTablePolicySection: React.FC<EntryPolicySectionProps> = ({serv
}
{
rule.Service &&
<tr className={styles.dataValue}><td><b>Service:</b></td> <td>{service}</td></tr>
<tr className={styles.dataValue}><td><b>Service:</b></td> <td>{rule.Service}</td></tr>
}
{
rule.Type &&
@@ -251,7 +251,7 @@ export const EntryTablePolicySection: React.FC<EntryPolicySectionProps> = ({serv
</tbody>
</table>
</EntrySectionContainer>
</> : <span/>
</> : <span className={styles.noRules}>No rules could be applied to this request.</span>
}
</React.Fragment>
}

View File

@@ -33,8 +33,7 @@ const SectionsRepresentation: React.FC<any> = ({data, color}) => {
return <>{sections}</>;
}
const AutoRepresentation: React.FC<any> = ({representation, color}) => {
const rulesMatched = []
const AutoRepresentation: React.FC<any> = ({representation, rulesMatched, elapsedTime, color}) => {
const TABS = [
{
tab: 'request'
@@ -68,8 +67,7 @@ const AutoRepresentation: React.FC<any> = ({representation, color}) => {
<SectionsRepresentation data={response} color={color}/>
</React.Fragment>}
{currentTab === TABS[2].tab && <React.Fragment>
{// FIXME: Fix here
<EntryTablePolicySection service={representation.service} title={'Rule'} color={color} latency={0} response={response} arrayToIterate={rulesMatched ? rulesMatched : []}/>}
<EntryTablePolicySection service={representation.service} title={'Rule'} color={color} latency={elapsedTime} response={response} arrayToIterate={rulesMatched ? rulesMatched : []}/>
</React.Fragment>}
</div>}
</div>;
@@ -77,11 +75,13 @@ const AutoRepresentation: React.FC<any> = ({representation, color}) => {
interface Props {
representation: any;
color: string,
rulesMatched: any;
color: string;
elapsedTime: number;
}
const EntryViewer: React.FC<Props> = ({representation, color}) => {
return <AutoRepresentation representation={representation} color={color}/>
const EntryViewer: React.FC<Props> = ({representation, rulesMatched, elapsedTime, color}) => {
return <AutoRepresentation representation={representation} rulesMatched={rulesMatched} elapsedTime={elapsedTime} color={color}/>
};
export default EntryViewer;

View File

@@ -19,20 +19,31 @@
.rowSelected
border: 1px $blue-color solid
// border-left: 5px $blue-color solid
margin-left: 10px
margin-right: 3px
.ruleSuccessRow
border: 1px $success-color solid
// border-left: 5px $success-color solid
background: #E8FFF1
.ruleSuccessRowSelected
border: 1px #6FCF97 solid
border-left: 5px #6FCF97 solid
.ruleFailureRow
background: #FFE9EF
.ruleFailureRowSelected
border: 1px $failure-color solid
// border-left: 5px $failure-color solid
border-left: 5px $failure-color solid
.ruleNumberText
font-size: 12px;
font-style: italic;
.ruleNumberTextFailure
color: #DB2156
.ruleNumberTextSuccess
color: #219653
.service
text-overflow: ellipsis

View File

@@ -16,14 +16,14 @@ interface Entry {
summary: string,
service: string,
id: string,
status_code?: number;
statusCode?: number;
url?: string;
timestamp: Date;
source_ip: string,
source_port: string,
destination_ip: string,
destination_port: string,
isOutgoing?: boolean;
sourceIp: string,
sourcePort: string,
destinationIp: string,
destinationPort: string,
isOutgoing?: boolean;
latency: number;
rules: Rules;
}
@@ -38,10 +38,12 @@ interface EntryProps {
entry: Entry;
setFocusedEntryId: (id: string) => void;
isSelected?: boolean;
style: object;
}
export const EntryItem: React.FC<EntryProps> = ({entry, setFocusedEntryId, isSelected}) => {
const classification = getClassification(entry.status_code)
export const EntryItem: React.FC<EntryProps> = ({entry, setFocusedEntryId, isSelected, style}) => {
const classification = getClassification(entry.statusCode)
const numberOfRules = entry.rules.numberOfRules
let ingoingIcon;
let outgoingIcon;
switch(classification) {
@@ -61,53 +63,51 @@ export const EntryItem: React.FC<EntryProps> = ({entry, setFocusedEntryId, isSel
break;
}
}
// let additionalRulesProperties = "";
// let ruleSuccess: boolean;
let additionalRulesProperties = "";
let ruleSuccess: boolean;
let rule = 'latency' in entry.rules
if (rule) {
if (entry.rules.latency !== -1) {
if (entry.rules.latency >= entry.latency) {
// additionalRulesProperties = styles.ruleSuccessRow
// ruleSuccess = true
additionalRulesProperties = styles.ruleSuccessRow
ruleSuccess = true
} else {
// additionalRulesProperties = styles.ruleFailureRow
// ruleSuccess = false
additionalRulesProperties = styles.ruleFailureRow
ruleSuccess = false
}
if (isSelected) {
// additionalRulesProperties += ` ${entry.rules.latency >= entry.latency ? styles.ruleSuccessRowSelected : styles.ruleFailureRowSelected}`
additionalRulesProperties += ` ${entry.rules.latency >= entry.latency ? styles.ruleSuccessRowSelected : styles.ruleFailureRowSelected}`
}
} else {
if (entry.rules.status) {
// additionalRulesProperties = styles.ruleSuccessRow
// ruleSuccess = true
additionalRulesProperties = styles.ruleSuccessRow
ruleSuccess = true
} else {
// additionalRulesProperties = styles.ruleFailureRow
// ruleSuccess = false
additionalRulesProperties = styles.ruleFailureRow
ruleSuccess = false
}
if (isSelected) {
// additionalRulesProperties += ` ${entry.rules.status ? styles.ruleSuccessRowSelected : styles.ruleFailureRowSelected}`
additionalRulesProperties += ` ${entry.rules.status ? styles.ruleSuccessRowSelected : styles.ruleFailureRowSelected}`
}
}
}
let backgroundColor = "";
if ('latency' in entry.rules) {
if (entry.rules.latency !== -1) {
backgroundColor = entry.rules.latency >= entry.latency ? styles.ruleSuccessRow : styles.ruleFailureRow
} else {
backgroundColor = entry.rules.status ? styles.ruleSuccessRow : styles.ruleFailureRow
}
}
return <>
<div
id={entry.id}
className={`${styles.row}
${isSelected ? styles.rowSelected : backgroundColor}`}
${isSelected && !rule ? styles.rowSelected : additionalRulesProperties}`}
onClick={() => setFocusedEntryId(entry.id)}
style={{border: isSelected ? `1px ${entry.protocol.background_color} solid` : "1px transparent solid"}}
style={{
border: isSelected ? `1px ${entry.protocol.backgroundColor} solid` : "1px transparent solid",
position: "absolute",
top: style['top'],
marginTop: style['marginTop'],
width: "calc(100% - 25px)",
}}
>
<Protocol protocol={entry.protocol} horizontal={false}/>
{((entry.protocol.name === "http" && "status_code" in entry) || entry.status_code !== 0) && <div>
<StatusCode statusCode={entry.status_code}/>
{((entry.protocol.name === "http" && "statusCode" in entry) || entry.statusCode !== 0) && <div>
<StatusCode statusCode={entry.statusCode}/>
</div>}
<div className={styles.endpointServiceContainer}>
<EndpointPath method={entry.method} path={entry.summary}/>
@@ -115,14 +115,21 @@ export const EntryItem: React.FC<EntryProps> = ({entry, setFocusedEntryId, isSel
<span title="Service Name">{entry.service}</span>
</div>
</div>
{
rule ?
<div className={`${styles.ruleNumberText} ${ruleSuccess ? styles.ruleNumberTextSuccess : styles.ruleNumberTextFailure}`}>
{`Rules (${numberOfRules})`}
</div>
: ""
}
<div className={styles.directionContainer}>
<span className={styles.port} title="Source Port">{entry.source_port}</span>
<span className={styles.port} title="Source Port">{entry.sourcePort}</span>
{entry.isOutgoing ?
<img src={outgoingIcon} alt="Ingoing traffic" title="Ingoing"/>
:
<img src={ingoingIcon} alt="Outgoing traffic" title="Outgoing"/>
}
<span className={styles.port} title="Destination Port">{entry.destination_port}</span>
<span className={styles.port} title="Destination Port">{entry.destinationPort}</span>
</div>
<div className={styles.timestamp}>
<span title="Timestamp">
@@ -131,4 +138,5 @@ export const EntryItem: React.FC<EntryProps> = ({entry, setFocusedEntryId, isSel
</div>
</div>
</>
};
}

View File

@@ -86,10 +86,6 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
}
if (!focusedEntryId) setFocusedEntryId(entry.id)
let newEntries = [...entries];
if (entries.length === 1000) {
newEntries = newEntries.splice(1);
setNoMoreDataTop(false);
}
setEntries([...newEntries, entry])
if(listEntry.current) {
if(isScrollable(listEntry.current.firstChild)) {

View File

@@ -3,12 +3,12 @@ import styles from './style/Protocol.module.sass';
export interface ProtocolInterface {
name: string
long_name: string
longName: string
abbreviation: string
background_color: string
foreground_color: string
font_size: number
reference_link: string
backgroundColor: string
foregroundColor: string
fontSize: number
referenceLink: string
ports: string[]
inbound_ports: string
}
@@ -20,29 +20,29 @@ interface ProtocolProps {
const Protocol: React.FC<ProtocolProps> = ({protocol, horizontal}) => {
if (horizontal) {
return <a target="_blank" rel="noopener noreferrer" href={protocol.reference_link}>
return <a target="_blank" rel="noopener noreferrer" href={protocol.referenceLink}>
<span
className={`${styles.base} ${styles.horizontal}`}
style={{
backgroundColor: protocol.background_color,
color: protocol.foreground_color,
backgroundColor: protocol.backgroundColor,
color: protocol.foregroundColor,
fontSize: 13,
}}
title={protocol.abbreviation}
>
{protocol.long_name}
{protocol.longName}
</span>
</a>
} else {
return <a target="_blank" rel="noopener noreferrer" href={protocol.reference_link}>
return <a target="_blank" rel="noopener noreferrer" href={protocol.referenceLink}>
<span
className={`${styles.base} ${styles.vertical}`}
style={{
backgroundColor: protocol.background_color,
color: protocol.foreground_color,
fontSize: protocol.font_size,
backgroundColor: protocol.backgroundColor,
color: protocol.foregroundColor,
fontSize: protocol.fontSize,
}}
title={protocol.long_name}
title={protocol.longName}
>
{protocol.abbreviation}
</span>

View File

@@ -14,7 +14,6 @@
flex-direction: column
overflow: hidden
flex-grow: 1
padding-top: 20px
.footer
display: flex