Compare commits

..

61 Commits

Author SHA1 Message Date
gadotroee
256006ca3e Merge pull request #332 - update download link fix
#minor
2021-10-07 19:45:05 +03:00
Roee Gadot
213528c619 no message 2021-10-07 19:41:51 +03:00
Igor Gov
8b47dba05d Merge pull request #326 from up9inc/develop
Develop -> Main
2021-10-07 12:28:21 +03:00
RoyUP9
f1a2ee7fb4 removed enforce policy file deprecated flag (#328) 2021-10-07 11:08:48 +03:00
lirazyehezkel
15021daa2e service path filter (#327) 2021-10-07 10:51:30 +03:00
RoyUP9
f83e565cd4 fixed policy rules readme (#321) 2021-10-07 09:11:24 +03:00
RoyUP9
8636a4731e fixed ignored user agents (#322) 2021-10-06 17:16:47 +03:00
lirazyehezkel
aa3510e936 service filter (#324) 2021-10-06 16:22:08 +03:00
Igor Gov
fd48cc6d87 Renaming ignored user agents var (#320) 2021-10-06 13:52:30 +03:00
RoyUP9
111d000c12 added interface conversion check (#318) 2021-10-06 13:38:32 +03:00
RoyUP9
9c98a4c2b1 Revert "Connecting Mizu to the application (#313)" (#316) 2021-10-06 10:41:23 +03:00
RoyUP9
d2d4ed5aee Connecting Mizu to the application (#313) 2021-10-05 16:35:16 +03:00
Igor Gov
30fce5d765 Supporting Mizu view from given url (#312)
* Supporting Mizu view from given url
2021-10-05 12:24:50 +03:00
Igor Gov
90040798b8 Adding additional error handling to api server watch (#311) 2021-09-30 11:49:31 +03:00
M. Mert Yıldıran
9eecddddd5 Start the tapper after the API server is ready (#309) 2021-09-30 11:22:07 +03:00
M. Mert Yıldıran
cc49e815d6 Watch the tapper pod after starting it (#310)
* Watch the tapper pod after starting it

* Improve the logic in `watchTapperPod` method
2021-09-30 09:32:27 +03:00
Selton Fiuza
c26eb843e3 [refactor/TRA-3693] type:latency to slo and latency field to response-time (#282)
* type:latency to slo and latency field to response-time

* remove comment from import

* Friendly message on ignored rules and format

* formatting

* change conditional to catch negative values and ignore it

* Fix Bug Alon Reported

* sliceUtils to shared
2021-09-29 11:51:03 -03:00
M. Mert Yıldıran
26efaa101d Fix a 500 error caused by an interface conversion in Redis (#308) 2021-09-29 14:37:50 +03:00
M. Mert Yıldıran
352567c56e Fix the typo in protocolAbbreviation field of MizuEntry (#307) 2021-09-28 16:45:04 +03:00
lirazyehezkel
51fc3307be Mizu rules font (#306) 2021-09-27 14:18:10 +03:00
M. Mert Yıldıran
cdf1c39a52 Omit the RULES tab if the policy rules feature is inactive (#303)
* Omit the `RULES` tab if the policy rules feature is inactive (WIP)

* Propagate the boolean value `isRulesEnabled` from file read error to UI

* Remove the debug log
2021-09-25 18:15:54 +03:00
M. Mert Yıldıran
db1f7d34cf Omit the RESPONSE tab and elapsedTime if the response is empty (#298)
* Omit the `RESPONSE` tab and `elapsedTime` if the `response` is empty

* Use the `hidden` attribute instead
2021-09-24 13:49:20 +03:00
Nimrod Gilboa Markevich
9212c195b4 Improve cloud resources cleanup (#215) 2021-09-23 20:51:37 +03:00
M. Mert Yıldıran
7b333556d0 Add Redis Serialization Protocol support (#290)
* Bring in the files

* Add request-response pair matcher for Redis

* Implement the `Represent` method

* Update `representGeneric` method signature

* Don't export `IntToByteArr`

* Remove unused `newRedisInputStream` method

* Return the errors as string

* Adapt to the latest change in the `develop`
2021-09-23 17:09:00 +03:00
M. Mert Yıldıran
8ba96acf05 Don't omit the Latency field in BaseEntryDetails (#302) 2021-09-23 14:58:20 +03:00
RoyUP9
f164e54fee changed test rules config to readonly (#301) 2021-09-23 09:13:17 +03:00
M. Mert Yıldıran
649b733ba1 Don't redact :authority pseudo-header field (#300) 2021-09-23 09:12:04 +03:00
M. Mert Yıldıran
e8ea93cb64 Upgrade react-scrollable-feed-virtualized version from 1.4.2 to 1.4.3 (#299) 2021-09-23 08:36:44 +03:00
RoyUP9
5e5d5de91a Merge pull request #297 from up9inc/develop
Develop -> main
2021-09-22 12:14:07 +03:00
RoyUP9
5dacd41ba9 renamed traffic-validation to traffic-validation-file (#296) 2021-09-22 11:21:43 +03:00
Igor Gov
7f837fe947 Improving logs and cleaning un-referenced code (#295) 2021-09-22 11:14:20 +03:00
RoyUP9
02bd7883cb fixed general stats (#293) 2021-09-22 10:52:53 +03:00
RoyUP9
1841798646 Fix: analysis feature (#292) 2021-09-22 09:44:02 +03:00
Igor Gov
749bee6d55 Using rlog in agent and tapper & removing unreferenced code (#289)
* .
2021-09-22 06:24:19 +03:00
Igor Gov
680ea71958 Merge branch 'develop'
# Conflicts:
#	acceptanceTests/tap_test.go
#	cli/apiserver/provider.go
#	cli/cmd/common.go
#	cli/cmd/fetch.go
#	cli/cmd/fetchRunner.go
#	cli/cmd/tapRunner.go
#	cli/cmd/viewRunner.go
#	cli/config/config.go
#	cli/mizu/fsUtils/mizuLogsUtils.go
2021-09-02 12:17:57 +03:00
Igor Gov
5fb5dbbbf5 Fixing call to analysis (#248) 2021-08-30 11:16:55 +03:00
RoyUP9
b3fe448ff1 added custom config path option (#247) 2021-08-30 11:16:55 +03:00
RoyUP9
101a54e8da added tap acceptance tests, fixed duplicate namespace problem (#244) 2021-08-30 11:16:55 +03:00
Igor Gov
3308cab826 Introducing API server provider (#243) 2021-08-30 11:16:55 +03:00
RoyUP9
5fdd8288f4 added tapper count route and wait time for tappers in test (#226) 2021-08-30 11:16:55 +03:00
Alon Girmonsky
4cb32b40e6 some changes in the read me (#241)
change prerequisite to permissions and kubeconfig. These are more FYIs as Mizu requires very little prerequisites. 
Change the description to match getmizu.io
2021-08-30 11:16:55 +03:00
Igor Gov
afa81c7ec2 Fixing bad conflict resolution 2021-08-19 13:33:14 +03:00
Igor Gov
e84c7d3310 Merge branch 'develop' 2021-08-19 13:18:06 +03:00
Igor Gov
7d0a90cb78 Merge branch 'main' into develop
# Conflicts:
#	cli/config/configStruct.go
#	cli/mizu/config.go
#	tap/http_reader.go
2021-08-19 13:16:19 +03:00
Nimrod Gilboa Markevich
24f79922e9 Add to periodic stats print in tapper (#221)
#patch
2021-08-16 15:50:04 +03:00
RoyUP9
c3995009ee Hotfix - ignore not allowed set flags (#192)
#patch
2021-08-10 14:21:16 +03:00
RoyUP9
6e9fe2986e removed duplicate har page header (#187) 2021-08-09 13:31:53 +03:00
RoyUP9
603240fedb temp fix - ignore agent image in config command (#185) 2021-08-09 11:55:45 +03:00
Igor Gov
e61871a68e Merge pull request #182 from up9inc/develop
Release 2021-08-08
2021-08-08 14:50:30 +03:00
nimrod-up9
379af59f07 Merge pull request #121 from up9inc/develop
Missing request body (#120)
2021-07-19 13:53:49 +03:00
gadotroee
ef9afe31a4 Merge pull request #119 from up9inc/develop
Mizu release
2021-07-18 16:54:31 +03:00
gadotroee
dca636b0fd Merge pull request #94 from up9inc/develop
Mizu release
2021-07-06 21:05:40 +03:00
Roee Gadot
9b72cc7aa6 Merge branch 'develop' into main
# Conflicts:
#	README.md
#	api/main.go
#	api/pkg/api/main.go
#	api/pkg/models/models.go
#	api/pkg/resolver/resolver.go
#	cli/Makefile
#	cli/cmd/tap.go
#	cli/cmd/tapRunner.go
#	tap/http_matcher.go
#	tap/http_reader.go
#	tap/tcp_stream_factory.go
2021-06-29 11:16:47 +03:00
Alex Haiut
d3c023b3ba mizu release 2021-06-21 (#79)
* Show pod name and namespace (#61)

* WIP

* Update main.go, consts.go, and 2 more files...

* Update messageSensitiveDataCleaner.go

* Update consts.go and messageSensitiveDataCleaner.go

* Update messageSensitiveDataCleaner.go

* Update main.go, consts.go, and 3 more files...

* WIP

* Update main.go, messageSensitiveDataCleaner.go, and 6 more files...

* Update main.go, messageSensitiveDataCleaner.go, and 3 more files...

* Update consts.go, messageSensitiveDataCleaner.go, and tap.go

* Update provider.go

* Update serializableRegexp.go

* Update tap.go

* TRA-3234 fetch with _source + no hard limit (#64)

* remove the HARD limit of 5000

* TRA-3299 Reduce footprint and Add Tolerances(#65)

* Use lib const for DNSClusterFirstWithHostNet.

* Whitespace.

* Break lines.

* Added affinity to pod names.

* Added tolerations to NoExecute and NoSchedule taints.

* Implementation of Mizu view command

* .

* .

* Update main.go and messageSensitiveDataCleaner.go

* Update main.go

* String and not pointers (#68)

* TRA-3318 - Cookies not null and fix har file names  (#69)

* no message

* TRA-3212 Passive-Tapper and Mizu share code (#70)

* Use log in tap package instead of fmt.

* Moved api/pkg/tap to root.

* Added go.mod and go.sum for tap.

* Added replace for shared.

* api uses tap module instead of tap package.

* Removed dependency of tap in shared by moving env var out of tap.

* Fixed compilation bugs.

* Fixed: Forgot to export struct field HostMode.

* Removed unused flag.

* Close har output channel when done.

* Moved websocket out of mizu and into passive-tapper.

* Send connection details over har output channel.

* Fixed compilation errors.

* Removed unused info from request response cache.

* Renamed connection -> connectionID.

* Fixed rename bug.

* Export setters and getters for filter ips and ports.

* Added tap dependency to Dockerfile.

* Uncomment error messages.

* Renamed `filterIpAddresses` -> `filterAuthorities`.

* Renamed ConnectionID -> ConnectionInfo.

* Fixed: Missed one replace.

* TRA-3342 Mizu/tap dump to har directory fails on Linux (#71)

* Instead of saving incomplete temp har files in a temp dir, save them in the output dir with a *.har.tmp suffix.

* API only loads har from *.har files (by extension).

* Add export entries endpoint for better up9 connect funcionality  (#72)

* no message
* no message
* no message

* Filter 'cookie' header

* Release action  (#73)

* Create main.yml

* Update main.yml

* Update main.yml

* Update main.yml

* Update main.yml

* trying new approach

* no message

* yaml error

* no message

* no message

* no message

* missing )

* no message

* no message

* remove main.yml and fix branches

* Create tag-temp.yaml

* Update tag-temp.yaml

* Update tag-temp.yaml

* no message

* no message

* no message

* no message

* no message

* no message

* no message

* #minor

* no message

* no message

* added checksum calc to CLI makefile

* fixed build error - created bin directory upfront

* using markdown for release text

* use separate checksum files

* fixed release readme

* #minor

* readme updated

Co-authored-by: Alex Haiut <alex@up9.com>

* TRA-3360 Fix: Mizu ignores -n namespace flag and records traffic from all pods (#75)

Do not tap pods in namespaces which were not requested.

* added apple/m1 binary, updated readme (#77)

Co-authored-by: Alex Haiut <alex@up9.com>

* Update README.md (#78)

Co-authored-by: lirazyehezkel <61656597+lirazyehezkel@users.noreply.github.com>
Co-authored-by: RamiBerm <rami.berman@up9.com>
Co-authored-by: RamiBerm <54766858+RamiBerm@users.noreply.github.com>
Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
Co-authored-by: nimrod-up9 <59927337+nimrod-up9@users.noreply.github.com>
Co-authored-by: Igor Gov <igor.govorov1@gmail.com>
Co-authored-by: Alex Haiut <alex@up9.com>
2021-06-21 15:17:31 +03:00
Roee Gadot
5f2a4deb19 remove file 2021-05-26 18:08:37 +03:00
Roee Gadot
91f290987e Merge branch 'develop' into main
# Conflicts:
#	cli/cmd/tap.go
#	cli/cmd/version.go
#	cli/kubernetes/provider.go
#	cli/mizu/consts.go
#	cli/mizu/mizuRunner.go
#	debug.Dockerfile
#	ui/src/components/HarPage.tsx
2021-05-26 17:58:17 +03:00
gadotroee
2f3215b71a Fix mizu image parameter (#53) 2021-05-23 13:34:32 +03:00
Alex Haiut
2e87a01346 end of week - develop to master (#50)
* Provide cli version as git hash from makefile

* Update Makefile, version.go, and 3 more files...

* Update mizuRunner.go

* Update README.md, resolver.go, and 2 more files...

* Update provider.go

* Feature/UI/light theme (#44)

* light theme

* css polish

* unused code

* css

* text shadow

* footer style

* Update mizuRunner.go

* Handle nullable vars (#47)

* Decode gRPC body (#48)

* Decode grpc.

* Better variable names.

* Added protobuf-decoder dependency.

* Updated protobuf-decoder's version.

Co-authored-by: RamiBerm <rami.berman@up9.com>
Co-authored-by: RamiBerm <54766858+RamiBerm@users.noreply.github.com>
Co-authored-by: lirazyehezkel <61656597+lirazyehezkel@users.noreply.github.com>
Co-authored-by: nimrod-up9 <59927337+nimrod-up9@users.noreply.github.com>
2021-05-13 20:29:31 +03:00
gadotroee
453003bf14 remove leftovers (#43) 2021-05-10 17:35:59 +03:00
Roee Gadot
80ca377668 Merge branch 'develop' into main
# Conflicts:
#	Dockerfile
#	Makefile
#	api/go.mod
#	api/go.sum
#	api/main.go
#	api/pkg/controllers/entries_controller.go
#	api/pkg/inserter/main.go
#	api/pkg/models/models.go
#	api/pkg/tap/grpc_assembler.go
#	api/pkg/tap/har_writer.go
#	api/pkg/tap/http_matcher.go
#	api/pkg/tap/http_reader.go
#	api/pkg/tap/passive_tapper.go
#	api/pkg/utils/utils.go
#	cli/Makefile
#	cli/cmd/tap.go
#	cli/cmd/version.go
#	cli/config/config.go
#	cli/kubernetes/provider.go
#	cli/mizu/mizuRunner.go
2021-05-10 17:27:32 +03:00
gadotroee
d21297bc9c 0.9 (#37)
* Update .gitignore

* WIP

* WIP

* Update README.md, root.go, and 4 more files...

* Update README.md

* Update README.md

* Update root.go

* Update provider.go

* Update provider.go

* Update root.go, go.mod, and go.sum

* Update mizu.go

* Update go.sum and provider.go

* Update portForward.go, watch.go, and mizu.go

* Update README.md

* Update watch.go

* Update mizu.go

* Update mizu.go

* no message

* no message

* remove unused things and use external for object id (instead of copy)

* no message

* Update mizu.go

* Update go.mod, go.sum, and 2 more files...

* no message

* Update README.md, go.mod, and resolver.go

* Update README.md

* Update go.mod

* Update loader.go

* some refactor

* Update loader.go

* no message

* status to statusCode

* return data directly

* Traffic viewer

* cleaning

* css

* no message

* Clean warnings

* Makefile - first draft

* Update Makefile

* Update Makefile

* Update Makefile, README.md, and 4 more files...

* Add api build and clean to makefile (files restructure) (#9)

* no message
* add clean api command

* no message

* stating with web socket

* Add tap as a separate executable (#10)

* Added tap.

* Ignore build directories.

* Added tapper build to Makefile.

* Improvements  (#12)

* no message

* no message

* Feature/makefile (#11)

* minor fixes

* makefile fixes - docker build

* minor fix in Makefile
Co-authored-by: Alex Haiut <alex@up9.com>

* Update Dockerfile, multi-runner.sh, and 31 more files...

* Update multi-runner.sh

* no message

* Update .dockerignore, Dockerfile, and 30 more files...

* Update cleaner.go, grpc_assembler.go, and 2 more files...

* start the pod with host network and privileged

* fix multi runner passive tapper command

* add HOST_MODE env var

* do not return true in the should tap function

* remove line in the end

* default value in api is input
fix description and pass the parameter in the multi runner script

* missing flag.parse

* no message

* fix image

* Create main.yml

* Update main.yml

* Update main.yml

* Update main.yml

* Update main.yml

* Update main.yml

* Update main.yml

* Update main.yml

* Update main.yml

* Update main.yml

* Update main.yml

* Update main.yml

* Update main.yml

* Update main.yml

* Update main.yml

* Update main.yml

* Update main.yml

* Update main.yml

* Update main.yml

* Update main.yml

* Small fixes - permission + har writing exception (#17)

* Select node by pod (#18)

* Select node by pod.

* Removed watch pod by regex. Irrelevant for now.

* Changed default image to develop:latest.

* Features/clifix (#19)

* makefile fixes - docker build

* readme update, CLI usage fix

* added chmod

Co-authored-by: Alex Haiut <alex@up9.com>

* meta information

* Only record traffic of the requested pod. Filtered by pod IP. (#21)

* fixed readme and reduced batch size to 5 (#22)

Co-authored-by: Alex Haiut <alex@up9.com>

* API and TAP in single process  (#24)

* no message
* no message

* CLI make --pod required flag and faster api image build (#25)

* makefile fixes - docker build

* readme update, CLI usage fix

* added chmod

* typo

* run example incorreect in makefile

* no message

* no message

* no message

Co-authored-by: Alex Haiut <alex@up9.com>

* Reduce delay between tap and UI - Skip dump to file (#26)

* Pass HARs between tap and api via channel.

* Fixed make docker commad.

* Various fixes.

* Added .DS_Store to .gitignore.

* Parse flags in Mizu main instead of in tap_output.go.

* Use channel to pass HAR by default instead of files.

* Infinite scroll (#28)

* no message

* infinite scroll + new ws implementation

* no message

* scrolling top

* fetch button

* more Backend changes

* fix go mod and sum

* mire fixes against develop

* unused code

* small ui refactor

Co-authored-by: Roee Gadot <roee.gadot@up9.com>

* Fix gRPC crash, display gRPC as base64, display gRPC URL and status code (#27)

* Added Method (POST) and URL (emtpy) to gRPC requests.

* Removed quickfix that skips writing HTTP/2 to HAR.

* Use HTTP/2 body to fill out http.Request and htt.Response.

* Make sure that in HARs request.postData.mimeType and response.content.mimeType are application/grpc in case of grpc.

* Comment.

* Add URL and status code for gRPC.

* Don't assume http scheme.

* Use http.Header.Set instead of manually acccessing the underlaying map.

* General stats api fix  (#29)

* refactor and validation

* Show gRPC as ASCII (#31)

* Moved try-catch up one block.

* Display grpc as ASCII.

* Better code in entries fetch endpoint (#30)

* no message
* no message

* Feature/UI/filters (#32)

* UI filters

* refactor

* Revert "refactor"

This reverts commit 70e7d4b6ac.

* remove recursive func

* CLI cleanup (#33)

* Moved cli root command to tap subcommand.

* tap subcommand works.

* Added view and fetch placeholders.

* Updated descriptions.

* Fixed indentation.

* Added versio subcommand.

* Removed version flag.

* gofmt.

* Changed pod from flag to arg.

* Commented out "all namespaces" flag.

* CLI cleanup 2 (#34)

* Renamed dashboard -> GUI/web interface.

* Commented out --quiet, removed unused config variables.

* Quiter output when calling unimplemented subcommands.

* Leftovers from PR #30 (#36)

Co-authored-by: up9-github <info@up9.com>
Co-authored-by: RamiBerm <54766858+RamiBerm@users.noreply.github.com>
Co-authored-by: Liraz Yehezkel <lirazy@up9.com>
Co-authored-by: Alex Haiut <alex@testr.io>
Co-authored-by: lirazyehezkel <61656597+lirazyehezkel@users.noreply.github.com>
Co-authored-by: Alex Haiut <alex@up9.com>
Co-authored-by: nimrod-up9 <59927337+nimrod-up9@users.noreply.github.com>
Co-authored-by: RamiBerm <rami.berman@up9.com>
Co-authored-by: Alex Haiut <alex.haiut@gmail.com>
2021-05-09 11:45:39 +03:00
55 changed files with 1786 additions and 560 deletions

View File

@@ -152,12 +152,12 @@ Web interface is now available at http://localhost:8899
```
Any request that contains `User-Agent` header with one of the specified values (`kube-probe` or `prometheus`) will not be captured
### API Rules validation
### Traffic validation rules
This feature allows you to define set of simple rules, and test the API against them.
This feature allows you to define set of simple rules, and test the traffic against them.
Such validation may test response for specific JSON fields, headers, etc.
Please see [API RULES](docs/POLICY_RULES.md) page for more details and syntax.
Please see [TRAFFIC RULES](docs/POLICY_RULES.md) page for more details and syntax.
## How to Run local UI

View File

@@ -762,6 +762,116 @@ func TestTapRegexMasking(t *testing.T) {
}
}
func TestTapIgnoredUserAgents(t *testing.T) {
if testing.Short() {
t.Skip("ignored acceptance test")
}
cliPath, cliPathErr := getCliPath()
if cliPathErr != nil {
t.Errorf("failed to get cli path, err: %v", cliPathErr)
return
}
tapCmdArgs := getDefaultTapCommandArgs()
tapNamespace := getDefaultTapNamespace()
tapCmdArgs = append(tapCmdArgs, tapNamespace...)
ignoredUserAgentValue := "ignore"
tapCmdArgs = append(tapCmdArgs, "--set", fmt.Sprintf("tap.ignored-user-agents=%v", ignoredUserAgentValue))
tapCmd := exec.Command(cliPath, tapCmdArgs...)
t.Logf("running command: %v", tapCmd.String())
t.Cleanup(func() {
if err := cleanupCommand(tapCmd); err != nil {
t.Logf("failed to cleanup tap command, err: %v", err)
}
})
if err := tapCmd.Start(); err != nil {
t.Errorf("failed to start tap command, err: %v", err)
return
}
apiServerUrl := getApiServerUrl(defaultApiServerPort)
if err := waitTapPodsReady(apiServerUrl); err != nil {
t.Errorf("failed to start tap pods on time, err: %v", err)
return
}
proxyUrl := getProxyUrl(defaultNamespaceName, defaultServiceName)
ignoredUserAgentCustomHeader := "Ignored-User-Agent"
headers := map[string]string {"User-Agent": ignoredUserAgentValue, ignoredUserAgentCustomHeader: ""}
for i := 0; i < defaultEntriesCount; i++ {
if _, requestErr := executeHttpGetRequestWithHeaders(fmt.Sprintf("%v/get", proxyUrl), headers); requestErr != nil {
t.Errorf("failed to send proxy request, err: %v", requestErr)
return
}
}
for i := 0; i < defaultEntriesCount; i++ {
if _, requestErr := executeHttpGetRequest(fmt.Sprintf("%v/get", proxyUrl)); requestErr != nil {
t.Errorf("failed to send proxy request, err: %v", requestErr)
return
}
}
ignoredUserAgentsCheckFunc := func() error {
timestamp := time.Now().UnixNano() / int64(time.Millisecond)
entriesUrl := fmt.Sprintf("%v/api/entries?limit=%v&operator=lt&timestamp=%v", apiServerUrl, defaultEntriesCount * 2, timestamp)
requestResult, requestErr := executeHttpGetRequest(entriesUrl)
if requestErr != nil {
return fmt.Errorf("failed to get entries, err: %v", requestErr)
}
entries := requestResult.([]interface{})
if len(entries) == 0 {
return fmt.Errorf("unexpected entries result - Expected more than 0 entries")
}
for _, entryInterface := range entries {
entryUrl := fmt.Sprintf("%v/api/entries/%v", apiServerUrl, entryInterface.(map[string]interface{})["id"])
requestResult, requestErr = executeHttpGetRequest(entryUrl)
if requestErr != nil {
return fmt.Errorf("failed to get entry, err: %v", requestErr)
}
data := requestResult.(map[string]interface{})["data"].(map[string]interface{})
entryJson := data["entry"].(string)
var entry map[string]interface{}
if parseErr := json.Unmarshal([]byte(entryJson), &entry); parseErr != nil {
return fmt.Errorf("failed to parse entry, err: %v", parseErr)
}
entryRequest := entry["request"].(map[string]interface{})
entryPayload := entryRequest["payload"].(map[string]interface{})
entryDetails := entryPayload["details"].(map[string]interface{})
entryHeaders := entryDetails["headers"].([]interface{})
for _, headerInterface := range entryHeaders {
header := headerInterface.(map[string]interface{})
if header["name"].(string) != ignoredUserAgentCustomHeader {
continue
}
return fmt.Errorf("unexpected result - user agent is not ignored")
}
}
return nil
}
if err := retriesExecute(shortRetriesCount, ignoredUserAgentsCheckFunc); err != nil {
t.Errorf("%v", err)
return
}
}
func TestTapDumpLogs(t *testing.T) {
if testing.Short() {
t.Skip("ignored acceptance test")

View File

@@ -172,6 +172,21 @@ func executeHttpRequest(response *http.Response, requestErr error) (interface{},
return jsonBytesToInterface(data)
}
func executeHttpGetRequestWithHeaders(url string, headers map[string]string) (interface{}, error) {
request, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, err
}
for headerKey, headerValue := range headers {
request.Header.Add(headerKey, headerValue)
}
client := &http.Client{}
response, requestErr := client.Do(request)
return executeHttpRequest(response, requestErr)
}
func executeHttpGetRequest(url string) (interface{}, error) {
response, requestErr := http.Get(url)
return executeHttpRequest(response, requestErr)

View File

@@ -4,6 +4,13 @@ import (
"encoding/json"
"flag"
"fmt"
"github.com/gin-contrib/static"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/romana/rlog"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/tap"
tapApi "github.com/up9inc/mizu/tap/api"
"io/ioutil"
"log"
"mizuserver/pkg/api"
@@ -18,15 +25,6 @@ import (
"path/filepath"
"plugin"
"sort"
"strings"
"github.com/gin-contrib/static"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/romana/rlog"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/tap"
tapApi "github.com/up9inc/mizu/tap/api"
)
var tapperMode = flag.Bool("tap", false, "Run in tapper mode without API")
@@ -59,12 +57,12 @@ func main() {
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
tap.StartPassiveTapper(tapOpts, outputItemsChannel, extensions, filteringOptions)
go filterItems(outputItemsChannel, filteredOutputItemsChannel, filteringOptions)
go filterItems(outputItemsChannel, filteredOutputItemsChannel)
go api.StartReadingEntries(filteredOutputItemsChannel, nil, extensionsMap)
// go api.StartReadingOutbound(outboundLinkOutputChannel)
hostApi(nil)
} else if *tapperMode {
rlog.Infof("Starting tapper, websocket address: %s", *apiServerAddress)
if *apiServerAddress == "" {
panic("API server address must be provided with --api-server-address when using --tap")
}
@@ -77,20 +75,20 @@ func main() {
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, extensions, filteringOptions)
socketConnection, err := shared.ConnectToSocketServer(*apiServerAddress, shared.DEFAULT_SOCKET_RETRIES, shared.DEFAULT_SOCKET_RETRY_SLEEP_TIME, false)
socketConnection, _, err := websocket.DefaultDialer.Dial(*apiServerAddress, nil)
if err != nil {
panic(fmt.Sprintf("Error connecting to socket server at %s %v", *apiServerAddress, err))
}
rlog.Infof("Connected successfully to websocket %s", *apiServerAddress)
go pipeTapChannelToSocket(socketConnection, filteredOutputItemsChannel)
// go pipeOutboundLinksChannelToSocket(socketConnection, outboundLinkOutputChannel)
} else if *apiServerMode {
api.StartResolving(*namespace)
outputItemsChannel := make(chan *tapApi.OutputChannelItem)
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
go filterItems(outputItemsChannel, filteredOutputItemsChannel, filteringOptions)
go filterItems(outputItemsChannel, filteredOutputItemsChannel)
go api.StartReadingEntries(filteredOutputItemsChannel, nil, extensionsMap)
hostApi(outputItemsChannel)
@@ -98,7 +96,7 @@ func main() {
outputItemsChannel := make(chan *tapApi.OutputChannelItem, 1000)
filteredHarChannel := make(chan *tapApi.OutputChannelItem)
go filterItems(outputItemsChannel, filteredHarChannel, filteringOptions)
go filterItems(outputItemsChannel, filteredHarChannel)
go api.StartReadingEntries(filteredHarChannel, harsDir, extensionsMap)
hostApi(nil)
}
@@ -122,7 +120,7 @@ func loadExtensions() {
extensionsMap = make(map[string]*tapApi.Extension)
for i, file := range files {
filename := file.Name()
log.Printf("Loading extension: %s\n", filename)
rlog.Infof("Loading extension: %s\n", filename)
extension := &tapApi.Extension{
Path: path.Join(extensionsDir, filename),
}
@@ -230,7 +228,7 @@ func getTrafficFilteringOptions() *tapApi.TrafficFilteringOptions {
filteringOptionsJson := os.Getenv(shared.MizuFilteringOptionsEnvVar)
if filteringOptionsJson == "" {
return &tapApi.TrafficFilteringOptions{
HealthChecksUserAgentHeaders: []string{},
IgnoredUserAgents: []string{},
}
}
var filteringOptions tapApi.TrafficFilteringOptions
@@ -242,42 +240,16 @@ func getTrafficFilteringOptions() *tapApi.TrafficFilteringOptions {
return &filteringOptions
}
func filterItems(inChannel <-chan *tapApi.OutputChannelItem, outChannel chan *tapApi.OutputChannelItem, filterOptions *tapApi.TrafficFilteringOptions) {
func filterItems(inChannel <-chan *tapApi.OutputChannelItem, outChannel chan *tapApi.OutputChannelItem) {
for message := range inChannel {
if message.ConnectionInfo.IsOutgoing && api.CheckIsServiceIP(message.ConnectionInfo.ServerIP) {
continue
}
// TODO: move this to tappers https://up9.atlassian.net/browse/TRA-3441
if isHealthCheckByUserAgent(message, filterOptions.HealthChecksUserAgentHeaders) {
continue
}
outChannel <- message
}
}
func isHealthCheckByUserAgent(item *tapApi.OutputChannelItem, userAgentsToIgnore []string) bool {
if item.Protocol.Name != "http" {
return false
}
request := item.Pair.Request.Payload.(map[string]interface{})
reqDetails := request["details"].(map[string]interface{})
for _, header := range reqDetails["headers"].([]interface{}) {
h := header.(map[string]interface{})
if strings.ToLower(h["name"].(string)) == "user-agent" {
for _, userAgent := range userAgentsToIgnore {
if strings.Contains(strings.ToLower(h["value"].(string)), strings.ToLower(userAgent)) {
return true
}
}
return false
}
}
return false
}
func pipeTapChannelToSocket(connection *websocket.Conn, messageDataChannel <-chan *tapApi.OutputChannelItem) {
if connection == nil {
panic("Websocket connection is nil")
@@ -290,7 +262,7 @@ func pipeTapChannelToSocket(connection *websocket.Conn, messageDataChannel <-cha
for messageData := range messageDataChannel {
marshaledData, err := models.CreateWebsocketTappedEntryMessage(messageData)
if err != nil {
rlog.Infof("error converting message to json %s, (%v,%+v)\n", err, err, err)
rlog.Errorf("error converting message to json %v, err: %s, (%v,%+v)", messageData, err, err, err)
continue
}
@@ -298,26 +270,8 @@ func pipeTapChannelToSocket(connection *websocket.Conn, messageDataChannel <-cha
// and goes into the intermediate WebSocket.
err = connection.WriteMessage(websocket.TextMessage, marshaledData)
if err != nil {
rlog.Infof("error sending message through socket server %s, (%v,%+v)\n", err, err, err)
rlog.Errorf("error sending message through socket server %v, err: %s, (%v,%+v)", messageData, err, err, err)
continue
}
}
}
func pipeOutboundLinksChannelToSocket(connection *websocket.Conn, outboundLinkChannel <-chan *tap.OutboundLink) {
for outboundLink := range outboundLinkChannel {
if outboundLink.SuggestedProtocol == tap.TLSProtocol {
marshaledData, err := models.CreateWebsocketOutboundLinkMessage(outboundLink)
if err != nil {
rlog.Infof("Error converting outbound link to json %s, (%v,%+v)", err, err, err)
continue
}
err = connection.WriteMessage(websocket.TextMessage, marshaledData)
if err != nil {
rlog.Infof("error sending outbound link message through socket server %s, (%v,%+v)", err, err, err)
continue
}
}
}
}

View File

@@ -7,7 +7,7 @@ import (
"fmt"
"mizuserver/pkg/database"
"mizuserver/pkg/holder"
"net/url"
"mizuserver/pkg/providers"
"os"
"path"
"sort"
@@ -18,7 +18,6 @@ import (
"github.com/google/martian/har"
"github.com/romana/rlog"
"github.com/up9inc/mizu/tap"
tapApi "github.com/up9inc/mizu/tap/api"
"mizuserver/pkg/models"
@@ -59,8 +58,10 @@ func StartReadingEntries(harChannel <-chan *tapApi.OutputChannelItem, workingDir
}
func startReadingFiles(workingDir string) {
err := os.MkdirAll(workingDir, os.ModePerm)
utils.CheckErr(err)
if err := os.MkdirAll(workingDir, os.ModePerm); err != nil {
rlog.Errorf("Failed to make dir: %s, err: %v", workingDir, err)
return
}
for true {
dir, _ := os.Open(workingDir)
@@ -88,17 +89,6 @@ func startReadingFiles(workingDir string) {
decErr := json.NewDecoder(bufio.NewReader(file)).Decode(&inputHar)
utils.CheckErr(decErr)
// for _, entry := range inputHar.Log.Entries {
// time.Sleep(time.Millisecond * 250)
// // connectionInfo := &tap.ConnectionInfo{
// // ClientIP: fileInfo.Name(),
// // ClientPort: "",
// // ServerIP: "",
// // ServerPort: "",
// // IsOutgoing: false,
// // }
// // saveHarToDb(entry, connectionInfo)
// }
rmErr := os.Remove(inputFilePath)
utils.CheckErr(rmErr)
}
@@ -110,6 +100,8 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
}
for item := range outputItems {
providers.EntryAdded()
extension := extensionsMap[item.Protocol.Name]
resolvedSource, resolvedDestionation := resolveIP(item.ConnectionInfo)
mizuEntry := extension.Dissector.Analyze(item, primitive.NewObjectID().Hex(), resolvedSource, resolvedDestionation)
@@ -121,9 +113,8 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
json.Unmarshal([]byte(mizuEntry.Entry), &pair)
harEntry, err := utils.NewEntry(&pair)
if err == nil {
rules, _ := models.RunValidationRulesState(*harEntry, mizuEntry.Service)
rules, _, _ := models.RunValidationRulesState(*harEntry, mizuEntry.Service)
baseEntry.Rules = rules
baseEntry.Latency = mizuEntry.ElapsedTime
}
}
@@ -132,13 +123,6 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
}
}
func StartReadingOutbound(outboundLinkChannel <-chan *tap.OutboundLink) {
// tcpStreamFactory will block on write to channel. Empty channel to unblock.
// TODO: Make write to channel optional.
for range outboundLinkChannel {
}
}
func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, resolvedDestination string) {
if k8sResolver != nil {
unresolvedSource := connectionInfo.ClientIP
@@ -161,12 +145,6 @@ func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, re
return resolvedSource, resolvedDestination
}
func getServiceNameFromUrl(inputUrl string) (string, string) {
parsed, err := url.Parse(inputUrl)
utils.CheckErr(err)
return fmt.Sprintf("%s://%s", parsed.Scheme, parsed.Host), parsed.Path
}
func CheckIsServiceIP(address string) bool {
if k8sResolver == nil {
return false

View File

@@ -143,11 +143,13 @@ func GetEntry(c *gin.Context) {
protocol, representation, bodySize, _ := extension.Dissector.Represent(&entryData)
var rules []map[string]interface{}
var isRulesEnabled bool
if entryData.ProtocolName == "http" {
var pair tapApi.RequestResponsePair
json.Unmarshal([]byte(entryData.Entry), &pair)
harEntry, _ := utils.NewEntry(&pair)
_, rulesMatched := models.RunValidationRulesState(*harEntry, entryData.Service)
_, rulesMatched, _isRulesEnabled := models.RunValidationRulesState(*harEntry, entryData.Service)
isRulesEnabled = _isRulesEnabled
inrec, _ := json.Marshal(rulesMatched)
json.Unmarshal(inrec, &rules)
}
@@ -158,6 +160,7 @@ func GetEntry(c *gin.Context) {
BodySize: bodySize,
Data: entryData,
Rules: rules,
IsRulesEnabled: isRulesEnabled,
})
}

View File

@@ -61,7 +61,7 @@ func GetEntriesFromDb(timestampFrom int64, timestampTo int64, protocolName *stri
order := OrderDesc
protocolNameCondition := "1 = 1"
if protocolName != nil {
protocolNameCondition = fmt.Sprintf("protocolKey = '%s'", *protocolName)
protocolNameCondition = fmt.Sprintf("protocolName = '%s'", *protocolName)
}
var entries []tapApi.MizuEntry

View File

@@ -97,8 +97,8 @@ type ExtendedCreator struct {
Source *string `json:"_source"`
}
func RunValidationRulesState(harEntry har.Entry, service string) (tapApi.ApplicableRules, []rules.RulesMatched) {
resultPolicyToSend := rules.MatchRequestPolicy(harEntry, service)
func RunValidationRulesState(harEntry har.Entry, service string) (tapApi.ApplicableRules, []rules.RulesMatched, bool) {
resultPolicyToSend, isEnabled := rules.MatchRequestPolicy(harEntry, service)
statusPolicyToSend, latency, numberOfRules := rules.PassedValidationRules(resultPolicyToSend)
return tapApi.ApplicableRules{Status: statusPolicyToSend, Latency: latency, NumberOfRules: numberOfRules}, resultPolicyToSend
return tapApi.ApplicableRules{Status: statusPolicyToSend, Latency: latency, NumberOfRules: numberOfRules}, resultPolicyToSend, isEnabled
}

View File

@@ -8,6 +8,8 @@ import (
"regexp"
"strings"
"github.com/romana/rlog"
"github.com/google/martian/har"
"github.com/up9inc/mizu/shared"
jsonpath "github.com/yalp/jsonpath"
@@ -42,9 +44,11 @@ func ValidateService(serviceFromRule string, service string) bool {
return true
}
func MatchRequestPolicy(harEntry har.Entry, service string) []RulesMatched {
enforcePolicy, _ := shared.DecodeEnforcePolicy(fmt.Sprintf("%s/%s", shared.RulePolicyPath, shared.RulePolicyFileName))
var resultPolicyToSend []RulesMatched
func MatchRequestPolicy(harEntry har.Entry, service string) (resultPolicyToSend []RulesMatched, isEnabled bool) {
enforcePolicy, err := shared.DecodeEnforcePolicy(fmt.Sprintf("%s/%s", shared.RulePolicyPath, shared.RulePolicyFileName))
if err == nil {
isEnabled = true
}
for _, rule := range enforcePolicy.Rules {
if !ValidatePath(rule.Path, harEntry.Request.URL) || !ValidateService(rule.Service, service) {
continue
@@ -65,7 +69,7 @@ func MatchRequestPolicy(harEntry har.Entry, service string) []RulesMatched {
if err != nil {
continue
}
fmt.Println(matchValue, rule.Value)
rlog.Info(matchValue, rule.Value)
} else {
val := fmt.Sprint(out)
matchValue, err = regexp.MatchString(rule.Value, val)
@@ -92,12 +96,12 @@ func MatchRequestPolicy(harEntry har.Entry, service string) []RulesMatched {
resultPolicyToSend = appendRulesMatched(resultPolicyToSend, true, rule)
}
}
return resultPolicyToSend
return
}
func PassedValidationRules(rulesMatched []RulesMatched) (bool, int64, int) {
var numberOfRulesMatched = len(rulesMatched)
var latency int64 = -1
var responseTime int64 = -1
if numberOfRulesMatched == 0 {
return false, 0, numberOfRulesMatched
@@ -105,15 +109,15 @@ func PassedValidationRules(rulesMatched []RulesMatched) (bool, int64, int) {
for _, rule := range rulesMatched {
if rule.Matched == false {
return false, latency, numberOfRulesMatched
return false, responseTime, numberOfRulesMatched
} else {
if strings.ToLower(rule.Rule.Type) == "latency" {
if rule.Rule.Latency < latency || latency == -1 {
latency = rule.Rule.Latency
if strings.ToLower(rule.Rule.Type) == "responseTime" {
if rule.Rule.ResponseTime < responseTime || responseTime == -1 {
responseTime = rule.Rule.ResponseTime
}
}
}
}
return true, latency, numberOfRulesMatched
return true, responseTime, numberOfRulesMatched
}

View File

@@ -4,13 +4,12 @@ import (
"bytes"
"errors"
"fmt"
"strconv"
"strings"
"time"
"github.com/google/martian/har"
"github.com/romana/rlog"
"github.com/up9inc/mizu/tap/api"
"strconv"
"strings"
"time"
)
// Keep it because we might want cookies in the future

View File

@@ -4,7 +4,6 @@ import (
"context"
"github.com/gin-gonic/gin"
"github.com/romana/rlog"
"log"
"net/http"
"net/url"
"os"
@@ -18,8 +17,8 @@ import (
func StartServer(app *gin.Engine) {
signals := make(chan os.Signal, 2)
signal.Notify(signals,
os.Interrupt, // this catch ctrl + c
syscall.SIGTSTP, // this catch ctrl + z
os.Interrupt, // this catch ctrl + c
syscall.SIGTSTP, // this catch ctrl + z
)
srv := &http.Server{
@@ -36,8 +35,9 @@ func StartServer(app *gin.Engine) {
}()
// Run server.
rlog.Infof("Starting the server...")
if err := app.Run(":8899"); err != nil {
log.Printf("Oops... Server is not running! Reason: %v", err)
rlog.Errorf("Server is not running! Reason: %v", err)
}
}
@@ -54,15 +54,14 @@ func ReverseSlice(data interface{}) {
func CheckErr(e error) {
if e != nil {
log.Printf("%v", e)
//panic(e)
rlog.Errorf("%v", e)
}
}
func SetHostname(address, newHostname string) string {
replacedUrl, err := url.Parse(address)
if err != nil{
log.Printf("error replacing hostname to %s in address %s, returning original %v",newHostname, address, err)
if err != nil {
rlog.Errorf("error replacing hostname to %s in address %s, returning original %v", newHostname, address, err)
return address
}
replacedUrl.Host = newHostname

View File

@@ -32,7 +32,7 @@ var logsCmd = &cobra.Command{
logger.Log.Debugf("Using file path %s", config.Config.Logs.FilePath())
if dumpLogsErr := fsUtils.DumpLogs(kubernetesProvider, ctx, config.Config.Logs.FilePath()); dumpLogsErr != nil {
if dumpLogsErr := fsUtils.DumpLogs(ctx, kubernetesProvider, config.Config.Logs.FilePath()); dumpLogsErr != nil {
logger.Log.Errorf("Failed dump logs %v", dumpLogsErr)
}

View File

@@ -2,7 +2,6 @@ package cmd
import (
"errors"
"fmt"
"os"
"github.com/up9inc/mizu/cli/config"
@@ -67,8 +66,5 @@ func init() {
tapCmd.Flags().Bool(configStructs.DisableRedactionTapName, defaultTapConfig.DisableRedaction, "Disables redaction of potentially sensitive request/response headers and body values")
tapCmd.Flags().String(configStructs.HumanMaxEntriesDBSizeTapName, defaultTapConfig.HumanMaxEntriesDBSize, "Override the default max entries db size")
tapCmd.Flags().Bool(configStructs.DryRunTapName, defaultTapConfig.DryRun, "Preview of all pods matching the regex, without tapping them")
tapCmd.Flags().String(configStructs.EnforcePolicyFile, defaultTapConfig.EnforcePolicyFile, "Yaml file with policy rules")
tapCmd.Flags().String(configStructs.EnforcePolicyFileDeprecated, defaultTapConfig.EnforcePolicyFileDeprecated, "Yaml file with policy rules")
tapCmd.Flags().MarkDeprecated(configStructs.EnforcePolicyFileDeprecated, fmt.Sprintf("Use --%s instead", configStructs.EnforcePolicyFile))
tapCmd.Flags().String(configStructs.EnforcePolicyFile, defaultTapConfig.EnforcePolicyFile, "Yaml file path with policy rules")
}

View File

@@ -8,6 +8,10 @@ import (
"strings"
"time"
"gopkg.in/yaml.v3"
core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/config/configStructs"
@@ -22,9 +26,6 @@ import (
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/debounce"
"github.com/up9inc/mizu/tap/api"
yaml "gopkg.in/yaml.v3"
core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
)
const (
@@ -36,7 +37,6 @@ type tapState struct {
apiServerService *core.Service
currentlyTappedPods []core.Pod
mizuServiceAccountExists bool
doNotRemoveConfigMap bool
}
var state tapState
@@ -49,15 +49,8 @@ func RunMizuTap() {
}
var mizuValidationRules string
if config.Config.Tap.EnforcePolicyFile != "" || config.Config.Tap.EnforcePolicyFileDeprecated != "" {
var trafficValidation string
if config.Config.Tap.EnforcePolicyFile != "" {
trafficValidation = config.Config.Tap.EnforcePolicyFile
} else {
trafficValidation = config.Config.Tap.EnforcePolicyFileDeprecated
}
mizuValidationRules, err = readValidationRules(trafficValidation)
if config.Config.Tap.EnforcePolicyFile != "" {
mizuValidationRules, err = readValidationRules(config.Config.Tap.EnforcePolicyFile)
if err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error reading policy file: %v", errormessage.FormatError(err)))
return
@@ -76,7 +69,7 @@ func RunMizuTap() {
targetNamespaces := getNamespaces(kubernetesProvider)
if config.Config.IsNsRestrictedMode() {
if len(targetNamespaces) != 1 || !mizu.Contains(targetNamespaces, config.Config.MizuResourcesNamespace) {
if len(targetNamespaces) != 1 || !shared.Contains(targetNamespaces, config.Config.MizuResourcesNamespace) {
logger.Log.Errorf("Not supported mode. Mizu can't resolve IPs in other namespaces when running in namespace restricted mode.\n"+
"You can use the same namespace for --%s and --%s", configStructs.NamespacesTapName, config.MizuResourcesNamespaceConfigName)
return
@@ -84,7 +77,7 @@ func RunMizuTap() {
}
var namespacesStr string
if !mizu.Contains(targetNamespaces, mizu.K8sAllNamespaces) {
if !shared.Contains(targetNamespaces, mizu.K8sAllNamespaces) {
namespacesStr = fmt.Sprintf("namespaces \"%s\"", strings.Join(targetNamespaces, "\", \""))
} else {
namespacesStr = "all namespaces"
@@ -99,7 +92,7 @@ func RunMizuTap() {
if len(state.currentlyTappedPods) == 0 {
var suggestionStr string
if !mizu.Contains(targetNamespaces, mizu.K8sAllNamespaces) {
if !shared.Contains(targetNamespaces, mizu.K8sAllNamespaces) {
suggestionStr = ". Select a different namespace with -n or tap all namespaces with -A"
}
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Did not find any pods matching the regex argument%s", suggestionStr))
@@ -109,18 +102,17 @@ func RunMizuTap() {
return
}
nodeToTappedPodIPMap := getNodeHostToTappedPodIpsMap(state.currentlyTappedPods)
defer finishMizuExecution(kubernetesProvider)
if err := createMizuResources(ctx, kubernetesProvider, nodeToTappedPodIPMap, mizuApiFilteringOptions, mizuValidationRules); err != nil {
if err := createMizuResources(ctx, kubernetesProvider, mizuApiFilteringOptions, mizuValidationRules); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error creating resources: %v", errormessage.FormatError(err)))
return
}
go goUtils.HandleExcWrapper(watchApiServerPod, ctx, kubernetesProvider, cancel)
go goUtils.HandleExcWrapper(watchApiServerPod, ctx, kubernetesProvider, cancel, mizuApiFilteringOptions)
go goUtils.HandleExcWrapper(watchTapperPod, ctx, kubernetesProvider, cancel)
go goUtils.HandleExcWrapper(watchPodsForTapping, ctx, kubernetesProvider, targetNamespaces, cancel, mizuApiFilteringOptions)
//block until exit signal or error
// block until exit signal or error
waitForFinish(ctx, cancel)
}
@@ -133,7 +125,7 @@ func readValidationRules(file string) (string, error) {
return string(newContent), nil
}
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, mizuApiFilteringOptions *api.TrafficFilteringOptions, mizuValidationRules string) error {
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, mizuApiFilteringOptions *api.TrafficFilteringOptions, mizuValidationRules string) error {
if !config.Config.IsNsRestrictedMode() {
if err := createMizuNamespace(ctx, kubernetesProvider); err != nil {
return err
@@ -144,15 +136,8 @@ func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Pro
return err
}
if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, mizuApiFilteringOptions); err != nil {
return err
}
if err := createMizuConfigmap(ctx, kubernetesProvider, mizuValidationRules); err != nil {
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Failed to create resources required for policy validation. Mizu will not validate policy rules. error: %v\n", errormessage.FormatError(err)))
state.doNotRemoveConfigMap = true
} else if mizuValidationRules == "" {
state.doNotRemoveConfigMap = true
}
return nil
@@ -224,13 +209,15 @@ func getMizuApiFilteringOptions() (*api.TrafficFilteringOptions, error) {
}
return &api.TrafficFilteringOptions{
PlainTextMaskingRegexes: compiledRegexSlice,
HealthChecksUserAgentHeaders: config.Config.Tap.HealthChecksUserAgentHeaders,
DisableRedaction: config.Config.Tap.DisableRedaction,
PlainTextMaskingRegexes: compiledRegexSlice,
IgnoredUserAgents: config.Config.Tap.IgnoredUserAgents,
DisableRedaction: config.Config.Tap.DisableRedaction,
}, nil
}
func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, mizuApiFilteringOptions *api.TrafficFilteringOptions) error {
func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, mizuApiFilteringOptions *api.TrafficFilteringOptions) error {
nodeToTappedPodIPMap := getNodeHostToTappedPodIpsMap(state.currentlyTappedPods)
if len(nodeToTappedPodIPMap) > 0 {
var serviceAccountName string
if state.mizuServiceAccountExists {
@@ -268,75 +255,108 @@ func finishMizuExecution(kubernetesProvider *kubernetes.Provider) {
telemetry.ReportAPICalls()
removalCtx, cancel := context.WithTimeout(context.Background(), cleanupTimeout)
defer cancel()
dumpLogsIfNeeded(kubernetesProvider, removalCtx)
cleanUpMizuResources(kubernetesProvider, removalCtx, cancel)
dumpLogsIfNeeded(removalCtx, kubernetesProvider)
cleanUpMizuResources(removalCtx, cancel, kubernetesProvider)
}
func dumpLogsIfNeeded(kubernetesProvider *kubernetes.Provider, removalCtx context.Context) {
func dumpLogsIfNeeded(ctx context.Context, kubernetesProvider *kubernetes.Provider) {
if !config.Config.DumpLogs {
return
}
mizuDir := mizu.GetMizuFolderPath()
filePath := path.Join(mizuDir, fmt.Sprintf("mizu_logs_%s.zip", time.Now().Format("2006_01_02__15_04_05")))
if err := fsUtils.DumpLogs(kubernetesProvider, removalCtx, filePath); err != nil {
if err := fsUtils.DumpLogs(ctx, kubernetesProvider, filePath); err != nil {
logger.Log.Errorf("Failed dump logs %v", err)
}
}
func cleanUpMizuResources(kubernetesProvider *kubernetes.Provider, removalCtx context.Context, cancel context.CancelFunc) {
func cleanUpMizuResources(ctx context.Context, cancel context.CancelFunc, kubernetesProvider *kubernetes.Provider) {
logger.Log.Infof("\nRemoving mizu resources\n")
if !config.Config.IsNsRestrictedMode() {
if err := kubernetesProvider.RemoveNamespace(removalCtx, config.Config.MizuResourcesNamespace); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Namespace %s: %v", config.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
return
}
var leftoverResources []string
if config.Config.IsNsRestrictedMode() {
leftoverResources = cleanUpRestrictedMode(ctx, kubernetesProvider)
} else {
if err := kubernetesProvider.RemovePod(removalCtx, config.Config.MizuResourcesNamespace, mizu.ApiServerPodName); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Pod %s in namespace %s: %v", mizu.ApiServerPodName, config.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
}
if err := kubernetesProvider.RemoveService(removalCtx, config.Config.MizuResourcesNamespace, mizu.ApiServerPodName); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Service %s in namespace %s: %v", mizu.ApiServerPodName, config.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
}
if err := kubernetesProvider.RemoveDaemonSet(removalCtx, config.Config.MizuResourcesNamespace, mizu.TapperDaemonSetName); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing DaemonSet %s in namespace %s: %v", mizu.TapperDaemonSetName, config.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
}
if !state.doNotRemoveConfigMap {
if err := kubernetesProvider.RemoveConfigMap(removalCtx, config.Config.MizuResourcesNamespace, mizu.ConfigMapName); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing ConfigMap %s in namespace %s: %v", mizu.ConfigMapName, config.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
}
}
leftoverResources = cleanUpNonRestrictedMode(ctx, cancel, kubernetesProvider)
}
if state.mizuServiceAccountExists {
if !config.Config.IsNsRestrictedMode() {
if err := kubernetesProvider.RemoveNonNamespacedResources(removalCtx, mizu.ClusterRoleName, mizu.ClusterRoleBindingName); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing non-namespaced resources: %v", errormessage.FormatError(err)))
return
}
} else {
if err := kubernetesProvider.RemoveServicAccount(removalCtx, config.Config.MizuResourcesNamespace, mizu.ServiceAccountName); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Service Account %s in namespace %s: %v", mizu.ServiceAccountName, config.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
return
}
if err := kubernetesProvider.RemoveRole(removalCtx, config.Config.MizuResourcesNamespace, mizu.RoleName); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Role %s in namespace %s: %v", mizu.RoleName, config.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
}
if err := kubernetesProvider.RemoveRoleBinding(removalCtx, config.Config.MizuResourcesNamespace, mizu.RoleBindingName); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing RoleBinding %s in namespace %s: %v", mizu.RoleBindingName, config.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
}
if len(leftoverResources) > 0 {
errMsg := fmt.Sprintf("Failed to remove the following resources, for more info check logs at %s:", logger.GetLogFilePath())
for _, resource := range leftoverResources {
errMsg += "\n- " + resource
}
logger.Log.Errorf(uiUtils.Error, errMsg)
}
}
func cleanUpRestrictedMode(ctx context.Context, kubernetesProvider *kubernetes.Provider) []string {
leftoverResources := make([]string, 0)
if err := kubernetesProvider.RemovePod(ctx, config.Config.MizuResourcesNamespace, mizu.ApiServerPodName); err != nil {
resourceDesc := fmt.Sprintf("Pod %s in namespace %s", mizu.ApiServerPodName, config.Config.MizuResourcesNamespace)
handleDeletionError(err, resourceDesc, &leftoverResources)
}
if !config.Config.IsNsRestrictedMode() {
waitUntilNamespaceDeleted(removalCtx, cancel, kubernetesProvider)
if err := kubernetesProvider.RemoveService(ctx, config.Config.MizuResourcesNamespace, mizu.ApiServerPodName); err != nil {
resourceDesc := fmt.Sprintf("Service %s in namespace %s", mizu.ApiServerPodName, config.Config.MizuResourcesNamespace)
handleDeletionError(err, resourceDesc, &leftoverResources)
}
if err := kubernetesProvider.RemoveDaemonSet(ctx, config.Config.MizuResourcesNamespace, mizu.TapperDaemonSetName); err != nil {
resourceDesc := fmt.Sprintf("DaemonSet %s in namespace %s", mizu.TapperDaemonSetName, config.Config.MizuResourcesNamespace)
handleDeletionError(err, resourceDesc, &leftoverResources)
}
if err := kubernetesProvider.RemoveConfigMap(ctx, config.Config.MizuResourcesNamespace, mizu.ConfigMapName); err != nil {
resourceDesc := fmt.Sprintf("ConfigMap %s in namespace %s", mizu.ConfigMapName, config.Config.MizuResourcesNamespace)
handleDeletionError(err, resourceDesc, &leftoverResources)
}
if err := kubernetesProvider.RemoveServicAccount(ctx, config.Config.MizuResourcesNamespace, mizu.ServiceAccountName); err != nil {
resourceDesc := fmt.Sprintf("Service Account %s in namespace %s", mizu.ServiceAccountName, config.Config.MizuResourcesNamespace)
handleDeletionError(err, resourceDesc, &leftoverResources)
}
if err := kubernetesProvider.RemoveRole(ctx, config.Config.MizuResourcesNamespace, mizu.RoleName); err != nil {
resourceDesc := fmt.Sprintf("Role %s in namespace %s", mizu.RoleName, config.Config.MizuResourcesNamespace)
handleDeletionError(err, resourceDesc, &leftoverResources)
}
if err := kubernetesProvider.RemoveRoleBinding(ctx, config.Config.MizuResourcesNamespace, mizu.RoleBindingName); err != nil {
resourceDesc := fmt.Sprintf("RoleBinding %s in namespace %s", mizu.RoleBindingName, config.Config.MizuResourcesNamespace)
handleDeletionError(err, resourceDesc, &leftoverResources)
}
return leftoverResources
}
func cleanUpNonRestrictedMode(ctx context.Context, cancel context.CancelFunc, kubernetesProvider *kubernetes.Provider) []string {
leftoverResources := make([]string, 0)
if err := kubernetesProvider.RemoveNamespace(ctx, config.Config.MizuResourcesNamespace); err != nil {
resourceDesc := fmt.Sprintf("Namespace %s", config.Config.MizuResourcesNamespace)
handleDeletionError(err, resourceDesc, &leftoverResources)
} else {
defer waitUntilNamespaceDeleted(ctx, cancel, kubernetesProvider)
}
if err := kubernetesProvider.RemoveClusterRole(ctx, mizu.ClusterRoleName); err != nil {
resourceDesc := fmt.Sprintf("ClusterRole %s", mizu.ClusterRoleName)
handleDeletionError(err, resourceDesc, &leftoverResources)
}
if err := kubernetesProvider.RemoveClusterRoleBinding(ctx, mizu.ClusterRoleBindingName); err != nil {
resourceDesc := fmt.Sprintf("ClusterRoleBinding %s", mizu.ClusterRoleBindingName)
handleDeletionError(err, resourceDesc, &leftoverResources)
}
return leftoverResources
}
func handleDeletionError(err error, resourceDesc string, leftoverResources *[]string) {
logger.Log.Debugf("Error removing %s: %v", resourceDesc, errormessage.FormatError(err))
*leftoverResources = append(*leftoverResources, resourceDesc)
}
func waitUntilNamespaceDeleted(ctx context.Context, cancel context.CancelFunc, kubernetesProvider *kubernetes.Provider) {
@@ -376,13 +396,8 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
logger.Log.Debugf("[Error] failed update tapped pods %v", err)
}
nodeToTappedPodIPMap := getNodeHostToTappedPodIpsMap(state.currentlyTappedPods)
if err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error building node to ips map: %v", errormessage.FormatError(err)))
cancel()
}
if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, mizuApiFilteringOptions); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error updating daemonset: %v", errormessage.FormatError(err)))
if err := updateMizuTappers(ctx, kubernetesProvider, mizuApiFilteringOptions); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error updating tappers: %v", errormessage.FormatError(err)))
cancel()
}
}
@@ -500,7 +515,7 @@ func getMissingPods(pods1 []core.Pod, pods2 []core.Pod) []core.Pod {
return missingPods
}
func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, mizuApiFilteringOptions *api.TrafficFilteringOptions) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", mizu.ApiServerPodName))
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, []string{config.Config.MizuResourcesNamespace}, podExactRegex)
isPodReady := false
@@ -530,16 +545,38 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
}
logger.Log.Debugf("Watching API Server pod loop, modified: %v", modifiedPod.Status.Phase)
if modifiedPod.Status.Phase == core.PodPending {
if modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue {
logger.Log.Debugf("Wasn't able to deploy the API server. Reason: \"%s\"", modifiedPod.Status.Conditions[0].Message)
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server, for more info check logs at %s", logger.GetLogFilePath()))
cancel()
break
}
if len(modifiedPod.Status.ContainerStatuses) > 0 && modifiedPod.Status.ContainerStatuses[0].State.Waiting != nil && modifiedPod.Status.ContainerStatuses[0].State.Waiting.Reason == "ErrImagePull" {
logger.Log.Debugf("Wasn't able to deploy the API server. (ErrImagePull) Reason: \"%s\"", modifiedPod.Status.ContainerStatuses[0].State.Waiting.Message)
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server: failed to pull the image, for more info check logs at %v", logger.GetLogFilePath()))
cancel()
break
}
}
if modifiedPod.Status.Phase == core.PodRunning && !isPodReady {
isPodReady = true
go startProxyReportErrorIfAny(kubernetesProvider, cancel)
url := GetApiServerUrl()
if err := apiserver.Provider.InitAndTestConnection(url); err != nil {
logger.Log.Errorf(uiUtils.Error, "Couldn't connect to API server, check logs")
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", logger.GetLogFilePath()))
cancel()
break
}
if err := updateMizuTappers(ctx, kubernetesProvider, mizuApiFilteringOptions); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error updating tappers: %v", errormessage.FormatError(err)))
cancel()
}
logger.Log.Infof("Mizu is available at %s\n", url)
openBrowser(url)
requestForAnalysisIfNeeded()
@@ -547,13 +584,13 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
logger.Log.Debugf("[Error] failed update tapped pods %v", err)
}
}
case _, ok := <-errorChan:
case err, ok := <-errorChan:
if !ok {
errorChan = nil
continue
}
logger.Log.Debugf("[ERROR] Agent creation, watching %v namespace", config.Config.MizuResourcesNamespace)
logger.Log.Debugf("[ERROR] Agent creation, watching %v namespace, error: %v", config.Config.MizuResourcesNamespace, err)
cancel()
case <-timeAfter:
@@ -568,6 +605,72 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
}
}
func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", mizu.TapperDaemonSetName))
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, []string{config.Config.MizuResourcesNamespace}, podExactRegex)
var prevPodPhase core.PodPhase
for {
select {
case addedPod, ok := <-added:
if !ok {
added = nil
continue
}
logger.Log.Debugf("Tapper is created [%s]", addedPod.Name)
case removedPod, ok := <-removed:
if !ok {
removed = nil
continue
}
logger.Log.Debugf("Tapper is removed [%s]", removedPod.Name)
case modifiedPod, ok := <-modified:
if !ok {
modified = nil
continue
}
if modifiedPod.Status.Phase == core.PodPending && modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue {
logger.Log.Infof(uiUtils.Red, "Wasn't able to deploy the tapper %s. Reason: \"%s\"", modifiedPod.Name, modifiedPod.Status.Conditions[0].Message)
cancel()
break
}
podStatus := modifiedPod.Status
if podStatus.Phase == core.PodPending && prevPodPhase == podStatus.Phase {
logger.Log.Debugf("Tapper %s is %s", modifiedPod.Name, strings.ToLower(string(podStatus.Phase)))
continue
}
prevPodPhase = podStatus.Phase
if podStatus.Phase == core.PodRunning {
state := podStatus.ContainerStatuses[0].State
if state.Terminated != nil {
switch state.Terminated.Reason {
case "OOMKilled":
logger.Log.Infof(uiUtils.Red, "Tapper %s was terminated (reason: OOMKilled). You should consider increasing machine resources.", modifiedPod.Name)
}
}
}
logger.Log.Debugf("Tapper %s is %s", modifiedPod.Name, strings.ToLower(string(podStatus.Phase)))
case err, ok := <-errorChan:
if !ok {
errorChan = nil
continue
}
logger.Log.Debugf("[Error] Error in mizu tapper watch, err: %v", err)
cancel()
case <-ctx.Done():
logger.Log.Debugf("Watching tapper pod loop, ctx done")
return
}
}
}
func requestForAnalysisIfNeeded() {
if !config.Config.Tap.Analysis {
return
@@ -609,7 +712,7 @@ func getNamespaces(kubernetesProvider *kubernetes.Provider) []string {
if config.Config.Tap.AllNamespaces {
return []string{mizu.K8sAllNamespaces}
} else if len(config.Config.Tap.Namespaces) > 0 {
return mizu.Unique(config.Config.Tap.Namespaces)
return shared.Unique(config.Config.Tap.Namespaces)
} else {
return []string{kubernetesProvider.CurrentNamespace()}
}

View File

@@ -25,4 +25,7 @@ func init() {
defaults.Set(&defaultViewConfig)
viewCmd.Flags().Uint16P(configStructs.GuiPortViewName, "p", defaultViewConfig.GuiPort, "Provide a custom port for the web interface webserver")
viewCmd.Flags().StringP(configStructs.UrlViewName, "u", defaultViewConfig.Url, "Provide a custom host")
viewCmd.Flags().MarkHidden(configStructs.UrlViewName)
}

View File

@@ -24,34 +24,39 @@ func runMizuView() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
exists, err := kubernetesProvider.DoesServicesExist(ctx, config.Config.MizuResourcesNamespace, mizu.ApiServerPodName)
if err != nil {
logger.Log.Errorf("Failed to found mizu service %v", err)
cancel()
return
}
if !exists {
logger.Log.Infof("%s service not found, you should run `mizu tap` command first", mizu.ApiServerPodName)
cancel()
return
}
url := config.Config.View.Url
url := GetApiServerUrl()
if url == "" {
exists, err := kubernetesProvider.DoesServicesExist(ctx, config.Config.MizuResourcesNamespace, mizu.ApiServerPodName)
if err != nil {
logger.Log.Errorf("Failed to found mizu service %v", err)
cancel()
return
}
if !exists {
logger.Log.Infof("%s service not found, you should run `mizu tap` command first", mizu.ApiServerPodName)
cancel()
return
}
response, err := http.Get(fmt.Sprintf("%s/", url))
if err == nil && response.StatusCode == 200 {
logger.Log.Infof("Found a running service %s and open port %d", mizu.ApiServerPodName, config.Config.View.GuiPort)
return
}
logger.Log.Infof("Establishing connection to k8s cluster...")
go startProxyReportErrorIfAny(kubernetesProvider, cancel)
url = GetApiServerUrl()
if err := apiserver.Provider.InitAndTestConnection(GetApiServerUrl()); err != nil {
logger.Log.Errorf(uiUtils.Error, "Couldn't connect to API server, check logs")
return
response, err := http.Get(fmt.Sprintf("%s/", url))
if err == nil && response.StatusCode == 200 {
logger.Log.Infof("Found a running service %s and open port %d", mizu.ApiServerPodName, config.Config.View.GuiPort)
return
}
logger.Log.Infof("Establishing connection to k8s cluster...")
go startProxyReportErrorIfAny(kubernetesProvider, cancel)
if err := apiserver.Provider.InitAndTestConnection(GetApiServerUrl()); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", logger.GetLogFilePath()))
return
}
}
logger.Log.Infof("Mizu is available at %s\n", url)
openBrowser(url)
if isCompatible, err := version.CheckVersionCompatibility(); err != nil {
logger.Log.Errorf("Failed to check versions compatibility %v", err)

View File

@@ -4,7 +4,7 @@ import (
"errors"
"fmt"
"github.com/up9inc/mizu/cli/logger"
"github.com/up9inc/mizu/cli/mizu"
"github.com/up9inc/mizu/shared"
"io/ioutil"
"os"
"reflect"
@@ -89,7 +89,7 @@ func initFlag(f *pflag.Flag) {
configElemValue := reflect.ValueOf(&Config).Elem()
var flagPath []string
if mizu.Contains([]string{ConfigFilePathCommandName}, f.Name) {
if shared.Contains([]string{ConfigFilePathCommandName}, f.Name) {
flagPath = []string{f.Name}
} else {
flagPath = []string{cmdName, f.Name}

View File

@@ -16,27 +16,25 @@ const (
DisableRedactionTapName = "no-redact"
HumanMaxEntriesDBSizeTapName = "max-entries-db-size"
DryRunTapName = "dry-run"
EnforcePolicyFile = "traffic-validation"
EnforcePolicyFileDeprecated = "test-rules"
EnforcePolicyFile = "traffic-validation-file"
)
type TapConfig struct {
AnalysisDestination string `yaml:"dest" default:"up9.app"`
SleepIntervalSec int `yaml:"upload-interval" default:"10"`
PodRegexStr string `yaml:"regex" default:".*"`
GuiPort uint16 `yaml:"gui-port" default:"8899"`
Namespaces []string `yaml:"namespaces"`
Analysis bool `yaml:"analysis" default:"false"`
AllNamespaces bool `yaml:"all-namespaces" default:"false"`
PlainTextFilterRegexes []string `yaml:"regex-masking"`
HealthChecksUserAgentHeaders []string `yaml:"ignored-user-agents"`
DisableRedaction bool `yaml:"no-redact" default:"false"`
HumanMaxEntriesDBSize string `yaml:"max-entries-db-size" default:"200MB"`
DryRun bool `yaml:"dry-run" default:"false"`
EnforcePolicyFile string `yaml:"traffic-validation"`
EnforcePolicyFileDeprecated string `yaml:"test-rules"`
ApiServerResources Resources `yaml:"api-server-resources"`
TapperResources Resources `yaml:"tapper-resources"`
AnalysisDestination string `yaml:"dest" default:"up9.app"`
SleepIntervalSec int `yaml:"upload-interval" default:"10"`
PodRegexStr string `yaml:"regex" default:".*"`
GuiPort uint16 `yaml:"gui-port" default:"8899"`
Namespaces []string `yaml:"namespaces"`
Analysis bool `yaml:"analysis" default:"false"`
AllNamespaces bool `yaml:"all-namespaces" default:"false"`
PlainTextFilterRegexes []string `yaml:"regex-masking"`
IgnoredUserAgents []string `yaml:"ignored-user-agents"`
DisableRedaction bool `yaml:"no-redact" default:"false"`
HumanMaxEntriesDBSize string `yaml:"max-entries-db-size" default:"200MB"`
DryRun bool `yaml:"dry-run" default:"false"`
EnforcePolicyFile string `yaml:"traffic-validation-file"`
ApiServerResources Resources `yaml:"api-server-resources"`
TapperResources Resources `yaml:"tapper-resources"`
}
type Resources struct {

View File

@@ -2,8 +2,10 @@ package configStructs
const (
GuiPortViewName = "gui-port"
UrlViewName = "url"
)
type ViewConfig struct {
GuiPort uint16 `yaml:"gui-port" default:"8899"`
Url string `yaml:"url,omitempty" readonly:""`
}

View File

@@ -268,67 +268,21 @@ func (provider *Provider) CreateService(ctx context.Context, namespace string, s
return provider.clientSet.CoreV1().Services(namespace).Create(ctx, &service, metav1.CreateOptions{})
}
func (provider *Provider) DoesServiceAccountExist(ctx context.Context, namespace string, serviceAccountName string) (bool, error) {
serviceAccount, err := provider.clientSet.CoreV1().ServiceAccounts(namespace).Get(ctx, serviceAccountName, metav1.GetOptions{})
return provider.doesResourceExist(serviceAccount, err)
}
func (provider *Provider) DoesConfigMapExist(ctx context.Context, namespace string, name string) (bool, error) {
resource, err := provider.clientSet.CoreV1().ConfigMaps(namespace).Get(ctx, name, metav1.GetOptions{})
return provider.doesResourceExist(resource, err)
}
func (provider *Provider) DoesServicesExist(ctx context.Context, namespace string, name string) (bool, error) {
resource, err := provider.clientSet.CoreV1().Services(namespace).Get(ctx, name, metav1.GetOptions{})
return provider.doesResourceExist(resource, err)
}
func (provider *Provider) DoesNamespaceExist(ctx context.Context, name string) (bool, error) {
resource, err := provider.clientSet.CoreV1().Namespaces().Get(ctx, name, metav1.GetOptions{})
return provider.doesResourceExist(resource, err)
}
func (provider *Provider) DoesClusterRoleExist(ctx context.Context, name string) (bool, error) {
resource, err := provider.clientSet.RbacV1().ClusterRoles().Get(ctx, name, metav1.GetOptions{})
return provider.doesResourceExist(resource, err)
}
func (provider *Provider) DoesClusterRoleBindingExist(ctx context.Context, name string) (bool, error) {
resource, err := provider.clientSet.RbacV1().ClusterRoleBindings().Get(ctx, name, metav1.GetOptions{})
return provider.doesResourceExist(resource, err)
}
func (provider *Provider) DoesRoleExist(ctx context.Context, namespace string, name string) (bool, error) {
resource, err := provider.clientSet.RbacV1().Roles(namespace).Get(ctx, name, metav1.GetOptions{})
return provider.doesResourceExist(resource, err)
}
func (provider *Provider) DoesRoleBindingExist(ctx context.Context, namespace string, name string) (bool, error) {
resource, err := provider.clientSet.RbacV1().RoleBindings(namespace).Get(ctx, name, metav1.GetOptions{})
return provider.doesResourceExist(resource, err)
}
func (provider *Provider) DoesPodExist(ctx context.Context, namespace string, name string) (bool, error) {
resource, err := provider.clientSet.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{})
return provider.doesResourceExist(resource, err)
}
func (provider *Provider) DoesDaemonSetExist(ctx context.Context, namespace string, name string) (bool, error) {
resource, err := provider.clientSet.AppsV1().DaemonSets(namespace).Get(ctx, name, metav1.GetOptions{})
return provider.doesResourceExist(resource, err)
}
func (provider *Provider) doesResourceExist(resource interface{}, err error) (bool, error) {
var statusError *k8serrors.StatusError
if errors.As(err, &statusError) {
// expected behavior when resource does not exist
if statusError.ErrStatus.Reason == metav1.StatusReasonNotFound {
return false, nil
}
// Getting NotFound error is the expected behavior when a resource does not exist.
if k8serrors.IsNotFound(err) {
return false, nil
}
if err != nil {
return false, err
}
return resource != nil, nil
}
@@ -441,115 +395,63 @@ func (provider *Provider) CreateMizuRBACNamespaceRestricted(ctx context.Context,
}
func (provider *Provider) RemoveNamespace(ctx context.Context, name string) error {
if isFound, err := provider.DoesNamespaceExist(ctx, name); err != nil {
return err
} else if !isFound {
return nil
}
return provider.clientSet.CoreV1().Namespaces().Delete(ctx, name, metav1.DeleteOptions{})
}
func (provider *Provider) RemoveNonNamespacedResources(ctx context.Context, clusterRoleName string, clusterRoleBindingName string) error {
if err := provider.RemoveClusterRole(ctx, clusterRoleName); err != nil {
return err
}
if err := provider.RemoveClusterRoleBinding(ctx, clusterRoleBindingName); err != nil {
return err
}
return nil
err := provider.clientSet.CoreV1().Namespaces().Delete(ctx, name, metav1.DeleteOptions{})
return provider.handleRemovalError(err)
}
func (provider *Provider) RemoveClusterRole(ctx context.Context, name string) error {
if isFound, err := provider.DoesClusterRoleExist(ctx, name); err != nil {
return err
} else if !isFound {
return nil
}
return provider.clientSet.RbacV1().ClusterRoles().Delete(ctx, name, metav1.DeleteOptions{})
err := provider.clientSet.RbacV1().ClusterRoles().Delete(ctx, name, metav1.DeleteOptions{})
return provider.handleRemovalError(err)
}
func (provider *Provider) RemoveClusterRoleBinding(ctx context.Context, name string) error {
if isFound, err := provider.DoesClusterRoleBindingExist(ctx, name); err != nil {
return err
} else if !isFound {
return nil
}
return provider.clientSet.RbacV1().ClusterRoleBindings().Delete(ctx, name, metav1.DeleteOptions{})
err := provider.clientSet.RbacV1().ClusterRoleBindings().Delete(ctx, name, metav1.DeleteOptions{})
return provider.handleRemovalError(err)
}
func (provider *Provider) RemoveRoleBinding(ctx context.Context, namespace string, name string) error {
if isFound, err := provider.DoesRoleBindingExist(ctx, namespace, name); err != nil {
return err
} else if !isFound {
return nil
}
return provider.clientSet.RbacV1().RoleBindings(namespace).Delete(ctx, name, metav1.DeleteOptions{})
err := provider.clientSet.RbacV1().RoleBindings(namespace).Delete(ctx, name, metav1.DeleteOptions{})
return provider.handleRemovalError(err)
}
func (provider *Provider) RemoveRole(ctx context.Context, namespace string, name string) error {
if isFound, err := provider.DoesRoleExist(ctx, namespace, name); err != nil {
return err
} else if !isFound {
return nil
}
return provider.clientSet.RbacV1().Roles(namespace).Delete(ctx, name, metav1.DeleteOptions{})
err := provider.clientSet.RbacV1().Roles(namespace).Delete(ctx, name, metav1.DeleteOptions{})
return provider.handleRemovalError(err)
}
func (provider *Provider) RemoveServicAccount(ctx context.Context, namespace string, name string) error {
if isFound, err := provider.DoesServiceAccountExist(ctx, namespace, name); err != nil {
return err
} else if !isFound {
return nil
}
return provider.clientSet.CoreV1().ServiceAccounts(namespace).Delete(ctx, name, metav1.DeleteOptions{})
err := provider.clientSet.CoreV1().ServiceAccounts(namespace).Delete(ctx, name, metav1.DeleteOptions{})
return provider.handleRemovalError(err)
}
func (provider *Provider) RemovePod(ctx context.Context, namespace string, podName string) error {
if isFound, err := provider.DoesPodExist(ctx, namespace, podName); err != nil {
return err
} else if !isFound {
return nil
}
return provider.clientSet.CoreV1().Pods(namespace).Delete(ctx, podName, metav1.DeleteOptions{})
err := provider.clientSet.CoreV1().Pods(namespace).Delete(ctx, podName, metav1.DeleteOptions{})
return provider.handleRemovalError(err)
}
func (provider *Provider) RemoveConfigMap(ctx context.Context, namespace string, configMapName string) error {
if isFound, err := provider.DoesConfigMapExist(ctx, namespace, configMapName); err != nil {
return err
} else if !isFound {
return nil
}
return provider.clientSet.CoreV1().ConfigMaps(namespace).Delete(ctx, configMapName, metav1.DeleteOptions{})
err := provider.clientSet.CoreV1().ConfigMaps(namespace).Delete(ctx, configMapName, metav1.DeleteOptions{})
return provider.handleRemovalError(err)
}
func (provider *Provider) RemoveService(ctx context.Context, namespace string, serviceName string) error {
if isFound, err := provider.DoesServicesExist(ctx, namespace, serviceName); err != nil {
return err
} else if !isFound {
return nil
}
return provider.clientSet.CoreV1().Services(namespace).Delete(ctx, serviceName, metav1.DeleteOptions{})
err := provider.clientSet.CoreV1().Services(namespace).Delete(ctx, serviceName, metav1.DeleteOptions{})
return provider.handleRemovalError(err)
}
func (provider *Provider) RemoveDaemonSet(ctx context.Context, namespace string, daemonSetName string) error {
if isFound, err := provider.DoesDaemonSetExist(ctx, namespace, daemonSetName); err != nil {
return err
} else if !isFound {
err := provider.clientSet.AppsV1().DaemonSets(namespace).Delete(ctx, daemonSetName, metav1.DeleteOptions{})
return provider.handleRemovalError(err)
}
func (provider *Provider) handleRemovalError(err error) error {
// Ignore NotFound - There is nothing to delete.
// Ignore Forbidden - Assume that a user could not have created the resource in the first place.
if k8serrors.IsNotFound(err) || k8serrors.IsForbidden(err) {
return nil
}
return provider.clientSet.AppsV1().DaemonSets(namespace).Delete(ctx, daemonSetName, metav1.DeleteOptions{})
return err
}
func (provider *Provider) CreateConfigMap(ctx context.Context, namespace string, configMapName string, data string) error {
@@ -731,7 +633,7 @@ func (provider *Provider) ListAllRunningPodsMatchingRegex(ctx context.Context, r
return matchingPods, nil
}
func (provider *Provider) GetPodLogs(namespace string, podName string, ctx context.Context) (string, error) {
func (provider *Provider) GetPodLogs(ctx context.Context, namespace string, podName string) (string, error) {
podLogOpts := core.PodLogOptions{}
req := provider.clientSet.CoreV1().Pods(namespace).GetLogs(podName, &podLogOpts)
podLogs, err := req.Stream(ctx)
@@ -747,7 +649,7 @@ func (provider *Provider) GetPodLogs(namespace string, podName string, ctx conte
return str, nil
}
func (provider *Provider) GetNamespaceEvents(namespace string, ctx context.Context) (string, error) {
func (provider *Provider) GetNamespaceEvents(ctx context.Context, namespace string) (string, error) {
eventsOpts := metav1.ListOptions{}
eventList, err := provider.clientSet.CoreV1().Events(namespace).List(ctx, eventsOpts)
if err != nil {

View File

@@ -33,7 +33,10 @@ func FilteredWatch(ctx context.Context, kubernetesProvider *Provider, targetName
return
}
pod := e.Object.(*corev1.Pod)
pod, ok := e.Object.(*corev1.Pod)
if !ok {
continue
}
if !podFilter.MatchString(pod.Name) {
continue

View File

@@ -1,42 +0,0 @@
package mizu
import (
"encoding/json"
"github.com/gorilla/websocket"
"github.com/up9inc/mizu/shared"
core "k8s.io/api/core/v1"
"time"
)
type ControlSocket struct {
connection *websocket.Conn
}
func CreateControlSocket(socketServerAddress string) (*ControlSocket, error) {
connection, err := shared.ConnectToSocketServer(socketServerAddress, 30, 2 * time.Second, true)
if err != nil {
return nil, err
} else {
return &ControlSocket{connection: connection}, nil
}
}
func (controlSocket *ControlSocket) SendNewTappedPodsListMessage(pods []core.Pod) error {
podInfos := make([]shared.PodInfo, 0)
for _, pod := range pods {
podInfos = append(podInfos, shared.PodInfo{Name: pod.Name, Namespace: pod.Namespace})
}
tapStatus := shared.TapStatus{Pods: podInfos}
socketMessage := shared.CreateWebSocketStatusMessage(tapStatus)
jsonMessage, err := json.Marshal(socketMessage)
if err != nil {
return err
}
err = controlSocket.connection.WriteMessage(websocket.TextMessage, jsonMessage)
if err != nil {
return err
}
return nil
}

View File

@@ -12,7 +12,7 @@ import (
"regexp"
)
func DumpLogs(provider *kubernetes.Provider, ctx context.Context, filePath string) error {
func DumpLogs(ctx context.Context, provider *kubernetes.Provider, filePath string) error {
podExactRegex := regexp.MustCompile("^" + mizu.MizuResourcesPrefix)
pods, err := provider.ListAllPodsMatchingRegex(ctx, podExactRegex, []string{config.Config.MizuResourcesNamespace})
if err != nil {
@@ -32,7 +32,7 @@ func DumpLogs(provider *kubernetes.Provider, ctx context.Context, filePath strin
defer zipWriter.Close()
for _, pod := range pods {
logs, err := provider.GetPodLogs(pod.Namespace, pod.Name, ctx)
logs, err := provider.GetPodLogs(ctx, pod.Namespace, pod.Name)
if err != nil {
logger.Log.Errorf("Failed to get logs, %v", err)
continue
@@ -47,7 +47,7 @@ func DumpLogs(provider *kubernetes.Provider, ctx context.Context, filePath strin
}
}
events, err := provider.GetNamespaceEvents(config.Config.MizuResourcesNamespace, ctx)
events, err := provider.GetNamespaceEvents(ctx, config.Config.MizuResourcesNamespace)
if err != nil {
logger.Log.Debugf("Failed to get k8b events, %v", err)
} else {

View File

@@ -9,6 +9,7 @@ import (
"io/ioutil"
"net/http"
"runtime"
"strings"
"time"
"github.com/google/go-github/v37/github"
@@ -77,7 +78,7 @@ func CheckNewerVersion(versionChan chan string) {
logger.Log.Debugf("Finished version validation, github version %v, current version %v, took %v", gitHubVersion, currentSemVer, time.Since(start))
if gitHubVersionSemVer.GreaterThan(currentSemVer) {
versionChan <- fmt.Sprintf("Update available! %v -> %v (curl -Lo mizu %v/mizu_%s_amd64 && chmod 755 mizu)", mizu.SemVer, gitHubVersion, *latestRelease.HTMLURL, runtime.GOOS)
versionChan <- fmt.Sprintf("Update available! %v -> %v (curl -Lo mizu %v/mizu_%s_amd64 && chmod 755 mizu)", mizu.SemVer, gitHubVersion, strings.Replace(*latestRelease.HTMLURL, "tag", "download", 1), runtime.GOOS)
} else {
versionChan <- ""
}

View File

@@ -1,45 +1,42 @@
# API rules validation
# Traffic validation rules
This feature allows you to define set of simple rules, and test the API against them.
This feature allows you to define set of simple rules, and test the traffic against them.
Such validation may test response for specific JSON fields, headers, etc.
## Examples
Example 1: HTTP request (REST API call) that didnt pass validation is highlighted in red
Example 1: HTTP request (REST API call) that didn't pass validation is highlighted in red
![Simple UI](../assets/validation-example1.png)
- - -
Example 2: Details pane shows the validation rule details and whether it passed or failed
![Simple UI](../assets/validation-example2.png)
## How to use
To use this feature - create simple rules file (see details below) and pass this file as parameter to `mizu tap` command. For example, if rules are stored in file named `rules.yaml` — run the following command:
```shell
mizu tap --traffic-validation rules.yaml PODNAME
mizu tap --traffic-validation-file rules.yaml
```
## Rules file structure
The structure of the traffic-validation-file is:
* `name`: string, name of the rule
* `type`: string, type of the rule, must be `json` or `header` or `latency`
* `type`: string, type of the rule, must be `json` or `header` or `slo`
* `key`: string, [jsonpath](https://code.google.com/archive/p/jsonpath/wikis/Javascript.wiki) used only in `json` or `header` type
* `value`: string, [regex](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Guide/Regular_Expressions) used only in `json` or `header` type
* `service`: string, [regex](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Guide/Regular_Expressions) service name to filter
* `path`: string, [regex](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Guide/Regular_Expressions) URL path to filter
* `latency`: integer, time in ms of the expected latency.
* `response-time`: integer, time in ms of the expected latency.
### For example:
@@ -57,11 +54,12 @@ rules:
key: "Content-Le.*"
value: "(\\d+(?:\\.\\d+)?)"
- name: latency-test
type: latency
latency: 1
type: slo
response-time: 1
service: "carts.*"
```
### Explanation:
* First rule `holy-in-name-property`:
@@ -74,5 +72,4 @@ rules:
* Third rule `latency-test`:
> This rule will be applied to all request made to `carts.*` services. If the latency of the response is greater than `1` will be marked as failure, marked as success otherwise.
> This rule will be applied to all request made to `carts.*` services. If the latency of the response is greater than `1ms` will be marked as failure, marked as success otherwise.

View File

@@ -1,8 +1,8 @@
package shared
import (
"fmt"
"io/ioutil"
"log"
"strings"
"gopkg.in/yaml.v3"
@@ -83,14 +83,14 @@ type RulesPolicy struct {
}
type RulePolicy struct {
Type string `yaml:"type"`
Service string `yaml:"service"`
Path string `yaml:"path"`
Method string `yaml:"method"`
Key string `yaml:"key"`
Value string `yaml:"value"`
Latency int64 `yaml:"latency"`
Name string `yaml:"name"`
Type string `yaml:"type"`
Service string `yaml:"service"`
Path string `yaml:"path"`
Method string `yaml:"method"`
Key string `yaml:"key"`
Value string `yaml:"value"`
ResponseTime int64 `yaml:"response-time"`
Name string `yaml:"name"`
}
type RulesMatched struct {
@@ -99,14 +99,17 @@ type RulesMatched struct {
}
func (r *RulePolicy) validateType() bool {
permitedTypes := []string{"json", "header", "latency"}
permitedTypes := []string{"json", "header", "slo"}
_, found := Find(permitedTypes, r.Type)
if !found {
fmt.Printf("\nRule with name %s will be ignored. Err: only json, header and latency types are supported on rule definition.\n", r.Name)
log.Printf("Error: %s. ", r.Name)
log.Printf("Only json, header and slo types are supported on rule definition. This rule will be ignored\n")
found = false
}
if strings.ToLower(r.Type) == "latency" {
if r.Latency == 0 {
fmt.Printf("\nRule with name %s will be ignored. Err: when type=latency, the field Latency should be specified and have a value >= 1\n\n", r.Name)
if strings.ToLower(r.Type) == "slo" {
if r.ResponseTime <= 0 {
log.Printf("Error: %s. ", r.Name)
log.Printf("When type=slo, the field response-time should be specified and have a value >= 1\n\n")
found = false
}
}
@@ -124,10 +127,6 @@ func (rules *RulesPolicy) ValidateRulesPolicy() []int {
return invalidIndex
}
func (rules *RulesPolicy) RemoveRule(idx int) {
rules.Rules = append(rules.Rules[:idx], rules.Rules[idx+1:]...)
}
func Find(slice []string, val string) (int, bool) {
for i, item := range slice {
if item == val {
@@ -148,10 +147,15 @@ func DecodeEnforcePolicy(path string) (RulesPolicy, error) {
return enforcePolicy, err
}
invalidIndex := enforcePolicy.ValidateRulesPolicy()
var k = 0
if len(invalidIndex) != 0 {
for i := range invalidIndex {
enforcePolicy.RemoveRule(invalidIndex[i])
for i, rule := range enforcePolicy.Rules {
if !ContainsInt(invalidIndex, i) {
enforcePolicy.Rules[k] = rule
k++
}
}
enforcePolicy.Rules = enforcePolicy.Rules[:k]
}
return enforcePolicy, nil
}

View File

@@ -1,4 +1,4 @@
package mizu
package shared
func Contains(slice []string, containsValue string) bool {
for _, sliceValue := range slice {
@@ -10,6 +10,16 @@ func Contains(slice []string, containsValue string) bool {
return false
}
func ContainsInt(slice []int, containsValue int) bool {
for _, sliceValue := range slice {
if sliceValue == containsValue {
return true
}
}
return false
}
func Unique(slice []string) []string {
keys := make(map[string]bool)
var list []string

View File

@@ -1,8 +1,8 @@
package mizu_test
package shared_test
import (
"fmt"
"github.com/up9inc/mizu/cli/mizu"
"github.com/up9inc/mizu/shared"
"reflect"
"testing"
)
@@ -21,7 +21,7 @@ func TestContainsExists(t *testing.T) {
for _, test := range tests {
t.Run(test.ContainsValue, func(t *testing.T) {
actual := mizu.Contains(test.Slice, test.ContainsValue)
actual := shared.Contains(test.Slice, test.ContainsValue)
if actual != test.Expected {
t.Errorf("unexpected result - Expected: %v, actual: %v", test.Expected, actual)
}
@@ -43,7 +43,7 @@ func TestContainsNotExists(t *testing.T) {
for _, test := range tests {
t.Run(test.ContainsValue, func(t *testing.T) {
actual := mizu.Contains(test.Slice, test.ContainsValue)
actual := shared.Contains(test.Slice, test.ContainsValue)
if actual != test.Expected {
t.Errorf("unexpected result - Expected: %v, actual: %v", test.Expected, actual)
}
@@ -63,7 +63,7 @@ func TestContainsEmptySlice(t *testing.T) {
for _, test := range tests {
t.Run(test.ContainsValue, func(t *testing.T) {
actual := mizu.Contains(test.Slice, test.ContainsValue)
actual := shared.Contains(test.Slice, test.ContainsValue)
if actual != test.Expected {
t.Errorf("unexpected result - Expected: %v, actual: %v", test.Expected, actual)
}
@@ -83,7 +83,7 @@ func TestContainsNilSlice(t *testing.T) {
for _, test := range tests {
t.Run(test.ContainsValue, func(t *testing.T) {
actual := mizu.Contains(test.Slice, test.ContainsValue)
actual := shared.Contains(test.Slice, test.ContainsValue)
if actual != test.Expected {
t.Errorf("unexpected result - Expected: %v, actual: %v", test.Expected, actual)
}
@@ -102,7 +102,7 @@ func TestUniqueNoDuplicateValues(t *testing.T) {
for index, test := range tests {
t.Run(fmt.Sprintf("%v", index), func(t *testing.T) {
actual := mizu.Unique(test.Slice)
actual := shared.Unique(test.Slice)
if !reflect.DeepEqual(test.Expected, actual) {
t.Errorf("unexpected result - Expected: %v, actual: %v", test.Expected, actual)
}
@@ -121,7 +121,7 @@ func TestUniqueDuplicateValues(t *testing.T) {
for index, test := range tests {
t.Run(fmt.Sprintf("%v", index), func(t *testing.T) {
actual := mizu.Unique(test.Slice)
actual := shared.Unique(test.Slice)
if !reflect.DeepEqual(test.Expected, actual) {
t.Errorf("unexpected result - Expected: %v, actual: %v", test.Expected, actual)
}

View File

@@ -1,39 +0,0 @@
package shared
import (
"fmt"
"github.com/gorilla/websocket"
"time"
)
const (
DEFAULT_SOCKET_RETRIES = 3
DEFAULT_SOCKET_RETRY_SLEEP_TIME = time.Second * 10
)
func ConnectToSocketServer(address string, retries int, retrySleepTime time.Duration, hideTimeoutErrors bool) (*websocket.Conn, error) {
var err error
var connection *websocket.Conn
try := 0
// Connection to server fails if client pod is up before server.
// Retries solve this issue.
for try < retries {
connection, _, err = websocket.DefaultDialer.Dial(address, nil)
if err != nil {
try++
if !hideTimeoutErrors {
fmt.Printf("Failed connecting to websocket server: %s, (%v,%+v)\n", err, err, err)
}
} else {
break
}
time.Sleep(retrySleepTime)
}
if err != nil {
return nil, err
}
return connection, nil
}

View File

@@ -106,7 +106,7 @@ type MizuEntry struct {
UpdatedAt time.Time
ProtocolName string `json:"protocolName" gorm:"column:protocolName"`
ProtocolLongName string `json:"protocolLongName" gorm:"column:protocolLongName"`
ProtocolAbbreviation string `json:"protocolAbbreviation" gorm:"column:protocolVersion"`
ProtocolAbbreviation string `json:"protocolAbbreviation" gorm:"column:protocolAbbreviation"`
ProtocolVersion string `json:"protocolVersion" gorm:"column:protocolVersion"`
ProtocolBackgroundColor string `json:"protocolBackgroundColor" gorm:"column:protocolBackgroundColor"`
ProtocolForegroundColor string `json:"protocolForegroundColor" gorm:"column:protocolForegroundColor"`
@@ -138,6 +138,7 @@ type MizuEntryWrapper struct {
BodySize int64 `json:"bodySize"`
Data MizuEntry `json:"data"`
Rules []map[string]interface{} `json:"rulesMatched,omitempty"`
IsRulesEnabled bool `json:"isRulesEnabled"`
}
type BaseEntryDetails struct {
@@ -156,7 +157,7 @@ type BaseEntryDetails struct {
SourcePort string `json:"sourcePort,omitempty"`
DestinationPort string `json:"destinationPort,omitempty"`
IsOutgoing bool `json:"isOutgoing,omitempty"`
Latency int64 `json:"latency,omitempty"`
Latency int64 `json:"latency"`
Rules ApplicableRules `json:"rules,omitempty"`
}
@@ -190,6 +191,7 @@ func (bed *BaseEntryDetails) UnmarshalData(entry *MizuEntry) error {
bed.Timestamp = entry.Timestamp
bed.RequestSenderIp = entry.RequestSenderIp
bed.IsOutgoing = entry.IsOutgoing
bed.Latency = entry.ElapsedTime
return nil
}

View File

@@ -1,7 +1,7 @@
package api
type TrafficFilteringOptions struct {
HealthChecksUserAgentHeaders []string
PlainTextMaskingRegexes []*SerializableRegexp
DisableRedaction bool
IgnoredUserAgents []string
PlainTextMaskingRegexes []*SerializableRegexp
DisableRedaction bool
}

View File

@@ -312,7 +312,7 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
SourcePort: entry.SourcePort,
DestinationPort: entry.DestinationPort,
IsOutgoing: entry.IsOutgoing,
Latency: 0,
Latency: entry.ElapsedTime,
Rules: api.ApplicableRules{
Latency: 0,
Status: false,

View File

@@ -14,9 +14,14 @@ import (
)
func filterAndEmit(item *api.OutputChannelItem, emitter api.Emitter, options *api.TrafficFilteringOptions) {
if IsIgnoredUserAgent(item, options) {
return
}
if !options.DisableRedaction {
FilterSensitiveData(item, options)
}
emitter.Emit(item)
}

View File

@@ -223,7 +223,7 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
SourcePort: entry.SourcePort,
DestinationPort: entry.DestinationPort,
IsOutgoing: entry.IsOutgoing,
Latency: 0,
Latency: entry.ElapsedTime,
Rules: api.ApplicableRules{
Latency: 0,
Status: false,

View File

@@ -25,6 +25,30 @@ var personallyIdentifiableDataFields = []string{"token", "authorization", "authe
"zip", "zipcode", "address", "country", "firstname", "lastname",
"middlename", "fname", "lname", "birthdate"}
func IsIgnoredUserAgent(item *api.OutputChannelItem, options *api.TrafficFilteringOptions) bool {
if item.Protocol.Name != "http" {
return false
}
request := item.Pair.Request.Payload.(HTTPPayload).Data.(*http.Request)
for headerKey, headerValues := range request.Header {
if strings.ToLower(headerKey) == "user-agent" {
for _, userAgent := range options.IgnoredUserAgents {
for _, headerValue := range headerValues {
if strings.Contains(strings.ToLower(headerValue), strings.ToLower(userAgent)) {
return true
}
}
}
return false
}
}
return false
}
func FilterSensitiveData(item *api.OutputChannelItem, options *api.TrafficFilteringOptions) {
request := item.Pair.Request.Payload.(HTTPPayload).Data.(*http.Request)
response := item.Pair.Response.Payload.(HTTPPayload).Data.(*http.Response)
@@ -86,6 +110,10 @@ func getContentTypeHeaderValue(headers http.Header) string {
}
func isFieldNameSensitive(fieldName string) bool {
if fieldName == ":authority" {
return false
}
name := strings.ToLower(fieldName)
name = strings.ReplaceAll(name, "_", "")
name = strings.ReplaceAll(name, "-", "")

View File

@@ -186,7 +186,7 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
SourcePort: entry.SourcePort,
DestinationPort: entry.DestinationPort,
IsOutgoing: entry.IsOutgoing,
Latency: 0,
Latency: entry.ElapsedTime,
Rules: api.ApplicableRules{
Latency: 0,
Status: false,

View File

@@ -0,0 +1,14 @@
package main
//ConnectError redis connection error,such as io timeout
type ConnectError struct {
Message string
}
func newConnectError(message string) *ConnectError {
return &ConnectError{Message: message}
}
func (e *ConnectError) Error() string {
return e.Message
}

View File

@@ -0,0 +1,9 @@
module github.com/up9inc/mizu/tap/extensions/redis
go 1.16
require (
github.com/up9inc/mizu/tap/api v0.0.0
)
replace github.com/up9inc/mizu/tap/api v0.0.0 => ../../api

View File

@@ -0,0 +1,55 @@
package main
import (
"fmt"
"github.com/up9inc/mizu/tap/api"
)
func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket) error {
counterPair.Request++
ident := fmt.Sprintf(
"%s->%s %s->%s %d",
tcpID.SrcIP,
tcpID.DstIP,
tcpID.SrcPort,
tcpID.DstPort,
counterPair.Request,
)
item := reqResMatcher.registerRequest(ident, request, superTimer.CaptureTime)
if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.SrcIP,
ClientPort: tcpID.SrcPort,
ServerIP: tcpID.DstIP,
ServerPort: tcpID.DstPort,
IsOutgoing: true,
}
emitter.Emit(item)
}
return nil
}
func handleServerStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket) error {
counterPair.Response++
ident := fmt.Sprintf(
"%s->%s %s->%s %d",
tcpID.DstIP,
tcpID.SrcIP,
tcpID.DstPort,
tcpID.SrcPort,
counterPair.Response,
)
item := reqResMatcher.registerResponse(ident, response, superTimer.CaptureTime)
if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.DstIP,
ClientPort: tcpID.DstPort,
ServerIP: tcpID.SrcIP,
ServerPort: tcpID.SrcPort,
IsOutgoing: false,
}
emitter.Emit(item)
}
return nil
}

View File

@@ -0,0 +1,57 @@
package main
import (
"encoding/json"
"github.com/up9inc/mizu/tap/api"
)
type RedisPayload struct {
Data interface{}
}
type RedisPayloader interface {
MarshalJSON() ([]byte, error)
}
func (h RedisPayload) MarshalJSON() ([]byte, error) {
return json.Marshal(h.Data)
}
type RedisWrapper struct {
Method string `json:"method"`
Url string `json:"url"`
Details interface{} `json:"details"`
}
func representGeneric(generic map[string]interface{}) (representation []interface{}) {
details, _ := json.Marshal([]map[string]string{
{
"name": "Type",
"value": generic["type"].(string),
},
{
"name": "Command",
"value": generic["command"].(string),
},
{
"name": "Key",
"value": generic["key"].(string),
},
{
"name": "Value",
"value": generic["value"].(string),
},
{
"name": "Keyword",
"value": generic["keyword"].(string),
},
})
representation = append(representation, map[string]string{
"type": api.TABLE,
"title": "Details",
"data": string(details),
})
return
}

View File

@@ -0,0 +1,154 @@
package main
import (
"bufio"
"encoding/json"
"fmt"
"log"
"github.com/up9inc/mizu/tap/api"
)
var protocol api.Protocol = api.Protocol{
Name: "redis",
LongName: "Redis Serialization Protocol",
Abbreviation: "REDIS",
Version: "3.x",
BackgroundColor: "#a41e11",
ForegroundColor: "#ffffff",
FontSize: 11,
ReferenceLink: "https://redis.io/topics/protocol",
Ports: []string{"6379"},
Priority: 3,
}
func init() {
log.Println("Initializing Redis extension...")
}
type dissecting string
func (d dissecting) Register(extension *api.Extension) {
extension.Protocol = &protocol
extension.MatcherMap = reqResMatcher.openMessagesMap
}
func (d dissecting) Ping() {
log.Printf("pong %s\n", protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
is := &RedisInputStream{
Reader: b,
Buf: make([]byte, 8192),
}
proto := NewProtocol(is)
for {
redisPacket, err := proto.Read()
if err != nil {
return err
}
if isClient {
handleClientStream(tcpID, counterPair, superTimer, emitter, redisPacket)
} else {
handleServerStream(tcpID, counterPair, superTimer, emitter, redisPacket)
}
}
}
func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *api.MizuEntry {
request := item.Pair.Request.Payload.(map[string]interface{})
reqDetails := request["details"].(map[string]interface{})
service := "redis"
if resolvedDestination != "" {
service = resolvedDestination
} else if resolvedSource != "" {
service = resolvedSource
}
method := ""
if reqDetails["command"] != nil {
method = reqDetails["command"].(string)
}
summary := ""
if reqDetails["key"] != nil {
summary = reqDetails["key"].(string)
}
request["url"] = summary
entryBytes, _ := json.Marshal(item.Pair)
return &api.MizuEntry{
ProtocolName: protocol.Name,
ProtocolLongName: protocol.LongName,
ProtocolAbbreviation: protocol.Abbreviation,
ProtocolVersion: protocol.Version,
ProtocolBackgroundColor: protocol.BackgroundColor,
ProtocolForegroundColor: protocol.ForegroundColor,
ProtocolFontSize: protocol.FontSize,
ProtocolReferenceLink: protocol.ReferenceLink,
EntryId: entryId,
Entry: string(entryBytes),
Url: fmt.Sprintf("%s%s", service, summary),
Method: method,
Status: 0,
RequestSenderIp: item.ConnectionInfo.ClientIP,
Service: service,
Timestamp: item.Timestamp,
ElapsedTime: 0,
Path: summary,
ResolvedSource: resolvedSource,
ResolvedDestination: resolvedDestination,
SourceIp: item.ConnectionInfo.ClientIP,
DestinationIp: item.ConnectionInfo.ServerIP,
SourcePort: item.ConnectionInfo.ClientPort,
DestinationPort: item.ConnectionInfo.ServerPort,
IsOutgoing: item.ConnectionInfo.IsOutgoing,
}
}
func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
return &api.BaseEntryDetails{
Id: entry.EntryId,
Protocol: protocol,
Url: entry.Url,
RequestSenderIp: entry.RequestSenderIp,
Service: entry.Service,
Summary: entry.Path,
StatusCode: entry.Status,
Method: entry.Method,
Timestamp: entry.Timestamp,
SourceIp: entry.SourceIp,
DestinationIp: entry.DestinationIp,
SourcePort: entry.SourcePort,
DestinationPort: entry.DestinationPort,
IsOutgoing: entry.IsOutgoing,
Latency: entry.ElapsedTime,
Rules: api.ApplicableRules{
Latency: 0,
Status: false,
},
}
}
func (d dissecting) Represent(entry *api.MizuEntry) (p api.Protocol, object []byte, bodySize int64, err error) {
p = protocol
bodySize = 0
var root map[string]interface{}
json.Unmarshal([]byte(entry.Entry), &root)
representation := make(map[string]interface{}, 0)
request := root["request"].(map[string]interface{})["payload"].(map[string]interface{})
response := root["response"].(map[string]interface{})["payload"].(map[string]interface{})
reqDetails := request["details"].(map[string]interface{})
resDetails := response["details"].(map[string]interface{})
repRequest := representGeneric(reqDetails)
repResponse := representGeneric(resDetails)
representation["request"] = repRequest
representation["response"] = repResponse
object, err = json.Marshal(representation)
return
}
var Dissector dissecting

View File

@@ -0,0 +1,102 @@
package main
import (
"fmt"
"strings"
"sync"
"time"
"github.com/up9inc/mizu/tap/api"
)
var reqResMatcher = createResponseRequestMatcher() // global
// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port}_{incremental_counter}
type requestResponseMatcher struct {
openMessagesMap *sync.Map
}
func createResponseRequestMatcher() requestResponseMatcher {
newMatcher := &requestResponseMatcher{openMessagesMap: &sync.Map{}}
return *newMatcher
}
func (matcher *requestResponseMatcher) registerRequest(ident string, request *RedisPacket, captureTime time.Time) *api.OutputChannelItem {
split := splitIdent(ident)
key := genKey(split)
requestRedisMessage := api.GenericMessage{
IsRequest: true,
CaptureTime: captureTime,
Payload: RedisPayload{
Data: &RedisWrapper{
Method: string(request.Command),
Url: "",
Details: request,
},
},
}
if response, found := matcher.openMessagesMap.LoadAndDelete(key); found {
// Type assertion always succeeds because all of the map's values are of api.GenericMessage type
responseRedisMessage := response.(*api.GenericMessage)
if responseRedisMessage.IsRequest {
return nil
}
return matcher.preparePair(&requestRedisMessage, responseRedisMessage)
}
matcher.openMessagesMap.Store(key, &requestRedisMessage)
return nil
}
func (matcher *requestResponseMatcher) registerResponse(ident string, response *RedisPacket, captureTime time.Time) *api.OutputChannelItem {
split := splitIdent(ident)
key := genKey(split)
responseRedisMessage := api.GenericMessage{
IsRequest: false,
CaptureTime: captureTime,
Payload: RedisPayload{
Data: &RedisWrapper{
Method: string(response.Command),
Url: "",
Details: response,
},
},
}
if request, found := matcher.openMessagesMap.LoadAndDelete(key); found {
// Type assertion always succeeds because all of the map's values are of api.GenericMessage type
requestRedisMessage := request.(*api.GenericMessage)
if !requestRedisMessage.IsRequest {
return nil
}
return matcher.preparePair(requestRedisMessage, &responseRedisMessage)
}
matcher.openMessagesMap.Store(key, &responseRedisMessage)
return nil
}
func (matcher *requestResponseMatcher) preparePair(requestRedisMessage *api.GenericMessage, responseRedisMessage *api.GenericMessage) *api.OutputChannelItem {
return &api.OutputChannelItem{
Protocol: protocol,
Timestamp: requestRedisMessage.CaptureTime.UnixNano() / int64(time.Millisecond),
ConnectionInfo: nil,
Pair: &api.RequestResponsePair{
Request: *requestRedisMessage,
Response: *responseRedisMessage,
},
}
}
func splitIdent(ident string) []string {
ident = strings.Replace(ident, "->", " ", -1)
return strings.Split(ident, " ")
}
func genKey(split []string) string {
key := fmt.Sprintf("%s:%s->%s:%s,%s", split[0], split[2], split[1], split[3], split[4])
return key
}

View File

@@ -0,0 +1,474 @@
package main
import (
"bufio"
"errors"
"fmt"
"log"
"math"
"reflect"
"strconv"
"strings"
"time"
)
const (
askPrefix = "ASK "
movedPrefix = "MOVED "
clusterDownPrefix = "CLUSTERDOWN "
busyPrefix = "BUSY "
noscriptPrefix = "NOSCRIPT "
defaultHost = "localhost"
defaultPort = 6379
defaultSentinelPort = 26379
defaultTimeout = 5 * time.Second
defaultDatabase = 2 * time.Second
dollarByte = '$'
asteriskByte = '*'
plusByte = '+'
minusByte = '-'
colonByte = ':'
notApplicableByte = '0'
sentinelMasters = "masters"
sentinelGetMasterAddrByName = "get-master-addr-by-name"
sentinelReset = "reset"
sentinelSlaves = "slaves"
sentinelFailOver = "failover"
sentinelMonitor = "monitor"
sentinelRemove = "remove"
sentinelSet = "set"
clusterNodes = "nodes"
clusterMeet = "meet"
clusterReset = "reset"
clusterAddSlots = "addslots"
clusterDelSlots = "delslots"
clusterInfo = "info"
clusterGetKeysInSlot = "getkeysinslot"
clusterSetSlot = "setslot"
clusterSetSlotNode = "node"
clusterSetSlotMigrating = "migrating"
clusterSetSlotImporting = "importing"
clusterSetSlotStable = "stable"
clusterForget = "forget"
clusterFlushSlot = "flushslots"
clusterKeySlot = "keyslot"
clusterCountKeyInSlot = "countkeysinslot"
clusterSaveConfig = "saveconfig"
clusterReplicate = "replicate"
clusterSlaves = "slaves"
clusterFailOver = "failover"
clusterSlots = "slots"
pubSubChannels = "channels"
pubSubNumSub = "numsub"
pubSubNumPat = "numpat"
)
//intToByteArr convert int to byte array
func intToByteArr(a int) []byte {
buf := make([]byte, 0)
return strconv.AppendInt(buf, int64(a), 10)
}
var (
bytesTrue = intToByteArr(1)
bytesFalse = intToByteArr(0)
bytesTilde = []byte("~")
positiveInfinityBytes = []byte("+inf")
negativeInfinityBytes = []byte("-inf")
)
var (
sizeTable = []int{9, 99, 999, 9999, 99999, 999999, 9999999, 99999999,
999999999, math.MaxInt32}
digitTens = []byte{'0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '1',
'1', '1', '1', '1', '1', '1', '1', '1', '1', '2', '2', '2', '2', '2', '2', '2', '2', '2',
'2', '3', '3', '3', '3', '3', '3', '3', '3', '3', '3', '4', '4', '4', '4', '4', '4', '4',
'4', '4', '4', '5', '5', '5', '5', '5', '5', '5', '5', '5', '5', '6', '6', '6', '6', '6',
'6', '6', '6', '6', '6', '7', '7', '7', '7', '7', '7', '7', '7', '7', '7', '8', '8', '8',
'8', '8', '8', '8', '8', '8', '8', '9', '9', '9', '9', '9', '9', '9', '9', '9', '9'}
digitOnes = []byte{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0',
'1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8',
'9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6',
'7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4',
'5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2',
'3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9'}
digits = []byte{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a',
'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's',
't', 'u', 'v', 'w', 'x', 'y', 'z'}
)
// receive message from redis
type RedisInputStream struct {
*bufio.Reader
Buf []byte
count int
limit int
}
func (r *RedisInputStream) readByte() (byte, error) {
err := r.ensureFill()
if err != nil {
return 0, err
}
ret := r.Buf[r.count]
r.count++
return ret, nil
}
func (r *RedisInputStream) ensureFill() error {
if r.count < r.limit {
return nil
}
var err error
r.limit, err = r.Read(r.Buf)
if err != nil {
return newConnectError(err.Error())
}
r.count = 0
if r.limit == -1 {
return newConnectError("Unexpected end of stream")
}
return nil
}
func (r *RedisInputStream) readLine() (string, error) {
buf := ""
for {
err := r.ensureFill()
if err != nil {
return "", err
}
b := r.Buf[r.count]
r.count++
if b == '\r' {
err := r.ensureFill()
if err != nil {
return "", err
}
c := r.Buf[r.count]
r.count++
if c == '\n' {
break
}
buf += string(b)
buf += string(c)
} else {
buf += string(b)
}
}
if buf == "" {
return "", newConnectError("It seems like server has closed the connection.")
}
return buf, nil
}
func (r *RedisInputStream) readLineBytes() ([]byte, error) {
err := r.ensureFill()
if err != nil {
return nil, err
}
pos := r.count
buf := r.Buf
for {
if pos == r.limit {
return r.readLineBytesSlowly()
}
p := buf[pos]
pos++
if p == '\r' {
if pos == r.limit {
return r.readLineBytesSlowly()
}
p := buf[pos]
pos++
if p == '\n' {
break
}
}
}
N := pos - r.count - 2
line := make([]byte, N)
j := 0
for i := r.count; i <= N; i++ {
line[j] = buf[i]
j++
}
r.count = pos
return line, nil
}
func (r *RedisInputStream) readLineBytesSlowly() ([]byte, error) {
buf := make([]byte, 0)
for {
err := r.ensureFill()
if err != nil {
return nil, err
}
b := r.Buf[r.count]
r.count++
if b == 'r' {
err := r.ensureFill()
if err != nil {
return nil, err
}
c := r.Buf[r.count]
r.count++
if c == '\n' {
break
}
buf = append(buf, b)
buf = append(buf, c)
} else {
buf = append(buf, b)
}
}
return buf, nil
}
func (r *RedisInputStream) readIntCrLf() (int64, error) {
err := r.ensureFill()
if err != nil {
return 0, err
}
buf := r.Buf
isNeg := false
if buf[r.count] == '-' {
isNeg = true
}
if isNeg {
r.count++
}
value := int64(0)
for {
err := r.ensureFill()
if err != nil {
return 0, err
}
b := buf[r.count]
r.count++
if b == '\r' {
err := r.ensureFill()
if err != nil {
return 0, err
}
c := buf[r.count]
r.count++
if c != '\n' {
return 0, newConnectError("Unexpected character!")
}
break
} else {
value = value*10 + int64(b) - int64('0')
}
}
if isNeg {
return -value, nil
}
return value, nil
}
type RedisProtocol struct {
is *RedisInputStream
}
func NewProtocol(is *RedisInputStream) *RedisProtocol {
return &RedisProtocol{
is: is,
}
}
func (p *RedisProtocol) Read() (packet *RedisPacket, err error) {
x, r, err := p.process()
if err != nil {
return
}
packet = &RedisPacket{}
packet.Type = r
switch x.(type) {
case []interface{}:
array := x.([]interface{})
packet.Command = RedisCommand(strings.ToUpper(string(array[0].([]uint8))))
if len(array) > 1 {
packet.Key = string(array[1].([]uint8))
}
if len(array) > 2 {
packet.Value = string(array[2].([]uint8))
}
case []uint8:
val := string(x.([]uint8))
if packet.Type == types[plusByte] {
packet.Keyword = RedisKeyword(strings.ToUpper(val))
if !isValidRedisKeyword(keywords, packet.Keyword) {
err = errors.New(fmt.Sprintf("Unrecognized keyword: %s", string(packet.Command)))
return
}
} else {
packet.Value = val
}
case string:
packet.Value = x.(string)
default:
msg := fmt.Sprintf("Unrecognized Redis data type: %v\n", reflect.TypeOf(x))
log.Printf(msg)
err = errors.New(msg)
return
}
if packet.Command != "" {
if !isValidRedisCommand(commands, packet.Command) {
err = errors.New(fmt.Sprintf("Unrecognized command: %s", string(packet.Command)))
return
}
}
return
}
func (p *RedisProtocol) process() (v interface{}, r RedisType, err error) {
b, err := p.is.readByte()
if err != nil {
return nil, types[notApplicableByte], newConnectError(err.Error())
}
switch b {
case plusByte:
v, err = p.processSimpleString()
r = types[plusByte]
return
case dollarByte:
v, err = p.processBulkString()
r = types[dollarByte]
return
case asteriskByte:
v, err = p.processArray()
r = types[asteriskByte]
return
case colonByte:
v, err = p.processInteger()
r = types[colonByte]
return
case minusByte:
v, err = p.processError()
r = types[minusByte]
return
default:
return nil, types[notApplicableByte], newConnectError(fmt.Sprintf("Unknown reply: %b", b))
}
}
func (p *RedisProtocol) processSimpleString() ([]byte, error) {
return p.is.readLineBytes()
}
func (p *RedisProtocol) processBulkString() ([]byte, error) {
l, err := p.is.readIntCrLf()
if err != nil {
return nil, newConnectError(err.Error())
}
if l == -1 {
return nil, nil
}
line := make([]byte, 0)
for {
err := p.is.ensureFill()
if err != nil {
return nil, err
}
b := p.is.Buf[p.is.count]
p.is.count++
if b == '\r' {
err := p.is.ensureFill()
if err != nil {
return nil, err
}
c := p.is.Buf[p.is.count]
p.is.count++
if c != '\n' {
return nil, newConnectError("Unexpected character!")
}
break
} else {
line = append(line, b)
}
}
return line, nil
}
func (p *RedisProtocol) processArray() ([]interface{}, error) {
l, err := p.is.readIntCrLf()
if err != nil {
return nil, newConnectError(err.Error())
}
if l == -1 {
return nil, nil
}
ret := make([]interface{}, 0)
for i := 0; i < int(l); i++ {
if obj, _, err := p.process(); err != nil {
ret = append(ret, err)
} else {
ret = append(ret, obj)
}
}
return ret, nil
}
func (p *RedisProtocol) processInteger() (int64, error) {
return p.is.readIntCrLf()
}
func (p *RedisProtocol) processError() (interface{}, error) {
msg, err := p.is.readLine()
if err != nil {
return nil, newConnectError(err.Error())
}
if strings.HasPrefix(msg, movedPrefix) {
host, port, slot, err := p.parseTargetHostAndSlot(msg)
if err != nil {
return nil, err
}
return fmt.Sprintf("MovedDataError: %s host: %s port: %d slot: %d", msg, host, port, slot), nil
} else if strings.HasPrefix(msg, askPrefix) {
host, port, slot, err := p.parseTargetHostAndSlot(msg)
if err != nil {
return nil, err
}
return fmt.Sprintf("AskDataError: %s host: %s port: %d slot: %d", msg, host, port, slot), nil
} else if strings.HasPrefix(msg, clusterDownPrefix) {
return fmt.Sprintf("ClusterError: %s", msg), nil
} else if strings.HasPrefix(msg, busyPrefix) {
return fmt.Sprintf("BusyError: %s", msg), nil
} else if strings.HasPrefix(msg, noscriptPrefix) {
return fmt.Sprintf("NoScriptError: %s", msg), nil
}
return fmt.Sprintf("DataError: %s", msg), nil
}
func (p *RedisProtocol) parseTargetHostAndSlot(clusterRedirectResponse string) (host string, po int, slot int, err error) {
arr := strings.Split(clusterRedirectResponse, " ")
host, port := p.extractParts(arr[2])
slot, err = strconv.Atoi(arr[1])
po, err = strconv.Atoi(port)
return
}
func (p *RedisProtocol) extractParts(from string) (string, string) {
idx := strings.LastIndex(from, ":")
host := from
if idx != -1 {
host = from[0:idx]
}
port := ""
if idx != -1 {
port = from[idx+1:]
}
return host, port
}

View File

@@ -0,0 +1,290 @@
package main
type RedisType string
type RedisCommand string
type RedisKeyword string
var types map[rune]RedisType = map[rune]RedisType{
plusByte: "Simple String",
dollarByte: "Bulk String",
asteriskByte: "Array",
colonByte: "Integer",
minusByte: "Error",
notApplicableByte: "N/A",
}
var commands []RedisCommand = []RedisCommand{
"PING",
"SET",
"GET",
"QUIT",
"EXISTS",
"DEL",
"UNLINK",
"TYPE",
"FLUSHDB",
"KEYS",
"RANDOMKEY",
"RENAME",
"RENAMENX",
"RENAMEX",
"DBSIZE",
"EXPIRE",
"EXPIREAT",
"TTL",
"SELECT",
"MOVE",
"FLUSHALL",
"GETSET",
"MGET",
"SETNX",
"SETEX",
"MSET",
"MSETNX",
"DECRBY",
"DECR",
"INCRBY",
"INCR",
"APPEND",
"SUBSTR",
"HSET",
"HGET",
"HSETNX",
"HMSET",
"HMGET",
"HINCRBY",
"HEXISTS",
"HDEL",
"HLEN",
"HKEYS",
"HVALS",
"HGETALL",
"RPUSH",
"LPUSH",
"LLEN",
"LRANGE",
"LTRIM",
"LINDEX",
"LSET",
"LREM",
"LPOP",
"RPOP",
"RPOPLPUSH",
"SADD",
"SMEMBERS",
"SREM",
"SPOP",
"SMOVE",
"SCARD",
"SISMEMBER",
"SINTER",
"SINTERSTORE",
"SUNION",
"SUNIONSTORE",
"SDIFF",
"SDIFFSTORE",
"SRANDMEMBER",
"ZADD",
"ZRANGE",
"ZREM",
"ZINCRBY",
"ZRANK",
"ZREVRANK",
"ZREVRANGE",
"ZCARD",
"ZSCORE",
"MULTI",
"DISCARD",
"EXEC",
"WATCH",
"UNWATCH",
"SORT",
"BLPOP",
"BRPOP",
"AUTH",
"SUBSCRIBE",
"PUBLISH",
"UNSUBSCRIBE",
"PSUBSCRIBE",
"PUNSUBSCRIBE",
"PUBSUB",
"ZCOUNT",
"ZRANGEBYSCORE",
"ZREVRANGEBYSCORE",
"ZREMRANGEBYRANK",
"ZREMRANGEBYSCORE",
"ZUNIONSTORE",
"ZINTERSTORE",
"ZLEXCOUNT",
"ZRANGEBYLEX",
"ZREVRANGEBYLEX",
"ZREMRANGEBYLEX",
"SAVE",
"BGSAVE",
"BGREWRITEAOF",
"LASTSAVE",
"SHUTDOWN",
"INFO",
"MONITOR",
"SLAVEOF",
"CONFIG",
"STRLEN",
"SYNC",
"LPUSHX",
"PERSIST",
"RPUSHX",
"ECHO",
"LINSERT",
"DEBUG",
"BRPOPLPUSH",
"SETBIT",
"GETBIT",
"BITPOS",
"SETRANGE",
"GETRANGE",
"EVAL",
"EVALSHA",
"SCRIPT",
"SLOWLOG",
"OBJECT",
"BITCOUNT",
"BITOP",
"SENTINEL",
"DUMP",
"RESTORE",
"PEXPIRE",
"PEXPIREAT",
"PTTL",
"INCRBYFLOAT",
"PSETEX",
"CLIENT",
"TIME",
"MIGRATE",
"HINCRBYFLOAT",
"SCAN",
"HSCAN",
"SSCAN",
"ZSCAN",
"WAIT",
"CLUSTER",
"ASKING",
"PFADD",
"PFCOUNT",
"PFMERGE",
"READONLY",
"GEOADD",
"GEODIST",
"GEOHASH",
"GEOPOS",
"GEORADIUS",
"GEORADIUS_RO",
"GEORADIUSBYMEMBER",
"GEORADIUSBYMEMBER_RO",
"MODULE",
"BITFIELD",
"HSTRLEN",
"TOUCH",
"SWAPDB",
"MEMORY",
"XADD",
"XLEN",
"XDEL",
"XTRIM",
"XRANGE",
"XREVRANGE",
"XREAD",
"XACK",
"XGROUP",
"XREADGROUP",
"XPENDING",
"XCLAIM",
}
var keywords []RedisKeyword = []RedisKeyword{
"AGGREGATE",
"ALPHA",
"ASC",
"BY",
"DESC",
"GET",
"LIMIT",
"MESSAGE",
"NO",
"NOSORT",
"PMESSAGE",
"PSUBSCRIBE",
"PUNSUBSCRIBE",
"OK",
"ONE",
"QUEUED",
"SET",
"STORE",
"SUBSCRIBE",
"UNSUBSCRIBE",
"WEIGHTS",
"WITHSCORES",
"RESETSTAT",
"REWRITE",
"RESET",
"FLUSH",
"EXISTS",
"LOAD",
"KILL",
"LEN",
"REFCOUNT",
"ENCODING",
"IDLETIME",
"GETNAME",
"SETNAME",
"LIST",
"MATCH",
"COUNT",
"PING",
"PONG",
"UNLOAD",
"REPLACE",
"KEYS",
"PAUSE",
"DOCTOR",
"BLOCK",
"NOACK",
"STREAMS",
"KEY",
"CREATE",
"MKSTREAM",
"SETID",
"DESTROY",
"DELCONSUMER",
"MAXLEN",
"GROUP",
"IDLE",
"TIME",
"RETRYCOUNT",
"FORCE",
}
type RedisPacket struct {
Type RedisType `json:"type"`
Command RedisCommand `json:"command"`
Key string `json:"key"`
Value string `json:"value"`
Keyword RedisKeyword `json:"keyword"`
}
func isValidRedisCommand(s []RedisCommand, c RedisCommand) bool {
for _, v := range s {
if v == c {
return true
}
}
return false
}
func isValidRedisKeyword(s []RedisKeyword, c RedisKeyword) bool {
for _, v := range s {
if v == c {
return true
}
}
return false
}

6
ui/package-lock.json generated
View File

@@ -13454,9 +13454,9 @@
}
},
"react-scrollable-feed-virtualized": {
"version": "1.4.2",
"resolved": "https://registry.npmjs.org/react-scrollable-feed-virtualized/-/react-scrollable-feed-virtualized-1.4.2.tgz",
"integrity": "sha512-j6M80ETqhSRBlygWe491gMPqCiAkVUQsLd/JR7DCKsZF44IYlJiunZWjdWe3//gxddTL68pjqq8pMvd1YN1bgw=="
"version": "1.4.3",
"resolved": "https://registry.npmjs.org/react-scrollable-feed-virtualized/-/react-scrollable-feed-virtualized-1.4.3.tgz",
"integrity": "sha512-M9WgJKr57jCyWKNCksc3oi+xhtO0YbL9d7Ll8Sdc5ZWOIstNvdNbNX0k4Nq6kXUVaHCJ9qE8omdSI/CxT3MLAQ=="
},
"react-syntax-highlighter": {
"version": "15.4.3",

View File

@@ -21,7 +21,7 @@
"react-copy-to-clipboard": "^5.0.3",
"react-dom": "^17.0.2",
"react-scripts": "4.0.3",
"react-scrollable-feed-virtualized": "^1.4.2",
"react-scrollable-feed-virtualized": "^1.4.3",
"react-syntax-highlighter": "^15.4.3",
"typescript": "^4.2.4",
"web-vitals": "^1.1.1"

View File

@@ -91,7 +91,7 @@ const App = () => {
</table>
</span>
return (
<div className="mizuApp">
<div className="header">

View File

@@ -19,7 +19,8 @@ interface EntriesListProps {
setNoMoreDataBottom: (flag: boolean) => void;
methodsFilter: Array<string>;
statusFilter: Array<string>;
pathFilter: string
pathFilter: string;
serviceFilter: string;
listEntryREF: any;
onScrollEvent: (isAtBottom:boolean) => void;
scrollableList: boolean;
@@ -32,7 +33,7 @@ enum FetchOperator {
const api = new Api();
export const EntriesList: React.FC<EntriesListProps> = ({entries, setEntries, focusedEntryId, setFocusedEntryId, connectionOpen, noMoreDataTop, setNoMoreDataTop, noMoreDataBottom, setNoMoreDataBottom, methodsFilter, statusFilter, pathFilter, listEntryREF, onScrollEvent, scrollableList}) => {
export const EntriesList: React.FC<EntriesListProps> = ({entries, setEntries, focusedEntryId, setFocusedEntryId, connectionOpen, noMoreDataTop, setNoMoreDataTop, noMoreDataBottom, setNoMoreDataBottom, methodsFilter, statusFilter, pathFilter, serviceFilter, listEntryREF, onScrollEvent, scrollableList}) => {
const [loadMoreTop, setLoadMoreTop] = useState(false);
const [isLoadingTop, setIsLoadingTop] = useState(false);
@@ -54,10 +55,11 @@ export const EntriesList: React.FC<EntriesListProps> = ({entries, setEntries, fo
const filterEntries = useCallback((entry) => {
if(methodsFilter.length > 0 && !methodsFilter.includes(entry.method.toLowerCase())) return;
if(pathFilter && entry.path?.toLowerCase()?.indexOf(pathFilter) === -1) return;
if(serviceFilter && entry.service?.toLowerCase()?.indexOf(serviceFilter) === -1) return;
if(statusFilter.includes(StatusType.SUCCESS) && entry.statusCode >= 400) return;
if(statusFilter.includes(StatusType.ERROR) && entry.statusCode < 400) return;
return entry;
},[methodsFilter, pathFilter, statusFilter])
},[methodsFilter, pathFilter, statusFilter, serviceFilter])
const filteredEntries = useMemo(() => {
return entries.filter(filterEntries);

View File

@@ -41,7 +41,7 @@ const EntryTitle: React.FC<any> = ({protocol, data, bodySize, elapsedTime}) => {
<Protocol protocol={protocol} horizontal={true}/>
<div style={{right: "30px", position: "absolute", display: "flex"}}>
{response.payload && <div style={{margin: "0 18px", opacity: 0.5}}>{formatSize(bodySize)}</div>}
<div style={{marginRight: 18, opacity: 0.5}}>{Math.round(elapsedTime)}ms</div>
{response.payload && <div style={{marginRight: 18, opacity: 0.5}}>{Math.round(elapsedTime)}ms</div>}
</div>
</div>;
};
@@ -71,7 +71,7 @@ export const EntryDetailed: React.FC<EntryDetailedProps> = ({entryData}) => {
/>
{entryData.data && <EntrySummary data={entryData.data}/>}
<>
{entryData.data && <EntryViewer representation={entryData.representation} rulesMatched={entryData.rulesMatched} elapsedTime={entryData.data.elapsedTime} color={entryData.protocol.backgroundColor}/>}
{entryData.data && <EntryViewer representation={entryData.representation} isRulesEnabled={entryData.isRulesEnabled} rulesMatched={entryData.rulesMatched} elapsedTime={entryData.data.elapsedTime} color={entryData.protocol.backgroundColor}/>}
</>
</>
};

View File

@@ -153,10 +153,8 @@ export const EntryTableSection: React.FC<EntrySectionProps> = ({title, color, ar
interface EntryPolicySectionProps {
service: string,
title: string,
color: string,
response: any,
latency?: number,
arrayToIterate: any[],
}
@@ -200,7 +198,7 @@ export const EntryPolicySectionContainer: React.FC<EntryPolicySectionContainerPr
</CollapsibleContainer>
}
export const EntryTablePolicySection: React.FC<EntryPolicySectionProps> = ({service, title, color, response, latency, arrayToIterate}) => {
export const EntryTablePolicySection: React.FC<EntryPolicySectionProps> = ({title, color, latency, arrayToIterate}) => {
return <React.Fragment>
{
arrayToIterate && arrayToIterate.length > 0 ?

View File

@@ -33,8 +33,8 @@ const SectionsRepresentation: React.FC<any> = ({data, color}) => {
return <>{sections}</>;
}
const AutoRepresentation: React.FC<any> = ({representation, rulesMatched, elapsedTime, color}) => {
const TABS = [
const AutoRepresentation: React.FC<any> = ({representation, isRulesEnabled, rulesMatched, elapsedTime, color}) => {
var TABS = [
{
tab: 'request'
},
@@ -54,6 +54,14 @@ const AutoRepresentation: React.FC<any> = ({representation, rulesMatched, elapse
const {request, response} = JSON.parse(representation);
if (!response) {
TABS[1]['hidden'] = true;
}
if (!isRulesEnabled) {
TABS.pop()
}
return <div className={styles.Entry}>
{<div className={styles.body}>
<div className={styles.bodyHeader}>
@@ -63,11 +71,11 @@ const AutoRepresentation: React.FC<any> = ({representation, rulesMatched, elapse
{currentTab === TABS[0].tab && <React.Fragment>
<SectionsRepresentation data={request} color={color}/>
</React.Fragment>}
{currentTab === TABS[1].tab && <React.Fragment>
{response && currentTab === TABS[1].tab && <React.Fragment>
<SectionsRepresentation data={response} color={color}/>
</React.Fragment>}
{currentTab === TABS[2].tab && <React.Fragment>
<EntryTablePolicySection service={representation.service} title={'Rule'} color={color} latency={elapsedTime} response={response} arrayToIterate={rulesMatched ? rulesMatched : []}/>
{TABS.length > 2 && currentTab === TABS[2].tab && <React.Fragment>
<EntryTablePolicySection title={'Rule'} color={color} latency={elapsedTime} arrayToIterate={rulesMatched ? rulesMatched : []}/>
</React.Fragment>}
</div>}
</div>;
@@ -75,13 +83,14 @@ const AutoRepresentation: React.FC<any> = ({representation, rulesMatched, elapse
interface Props {
representation: any;
isRulesEnabled: boolean;
rulesMatched: any;
color: string;
elapsedTime: number;
}
const EntryViewer: React.FC<Props> = ({representation, rulesMatched, elapsedTime, color}) => {
return <AutoRepresentation representation={representation} rulesMatched={rulesMatched} elapsedTime={elapsedTime} color={color}/>
const EntryViewer: React.FC<Props> = ({representation, isRulesEnabled, rulesMatched, elapsedTime, color}) => {
return <AutoRepresentation representation={representation} isRulesEnabled={isRulesEnabled} rulesMatched={rulesMatched} elapsedTime={elapsedTime} color={color}/>
};
export default EntryViewer;

View File

@@ -36,8 +36,8 @@
border-left: 5px $failure-color solid
.ruleNumberText
font-size: 12px;
font-style: italic;
font-size: 12px
font-weight: 600
.ruleNumberTextFailure
color: #DB2156

View File

@@ -68,7 +68,7 @@ export const EntryItem: React.FC<EntryProps> = ({entry, setFocusedEntryId, isSel
let rule = 'latency' in entry.rules
if (rule) {
if (entry.rules.latency !== -1) {
if (entry.rules.latency >= entry.latency) {
if (entry.rules.latency >= entry.latency || !('latency' in entry)) {
additionalRulesProperties = styles.ruleSuccessRow
ruleSuccess = true
} else {

View File

@@ -11,13 +11,16 @@ interface FiltersProps {
setStatusFilter: (methods: Array<string>) => void;
pathFilter: string
setPathFilter: (val: string) => void;
serviceFilter: string
setServiceFilter: (val: string) => void;
}
export const Filters: React.FC<FiltersProps> = ({methodsFilter, setMethodsFilter, statusFilter, setStatusFilter, pathFilter, setPathFilter}) => {
export const Filters: React.FC<FiltersProps> = ({methodsFilter, setMethodsFilter, statusFilter, setStatusFilter, pathFilter, setPathFilter, serviceFilter, setServiceFilter}) => {
return <div className={styles.container}>
<MethodFilter methodsFilter={methodsFilter} setMethodsFilter={setMethodsFilter}/>
<StatusTypesFilter statusFilter={statusFilter} setStatusFilter={setStatusFilter}/>
<ServiceFilter serviceFilter={serviceFilter} setServiceFilter={setServiceFilter}/>
<PathFilter pathFilter={pathFilter} setPathFilter={setPathFilter}/>
</div>;
};
@@ -117,3 +120,18 @@ const PathFilter: React.FC<PathFilterProps> = ({pathFilter, setPathFilter}) => {
</FilterContainer>;
};
interface ServiceFilterProps {
serviceFilter: string;
setServiceFilter: (val: string) => void;
}
const ServiceFilter: React.FC<ServiceFilterProps> = ({serviceFilter, setServiceFilter}) => {
return <FilterContainer>
<div className={styles.filterLabel}>Service</div>
<div>
<TextField value={serviceFilter} variant="outlined" className={styles.filterText} style={{minWidth: '150px'}} onChange={(e: any) => setServiceFilter(e.target.value)}/>
</div>
</FilterContainer>;
};

View File

@@ -58,6 +58,7 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
const [methodsFilter, setMethodsFilter] = useState([]);
const [statusFilter, setStatusFilter] = useState([]);
const [pathFilter, setPathFilter] = useState("");
const [serviceFilter, setServiceFilter] = useState("");
const [tappingStatus, setTappingStatus] = useState(null);
@@ -192,6 +193,8 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
setStatusFilter={setStatusFilter}
pathFilter={pathFilter}
setPathFilter={setPathFilter}
serviceFilter={serviceFilter}
setServiceFilter={setServiceFilter}
/>
<div className={styles.container}>
<EntriesList entries={entries}
@@ -206,6 +209,7 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
methodsFilter={methodsFilter}
statusFilter={statusFilter}
pathFilter={pathFilter}
serviceFilter={serviceFilter}
listEntryREF={listEntry}
onScrollEvent={onScrollEvent}
scrollableList={disableScrollList}