Compare commits

...

11 Commits

Author SHA1 Message Date
lirazyehezkel
15021daa2e service path filter (#327) 2021-10-07 10:51:30 +03:00
RoyUP9
f83e565cd4 fixed policy rules readme (#321) 2021-10-07 09:11:24 +03:00
RoyUP9
8636a4731e fixed ignored user agents (#322) 2021-10-06 17:16:47 +03:00
lirazyehezkel
aa3510e936 service filter (#324) 2021-10-06 16:22:08 +03:00
Igor Gov
fd48cc6d87 Renaming ignored user agents var (#320) 2021-10-06 13:52:30 +03:00
RoyUP9
111d000c12 added interface conversion check (#318) 2021-10-06 13:38:32 +03:00
RoyUP9
9c98a4c2b1 Revert "Connecting Mizu to the application (#313)" (#316) 2021-10-06 10:41:23 +03:00
RoyUP9
d2d4ed5aee Connecting Mizu to the application (#313) 2021-10-05 16:35:16 +03:00
Igor Gov
30fce5d765 Supporting Mizu view from given url (#312)
* Supporting Mizu view from given url
2021-10-05 12:24:50 +03:00
Igor Gov
90040798b8 Adding additional error handling to api server watch (#311) 2021-09-30 11:49:31 +03:00
M. Mert Yıldıran
9eecddddd5 Start the tapper after the API server is ready (#309) 2021-09-30 11:22:07 +03:00
17 changed files with 300 additions and 177 deletions

View File

@@ -762,6 +762,116 @@ func TestTapRegexMasking(t *testing.T) {
}
}
func TestTapIgnoredUserAgents(t *testing.T) {
if testing.Short() {
t.Skip("ignored acceptance test")
}
cliPath, cliPathErr := getCliPath()
if cliPathErr != nil {
t.Errorf("failed to get cli path, err: %v", cliPathErr)
return
}
tapCmdArgs := getDefaultTapCommandArgs()
tapNamespace := getDefaultTapNamespace()
tapCmdArgs = append(tapCmdArgs, tapNamespace...)
ignoredUserAgentValue := "ignore"
tapCmdArgs = append(tapCmdArgs, "--set", fmt.Sprintf("tap.ignored-user-agents=%v", ignoredUserAgentValue))
tapCmd := exec.Command(cliPath, tapCmdArgs...)
t.Logf("running command: %v", tapCmd.String())
t.Cleanup(func() {
if err := cleanupCommand(tapCmd); err != nil {
t.Logf("failed to cleanup tap command, err: %v", err)
}
})
if err := tapCmd.Start(); err != nil {
t.Errorf("failed to start tap command, err: %v", err)
return
}
apiServerUrl := getApiServerUrl(defaultApiServerPort)
if err := waitTapPodsReady(apiServerUrl); err != nil {
t.Errorf("failed to start tap pods on time, err: %v", err)
return
}
proxyUrl := getProxyUrl(defaultNamespaceName, defaultServiceName)
ignoredUserAgentCustomHeader := "Ignored-User-Agent"
headers := map[string]string {"User-Agent": ignoredUserAgentValue, ignoredUserAgentCustomHeader: ""}
for i := 0; i < defaultEntriesCount; i++ {
if _, requestErr := executeHttpGetRequestWithHeaders(fmt.Sprintf("%v/get", proxyUrl), headers); requestErr != nil {
t.Errorf("failed to send proxy request, err: %v", requestErr)
return
}
}
for i := 0; i < defaultEntriesCount; i++ {
if _, requestErr := executeHttpGetRequest(fmt.Sprintf("%v/get", proxyUrl)); requestErr != nil {
t.Errorf("failed to send proxy request, err: %v", requestErr)
return
}
}
ignoredUserAgentsCheckFunc := func() error {
timestamp := time.Now().UnixNano() / int64(time.Millisecond)
entriesUrl := fmt.Sprintf("%v/api/entries?limit=%v&operator=lt&timestamp=%v", apiServerUrl, defaultEntriesCount * 2, timestamp)
requestResult, requestErr := executeHttpGetRequest(entriesUrl)
if requestErr != nil {
return fmt.Errorf("failed to get entries, err: %v", requestErr)
}
entries := requestResult.([]interface{})
if len(entries) == 0 {
return fmt.Errorf("unexpected entries result - Expected more than 0 entries")
}
for _, entryInterface := range entries {
entryUrl := fmt.Sprintf("%v/api/entries/%v", apiServerUrl, entryInterface.(map[string]interface{})["id"])
requestResult, requestErr = executeHttpGetRequest(entryUrl)
if requestErr != nil {
return fmt.Errorf("failed to get entry, err: %v", requestErr)
}
data := requestResult.(map[string]interface{})["data"].(map[string]interface{})
entryJson := data["entry"].(string)
var entry map[string]interface{}
if parseErr := json.Unmarshal([]byte(entryJson), &entry); parseErr != nil {
return fmt.Errorf("failed to parse entry, err: %v", parseErr)
}
entryRequest := entry["request"].(map[string]interface{})
entryPayload := entryRequest["payload"].(map[string]interface{})
entryDetails := entryPayload["details"].(map[string]interface{})
entryHeaders := entryDetails["headers"].([]interface{})
for _, headerInterface := range entryHeaders {
header := headerInterface.(map[string]interface{})
if header["name"].(string) != ignoredUserAgentCustomHeader {
continue
}
return fmt.Errorf("unexpected result - user agent is not ignored")
}
}
return nil
}
if err := retriesExecute(shortRetriesCount, ignoredUserAgentsCheckFunc); err != nil {
t.Errorf("%v", err)
return
}
}
func TestTapDumpLogs(t *testing.T) {
if testing.Short() {
t.Skip("ignored acceptance test")

View File

@@ -172,6 +172,21 @@ func executeHttpRequest(response *http.Response, requestErr error) (interface{},
return jsonBytesToInterface(data)
}
func executeHttpGetRequestWithHeaders(url string, headers map[string]string) (interface{}, error) {
request, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, err
}
for headerKey, headerValue := range headers {
request.Header.Add(headerKey, headerValue)
}
client := &http.Client{}
response, requestErr := client.Do(request)
return executeHttpRequest(response, requestErr)
}
func executeHttpGetRequest(url string) (interface{}, error) {
response, requestErr := http.Get(url)
return executeHttpRequest(response, requestErr)

View File

@@ -4,6 +4,13 @@ import (
"encoding/json"
"flag"
"fmt"
"github.com/gin-contrib/static"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/romana/rlog"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/tap"
tapApi "github.com/up9inc/mizu/tap/api"
"io/ioutil"
"log"
"mizuserver/pkg/api"
@@ -18,15 +25,6 @@ import (
"path/filepath"
"plugin"
"sort"
"strings"
"github.com/gin-contrib/static"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/romana/rlog"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/tap"
tapApi "github.com/up9inc/mizu/tap/api"
)
var tapperMode = flag.Bool("tap", false, "Run in tapper mode without API")
@@ -59,7 +57,7 @@ func main() {
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
tap.StartPassiveTapper(tapOpts, outputItemsChannel, extensions, filteringOptions)
go filterItems(outputItemsChannel, filteredOutputItemsChannel, filteringOptions)
go filterItems(outputItemsChannel, filteredOutputItemsChannel)
go api.StartReadingEntries(filteredOutputItemsChannel, nil, extensionsMap)
hostApi(nil)
@@ -77,7 +75,7 @@ func main() {
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, extensions, filteringOptions)
socketConnection, err := utils.ConnectToSocketServer(*apiServerAddress)
socketConnection, _, err := websocket.DefaultDialer.Dial(*apiServerAddress, nil)
if err != nil {
panic(fmt.Sprintf("Error connecting to socket server at %s %v", *apiServerAddress, err))
}
@@ -90,7 +88,7 @@ func main() {
outputItemsChannel := make(chan *tapApi.OutputChannelItem)
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
go filterItems(outputItemsChannel, filteredOutputItemsChannel, filteringOptions)
go filterItems(outputItemsChannel, filteredOutputItemsChannel)
go api.StartReadingEntries(filteredOutputItemsChannel, nil, extensionsMap)
hostApi(outputItemsChannel)
@@ -98,7 +96,7 @@ func main() {
outputItemsChannel := make(chan *tapApi.OutputChannelItem, 1000)
filteredHarChannel := make(chan *tapApi.OutputChannelItem)
go filterItems(outputItemsChannel, filteredHarChannel, filteringOptions)
go filterItems(outputItemsChannel, filteredHarChannel)
go api.StartReadingEntries(filteredHarChannel, harsDir, extensionsMap)
hostApi(nil)
}
@@ -230,7 +228,7 @@ func getTrafficFilteringOptions() *tapApi.TrafficFilteringOptions {
filteringOptionsJson := os.Getenv(shared.MizuFilteringOptionsEnvVar)
if filteringOptionsJson == "" {
return &tapApi.TrafficFilteringOptions{
HealthChecksUserAgentHeaders: []string{},
IgnoredUserAgents: []string{},
}
}
var filteringOptions tapApi.TrafficFilteringOptions
@@ -242,42 +240,16 @@ func getTrafficFilteringOptions() *tapApi.TrafficFilteringOptions {
return &filteringOptions
}
func filterItems(inChannel <-chan *tapApi.OutputChannelItem, outChannel chan *tapApi.OutputChannelItem, filterOptions *tapApi.TrafficFilteringOptions) {
func filterItems(inChannel <-chan *tapApi.OutputChannelItem, outChannel chan *tapApi.OutputChannelItem) {
for message := range inChannel {
if message.ConnectionInfo.IsOutgoing && api.CheckIsServiceIP(message.ConnectionInfo.ServerIP) {
continue
}
// TODO: move this to tappers https://up9.atlassian.net/browse/TRA-3441
if isHealthCheckByUserAgent(message, filterOptions.HealthChecksUserAgentHeaders) {
continue
}
outChannel <- message
}
}
func isHealthCheckByUserAgent(item *tapApi.OutputChannelItem, userAgentsToIgnore []string) bool {
if item.Protocol.Name != "http" {
return false
}
request := item.Pair.Request.Payload.(map[string]interface{})
reqDetails := request["details"].(map[string]interface{})
for _, header := range reqDetails["headers"].([]interface{}) {
h := header.(map[string]interface{})
if strings.ToLower(h["name"].(string)) == "user-agent" {
for _, userAgent := range userAgentsToIgnore {
if strings.Contains(strings.ToLower(h["value"].(string)), strings.ToLower(userAgent)) {
return true
}
}
return false
}
}
return false
}
func pipeTapChannelToSocket(connection *websocket.Conn, messageDataChannel <-chan *tapApi.OutputChannelItem) {
if connection == nil {
panic("Websocket connection is nil")

View File

@@ -1,38 +0,0 @@
package utils
import (
"github.com/gorilla/websocket"
"github.com/romana/rlog"
"time"
)
const (
DEFAULT_SOCKET_RETRIES = 3
DEFAULT_SOCKET_RETRY_SLEEP_TIME = time.Second * 10
)
func ConnectToSocketServer(address string) (*websocket.Conn, error) {
var err error
var connection *websocket.Conn
try := 0
// Connection to server fails if client pod is up before server.
// Retries solve this issue.
for try < DEFAULT_SOCKET_RETRIES {
rlog.Infof("Trying to connect to websocket: %s, attempt: %v/%v", address, try, DEFAULT_SOCKET_RETRIES)
connection, _, err = websocket.DefaultDialer.Dial(address, nil)
if err != nil {
rlog.Warnf("Failed connecting to websocket: %s, attempt: %v/%v, err: %s, (%v,%+v)", address, try, DEFAULT_SOCKET_RETRIES, err, err, err)
try++
} else {
break
}
time.Sleep(DEFAULT_SOCKET_RETRY_SLEEP_TIME)
}
if err != nil {
return nil, err
}
return connection, nil
}

View File

@@ -109,19 +109,17 @@ func RunMizuTap() {
return
}
nodeToTappedPodIPMap := getNodeHostToTappedPodIpsMap(state.currentlyTappedPods)
defer finishMizuExecution(kubernetesProvider)
if err := createMizuResources(ctx, kubernetesProvider, nodeToTappedPodIPMap, mizuApiFilteringOptions, mizuValidationRules); err != nil {
if err := createMizuResources(ctx, kubernetesProvider, mizuApiFilteringOptions, mizuValidationRules); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error creating resources: %v", errormessage.FormatError(err)))
return
}
go goUtils.HandleExcWrapper(watchApiServerPod, ctx, kubernetesProvider, cancel)
go goUtils.HandleExcWrapper(watchTapperPod, ctx, kubernetesProvider, cancel, nodeToTappedPodIPMap)
go goUtils.HandleExcWrapper(watchApiServerPod, ctx, kubernetesProvider, cancel, mizuApiFilteringOptions)
go goUtils.HandleExcWrapper(watchTapperPod, ctx, kubernetesProvider, cancel)
go goUtils.HandleExcWrapper(watchPodsForTapping, ctx, kubernetesProvider, targetNamespaces, cancel, mizuApiFilteringOptions)
//block until exit signal or error
// block until exit signal or error
waitForFinish(ctx, cancel)
}
@@ -134,7 +132,7 @@ func readValidationRules(file string) (string, error) {
return string(newContent), nil
}
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, mizuApiFilteringOptions *api.TrafficFilteringOptions, mizuValidationRules string) error {
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, mizuApiFilteringOptions *api.TrafficFilteringOptions, mizuValidationRules string) error {
if !config.Config.IsNsRestrictedMode() {
if err := createMizuNamespace(ctx, kubernetesProvider); err != nil {
return err
@@ -145,10 +143,6 @@ func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Pro
return err
}
if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, mizuApiFilteringOptions); err != nil {
return err
}
if err := createMizuConfigmap(ctx, kubernetesProvider, mizuValidationRules); err != nil {
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Failed to create resources required for policy validation. Mizu will not validate policy rules. error: %v\n", errormessage.FormatError(err)))
}
@@ -222,13 +216,15 @@ func getMizuApiFilteringOptions() (*api.TrafficFilteringOptions, error) {
}
return &api.TrafficFilteringOptions{
PlainTextMaskingRegexes: compiledRegexSlice,
HealthChecksUserAgentHeaders: config.Config.Tap.HealthChecksUserAgentHeaders,
DisableRedaction: config.Config.Tap.DisableRedaction,
PlainTextMaskingRegexes: compiledRegexSlice,
IgnoredUserAgents: config.Config.Tap.IgnoredUserAgents,
DisableRedaction: config.Config.Tap.DisableRedaction,
}, nil
}
func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, mizuApiFilteringOptions *api.TrafficFilteringOptions) error {
func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, mizuApiFilteringOptions *api.TrafficFilteringOptions) error {
nodeToTappedPodIPMap := getNodeHostToTappedPodIpsMap(state.currentlyTappedPods)
if len(nodeToTappedPodIPMap) > 0 {
var serviceAccountName string
if state.mizuServiceAccountExists {
@@ -407,13 +403,8 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
logger.Log.Debugf("[Error] failed update tapped pods %v", err)
}
nodeToTappedPodIPMap := getNodeHostToTappedPodIpsMap(state.currentlyTappedPods)
if err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error building node to ips map: %v", errormessage.FormatError(err)))
cancel()
}
if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, mizuApiFilteringOptions); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error updating daemonset: %v", errormessage.FormatError(err)))
if err := updateMizuTappers(ctx, kubernetesProvider, mizuApiFilteringOptions); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error updating tappers: %v", errormessage.FormatError(err)))
cancel()
}
}
@@ -531,7 +522,7 @@ func getMissingPods(pods1 []core.Pod, pods2 []core.Pod) []core.Pod {
return missingPods
}
func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, mizuApiFilteringOptions *api.TrafficFilteringOptions) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", mizu.ApiServerPodName))
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, []string{config.Config.MizuResourcesNamespace}, podExactRegex)
isPodReady := false
@@ -561,6 +552,23 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
}
logger.Log.Debugf("Watching API Server pod loop, modified: %v", modifiedPod.Status.Phase)
if modifiedPod.Status.Phase == core.PodPending {
if modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue {
logger.Log.Debugf("Wasn't able to deploy the API server. Reason: \"%s\"", modifiedPod.Status.Conditions[0].Message)
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server, for more info check logs at %s", logger.GetLogFilePath()))
cancel()
break
}
if len(modifiedPod.Status.ContainerStatuses) > 0 && modifiedPod.Status.ContainerStatuses[0].State.Waiting != nil && modifiedPod.Status.ContainerStatuses[0].State.Waiting.Reason == "ErrImagePull" {
logger.Log.Debugf("Wasn't able to deploy the API server. (ErrImagePull) Reason: \"%s\"", modifiedPod.Status.ContainerStatuses[0].State.Waiting.Message)
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server: failed to pull the image, for more info check logs at %v", logger.GetLogFilePath()))
cancel()
break
}
}
if modifiedPod.Status.Phase == core.PodRunning && !isPodReady {
isPodReady = true
go startProxyReportErrorIfAny(kubernetesProvider, cancel)
@@ -571,6 +579,10 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
cancel()
break
}
if err := updateMizuTappers(ctx, kubernetesProvider, mizuApiFilteringOptions); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error updating tappers: %v", errormessage.FormatError(err)))
cancel()
}
logger.Log.Infof("Mizu is available at %s\n", url)
openBrowser(url)
@@ -579,13 +591,13 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
logger.Log.Debugf("[Error] failed update tapped pods %v", err)
}
}
case _, ok := <-errorChan:
case err, ok := <-errorChan:
if !ok {
errorChan = nil
continue
}
logger.Log.Debugf("[ERROR] Agent creation, watching %v namespace", config.Config.MizuResourcesNamespace)
logger.Log.Debugf("[ERROR] Agent creation, watching %v namespace, error: %v", config.Config.MizuResourcesNamespace, err)
cancel()
case <-timeAfter:
@@ -600,14 +612,10 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
}
}
func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, nodeToTappedPodIPMap map[string][]string) {
func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", mizu.TapperDaemonSetName))
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, []string{config.Config.MizuResourcesNamespace}, podExactRegex)
var prevPodPhase core.PodPhase
var appendMetaname bool
if len(nodeToTappedPodIPMap) > 1 {
appendMetaname = true
}
for {
select {
case addedPod, ok := <-added:
@@ -616,22 +624,14 @@ func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider
continue
}
if appendMetaname {
logger.Log.Debugf("Tapper is created [%s]", addedPod.ObjectMeta.Name)
} else {
logger.Log.Debugf("Tapper is created")
}
logger.Log.Debugf("Tapper is created [%s]", addedPod.Name)
case removedPod, ok := <-removed:
if !ok {
removed = nil
continue
}
if appendMetaname {
logger.Log.Debugf("Tapper is removed [%s]", removedPod.ObjectMeta.Name)
} else {
logger.Log.Debugf("Tapper is removed")
}
logger.Log.Debugf("Tapper is removed [%s]", removedPod.Name)
case modifiedPod, ok := <-modified:
if !ok {
modified = nil
@@ -639,13 +639,14 @@ func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider
}
if modifiedPod.Status.Phase == core.PodPending && modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue {
logger.Log.Infof(uiUtils.Red, "Cannot deploy the tapper. Reason: \"%s\"", modifiedPod.Status.Conditions[0].Message)
logger.Log.Infof(uiUtils.Red, "Wasn't able to deploy the tapper %s. Reason: \"%s\"", modifiedPod.Name, modifiedPod.Status.Conditions[0].Message)
cancel()
break
}
podStatus := modifiedPod.Status
if podStatus.Phase == core.PodPending && prevPodPhase == podStatus.Phase {
logger.Log.Debugf("Tapper %s is %s", modifiedPod.Name, strings.ToLower(string(podStatus.Phase)))
continue
}
prevPodPhase = podStatus.Phase
@@ -655,22 +656,19 @@ func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider
if state.Terminated != nil {
switch state.Terminated.Reason {
case "OOMKilled":
logger.Log.Infof(uiUtils.Red, "Tapper is terminated! OOMKilled. Increase pod resources.")
logger.Log.Infof(uiUtils.Red, "Tapper %s was terminated (reason: OOMKilled). You should consider increasing machine resources.", modifiedPod.Name)
}
} else {
logger.Log.Debugf("Tapper is %s", strings.ToLower(string(podStatus.Phase)))
}
} else {
logger.Log.Debugf("Tapper is %s", strings.ToLower(string(podStatus.Phase)))
}
case _, ok := <-errorChan:
logger.Log.Debugf("Tapper %s is %s", modifiedPod.Name, strings.ToLower(string(podStatus.Phase)))
case err, ok := <-errorChan:
if !ok {
errorChan = nil
continue
}
logger.Log.Errorf("[ERROR] Tapper creation, watching %v namespace", config.Config.MizuResourcesNamespace)
logger.Log.Debugf("[Error] Error in mizu tapper watch, err: %v", err)
cancel()
case <-ctx.Done():

View File

@@ -25,4 +25,7 @@ func init() {
defaults.Set(&defaultViewConfig)
viewCmd.Flags().Uint16P(configStructs.GuiPortViewName, "p", defaultViewConfig.GuiPort, "Provide a custom port for the web interface webserver")
viewCmd.Flags().StringP(configStructs.UrlViewName, "u", defaultViewConfig.Url, "Provide a custom host")
viewCmd.Flags().MarkHidden(configStructs.UrlViewName)
}

View File

@@ -24,34 +24,39 @@ func runMizuView() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
exists, err := kubernetesProvider.DoesServicesExist(ctx, config.Config.MizuResourcesNamespace, mizu.ApiServerPodName)
if err != nil {
logger.Log.Errorf("Failed to found mizu service %v", err)
cancel()
return
}
if !exists {
logger.Log.Infof("%s service not found, you should run `mizu tap` command first", mizu.ApiServerPodName)
cancel()
return
}
url := config.Config.View.Url
url := GetApiServerUrl()
if url == "" {
exists, err := kubernetesProvider.DoesServicesExist(ctx, config.Config.MizuResourcesNamespace, mizu.ApiServerPodName)
if err != nil {
logger.Log.Errorf("Failed to found mizu service %v", err)
cancel()
return
}
if !exists {
logger.Log.Infof("%s service not found, you should run `mizu tap` command first", mizu.ApiServerPodName)
cancel()
return
}
response, err := http.Get(fmt.Sprintf("%s/", url))
if err == nil && response.StatusCode == 200 {
logger.Log.Infof("Found a running service %s and open port %d", mizu.ApiServerPodName, config.Config.View.GuiPort)
return
}
logger.Log.Infof("Establishing connection to k8s cluster...")
go startProxyReportErrorIfAny(kubernetesProvider, cancel)
url = GetApiServerUrl()
if err := apiserver.Provider.InitAndTestConnection(GetApiServerUrl()); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", logger.GetLogFilePath()))
return
response, err := http.Get(fmt.Sprintf("%s/", url))
if err == nil && response.StatusCode == 200 {
logger.Log.Infof("Found a running service %s and open port %d", mizu.ApiServerPodName, config.Config.View.GuiPort)
return
}
logger.Log.Infof("Establishing connection to k8s cluster...")
go startProxyReportErrorIfAny(kubernetesProvider, cancel)
if err := apiserver.Provider.InitAndTestConnection(GetApiServerUrl()); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", logger.GetLogFilePath()))
return
}
}
logger.Log.Infof("Mizu is available at %s\n", url)
openBrowser(url)
if isCompatible, err := version.CheckVersionCompatibility(); err != nil {
logger.Log.Errorf("Failed to check versions compatibility %v", err)

View File

@@ -21,22 +21,22 @@ const (
)
type TapConfig struct {
AnalysisDestination string `yaml:"dest" default:"up9.app"`
SleepIntervalSec int `yaml:"upload-interval" default:"10"`
PodRegexStr string `yaml:"regex" default:".*"`
GuiPort uint16 `yaml:"gui-port" default:"8899"`
Namespaces []string `yaml:"namespaces"`
Analysis bool `yaml:"analysis" default:"false"`
AllNamespaces bool `yaml:"all-namespaces" default:"false"`
PlainTextFilterRegexes []string `yaml:"regex-masking"`
HealthChecksUserAgentHeaders []string `yaml:"ignored-user-agents"`
DisableRedaction bool `yaml:"no-redact" default:"false"`
HumanMaxEntriesDBSize string `yaml:"max-entries-db-size" default:"200MB"`
DryRun bool `yaml:"dry-run" default:"false"`
EnforcePolicyFile string `yaml:"traffic-validation-file"`
EnforcePolicyFileDeprecated string `yaml:"test-rules,omitempty" readonly:""`
ApiServerResources Resources `yaml:"api-server-resources"`
TapperResources Resources `yaml:"tapper-resources"`
AnalysisDestination string `yaml:"dest" default:"up9.app"`
SleepIntervalSec int `yaml:"upload-interval" default:"10"`
PodRegexStr string `yaml:"regex" default:".*"`
GuiPort uint16 `yaml:"gui-port" default:"8899"`
Namespaces []string `yaml:"namespaces"`
Analysis bool `yaml:"analysis" default:"false"`
AllNamespaces bool `yaml:"all-namespaces" default:"false"`
PlainTextFilterRegexes []string `yaml:"regex-masking"`
IgnoredUserAgents []string `yaml:"ignored-user-agents"`
DisableRedaction bool `yaml:"no-redact" default:"false"`
HumanMaxEntriesDBSize string `yaml:"max-entries-db-size" default:"200MB"`
DryRun bool `yaml:"dry-run" default:"false"`
EnforcePolicyFile string `yaml:"traffic-validation-file"`
EnforcePolicyFileDeprecated string `yaml:"test-rules,omitempty" readonly:""`
ApiServerResources Resources `yaml:"api-server-resources"`
TapperResources Resources `yaml:"tapper-resources"`
}
type Resources struct {

View File

@@ -2,8 +2,10 @@ package configStructs
const (
GuiPortViewName = "gui-port"
UrlViewName = "url"
)
type ViewConfig struct {
GuiPort uint16 `yaml:"gui-port" default:"8899"`
Url string `yaml:"url,omitempty" readonly:""`
}

View File

@@ -33,7 +33,10 @@ func FilteredWatch(ctx context.Context, kubernetesProvider *Provider, targetName
return
}
pod := e.Object.(*corev1.Pod)
pod, ok := e.Object.(*corev1.Pod)
if !ok {
continue
}
if !podFilter.MatchString(pod.Name) {
continue

View File

@@ -31,12 +31,12 @@ mizu tap --traffic-validation-file rules.yaml
The structure of the traffic-validation-file is:
* `name`: string, name of the rule
* `type`: string, type of the rule, must be `json` or `header` or `latency`
* `type`: string, type of the rule, must be `json` or `header` or `slo`
* `key`: string, [jsonpath](https://code.google.com/archive/p/jsonpath/wikis/Javascript.wiki) used only in `json` or `header` type
* `value`: string, [regex](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Guide/Regular_Expressions) used only in `json` or `header` type
* `service`: string, [regex](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Guide/Regular_Expressions) service name to filter
* `path`: string, [regex](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Guide/Regular_Expressions) URL path to filter
* `latency`: integer, time in ms of the expected latency.
* `response-time`: integer, time in ms of the expected latency.
### For example:
@@ -54,8 +54,8 @@ rules:
key: "Content-Le.*"
value: "(\\d+(?:\\.\\d+)?)"
- name: latency-test
type: latency
latency: 1
type: slo
response-time: 1
service: "carts.*"
```

View File

@@ -1,7 +1,7 @@
package api
type TrafficFilteringOptions struct {
HealthChecksUserAgentHeaders []string
PlainTextMaskingRegexes []*SerializableRegexp
DisableRedaction bool
IgnoredUserAgents []string
PlainTextMaskingRegexes []*SerializableRegexp
DisableRedaction bool
}

View File

@@ -14,9 +14,14 @@ import (
)
func filterAndEmit(item *api.OutputChannelItem, emitter api.Emitter, options *api.TrafficFilteringOptions) {
if IsIgnoredUserAgent(item, options) {
return
}
if !options.DisableRedaction {
FilterSensitiveData(item, options)
}
emitter.Emit(item)
}

View File

@@ -25,6 +25,30 @@ var personallyIdentifiableDataFields = []string{"token", "authorization", "authe
"zip", "zipcode", "address", "country", "firstname", "lastname",
"middlename", "fname", "lname", "birthdate"}
func IsIgnoredUserAgent(item *api.OutputChannelItem, options *api.TrafficFilteringOptions) bool {
if item.Protocol.Name != "http" {
return false
}
request := item.Pair.Request.Payload.(HTTPPayload).Data.(*http.Request)
for headerKey, headerValues := range request.Header {
if strings.ToLower(headerKey) == "user-agent" {
for _, userAgent := range options.IgnoredUserAgents {
for _, headerValue := range headerValues {
if strings.Contains(strings.ToLower(headerValue), strings.ToLower(userAgent)) {
return true
}
}
}
return false
}
}
return false
}
func FilterSensitiveData(item *api.OutputChannelItem, options *api.TrafficFilteringOptions) {
request := item.Pair.Request.Payload.(HTTPPayload).Data.(*http.Request)
response := item.Pair.Response.Payload.(HTTPPayload).Data.(*http.Response)

View File

@@ -19,7 +19,8 @@ interface EntriesListProps {
setNoMoreDataBottom: (flag: boolean) => void;
methodsFilter: Array<string>;
statusFilter: Array<string>;
pathFilter: string
pathFilter: string;
serviceFilter: string;
listEntryREF: any;
onScrollEvent: (isAtBottom:boolean) => void;
scrollableList: boolean;
@@ -32,7 +33,7 @@ enum FetchOperator {
const api = new Api();
export const EntriesList: React.FC<EntriesListProps> = ({entries, setEntries, focusedEntryId, setFocusedEntryId, connectionOpen, noMoreDataTop, setNoMoreDataTop, noMoreDataBottom, setNoMoreDataBottom, methodsFilter, statusFilter, pathFilter, listEntryREF, onScrollEvent, scrollableList}) => {
export const EntriesList: React.FC<EntriesListProps> = ({entries, setEntries, focusedEntryId, setFocusedEntryId, connectionOpen, noMoreDataTop, setNoMoreDataTop, noMoreDataBottom, setNoMoreDataBottom, methodsFilter, statusFilter, pathFilter, serviceFilter, listEntryREF, onScrollEvent, scrollableList}) => {
const [loadMoreTop, setLoadMoreTop] = useState(false);
const [isLoadingTop, setIsLoadingTop] = useState(false);
@@ -54,10 +55,11 @@ export const EntriesList: React.FC<EntriesListProps> = ({entries, setEntries, fo
const filterEntries = useCallback((entry) => {
if(methodsFilter.length > 0 && !methodsFilter.includes(entry.method.toLowerCase())) return;
if(pathFilter && entry.path?.toLowerCase()?.indexOf(pathFilter) === -1) return;
if(serviceFilter && entry.service?.toLowerCase()?.indexOf(serviceFilter) === -1) return;
if(statusFilter.includes(StatusType.SUCCESS) && entry.statusCode >= 400) return;
if(statusFilter.includes(StatusType.ERROR) && entry.statusCode < 400) return;
return entry;
},[methodsFilter, pathFilter, statusFilter])
},[methodsFilter, pathFilter, statusFilter, serviceFilter])
const filteredEntries = useMemo(() => {
return entries.filter(filterEntries);

View File

@@ -11,13 +11,16 @@ interface FiltersProps {
setStatusFilter: (methods: Array<string>) => void;
pathFilter: string
setPathFilter: (val: string) => void;
serviceFilter: string
setServiceFilter: (val: string) => void;
}
export const Filters: React.FC<FiltersProps> = ({methodsFilter, setMethodsFilter, statusFilter, setStatusFilter, pathFilter, setPathFilter}) => {
export const Filters: React.FC<FiltersProps> = ({methodsFilter, setMethodsFilter, statusFilter, setStatusFilter, pathFilter, setPathFilter, serviceFilter, setServiceFilter}) => {
return <div className={styles.container}>
<MethodFilter methodsFilter={methodsFilter} setMethodsFilter={setMethodsFilter}/>
<StatusTypesFilter statusFilter={statusFilter} setStatusFilter={setStatusFilter}/>
<ServiceFilter serviceFilter={serviceFilter} setServiceFilter={setServiceFilter}/>
<PathFilter pathFilter={pathFilter} setPathFilter={setPathFilter}/>
</div>;
};
@@ -117,3 +120,18 @@ const PathFilter: React.FC<PathFilterProps> = ({pathFilter, setPathFilter}) => {
</FilterContainer>;
};
interface ServiceFilterProps {
serviceFilter: string;
setServiceFilter: (val: string) => void;
}
const ServiceFilter: React.FC<ServiceFilterProps> = ({serviceFilter, setServiceFilter}) => {
return <FilterContainer>
<div className={styles.filterLabel}>Service</div>
<div>
<TextField value={serviceFilter} variant="outlined" className={styles.filterText} style={{minWidth: '150px'}} onChange={(e: any) => setServiceFilter(e.target.value)}/>
</div>
</FilterContainer>;
};

View File

@@ -58,6 +58,7 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
const [methodsFilter, setMethodsFilter] = useState([]);
const [statusFilter, setStatusFilter] = useState([]);
const [pathFilter, setPathFilter] = useState("");
const [serviceFilter, setServiceFilter] = useState("");
const [tappingStatus, setTappingStatus] = useState(null);
@@ -192,6 +193,8 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
setStatusFilter={setStatusFilter}
pathFilter={pathFilter}
setPathFilter={setPathFilter}
serviceFilter={serviceFilter}
setServiceFilter={setServiceFilter}
/>
<div className={styles.container}>
<EntriesList entries={entries}
@@ -206,6 +209,7 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
methodsFilter={methodsFilter}
statusFilter={statusFilter}
pathFilter={pathFilter}
serviceFilter={serviceFilter}
listEntryREF={listEntry}
onScrollEvent={onScrollEvent}
scrollableList={disableScrollList}