Compare commits

...

24 Commits

Author SHA1 Message Date
RoyUP9
56e801a582 changed workspace remote url (#341) 2021-10-11 17:25:23 +03:00
RoyUP9
04c0f8cbcd tap to workspace (#315) 2021-10-11 15:42:41 +03:00
RoyUP9
da846da334 api server support sync workspace (#340) 2021-10-11 13:09:23 +03:00
RoyUP9
ba6b5c868c added semver isvalid check in version update checker (#338) 2021-10-11 11:32:41 +03:00
RoyUP9
9d378ed75b refactor login (#339) 2021-10-11 11:31:12 +03:00
M. Mert Yıldıran
70982c2844 Fix the interface conversion errors in Redis (#334) 2021-10-10 08:34:47 +03:00
M. Mert Yıldıran
61f24320b8 Fix the issues in the Tabs React component (#335)
* Fix the issues in the `Tabs` React component

* Update the boolean expression as well
2021-10-09 13:16:08 +03:00
M. Mert Yıldıran
eb4a541376 Fix the interface conversion errors in Kafka (#333) 2021-10-08 07:35:20 +03:00
M. Mert Yıldıran
77710cc411 Format the strings in watchTapperPod method (#331) 2021-10-07 21:06:17 +03:00
RoyUP9
8b8c4609ce renamed upload entries to sync entries (#330) 2021-10-07 18:33:14 +03:00
RoyUP9
14b616a856 Connecting Mizu to the application (#317) 2021-10-07 17:28:28 +03:00
RoyUP9
82d603c0fd fixed version update http route (#329) 2021-10-07 17:26:38 +03:00
RoyUP9
f1a2ee7fb4 removed enforce policy file deprecated flag (#328) 2021-10-07 11:08:48 +03:00
lirazyehezkel
15021daa2e service path filter (#327) 2021-10-07 10:51:30 +03:00
RoyUP9
f83e565cd4 fixed policy rules readme (#321) 2021-10-07 09:11:24 +03:00
RoyUP9
8636a4731e fixed ignored user agents (#322) 2021-10-06 17:16:47 +03:00
lirazyehezkel
aa3510e936 service filter (#324) 2021-10-06 16:22:08 +03:00
Igor Gov
fd48cc6d87 Renaming ignored user agents var (#320) 2021-10-06 13:52:30 +03:00
RoyUP9
111d000c12 added interface conversion check (#318) 2021-10-06 13:38:32 +03:00
RoyUP9
9c98a4c2b1 Revert "Connecting Mizu to the application (#313)" (#316) 2021-10-06 10:41:23 +03:00
RoyUP9
d2d4ed5aee Connecting Mizu to the application (#313) 2021-10-05 16:35:16 +03:00
Igor Gov
30fce5d765 Supporting Mizu view from given url (#312)
* Supporting Mizu view from given url
2021-10-05 12:24:50 +03:00
Igor Gov
90040798b8 Adding additional error handling to api server watch (#311) 2021-09-30 11:49:31 +03:00
M. Mert Yıldıran
9eecddddd5 Start the tapper after the API server is ready (#309) 2021-09-30 11:22:07 +03:00
40 changed files with 810 additions and 321 deletions

View File

@@ -762,6 +762,116 @@ func TestTapRegexMasking(t *testing.T) {
}
}
func TestTapIgnoredUserAgents(t *testing.T) {
if testing.Short() {
t.Skip("ignored acceptance test")
}
cliPath, cliPathErr := getCliPath()
if cliPathErr != nil {
t.Errorf("failed to get cli path, err: %v", cliPathErr)
return
}
tapCmdArgs := getDefaultTapCommandArgs()
tapNamespace := getDefaultTapNamespace()
tapCmdArgs = append(tapCmdArgs, tapNamespace...)
ignoredUserAgentValue := "ignore"
tapCmdArgs = append(tapCmdArgs, "--set", fmt.Sprintf("tap.ignored-user-agents=%v", ignoredUserAgentValue))
tapCmd := exec.Command(cliPath, tapCmdArgs...)
t.Logf("running command: %v", tapCmd.String())
t.Cleanup(func() {
if err := cleanupCommand(tapCmd); err != nil {
t.Logf("failed to cleanup tap command, err: %v", err)
}
})
if err := tapCmd.Start(); err != nil {
t.Errorf("failed to start tap command, err: %v", err)
return
}
apiServerUrl := getApiServerUrl(defaultApiServerPort)
if err := waitTapPodsReady(apiServerUrl); err != nil {
t.Errorf("failed to start tap pods on time, err: %v", err)
return
}
proxyUrl := getProxyUrl(defaultNamespaceName, defaultServiceName)
ignoredUserAgentCustomHeader := "Ignored-User-Agent"
headers := map[string]string {"User-Agent": ignoredUserAgentValue, ignoredUserAgentCustomHeader: ""}
for i := 0; i < defaultEntriesCount; i++ {
if _, requestErr := executeHttpGetRequestWithHeaders(fmt.Sprintf("%v/get", proxyUrl), headers); requestErr != nil {
t.Errorf("failed to send proxy request, err: %v", requestErr)
return
}
}
for i := 0; i < defaultEntriesCount; i++ {
if _, requestErr := executeHttpGetRequest(fmt.Sprintf("%v/get", proxyUrl)); requestErr != nil {
t.Errorf("failed to send proxy request, err: %v", requestErr)
return
}
}
ignoredUserAgentsCheckFunc := func() error {
timestamp := time.Now().UnixNano() / int64(time.Millisecond)
entriesUrl := fmt.Sprintf("%v/api/entries?limit=%v&operator=lt&timestamp=%v", apiServerUrl, defaultEntriesCount * 2, timestamp)
requestResult, requestErr := executeHttpGetRequest(entriesUrl)
if requestErr != nil {
return fmt.Errorf("failed to get entries, err: %v", requestErr)
}
entries := requestResult.([]interface{})
if len(entries) == 0 {
return fmt.Errorf("unexpected entries result - Expected more than 0 entries")
}
for _, entryInterface := range entries {
entryUrl := fmt.Sprintf("%v/api/entries/%v", apiServerUrl, entryInterface.(map[string]interface{})["id"])
requestResult, requestErr = executeHttpGetRequest(entryUrl)
if requestErr != nil {
return fmt.Errorf("failed to get entry, err: %v", requestErr)
}
data := requestResult.(map[string]interface{})["data"].(map[string]interface{})
entryJson := data["entry"].(string)
var entry map[string]interface{}
if parseErr := json.Unmarshal([]byte(entryJson), &entry); parseErr != nil {
return fmt.Errorf("failed to parse entry, err: %v", parseErr)
}
entryRequest := entry["request"].(map[string]interface{})
entryPayload := entryRequest["payload"].(map[string]interface{})
entryDetails := entryPayload["details"].(map[string]interface{})
entryHeaders := entryDetails["headers"].([]interface{})
for _, headerInterface := range entryHeaders {
header := headerInterface.(map[string]interface{})
if header["name"].(string) != ignoredUserAgentCustomHeader {
continue
}
return fmt.Errorf("unexpected result - user agent is not ignored")
}
}
return nil
}
if err := retriesExecute(shortRetriesCount, ignoredUserAgentsCheckFunc); err != nil {
t.Errorf("%v", err)
return
}
}
func TestTapDumpLogs(t *testing.T) {
if testing.Short() {
t.Skip("ignored acceptance test")

View File

@@ -172,6 +172,21 @@ func executeHttpRequest(response *http.Response, requestErr error) (interface{},
return jsonBytesToInterface(data)
}
func executeHttpGetRequestWithHeaders(url string, headers map[string]string) (interface{}, error) {
request, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, err
}
for headerKey, headerValue := range headers {
request.Header.Add(headerKey, headerValue)
}
client := &http.Client{}
response, requestErr := client.Do(request)
return executeHttpRequest(response, requestErr)
}
func executeHttpGetRequest(url string) (interface{}, error) {
response, requestErr := http.Get(url)
return executeHttpRequest(response, requestErr)

View File

@@ -4,6 +4,13 @@ import (
"encoding/json"
"flag"
"fmt"
"github.com/gin-contrib/static"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/romana/rlog"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/tap"
tapApi "github.com/up9inc/mizu/tap/api"
"io/ioutil"
"log"
"mizuserver/pkg/api"
@@ -18,15 +25,6 @@ import (
"path/filepath"
"plugin"
"sort"
"strings"
"github.com/gin-contrib/static"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/romana/rlog"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/tap"
tapApi "github.com/up9inc/mizu/tap/api"
)
var tapperMode = flag.Bool("tap", false, "Run in tapper mode without API")
@@ -59,7 +57,7 @@ func main() {
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
tap.StartPassiveTapper(tapOpts, outputItemsChannel, extensions, filteringOptions)
go filterItems(outputItemsChannel, filteredOutputItemsChannel, filteringOptions)
go filterItems(outputItemsChannel, filteredOutputItemsChannel)
go api.StartReadingEntries(filteredOutputItemsChannel, nil, extensionsMap)
hostApi(nil)
@@ -77,7 +75,7 @@ func main() {
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, extensions, filteringOptions)
socketConnection, err := utils.ConnectToSocketServer(*apiServerAddress)
socketConnection, _, err := websocket.DefaultDialer.Dial(*apiServerAddress, nil)
if err != nil {
panic(fmt.Sprintf("Error connecting to socket server at %s %v", *apiServerAddress, err))
}
@@ -90,7 +88,7 @@ func main() {
outputItemsChannel := make(chan *tapApi.OutputChannelItem)
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
go filterItems(outputItemsChannel, filteredOutputItemsChannel, filteringOptions)
go filterItems(outputItemsChannel, filteredOutputItemsChannel)
go api.StartReadingEntries(filteredOutputItemsChannel, nil, extensionsMap)
hostApi(outputItemsChannel)
@@ -98,7 +96,7 @@ func main() {
outputItemsChannel := make(chan *tapApi.OutputChannelItem, 1000)
filteredHarChannel := make(chan *tapApi.OutputChannelItem)
go filterItems(outputItemsChannel, filteredHarChannel, filteringOptions)
go filterItems(outputItemsChannel, filteredHarChannel)
go api.StartReadingEntries(filteredHarChannel, harsDir, extensionsMap)
hostApi(nil)
}
@@ -230,7 +228,7 @@ func getTrafficFilteringOptions() *tapApi.TrafficFilteringOptions {
filteringOptionsJson := os.Getenv(shared.MizuFilteringOptionsEnvVar)
if filteringOptionsJson == "" {
return &tapApi.TrafficFilteringOptions{
HealthChecksUserAgentHeaders: []string{},
IgnoredUserAgents: []string{},
}
}
var filteringOptions tapApi.TrafficFilteringOptions
@@ -242,42 +240,16 @@ func getTrafficFilteringOptions() *tapApi.TrafficFilteringOptions {
return &filteringOptions
}
func filterItems(inChannel <-chan *tapApi.OutputChannelItem, outChannel chan *tapApi.OutputChannelItem, filterOptions *tapApi.TrafficFilteringOptions) {
func filterItems(inChannel <-chan *tapApi.OutputChannelItem, outChannel chan *tapApi.OutputChannelItem) {
for message := range inChannel {
if message.ConnectionInfo.IsOutgoing && api.CheckIsServiceIP(message.ConnectionInfo.ServerIP) {
continue
}
// TODO: move this to tappers https://up9.atlassian.net/browse/TRA-3441
if isHealthCheckByUserAgent(message, filterOptions.HealthChecksUserAgentHeaders) {
continue
}
outChannel <- message
}
}
func isHealthCheckByUserAgent(item *tapApi.OutputChannelItem, userAgentsToIgnore []string) bool {
if item.Protocol.Name != "http" {
return false
}
request := item.Pair.Request.Payload.(map[string]interface{})
reqDetails := request["details"].(map[string]interface{})
for _, header := range reqDetails["headers"].([]interface{}) {
h := header.(map[string]interface{})
if strings.ToLower(h["name"].(string)) == "user-agent" {
for _, userAgent := range userAgentsToIgnore {
if strings.Contains(strings.ToLower(h["value"].(string)), strings.ToLower(userAgent)) {
return true
}
}
return false
}
}
return false
}
func pipeTapChannelToSocket(connection *websocket.Conn, messageDataChannel <-chan *tapApi.OutputChannelItem) {
if connection == nil {
panic("Websocket connection is nil")

View File

@@ -10,6 +10,7 @@ import (
"mizuserver/pkg/utils"
"mizuserver/pkg/validation"
"net/http"
"regexp"
"time"
"github.com/google/martian/har"
@@ -64,31 +65,54 @@ func GetEntries(c *gin.Context) {
c.JSON(http.StatusOK, baseEntries)
}
func UploadEntries(c *gin.Context) {
rlog.Infof("Upload entries - started\n")
func SyncEntries(c *gin.Context) {
rlog.Infof("Sync entries - started\n")
uploadParams := &models.UploadEntriesRequestQuery{}
if err := c.BindQuery(uploadParams); err != nil {
syncParams := &models.SyncEntriesRequestQuery{}
if err := c.BindQuery(syncParams); err != nil {
c.JSON(http.StatusBadRequest, err)
return
}
if err := validation.Validate(uploadParams); err != nil {
if err := validation.Validate(syncParams); err != nil {
c.JSON(http.StatusBadRequest, err)
return
}
if up9.GetAnalyzeInfo().IsAnalyzing {
c.String(http.StatusBadRequest, "Cannot analyze, mizu is already analyzing")
return
}
rlog.Infof("Upload entries - creating token. dest %s\n", uploadParams.Dest)
token, err := up9.CreateAnonymousToken(uploadParams.Dest)
if err != nil {
c.String(http.StatusServiceUnavailable, "Cannot analyze, mizu is already analyzing")
var (
token, model string
guestMode bool
)
if syncParams.Token == "" {
rlog.Infof("Sync entries - creating token. env %s\n", syncParams.Env)
guestToken, err := up9.CreateAnonymousToken(syncParams.Env)
if err != nil {
c.String(http.StatusServiceUnavailable, "Failed creating anonymous token")
return
}
token = guestToken.Token
model = guestToken.Model
guestMode = true
} else {
token = fmt.Sprintf("bearer %s", syncParams.Token)
model = syncParams.Workspace
guestMode = false
}
modelRegex, _ := regexp.Compile("[A-Za-z0-9][-A-Za-z0-9_.]*[A-Za-z0-9]+$")
if len(model) > 63 || !modelRegex.MatchString(model) {
c.String(http.StatusBadRequest, "Invalid model name")
return
}
rlog.Infof("Upload entries - uploading. token: %s model: %s\n", token.Token, token.Model)
go up9.UploadEntriesImpl(token.Token, token.Model, uploadParams.Dest, uploadParams.SleepIntervalSec)
rlog.Infof("Sync entries - syncing. token: %s, model: %s, guest mode: %v\n", token, model, guestMode)
go up9.SyncEntriesImpl(token, model, syncParams.Env, syncParams.UploadIntervalSec, guestMode)
c.String(http.StatusOK, "OK")
}

View File

@@ -22,9 +22,11 @@ type EntriesFilter struct {
Timestamp int64 `form:"timestamp" validate:"required,min=1"`
}
type UploadEntriesRequestQuery struct {
Dest string `form:"dest"`
SleepIntervalSec int `form:"interval"`
type SyncEntriesRequestQuery struct {
Token string `form:"token"`
Env string `form:"env"`
Workspace string `form:"workspace"`
UploadIntervalSec int `form:"interval"`
}
type HarFetchRequestQuery struct {

View File

@@ -12,7 +12,7 @@ func EntriesRoutes(ginApp *gin.Engine) {
routeGroup.GET("/entries", controllers.GetEntries) // get entries (base/thin entries)
routeGroup.GET("/entries/:entryId", controllers.GetEntry) // get single (full) entry
routeGroup.GET("/exportEntries", controllers.GetFullEntries)
routeGroup.GET("/uploadEntries", controllers.UploadEntries)
routeGroup.GET("/syncEntries", controllers.SyncEntries)
routeGroup.GET("/resolving", controllers.GetCurrentResolvingInformation)
routeGroup.GET("/resetDB", controllers.DeleteAllEntries) // get single (full) entry

View File

@@ -55,18 +55,24 @@ func CreateAnonymousToken(envPrefix string) (*GuestToken, error) {
return token, nil
}
func GetRemoteUrl(analyzeDestination string, analyzeToken string) string {
return fmt.Sprintf("https://%s/share/%s", analyzeDestination, analyzeToken)
func GetRemoteUrl(analyzeDestination string, analyzeModel string, analyzeToken string, guestMode bool) string {
if guestMode {
return fmt.Sprintf("https://%s/share/%s", analyzeDestination, analyzeToken)
}
return fmt.Sprintf("https://%s/app/workspaces/%s", analyzeDestination, analyzeModel)
}
func CheckIfModelReady(analyzeDestination string, analyzeModel string, analyzeToken string) bool {
func CheckIfModelReady(analyzeDestination string, analyzeModel string, analyzeToken string, guestMode bool) bool {
statusUrl, _ := url.Parse(fmt.Sprintf("https://trcc.%s/models/%s/status", analyzeDestination, analyzeModel))
authHeader := getAuthHeader(guestMode)
req := &http.Request{
Method: http.MethodGet,
URL: statusUrl,
Header: map[string][]string{
"Content-Type": {"application/json"},
"Guest-Auth": {analyzeToken},
authHeader: {analyzeToken},
},
}
statusResp, err := http.DefaultClient.Do(req)
@@ -81,6 +87,14 @@ func CheckIfModelReady(analyzeDestination string, analyzeModel string, analyzeTo
return target.LastMajorGeneration > 0
}
func getAuthHeader(guestMode bool) string {
if guestMode {
return "Guest-Auth"
}
return "Authorization"
}
func GetTrafficDumpUrl(analyzeDestination string, analyzeModel string) *url.URL {
strUrl := fmt.Sprintf("https://traffic.%s/dumpTrafficBulk/%s", analyzeDestination, analyzeModel)
if strings.HasPrefix(analyzeDestination, "http") {
@@ -92,6 +106,7 @@ func GetTrafficDumpUrl(analyzeDestination string, analyzeModel string) *url.URL
type AnalyzeInformation struct {
IsAnalyzing bool
GuestMode bool
SentCount int
AnalyzedModel string
AnalyzeToken string
@@ -100,6 +115,7 @@ type AnalyzeInformation struct {
func (info *AnalyzeInformation) Reset() {
info.IsAnalyzing = false
info.GuestMode = true
info.AnalyzedModel = ""
info.AnalyzeToken = ""
info.AnalyzeDestination = ""
@@ -111,20 +127,21 @@ var analyzeInformation = &AnalyzeInformation{}
func GetAnalyzeInfo() *shared.AnalyzeStatus {
return &shared.AnalyzeStatus{
IsAnalyzing: analyzeInformation.IsAnalyzing,
RemoteUrl: GetRemoteUrl(analyzeInformation.AnalyzeDestination, analyzeInformation.AnalyzeToken),
IsRemoteReady: CheckIfModelReady(analyzeInformation.AnalyzeDestination, analyzeInformation.AnalyzedModel, analyzeInformation.AnalyzeToken),
RemoteUrl: GetRemoteUrl(analyzeInformation.AnalyzeDestination, analyzeInformation.AnalyzedModel, analyzeInformation.AnalyzeToken, analyzeInformation.GuestMode),
IsRemoteReady: CheckIfModelReady(analyzeInformation.AnalyzeDestination, analyzeInformation.AnalyzedModel, analyzeInformation.AnalyzeToken, analyzeInformation.GuestMode),
SentCount: analyzeInformation.SentCount,
}
}
func UploadEntriesImpl(token string, model string, envPrefix string, sleepIntervalSec int) {
func SyncEntriesImpl(token string, model string, envPrefix string, uploadIntervalSec int, guestMode bool) {
analyzeInformation.IsAnalyzing = true
analyzeInformation.GuestMode = guestMode
analyzeInformation.AnalyzedModel = model
analyzeInformation.AnalyzeToken = token
analyzeInformation.AnalyzeDestination = envPrefix
analyzeInformation.SentCount = 0
sleepTime := time.Second * time.Duration(sleepIntervalSec)
sleepTime := time.Second * time.Duration(uploadIntervalSec)
var timestampFrom int64 = 0
@@ -160,7 +177,7 @@ func UploadEntriesImpl(token string, model string, envPrefix string, sleepInterv
body, jMarshalErr := json.Marshal(result)
if jMarshalErr != nil {
analyzeInformation.Reset()
rlog.Infof("Stopping analyzing")
rlog.Infof("Stopping sync entries")
log.Fatal(jMarshalErr)
}
@@ -170,20 +187,21 @@ func UploadEntriesImpl(token string, model string, envPrefix string, sleepInterv
_ = w.Close()
reqBody := ioutil.NopCloser(bytes.NewReader(in.Bytes()))
authHeader := getAuthHeader(guestMode)
req := &http.Request{
Method: http.MethodPost,
URL: GetTrafficDumpUrl(envPrefix, model),
Header: map[string][]string{
"Content-Encoding": {"deflate"},
"Content-Type": {"application/octet-stream"},
"Guest-Auth": {token},
authHeader: {token},
},
Body: reqBody,
}
if _, postErr := http.DefaultClient.Do(req); postErr != nil {
analyzeInformation.Reset()
rlog.Info("Stopping analyzing")
rlog.Info("Stopping sync entries")
log.Fatal(postErr)
}
analyzeInformation.SentCount += len(entriesArray)

View File

@@ -1,38 +0,0 @@
package utils
import (
"github.com/gorilla/websocket"
"github.com/romana/rlog"
"time"
)
const (
DEFAULT_SOCKET_RETRIES = 3
DEFAULT_SOCKET_RETRY_SLEEP_TIME = time.Second * 10
)
func ConnectToSocketServer(address string) (*websocket.Conn, error) {
var err error
var connection *websocket.Conn
try := 0
// Connection to server fails if client pod is up before server.
// Retries solve this issue.
for try < DEFAULT_SOCKET_RETRIES {
rlog.Infof("Trying to connect to websocket: %s, attempt: %v/%v", address, try, DEFAULT_SOCKET_RETRIES)
connection, _, err = websocket.DefaultDialer.Dial(address, nil)
if err != nil {
rlog.Warnf("Failed connecting to websocket: %s, attempt: %v/%v, err: %s, (%v,%+v)", address, try, DEFAULT_SOCKET_RETRIES, err, err, err)
try++
} else {
break
}
time.Sleep(DEFAULT_SOCKET_RETRY_SLEEP_TIME)
}
if err != nil {
return nil, err
}
return connection, nil
}

View File

@@ -82,23 +82,23 @@ func (provider *apiServerProvider) ReportTappedPods(pods []core.Pod) error {
}
}
func (provider *apiServerProvider) RequestAnalysis(analysisDestination string, sleepIntervalSec int) error {
func (provider *apiServerProvider) RequestSyncEntries(envName string, workspace string, uploadIntervalSec int, token string) error {
if !provider.isReady {
return fmt.Errorf("trying to reach api server when not initialized yet")
}
urlPath := fmt.Sprintf("%s/api/uploadEntries?dest=%s&interval=%v", provider.url, url.QueryEscape(analysisDestination), sleepIntervalSec)
u, parseErr := url.ParseRequestURI(urlPath)
urlPath := fmt.Sprintf("%s/api/syncEntries?env=%s&workspace=%s&token=%s&interval=%v", provider.url, url.QueryEscape(envName), url.QueryEscape(workspace), url.QueryEscape(token), uploadIntervalSec)
syncEntriesUrl, parseErr := url.ParseRequestURI(urlPath)
if parseErr != nil {
logger.Log.Fatal("Failed parsing the URL (consider changing the analysis dest URL), err: %v", parseErr)
logger.Log.Fatal("Failed parsing the URL (consider changing the env name), err: %v", parseErr)
}
logger.Log.Debugf("Analysis url %v", u.String())
if response, requestErr := http.Get(u.String()); requestErr != nil {
return fmt.Errorf("failed to notify agent for analysis, err: %w", requestErr)
logger.Log.Debugf("Sync entries url %v", syncEntriesUrl.String())
if response, requestErr := http.Get(syncEntriesUrl.String()); requestErr != nil {
return fmt.Errorf("failed to notify api server for sync entries, err: %w", requestErr)
} else if response.StatusCode != 200 {
return fmt.Errorf("failed to notify agent for analysis, status code: %v", response.StatusCode)
return fmt.Errorf("failed to notify api server for sync entries, status code: %v", response.StatusCode)
} else {
logger.Log.Infof(uiUtils.Purple, "Traffic is uploading to UP9 for further analysis")
logger.Log.Infof(uiUtils.Purple, "Entries are syncing to UP9 for further analysis")
return nil
}
}

175
cli/auth/authProvider.go Normal file
View File

@@ -0,0 +1,175 @@
package auth
import (
"context"
"errors"
"fmt"
"github.com/golang-jwt/jwt/v4"
"github.com/google/uuid"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/logger"
"github.com/up9inc/mizu/cli/uiUtils"
"golang.org/x/oauth2"
"net"
"net/http"
"os"
"time"
)
const loginTimeoutInMin = 2
// Ports are configured in keycloak "cli" client as valid redirect URIs. A change here must be reflected there as well.
var listenPorts = []int{3141, 4001, 5002, 6003, 7004, 8005, 9006, 10007}
func IsTokenExpired(tokenString string) (bool, error) {
token, _, err := new(jwt.Parser).ParseUnverified(tokenString, jwt.MapClaims{})
if err != nil {
return true, fmt.Errorf("failed to parse token, err: %v", err)
}
claims, ok := token.Claims.(jwt.MapClaims)
if !ok {
return true, fmt.Errorf("can't convert token's claims to standard claims")
}
expiry := time.Unix(int64(claims["exp"].(float64)), 0)
return time.Now().After(expiry), nil
}
func Login() error {
token, loginErr := loginInteractively()
if loginErr != nil {
return fmt.Errorf("failed login interactively, err: %v", loginErr)
}
authConfig := configStructs.AuthConfig{
EnvName: config.Config.Auth.EnvName,
Token: token.AccessToken,
}
configFile, defaultConfigErr := config.GetConfigWithDefaults()
if defaultConfigErr != nil {
return fmt.Errorf("failed getting config with defaults, err: %v", defaultConfigErr)
}
if err := config.LoadConfigFile(config.Config.ConfigFilePath, configFile); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed getting config file, err: %v", err)
}
configFile.Auth = authConfig
if err := config.WriteConfig(configFile); err != nil {
return fmt.Errorf("failed writing config with auth, err: %v", err)
}
config.Config.Auth = authConfig
logger.Log.Infof("Login successfully, token stored in config path: %s", fmt.Sprintf(uiUtils.Purple, config.Config.ConfigFilePath))
return nil
}
func loginInteractively() (*oauth2.Token, error) {
tokenChannel := make(chan *oauth2.Token)
errorChannel := make(chan error)
server := http.Server{}
go startLoginServer(tokenChannel, errorChannel, &server)
defer func() {
if err := server.Shutdown(context.Background()); err != nil {
logger.Log.Debugf("Error shutting down server, err: %v", err)
}
}()
select {
case <-time.After(loginTimeoutInMin * time.Minute):
return nil, errors.New("auth timed out")
case err := <-errorChannel:
return nil, err
case token := <-tokenChannel:
return token, nil
}
}
func startLoginServer(tokenChannel chan *oauth2.Token, errorChannel chan error, server *http.Server) {
for _, port := range listenPorts {
var authConfig = &oauth2.Config{
ClientID: "cli",
RedirectURL: fmt.Sprintf("http://localhost:%v/callback", port),
Endpoint: oauth2.Endpoint{
AuthURL: fmt.Sprintf("https://auth.%s/auth/realms/testr/protocol/openid-connect/auth", config.Config.Auth.EnvName),
TokenURL: fmt.Sprintf("https://auth.%s/auth/realms/testr/protocol/openid-connect/token", config.Config.Auth.EnvName),
},
}
state := uuid.New()
mux := http.NewServeMux()
server.Handler = mux
mux.Handle("/callback", loginCallbackHandler(tokenChannel, errorChannel, authConfig, state))
listener, listenErr := net.Listen("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", port))
if listenErr != nil {
logger.Log.Debugf("failed to start listening on port %v, err: %v", port, listenErr)
continue
}
authorizationUrl := authConfig.AuthCodeURL(state.String())
uiUtils.OpenBrowser(authorizationUrl)
serveErr := server.Serve(listener)
if serveErr == http.ErrServerClosed {
logger.Log.Debugf("received server shutdown, server on port %v is closed", port)
return
} else if serveErr != nil {
logger.Log.Debugf("failed to start serving on port %v, err: %v", port, serveErr)
continue
}
logger.Log.Debugf("didn't receive server closed on port %v", port)
return
}
errorChannel <- fmt.Errorf("failed to start serving on all listen ports, ports: %v", listenPorts)
}
func loginCallbackHandler(tokenChannel chan *oauth2.Token, errorChannel chan error, authConfig *oauth2.Config, state uuid.UUID) http.Handler {
return http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
if err := request.ParseForm(); err != nil {
errorMsg := fmt.Sprintf("failed to parse form, err: %v", err)
http.Error(writer, errorMsg, http.StatusBadRequest)
errorChannel <- fmt.Errorf(errorMsg)
return
}
requestState := request.Form.Get("state")
if requestState != state.String() {
errorMsg := fmt.Sprintf("state invalid, requestState: %v, authState:%v", requestState, state.String())
http.Error(writer, errorMsg, http.StatusBadRequest)
errorChannel <- fmt.Errorf(errorMsg)
return
}
code := request.Form.Get("code")
if code == "" {
errorMsg := "code not found"
http.Error(writer, errorMsg, http.StatusBadRequest)
errorChannel <- fmt.Errorf(errorMsg)
return
}
token, err := authConfig.Exchange(context.Background(), code)
if err != nil {
errorMsg := fmt.Sprintf("failed to create token, err: %v", err)
http.Error(writer, errorMsg, http.StatusInternalServerError)
errorChannel <- fmt.Errorf(errorMsg)
return
}
tokenChannel <- token
http.Redirect(writer, request, fmt.Sprintf("https://%s/CliLogin", config.Config.Auth.EnvName), http.StatusFound)
})
}

View File

@@ -4,9 +4,7 @@ import (
"context"
"fmt"
"os"
"os/exec"
"os/signal"
"runtime"
"syscall"
"github.com/up9inc/mizu/cli/config"
@@ -49,21 +47,3 @@ func waitForFinish(ctx context.Context, cancel context.CancelFunc) {
}
}
func openBrowser(url string) {
var err error
switch runtime.GOOS {
case "linux":
err = exec.Command("xdg-open", url).Start()
case "windows":
err = exec.Command("rundll32", "url.dll,FileProtocolHandler", url).Start()
case "darwin":
err = exec.Command("open", url).Start()
default:
err = fmt.Errorf("unsupported platform")
}
if err != nil {
logger.Log.Errorf("error while opening browser, %v", err)
}
}

View File

@@ -9,7 +9,6 @@ import (
"github.com/up9inc/mizu/cli/logger"
"github.com/up9inc/mizu/cli/telemetry"
"github.com/up9inc/mizu/cli/uiUtils"
"io/ioutil"
)
var configCmd = &cobra.Command{
@@ -18,22 +17,30 @@ var configCmd = &cobra.Command{
RunE: func(cmd *cobra.Command, args []string) error {
go telemetry.ReportRun("config", config.Config.Config)
template, err := config.GetConfigWithDefaults()
configWithDefaults, err := config.GetConfigWithDefaults()
if err != nil {
logger.Log.Errorf("Failed generating config with defaults %v", err)
logger.Log.Errorf("Failed generating config with defaults, err: %v", err)
return nil
}
if config.Config.Config.Regenerate {
data := []byte(template)
if err := ioutil.WriteFile(config.Config.ConfigFilePath, data, 0644); err != nil {
logger.Log.Errorf("Failed writing config %v", err)
if err := config.WriteConfig(configWithDefaults); err != nil {
logger.Log.Errorf("Failed writing config with defaults, err: %v", err)
return nil
}
logger.Log.Infof(fmt.Sprintf("Template File written to %s", fmt.Sprintf(uiUtils.Purple, config.Config.ConfigFilePath)))
} else {
template, err := uiUtils.PrettyYaml(configWithDefaults)
if err != nil {
logger.Log.Errorf("Failed converting config with defaults to yaml, err: %v", err)
return nil
}
logger.Log.Debugf("Writing template config.\n%v", template)
fmt.Printf("%v", template)
}
return nil
},
}

View File

@@ -3,20 +3,19 @@ package cmd
import (
"errors"
"fmt"
"os"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/logger"
"github.com/up9inc/mizu/cli/telemetry"
"github.com/creasty/defaults"
"github.com/spf13/cobra"
"github.com/up9inc/mizu/cli/auth"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/errormessage"
"github.com/up9inc/mizu/cli/logger"
"github.com/up9inc/mizu/cli/telemetry"
"github.com/up9inc/mizu/cli/uiUtils"
"os"
)
const analysisMessageToConfirm = `NOTE: running mizu with --analysis flag will upload recorded traffic for further analysis and enriched presentation options.`
const uploadTrafficMessageToConfirm = `NOTE: running mizu with --%s flag will upload recorded traffic for further analysis and enriched presentation options.`
var tapCmd = &cobra.Command{
Use: "tap [POD REGEX]",
@@ -39,20 +38,52 @@ Supported protocols are HTTP and gRPC.`,
return errormessage.FormatError(err)
}
logger.Log.Infof("Mizu will store up to %s of traffic, old traffic will be cleared once the limit is reached.", config.Config.Tap.HumanMaxEntriesDBSize)
if config.Config.Tap.Workspace != "" {
askConfirmation(configStructs.WorkspaceTapName)
if config.Config.Tap.Analysis {
logger.Log.Infof(analysisMessageToConfirm)
if !uiUtils.AskForConfirmation("Would you like to proceed [Y/n]: ") {
logger.Log.Infof("You can always run mizu without analysis, aborting")
os.Exit(0)
if config.Config.Auth.Token == "" {
logger.Log.Infof("This action requires authentication, please log in to continue")
if err := auth.Login(); err != nil {
logger.Log.Errorf("failed to log in, err: %v", err)
return nil
}
} else {
tokenExpired, err := auth.IsTokenExpired(config.Config.Auth.Token)
if err != nil {
logger.Log.Errorf("failed to check if token is expired, err: %v", err)
return nil
}
if tokenExpired {
logger.Log.Infof("Token expired, please log in again to continue")
if err := auth.Login(); err != nil {
logger.Log.Errorf("failed to log in, err: %v", err)
return nil
}
}
}
}
if config.Config.Tap.Analysis {
askConfirmation(configStructs.AnalysisTapName)
config.Config.Auth.Token = ""
}
logger.Log.Infof("Mizu will store up to %s of traffic, old traffic will be cleared once the limit is reached.", config.Config.Tap.HumanMaxEntriesDBSize)
return nil
},
}
func askConfirmation(flagName string) {
logger.Log.Infof(fmt.Sprintf(uploadTrafficMessageToConfirm, flagName))
if !uiUtils.AskForConfirmation("Would you like to proceed [Y/n]: ") {
logger.Log.Infof("You can always run mizu without %s, aborting", flagName)
os.Exit(0)
}
}
func init() {
rootCmd.AddCommand(tapCmd)
@@ -67,8 +98,6 @@ func init() {
tapCmd.Flags().Bool(configStructs.DisableRedactionTapName, defaultTapConfig.DisableRedaction, "Disables redaction of potentially sensitive request/response headers and body values")
tapCmd.Flags().String(configStructs.HumanMaxEntriesDBSizeTapName, defaultTapConfig.HumanMaxEntriesDBSize, "Override the default max entries db size")
tapCmd.Flags().Bool(configStructs.DryRunTapName, defaultTapConfig.DryRun, "Preview of all pods matching the regex, without tapping them")
tapCmd.Flags().StringP(configStructs.WorkspaceTapName, "w", defaultTapConfig.Workspace, "Uploads traffic to your UP9 workspace for further analysis (requires auth)")
tapCmd.Flags().String(configStructs.EnforcePolicyFile, defaultTapConfig.EnforcePolicyFile, "Yaml file path with policy rules")
tapCmd.Flags().String(configStructs.EnforcePolicyFileDeprecated, defaultTapConfig.EnforcePolicyFileDeprecated, "Yaml file with policy rules")
tapCmd.Flags().MarkDeprecated(configStructs.EnforcePolicyFileDeprecated, fmt.Sprintf("Use --%s instead", configStructs.EnforcePolicyFile))
}

View File

@@ -49,15 +49,8 @@ func RunMizuTap() {
}
var mizuValidationRules string
if config.Config.Tap.EnforcePolicyFile != "" || config.Config.Tap.EnforcePolicyFileDeprecated != "" {
var trafficValidation string
if config.Config.Tap.EnforcePolicyFile != "" {
trafficValidation = config.Config.Tap.EnforcePolicyFile
} else {
trafficValidation = config.Config.Tap.EnforcePolicyFileDeprecated
}
mizuValidationRules, err = readValidationRules(trafficValidation)
if config.Config.Tap.EnforcePolicyFile != "" {
mizuValidationRules, err = readValidationRules(config.Config.Tap.EnforcePolicyFile)
if err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error reading policy file: %v", errormessage.FormatError(err)))
return
@@ -109,19 +102,17 @@ func RunMizuTap() {
return
}
nodeToTappedPodIPMap := getNodeHostToTappedPodIpsMap(state.currentlyTappedPods)
defer finishMizuExecution(kubernetesProvider)
if err := createMizuResources(ctx, kubernetesProvider, nodeToTappedPodIPMap, mizuApiFilteringOptions, mizuValidationRules); err != nil {
if err := createMizuResources(ctx, kubernetesProvider, mizuApiFilteringOptions, mizuValidationRules); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error creating resources: %v", errormessage.FormatError(err)))
return
}
go goUtils.HandleExcWrapper(watchApiServerPod, ctx, kubernetesProvider, cancel)
go goUtils.HandleExcWrapper(watchTapperPod, ctx, kubernetesProvider, cancel, nodeToTappedPodIPMap)
go goUtils.HandleExcWrapper(watchApiServerPod, ctx, kubernetesProvider, cancel, mizuApiFilteringOptions)
go goUtils.HandleExcWrapper(watchTapperPod, ctx, kubernetesProvider, cancel)
go goUtils.HandleExcWrapper(watchPodsForTapping, ctx, kubernetesProvider, targetNamespaces, cancel, mizuApiFilteringOptions)
//block until exit signal or error
// block until exit signal or error
waitForFinish(ctx, cancel)
}
@@ -134,7 +125,7 @@ func readValidationRules(file string) (string, error) {
return string(newContent), nil
}
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, mizuApiFilteringOptions *api.TrafficFilteringOptions, mizuValidationRules string) error {
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, mizuApiFilteringOptions *api.TrafficFilteringOptions, mizuValidationRules string) error {
if !config.Config.IsNsRestrictedMode() {
if err := createMizuNamespace(ctx, kubernetesProvider); err != nil {
return err
@@ -145,10 +136,6 @@ func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Pro
return err
}
if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, mizuApiFilteringOptions); err != nil {
return err
}
if err := createMizuConfigmap(ctx, kubernetesProvider, mizuValidationRules); err != nil {
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Failed to create resources required for policy validation. Mizu will not validate policy rules. error: %v\n", errormessage.FormatError(err)))
}
@@ -222,13 +209,15 @@ func getMizuApiFilteringOptions() (*api.TrafficFilteringOptions, error) {
}
return &api.TrafficFilteringOptions{
PlainTextMaskingRegexes: compiledRegexSlice,
HealthChecksUserAgentHeaders: config.Config.Tap.HealthChecksUserAgentHeaders,
DisableRedaction: config.Config.Tap.DisableRedaction,
PlainTextMaskingRegexes: compiledRegexSlice,
IgnoredUserAgents: config.Config.Tap.IgnoredUserAgents,
DisableRedaction: config.Config.Tap.DisableRedaction,
}, nil
}
func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, mizuApiFilteringOptions *api.TrafficFilteringOptions) error {
func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, mizuApiFilteringOptions *api.TrafficFilteringOptions) error {
nodeToTappedPodIPMap := getNodeHostToTappedPodIpsMap(state.currentlyTappedPods)
if len(nodeToTappedPodIPMap) > 0 {
var serviceAccountName string
if state.mizuServiceAccountExists {
@@ -407,13 +396,8 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
logger.Log.Debugf("[Error] failed update tapped pods %v", err)
}
nodeToTappedPodIPMap := getNodeHostToTappedPodIpsMap(state.currentlyTappedPods)
if err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error building node to ips map: %v", errormessage.FormatError(err)))
cancel()
}
if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, mizuApiFilteringOptions); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error updating daemonset: %v", errormessage.FormatError(err)))
if err := updateMizuTappers(ctx, kubernetesProvider, mizuApiFilteringOptions); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error updating tappers: %v", errormessage.FormatError(err)))
cancel()
}
}
@@ -531,7 +515,7 @@ func getMissingPods(pods1 []core.Pod, pods2 []core.Pod) []core.Pod {
return missingPods
}
func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, mizuApiFilteringOptions *api.TrafficFilteringOptions) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", mizu.ApiServerPodName))
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, []string{config.Config.MizuResourcesNamespace}, podExactRegex)
isPodReady := false
@@ -561,6 +545,23 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
}
logger.Log.Debugf("Watching API Server pod loop, modified: %v", modifiedPod.Status.Phase)
if modifiedPod.Status.Phase == core.PodPending {
if modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue {
logger.Log.Debugf("Wasn't able to deploy the API server. Reason: \"%s\"", modifiedPod.Status.Conditions[0].Message)
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server, for more info check logs at %s", logger.GetLogFilePath()))
cancel()
break
}
if len(modifiedPod.Status.ContainerStatuses) > 0 && modifiedPod.Status.ContainerStatuses[0].State.Waiting != nil && modifiedPod.Status.ContainerStatuses[0].State.Waiting.Reason == "ErrImagePull" {
logger.Log.Debugf("Wasn't able to deploy the API server. (ErrImagePull) Reason: \"%s\"", modifiedPod.Status.ContainerStatuses[0].State.Waiting.Message)
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server: failed to pull the image, for more info check logs at %v", logger.GetLogFilePath()))
cancel()
break
}
}
if modifiedPod.Status.Phase == core.PodRunning && !isPodReady {
isPodReady = true
go startProxyReportErrorIfAny(kubernetesProvider, cancel)
@@ -571,21 +572,25 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
cancel()
break
}
if err := updateMizuTappers(ctx, kubernetesProvider, mizuApiFilteringOptions); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error updating tappers: %v", errormessage.FormatError(err)))
cancel()
}
logger.Log.Infof("Mizu is available at %s\n", url)
openBrowser(url)
requestForAnalysisIfNeeded()
uiUtils.OpenBrowser(url)
requestForSyncEntriesIfNeeded()
if err := apiserver.Provider.ReportTappedPods(state.currentlyTappedPods); err != nil {
logger.Log.Debugf("[Error] failed update tapped pods %v", err)
}
}
case _, ok := <-errorChan:
case err, ok := <-errorChan:
if !ok {
errorChan = nil
continue
}
logger.Log.Debugf("[ERROR] Agent creation, watching %v namespace", config.Config.MizuResourcesNamespace)
logger.Log.Debugf("[ERROR] Agent creation, watching %v namespace, error: %v", config.Config.MizuResourcesNamespace, err)
cancel()
case <-timeAfter:
@@ -600,14 +605,10 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
}
}
func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, nodeToTappedPodIPMap map[string][]string) {
func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", mizu.TapperDaemonSetName))
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, []string{config.Config.MizuResourcesNamespace}, podExactRegex)
var prevPodPhase core.PodPhase
var appendMetaname bool
if len(nodeToTappedPodIPMap) > 1 {
appendMetaname = true
}
for {
select {
case addedPod, ok := <-added:
@@ -616,22 +617,14 @@ func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider
continue
}
if appendMetaname {
logger.Log.Debugf("Tapper is created [%s]", addedPod.ObjectMeta.Name)
} else {
logger.Log.Debugf("Tapper is created")
}
logger.Log.Debugf("Tapper is created [%s]", addedPod.Name)
case removedPod, ok := <-removed:
if !ok {
removed = nil
continue
}
if appendMetaname {
logger.Log.Debugf("Tapper is removed [%s]", removedPod.ObjectMeta.Name)
} else {
logger.Log.Debugf("Tapper is removed")
}
logger.Log.Debugf("Tapper is removed [%s]", removedPod.Name)
case modifiedPod, ok := <-modified:
if !ok {
modified = nil
@@ -639,13 +632,14 @@ func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider
}
if modifiedPod.Status.Phase == core.PodPending && modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue {
logger.Log.Infof(uiUtils.Red, "Cannot deploy the tapper. Reason: \"%s\"", modifiedPod.Status.Conditions[0].Message)
logger.Log.Infof(uiUtils.Red, fmt.Sprintf("Wasn't able to deploy the tapper %s. Reason: \"%s\"", modifiedPod.Name, modifiedPod.Status.Conditions[0].Message))
cancel()
break
}
podStatus := modifiedPod.Status
if podStatus.Phase == core.PodPending && prevPodPhase == podStatus.Phase {
logger.Log.Debugf("Tapper %s is %s", modifiedPod.Name, strings.ToLower(string(podStatus.Phase)))
continue
}
prevPodPhase = podStatus.Phase
@@ -655,22 +649,19 @@ func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider
if state.Terminated != nil {
switch state.Terminated.Reason {
case "OOMKilled":
logger.Log.Infof(uiUtils.Red, "Tapper is terminated! OOMKilled. Increase pod resources.")
logger.Log.Infof(uiUtils.Red, fmt.Sprintf("Tapper %s was terminated (reason: OOMKilled). You should consider increasing machine resources.", modifiedPod.Name))
}
} else {
logger.Log.Debugf("Tapper is %s", strings.ToLower(string(podStatus.Phase)))
}
} else {
logger.Log.Debugf("Tapper is %s", strings.ToLower(string(podStatus.Phase)))
}
case _, ok := <-errorChan:
logger.Log.Debugf("Tapper %s is %s", modifiedPod.Name, strings.ToLower(string(podStatus.Phase)))
case err, ok := <-errorChan:
if !ok {
errorChan = nil
continue
}
logger.Log.Errorf("[ERROR] Tapper creation, watching %v namespace", config.Config.MizuResourcesNamespace)
logger.Log.Debugf("[Error] Error in mizu tapper watch, err: %v", err)
cancel()
case <-ctx.Done():
@@ -680,12 +671,13 @@ func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider
}
}
func requestForAnalysisIfNeeded() {
if !config.Config.Tap.Analysis {
func requestForSyncEntriesIfNeeded() {
if !config.Config.Tap.Analysis && config.Config.Tap.Workspace == "" {
return
}
if err := apiserver.Provider.RequestAnalysis(config.Config.Tap.AnalysisDestination, config.Config.Tap.SleepIntervalSec); err != nil {
logger.Log.Debugf("[Error] failed requesting for analysis %v", err)
if err := apiserver.Provider.RequestSyncEntries(config.Config.Auth.EnvName, config.Config.Tap.Workspace, config.Config.Tap.UploadIntervalSec, config.Config.Auth.Token); err != nil {
logger.Log.Debugf("[Error] failed requesting for sync entries, err: %v", err)
}
}

View File

@@ -25,4 +25,7 @@ func init() {
defaults.Set(&defaultViewConfig)
viewCmd.Flags().Uint16P(configStructs.GuiPortViewName, "p", defaultViewConfig.GuiPort, "Provide a custom port for the web interface webserver")
viewCmd.Flags().StringP(configStructs.UrlViewName, "u", defaultViewConfig.Url, "Provide a custom host")
viewCmd.Flags().MarkHidden(configStructs.UrlViewName)
}

View File

@@ -24,35 +24,41 @@ func runMizuView() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
exists, err := kubernetesProvider.DoesServicesExist(ctx, config.Config.MizuResourcesNamespace, mizu.ApiServerPodName)
if err != nil {
logger.Log.Errorf("Failed to found mizu service %v", err)
cancel()
return
}
if !exists {
logger.Log.Infof("%s service not found, you should run `mizu tap` command first", mizu.ApiServerPodName)
cancel()
return
}
url := config.Config.View.Url
url := GetApiServerUrl()
if url == "" {
exists, err := kubernetesProvider.DoesServicesExist(ctx, config.Config.MizuResourcesNamespace, mizu.ApiServerPodName)
if err != nil {
logger.Log.Errorf("Failed to found mizu service %v", err)
cancel()
return
}
if !exists {
logger.Log.Infof("%s service not found, you should run `mizu tap` command first", mizu.ApiServerPodName)
cancel()
return
}
response, err := http.Get(fmt.Sprintf("%s/", url))
if err == nil && response.StatusCode == 200 {
logger.Log.Infof("Found a running service %s and open port %d", mizu.ApiServerPodName, config.Config.View.GuiPort)
return
}
logger.Log.Infof("Establishing connection to k8s cluster...")
go startProxyReportErrorIfAny(kubernetesProvider, cancel)
url = GetApiServerUrl()
if err := apiserver.Provider.InitAndTestConnection(GetApiServerUrl()); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", logger.GetLogFilePath()))
return
response, err := http.Get(fmt.Sprintf("%s/", url))
if err == nil && response.StatusCode == 200 {
logger.Log.Infof("Found a running service %s and open port %d", mizu.ApiServerPodName, config.Config.View.GuiPort)
return
}
logger.Log.Infof("Establishing connection to k8s cluster...")
go startProxyReportErrorIfAny(kubernetesProvider, cancel)
if err := apiserver.Provider.InitAndTestConnection(GetApiServerUrl()); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", logger.GetLogFilePath()))
return
}
}
logger.Log.Infof("Mizu is available at %s\n", url)
openBrowser(url)
uiUtils.OpenBrowser(url)
if isCompatible, err := version.CheckVersionCompatibility(); err != nil {
logger.Log.Errorf("Failed to check versions compatibility %v", err)
cancel()

View File

@@ -39,7 +39,7 @@ func InitConfig(cmd *cobra.Command) error {
configFilePathFlag := cmd.Flags().Lookup(ConfigFilePathCommandName)
configFilePath := configFilePathFlag.Value.String()
if err := mergeConfigFile(configFilePath); err != nil {
if err := LoadConfigFile(configFilePath, &Config); err != nil {
if configFilePathFlag.Changed || !os.IsNotExist(err) {
return fmt.Errorf("invalid config, %w\n"+
"you can regenerate the file by removing it (%v) and using `mizu config -r`", err, configFilePath)
@@ -54,19 +54,33 @@ func InitConfig(cmd *cobra.Command) error {
return nil
}
func GetConfigWithDefaults() (string, error) {
func GetConfigWithDefaults() (*ConfigStruct, error) {
defaultConf := ConfigStruct{}
if err := defaults.Set(&defaultConf); err != nil {
return "", err
return nil, err
}
configElem := reflect.ValueOf(&defaultConf).Elem()
setZeroForReadonlyFields(configElem)
return uiUtils.PrettyYaml(defaultConf)
return &defaultConf, nil
}
func mergeConfigFile(configFilePath string) error {
func WriteConfig(config *ConfigStruct) error {
template, err := uiUtils.PrettyYaml(config)
if err != nil {
return fmt.Errorf("failed converting config to yaml, err: %v", err)
}
data := []byte(template)
if err := ioutil.WriteFile(Config.ConfigFilePath, data, 0644); err != nil {
return fmt.Errorf("failed writing config, err: %v", err)
}
return nil
}
func LoadConfigFile(configFilePath string, config *ConfigStruct) error {
reader, openErr := os.Open(configFilePath)
if openErr != nil {
return openErr
@@ -77,10 +91,11 @@ func mergeConfigFile(configFilePath string) error {
return readErr
}
if err := yaml.Unmarshal(buf, &Config); err != nil {
if err := yaml.Unmarshal(buf, config); err != nil {
return err
}
logger.Log.Debugf("Found config file, merged to default options")
logger.Log.Debugf("Found config file, config path: %s", configFilePath)
return nil
}

View File

@@ -21,6 +21,7 @@ type ConfigStruct struct {
Version configStructs.VersionConfig `yaml:"version"`
View configStructs.ViewConfig `yaml:"view"`
Logs configStructs.LogsConfig `yaml:"logs"`
Auth configStructs.AuthConfig `yaml:"auth"`
Config configStructs.ConfigConfig `yaml:"config,omitempty"`
AgentImage string `yaml:"agent-image,omitempty" readonly:""`
ImagePullPolicyStr string `yaml:"image-pull-policy" default:"Always"`

View File

@@ -0,0 +1,6 @@
package configStructs
type AuthConfig struct {
EnvName string `yaml:"env-name" default:"up9.app"`
Token string `yaml:"token"`
}

View File

@@ -16,27 +16,26 @@ const (
DisableRedactionTapName = "no-redact"
HumanMaxEntriesDBSizeTapName = "max-entries-db-size"
DryRunTapName = "dry-run"
WorkspaceTapName = "workspace"
EnforcePolicyFile = "traffic-validation-file"
EnforcePolicyFileDeprecated = "test-rules"
)
type TapConfig struct {
AnalysisDestination string `yaml:"dest" default:"up9.app"`
SleepIntervalSec int `yaml:"upload-interval" default:"10"`
PodRegexStr string `yaml:"regex" default:".*"`
GuiPort uint16 `yaml:"gui-port" default:"8899"`
Namespaces []string `yaml:"namespaces"`
Analysis bool `yaml:"analysis" default:"false"`
AllNamespaces bool `yaml:"all-namespaces" default:"false"`
PlainTextFilterRegexes []string `yaml:"regex-masking"`
HealthChecksUserAgentHeaders []string `yaml:"ignored-user-agents"`
DisableRedaction bool `yaml:"no-redact" default:"false"`
HumanMaxEntriesDBSize string `yaml:"max-entries-db-size" default:"200MB"`
DryRun bool `yaml:"dry-run" default:"false"`
EnforcePolicyFile string `yaml:"traffic-validation-file"`
EnforcePolicyFileDeprecated string `yaml:"test-rules,omitempty" readonly:""`
ApiServerResources Resources `yaml:"api-server-resources"`
TapperResources Resources `yaml:"tapper-resources"`
UploadIntervalSec int `yaml:"upload-interval" default:"10"`
PodRegexStr string `yaml:"regex" default:".*"`
GuiPort uint16 `yaml:"gui-port" default:"8899"`
Namespaces []string `yaml:"namespaces"`
Analysis bool `yaml:"analysis" default:"false"`
AllNamespaces bool `yaml:"all-namespaces" default:"false"`
PlainTextFilterRegexes []string `yaml:"regex-masking"`
IgnoredUserAgents []string `yaml:"ignored-user-agents"`
DisableRedaction bool `yaml:"no-redact" default:"false"`
HumanMaxEntriesDBSize string `yaml:"max-entries-db-size" default:"200MB"`
DryRun bool `yaml:"dry-run" default:"false"`
Workspace string `yaml:"workspace"`
EnforcePolicyFile string `yaml:"traffic-validation-file"`
ApiServerResources Resources `yaml:"api-server-resources"`
TapperResources Resources `yaml:"tapper-resources"`
}
type Resources struct {
@@ -67,5 +66,16 @@ func (config *TapConfig) Validate() error {
return errors.New(fmt.Sprintf("Could not parse --%s value %s", HumanMaxEntriesDBSizeTapName, config.HumanMaxEntriesDBSize))
}
if config.Workspace != "" {
workspaceRegex, _ := regexp.Compile("[A-Za-z0-9][-A-Za-z0-9_.]*[A-Za-z0-9]+$")
if len(config.Workspace) > 63 || !workspaceRegex.MatchString(config.Workspace) {
return errors.New("invalid workspace name")
}
}
if config.Analysis && config.Workspace != "" {
return errors.New(fmt.Sprintf("Can't run with both --%s and --%s flags", AnalysisTapName, WorkspaceTapName))
}
return nil
}

View File

@@ -2,8 +2,10 @@ package configStructs
const (
GuiPortViewName = "gui-port"
UrlViewName = "url"
)
type ViewConfig struct {
GuiPort uint16 `yaml:"gui-port" default:"8899"`
Url string `yaml:"url,omitempty" readonly:""`
}

View File

@@ -3,6 +3,7 @@ package config_test
import (
"fmt"
"github.com/up9inc/mizu/cli/config"
"gopkg.in/yaml.v3"
"reflect"
"strings"
"testing"
@@ -15,10 +16,11 @@ func TestConfigWriteIgnoresReadonlyFields(t *testing.T) {
getFieldsWithReadonlyTag(configElem, &readonlyFields)
configWithDefaults, _ := config.GetConfigWithDefaults()
configWithDefaultsBytes, _ := yaml.Marshal(configWithDefaults)
for _, readonlyField := range readonlyFields {
t.Run(readonlyField, func(t *testing.T) {
readonlyFieldToCheck := fmt.Sprintf("\n%s:", readonlyField)
if strings.Contains(configWithDefaults, readonlyFieldToCheck) {
readonlyFieldToCheck := fmt.Sprintf(" %s:", readonlyField)
if strings.Contains(string(configWithDefaultsBytes), readonlyFieldToCheck) {
t.Errorf("unexpected result - readonly field: %v, config: %v", readonlyField, configWithDefaults)
}
})

View File

@@ -5,13 +5,15 @@ go 1.16
require (
github.com/creasty/defaults v1.5.1
github.com/denisbrodbeck/machineid v1.0.1
github.com/golang-jwt/jwt/v4 v4.1.0
github.com/google/go-github/v37 v37.0.0
github.com/gorilla/websocket v1.4.2
github.com/google/uuid v1.1.2
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
github.com/spf13/cobra v1.1.3
github.com/spf13/pflag v1.0.5
github.com/up9inc/mizu/shared v0.0.0
github.com/up9inc/mizu/tap/api v0.0.0
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
k8s.io/api v0.21.2
k8s.io/apimachinery v0.21.2

View File

@@ -175,6 +175,8 @@ github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-jwt/jwt/v4 v4.1.0 h1:XUgk2Ex5veyVFVeLm0xhusUTQybEbexJXrvPNOKkSY0=
github.com/golang-jwt/jwt/v4 v4.1.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
@@ -237,7 +239,6 @@ github.com/googleapis/gnostic v0.4.1 h1:DLJCy1n/vrD4HPjOvYcT8aYQXpPIzoRZONaYwyyc
github.com/googleapis/gnostic v0.4.1/go.mod h1:LRhVm6pbyptWbWbuZ38d1eyptfvIytN3ir6b65WBswg=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=

View File

@@ -33,7 +33,10 @@ func FilteredWatch(ctx context.Context, kubernetesProvider *Provider, targetName
return
}
pod := e.Object.(*corev1.Pod)
pod, ok := e.Object.(*corev1.Pod)
if !ok {
continue
}
if !podFilter.MatchString(pod.Name) {
continue

View File

@@ -9,6 +9,7 @@ import (
"io/ioutil"
"net/http"
"runtime"
"strings"
"time"
"github.com/google/go-github/v37/github"
@@ -74,10 +75,16 @@ func CheckNewerVersion(versionChan chan string) {
gitHubVersionSemVer := semver.SemVersion(gitHubVersion)
currentSemVer := semver.SemVersion(mizu.SemVer)
if !gitHubVersionSemVer.IsValid() || !currentSemVer.IsValid() {
logger.Log.Debugf("[ERROR] Semver version is not valid, github version %v, current version %v", gitHubVersion, currentSemVer)
versionChan <- ""
return
}
logger.Log.Debugf("Finished version validation, github version %v, current version %v, took %v", gitHubVersion, currentSemVer, time.Since(start))
if gitHubVersionSemVer.GreaterThan(currentSemVer) {
versionChan <- fmt.Sprintf("Update available! %v -> %v (curl -Lo mizu %v/mizu_%s_amd64 && chmod 755 mizu)", mizu.SemVer, gitHubVersion, *latestRelease.HTMLURL, runtime.GOOS)
versionChan <- fmt.Sprintf("Update available! %v -> %v (curl -Lo mizu %v/mizu_%s_amd64 && chmod 755 mizu)", mizu.SemVer, gitHubVersion, strings.Replace(*latestRelease.HTMLURL, "tag", "download", 1), runtime.GOOS)
} else {
versionChan <- ""
}

View File

@@ -0,0 +1,27 @@
package uiUtils
import (
"fmt"
"github.com/up9inc/mizu/cli/logger"
"os/exec"
"runtime"
)
func OpenBrowser(url string) {
var err error
switch runtime.GOOS {
case "linux":
err = exec.Command("xdg-open", url).Start()
case "windows":
err = exec.Command("rundll32", "url.dll,FileProtocolHandler", url).Start()
case "darwin":
err = exec.Command("open", url).Start()
default:
err = fmt.Errorf("unsupported platform")
}
if err != nil {
logger.Log.Errorf("error while opening browser, %v", err)
}
}

View File

@@ -31,12 +31,12 @@ mizu tap --traffic-validation-file rules.yaml
The structure of the traffic-validation-file is:
* `name`: string, name of the rule
* `type`: string, type of the rule, must be `json` or `header` or `latency`
* `type`: string, type of the rule, must be `json` or `header` or `slo`
* `key`: string, [jsonpath](https://code.google.com/archive/p/jsonpath/wikis/Javascript.wiki) used only in `json` or `header` type
* `value`: string, [regex](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Guide/Regular_Expressions) used only in `json` or `header` type
* `service`: string, [regex](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Guide/Regular_Expressions) service name to filter
* `path`: string, [regex](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Guide/Regular_Expressions) URL path to filter
* `latency`: integer, time in ms of the expected latency.
* `response-time`: integer, time in ms of the expected latency.
### For example:
@@ -54,8 +54,8 @@ rules:
key: "Content-Le.*"
value: "(\\d+(?:\\.\\d+)?)"
- name: latency-test
type: latency
latency: 1
type: slo
response-time: 1
service: "carts.*"
```

View File

@@ -6,9 +6,17 @@ import (
type SemVersion string
func (v SemVersion) IsValid() bool {
re := regexp.MustCompile(`\d+`)
breakdown := re.FindAllString(string(v), 3)
return len(breakdown) == 3
}
func (v SemVersion) Breakdown() (string, string, string) {
re := regexp.MustCompile(`\d+`)
breakdown := re.FindAllString(string(v), 3)
return breakdown[0], breakdown[1], breakdown[2]
}

View File

@@ -1,7 +1,7 @@
package api
type TrafficFilteringOptions struct {
HealthChecksUserAgentHeaders []string
PlainTextMaskingRegexes []*SerializableRegexp
DisableRedaction bool
IgnoredUserAgents []string
PlainTextMaskingRegexes []*SerializableRegexp
DisableRedaction bool
}

View File

@@ -14,9 +14,14 @@ import (
)
func filterAndEmit(item *api.OutputChannelItem, emitter api.Emitter, options *api.TrafficFilteringOptions) {
if IsIgnoredUserAgent(item, options) {
return
}
if !options.DisableRedaction {
FilterSensitiveData(item, options)
}
emitter.Emit(item)
}

View File

@@ -25,6 +25,30 @@ var personallyIdentifiableDataFields = []string{"token", "authorization", "authe
"zip", "zipcode", "address", "country", "firstname", "lastname",
"middlename", "fname", "lname", "birthdate"}
func IsIgnoredUserAgent(item *api.OutputChannelItem, options *api.TrafficFilteringOptions) bool {
if item.Protocol.Name != "http" {
return false
}
request := item.Pair.Request.Payload.(HTTPPayload).Data.(*http.Request)
for headerKey, headerValues := range request.Header {
if strings.ToLower(headerKey) == "user-agent" {
for _, userAgent := range options.IgnoredUserAgents {
for _, headerValue := range headerValues {
if strings.Contains(strings.ToLower(headerValue), strings.ToLower(userAgent)) {
return true
}
}
}
return false
}
}
return false
}
func FilterSensitiveData(item *api.OutputChannelItem, options *api.TrafficFilteringOptions) {
request := item.Pair.Request.Payload.(HTTPPayload).Data.(*http.Request)
response := item.Pair.Response.Payload.(HTTPPayload).Data.(*http.Response)

View File

@@ -130,8 +130,16 @@ func representMetadataResponse(data map[string]interface{}) []interface{} {
rep = representResponseHeader(data, rep)
payload := data["Payload"].(map[string]interface{})
topics, _ := json.Marshal(payload["Topics"].([]interface{}))
brokers, _ := json.Marshal(payload["Brokers"].([]interface{}))
topics := ""
if payload["Topics"] != nil {
_topics, _ := json.Marshal(payload["Topics"].([]interface{}))
topics = string(_topics)
}
brokers := ""
if payload["Brokers"] != nil {
_brokers, _ := json.Marshal(payload["Brokers"].([]interface{}))
brokers = string(_brokers)
}
controllerID := ""
clusterID := ""
throttleTimeMs := ""
@@ -155,7 +163,7 @@ func representMetadataResponse(data map[string]interface{}) []interface{} {
},
{
"name": "Brokers",
"value": string(brokers),
"value": brokers,
},
{
"name": "Cluster ID",
@@ -167,7 +175,7 @@ func representMetadataResponse(data map[string]interface{}) []interface{} {
},
{
"name": "Topics",
"value": string(topics),
"value": topics,
},
{
"name": "Cluster Authorized Operations",
@@ -303,7 +311,11 @@ func representProduceResponse(data map[string]interface{}) []interface{} {
rep = representResponseHeader(data, rep)
payload := data["Payload"].(map[string]interface{})
responses, _ := json.Marshal(payload["Responses"].([]interface{}))
responses := ""
if payload["Responses"] != nil {
_responses, _ := json.Marshal(payload["Responses"].([]interface{}))
responses = string(_responses)
}
throttleTimeMs := ""
if payload["ThrottleTimeMs"] != nil {
throttleTimeMs = fmt.Sprintf("%d", int(payload["ThrottleTimeMs"].(float64)))
@@ -333,7 +345,11 @@ func representFetchRequest(data map[string]interface{}) []interface{} {
rep = representRequestHeader(data, rep)
payload := data["Payload"].(map[string]interface{})
topics, _ := json.Marshal(payload["Topics"].([]interface{}))
topics := ""
if payload["Topics"] != nil {
_topics, _ := json.Marshal(payload["Topics"].([]interface{}))
topics = string(_topics)
}
replicaId := ""
if payload["ReplicaId"] != nil {
replicaId = fmt.Sprintf("%d", int(payload["ReplicaId"].(float64)))
@@ -361,7 +377,7 @@ func representFetchRequest(data map[string]interface{}) []interface{} {
}
rackId := ""
if payload["RackId"] != nil {
rackId = fmt.Sprintf("%d", int(payload["RackId"].(float64)))
rackId = payload["RackId"].(string)
}
repPayload, _ := json.Marshal([]map[string]string{
{
@@ -394,7 +410,7 @@ func representFetchRequest(data map[string]interface{}) []interface{} {
},
{
"name": "Topics",
"value": string(topics),
"value": topics,
},
{
"name": "Forgotten Topics Data",
@@ -420,7 +436,11 @@ func representFetchResponse(data map[string]interface{}) []interface{} {
rep = representResponseHeader(data, rep)
payload := data["Payload"].(map[string]interface{})
responses, _ := json.Marshal(payload["Responses"].([]interface{}))
responses := ""
if payload["Responses"] != nil {
_responses, _ := json.Marshal(payload["Responses"].([]interface{}))
responses = string(_responses)
}
throttleTimeMs := ""
if payload["ThrottleTimeMs"] != nil {
throttleTimeMs = fmt.Sprintf("%d", int(payload["ThrottleTimeMs"].(float64)))
@@ -448,7 +468,7 @@ func representFetchResponse(data map[string]interface{}) []interface{} {
},
{
"name": "Responses",
"value": string(responses),
"value": responses,
},
})
rep = append(rep, map[string]string{
@@ -466,7 +486,11 @@ func representListOffsetsRequest(data map[string]interface{}) []interface{} {
rep = representRequestHeader(data, rep)
payload := data["Payload"].(map[string]interface{})
topics, _ := json.Marshal(payload["Topics"].([]interface{}))
topics := ""
if payload["Topics"] != nil {
_topics, _ := json.Marshal(payload["Topics"].([]interface{}))
topics = string(_topics)
}
repPayload, _ := json.Marshal([]map[string]string{
{
"name": "Replica ID",
@@ -474,7 +498,7 @@ func representListOffsetsRequest(data map[string]interface{}) []interface{} {
},
{
"name": "Topics",
"value": string(topics),
"value": topics,
},
})
rep = append(rep, map[string]string{

View File

@@ -104,7 +104,11 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
}
break
case Fetch:
topics := reqDetails["Payload"].(map[string]interface{})["Topics"].([]interface{})
_topics := reqDetails["Payload"].(map[string]interface{})["Topics"]
if _topics == nil {
break
}
topics := _topics.([]interface{})
for _, topic := range topics {
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["Topic"].(string))
}
@@ -113,7 +117,11 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
}
break
case ListOffsets:
topics := reqDetails["Payload"].(map[string]interface{})["Topics"].([]interface{})
_topics := reqDetails["Payload"].(map[string]interface{})["Topics"]
if _topics == nil {
break
}
topics := _topics.([]interface{})
for _, topic := range topics {
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["Name"].(string))
}

View File

@@ -296,12 +296,28 @@ func (p *RedisProtocol) Read() (packet *RedisPacket, err error) {
switch x.(type) {
case []interface{}:
array := x.([]interface{})
packet.Command = RedisCommand(strings.ToUpper(string(array[0].([]uint8))))
if len(array) > 1 {
packet.Key = string(array[1].([]uint8))
}
if len(array) > 2 {
packet.Value = string(array[2].([]uint8))
switch array[0].(type) {
case []uint8:
packet.Command = RedisCommand(strings.ToUpper(string(array[0].([]uint8))))
if len(array) > 1 {
packet.Key = string(array[1].([]uint8))
}
if len(array) > 2 {
packet.Value = string(array[2].([]uint8))
}
if len(array) > 3 {
packet.Value = fmt.Sprintf("[%s", packet.Value)
for _, item := range array[3:] {
packet.Value = fmt.Sprintf("%s, %s", packet.Value, item.([]uint8))
}
packet.Value = strings.TrimSuffix(packet.Value, ", ")
packet.Value = fmt.Sprintf("%s]", packet.Value)
}
default:
msg := fmt.Sprintf("Unrecognized element in Redis array: %v\n", reflect.TypeOf(array[0]))
log.Printf(msg)
err = errors.New(msg)
return
}
case []uint8:
val := string(x.([]uint8))
@@ -316,6 +332,8 @@ func (p *RedisProtocol) Read() (packet *RedisPacket, err error) {
}
case string:
packet.Value = x.(string)
case int64:
packet.Value = fmt.Sprintf("%d", x.(int64))
default:
msg := fmt.Sprintf("Unrecognized Redis data type: %v\n", reflect.TypeOf(x))
log.Printf(msg)

View File

@@ -19,7 +19,8 @@ interface EntriesListProps {
setNoMoreDataBottom: (flag: boolean) => void;
methodsFilter: Array<string>;
statusFilter: Array<string>;
pathFilter: string
pathFilter: string;
serviceFilter: string;
listEntryREF: any;
onScrollEvent: (isAtBottom:boolean) => void;
scrollableList: boolean;
@@ -32,7 +33,7 @@ enum FetchOperator {
const api = new Api();
export const EntriesList: React.FC<EntriesListProps> = ({entries, setEntries, focusedEntryId, setFocusedEntryId, connectionOpen, noMoreDataTop, setNoMoreDataTop, noMoreDataBottom, setNoMoreDataBottom, methodsFilter, statusFilter, pathFilter, listEntryREF, onScrollEvent, scrollableList}) => {
export const EntriesList: React.FC<EntriesListProps> = ({entries, setEntries, focusedEntryId, setFocusedEntryId, connectionOpen, noMoreDataTop, setNoMoreDataTop, noMoreDataBottom, setNoMoreDataBottom, methodsFilter, statusFilter, pathFilter, serviceFilter, listEntryREF, onScrollEvent, scrollableList}) => {
const [loadMoreTop, setLoadMoreTop] = useState(false);
const [isLoadingTop, setIsLoadingTop] = useState(false);
@@ -54,10 +55,11 @@ export const EntriesList: React.FC<EntriesListProps> = ({entries, setEntries, fo
const filterEntries = useCallback((entry) => {
if(methodsFilter.length > 0 && !methodsFilter.includes(entry.method.toLowerCase())) return;
if(pathFilter && entry.path?.toLowerCase()?.indexOf(pathFilter) === -1) return;
if(serviceFilter && entry.service?.toLowerCase()?.indexOf(serviceFilter) === -1) return;
if(statusFilter.includes(StatusType.SUCCESS) && entry.statusCode >= 400) return;
if(statusFilter.includes(StatusType.ERROR) && entry.statusCode < 400) return;
return entry;
},[methodsFilter, pathFilter, statusFilter])
},[methodsFilter, pathFilter, statusFilter, serviceFilter])
const filteredEntries = useMemo(() => {
return entries.filter(filterEntries);

View File

@@ -36,14 +36,8 @@ const SectionsRepresentation: React.FC<any> = ({data, color}) => {
const AutoRepresentation: React.FC<any> = ({representation, isRulesEnabled, rulesMatched, elapsedTime, color}) => {
var TABS = [
{
tab: 'request'
},
{
tab: 'response',
},
{
tab: 'Rules',
},
tab: 'Request'
}
];
const [currentTab, setCurrentTab] = useState(TABS[0].tab);
@@ -54,12 +48,25 @@ const AutoRepresentation: React.FC<any> = ({representation, isRulesEnabled, rule
const {request, response} = JSON.parse(representation);
if (!response) {
TABS[1]['hidden'] = true;
var responseTabIndex = 0;
var rulesTabIndex = 0;
if (response) {
TABS.push(
{
tab: 'Response',
}
);
responseTabIndex = TABS.length - 1;
}
if (!isRulesEnabled) {
TABS.pop()
if (isRulesEnabled) {
TABS.push(
{
tab: 'Rules',
}
);
rulesTabIndex = TABS.length - 1;
}
return <div className={styles.Entry}>
@@ -71,10 +78,10 @@ const AutoRepresentation: React.FC<any> = ({representation, isRulesEnabled, rule
{currentTab === TABS[0].tab && <React.Fragment>
<SectionsRepresentation data={request} color={color}/>
</React.Fragment>}
{response && currentTab === TABS[1].tab && <React.Fragment>
{response && currentTab === TABS[responseTabIndex].tab && <React.Fragment>
<SectionsRepresentation data={response} color={color}/>
</React.Fragment>}
{TABS.length > 2 && currentTab === TABS[2].tab && <React.Fragment>
{isRulesEnabled && currentTab === TABS[rulesTabIndex].tab && <React.Fragment>
<EntryTablePolicySection title={'Rule'} color={color} latency={elapsedTime} arrayToIterate={rulesMatched ? rulesMatched : []}/>
</React.Fragment>}
</div>}

View File

@@ -11,13 +11,16 @@ interface FiltersProps {
setStatusFilter: (methods: Array<string>) => void;
pathFilter: string
setPathFilter: (val: string) => void;
serviceFilter: string
setServiceFilter: (val: string) => void;
}
export const Filters: React.FC<FiltersProps> = ({methodsFilter, setMethodsFilter, statusFilter, setStatusFilter, pathFilter, setPathFilter}) => {
export const Filters: React.FC<FiltersProps> = ({methodsFilter, setMethodsFilter, statusFilter, setStatusFilter, pathFilter, setPathFilter, serviceFilter, setServiceFilter}) => {
return <div className={styles.container}>
<MethodFilter methodsFilter={methodsFilter} setMethodsFilter={setMethodsFilter}/>
<StatusTypesFilter statusFilter={statusFilter} setStatusFilter={setStatusFilter}/>
<ServiceFilter serviceFilter={serviceFilter} setServiceFilter={setServiceFilter}/>
<PathFilter pathFilter={pathFilter} setPathFilter={setPathFilter}/>
</div>;
};
@@ -117,3 +120,18 @@ const PathFilter: React.FC<PathFilterProps> = ({pathFilter, setPathFilter}) => {
</FilterContainer>;
};
interface ServiceFilterProps {
serviceFilter: string;
setServiceFilter: (val: string) => void;
}
const ServiceFilter: React.FC<ServiceFilterProps> = ({serviceFilter, setServiceFilter}) => {
return <FilterContainer>
<div className={styles.filterLabel}>Service</div>
<div>
<TextField value={serviceFilter} variant="outlined" className={styles.filterText} style={{minWidth: '150px'}} onChange={(e: any) => setServiceFilter(e.target.value)}/>
</div>
</FilterContainer>;
};

View File

@@ -58,6 +58,7 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
const [methodsFilter, setMethodsFilter] = useState([]);
const [statusFilter, setStatusFilter] = useState([]);
const [pathFilter, setPathFilter] = useState("");
const [serviceFilter, setServiceFilter] = useState("");
const [tappingStatus, setTappingStatus] = useState(null);
@@ -192,6 +193,8 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
setStatusFilter={setStatusFilter}
pathFilter={pathFilter}
setPathFilter={setPathFilter}
serviceFilter={serviceFilter}
setServiceFilter={setServiceFilter}
/>
<div className={styles.container}>
<EntriesList entries={entries}
@@ -206,6 +209,7 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
methodsFilter={methodsFilter}
statusFilter={statusFilter}
pathFilter={pathFilter}
serviceFilter={serviceFilter}
listEntryREF={listEntry}
onScrollEvent={onScrollEvent}
scrollableList={disableScrollList}

View File

@@ -5,7 +5,6 @@ import variables from '../../variables.module.scss';
interface Tab {
tab: string,
hidden?: boolean,
disabled?: boolean,
disabledMessage?: string,
highlight?: boolean,
@@ -66,7 +65,8 @@ const useTabsStyles = makeStyles((theme) => ({
borderRight: "1px solid " + theme.palette.primary.dark,
height: 20,
verticalAlign: 'middle',
margin: '0 20px'
margin: '0 20px',
cursor: 'unset',
}
}));
@@ -75,7 +75,7 @@ const useTabsStyles = makeStyles((theme) => ({
const Tabs: React.FC<Props> = ({classes={}, tabs, currentTab, color, onChange, leftAligned, dark}) => {
const _classes = {...useTabsStyles(), ...classes};
return <div className={`${_classes.root} ${leftAligned ? _classes.tabsAlignLeft : ''}`}>
{tabs.filter((tab) => !tab.hidden).map(({tab, disabled, disabledMessage, highlight, badge}, index) => {
{tabs.map(({tab, disabled, disabledMessage, highlight, badge}, index) => {
const active = currentTab === tab;
const tabLink = <span
key={tab}